package io.intino.datahub.box.service.jms;

import io.intino.alexandria.jms.MessageReader;
import io.intino.alexandria.jms.QueueProducer;
import io.intino.alexandria.logger.Logger;
import io.intino.datahub.box.DataHubBox;
import io.intino.datahub.broker.BrokerManager;
import io.intino.datahub.broker.jms.JmsMessageTranslator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.activemq.command.ActiveMQTempQueue;

/* loaded from: input_file:io/intino/datahub/box/service/jms/NessService.class */
public class NessService {
    private final BrokerManager manager;
    private final ExecutorService dispatcherService = Executors.newFixedThreadPool(16, runnable -> {
        return new Thread(runnable, "Ness Datamarts Service");
    });

    public NessService(DataHubBox dataHubBox) {
        this.manager = dataHubBox.brokerService().manager();
        this.manager.registerQueueConsumer("service.ness.seal", message -> {
            response(this.manager, message, new SealRequest(dataHubBox).accept(MessageReader.textFrom(message)));
        });
        this.manager.registerQueueConsumer("service.ness.seal.last", message2 -> {
            response(this.manager, message2, new LastSealRequest(dataHubBox).accept(MessageReader.textFrom(message2)));
        });
        this.manager.registerQueueConsumer("service.ness.backup", message3 -> {
            response(this.manager, message3, new BackupRequest(dataHubBox).accept(MessageReader.textFrom(message3)));
        });
        this.manager.registerQueueConsumer("service.ness.datalake", message4 -> {
            response(this.manager, message4, new DatalakeRequest(dataHubBox).accept(message4));
        });
        this.manager.registerQueueConsumer("service.ness.datamarts", message5 -> {
            response(this.manager, message5, new DatamartsRequest(dataHubBox).accept(message5));
        });
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                this.dispatcherService.shutdown();
                this.dispatcherService.awaitTermination(1L, TimeUnit.MINUTES);
            } catch (InterruptedException e) {
                Logger.error(e);
            }
        }));
    }

    private void response(BrokerManager brokerManager, Message message, String str) {
        response(brokerManager, message, JmsMessageTranslator.toJmsMessage(str));
    }

    private void response(BrokerManager brokerManager, Message message, Message message2) {
        new Thread(() -> {
            if (message2 == null) {
                return;
            }
            try {
                QueueProducer producer = producer(brokerManager, message);
                if (producer == null) {
                    return;
                }
                message2.setJMSCorrelationID(message.getJMSCorrelationID());
                producer.produce(message2);
            } catch (Throwable th) {
                Logger.error("Error while handling response: " + th.getMessage(), th);
            }
        }).start();
    }

    private void response(BrokerManager brokerManager, Message message, Stream<Message> stream) {
        this.dispatcherService.execute(() -> {
            QueueProducer producer;
            if (stream == null || (producer = producer(brokerManager, message)) == null) {
                return;
            }
            publishResponse(message, stream, producer);
        });
    }

    private void publishResponse(Message message, Stream<Message> stream, QueueProducer queueProducer) {
        stream.forEach(message2 -> {
            try {
                message2.setJMSCorrelationID(message.getJMSCorrelationID());
                queueProducer.produce(message2);
            } catch (Throwable th) {
                Logger.error("Error while handling response: " + th.getMessage(), th);
            }
        });
        try {
            this.manager.unregisterQueueProducer(replyQueue(message));
        } catch (JMSException e) {
            Logger.error(e);
        }
    }

    private static QueueProducer producer(BrokerManager brokerManager, Message message) {
        try {
            return brokerManager.queueProducerOf(replyQueue(message));
        } catch (JMSException e) {
            Logger.error(e);
            return null;
        }
    }

    private static String replyQueue(Message message) throws JMSException {
        ActiveMQTempQueue jMSReplyTo = message.getJMSReplyTo();
        return jMSReplyTo instanceof ActiveMQTempQueue ? jMSReplyTo.getQueueName() : jMSReplyTo.toString();
    }
}
