package io.intino.datahub.box.service.jms;

import io.intino.alexandria.jms.MessageReader;
import io.intino.alexandria.jms.QueueProducer;
import io.intino.alexandria.logger.Logger;
import io.intino.datahub.box.DataHubBox;
import io.intino.datahub.broker.BrokerManager;
import io.intino.datahub.broker.jms.MessageTranslator;
import java.util.stream.Stream;
import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.activemq.command.ActiveMQTempQueue;

/* loaded from: input_file:io/intino/datahub/box/service/jms/NessService.class */
public class NessService {
    public NessService(DataHubBox dataHubBox) {
        BrokerManager manager = dataHubBox.brokerService().manager();
        manager.registerQueueConsumer("service.ness.seal", message -> {
            response(manager, message, new SealRequest(dataHubBox).accept(MessageReader.textFrom(message)));
        });
        manager.registerQueueConsumer("service.ness.seal.last", message2 -> {
            response(manager, message2, new LastSealRequest(dataHubBox).accept(MessageReader.textFrom(message2)));
        });
        manager.registerQueueConsumer("service.ness.backup", message3 -> {
            response(manager, message3, new BackupRequest(dataHubBox).accept(MessageReader.textFrom(message3)));
        });
        manager.registerQueueConsumer("service.ness.datalake.messagestore", message4 -> {
            response(manager, message4, new MessageStoreRequest(dataHubBox).accept(message4));
        });
    }

    private void response(BrokerManager brokerManager, Message message, String str) {
        response(brokerManager, message, MessageTranslator.toJmsMessage(str));
    }

    private void response(BrokerManager brokerManager, Message message, Message message2) {
        new Thread(() -> {
            if (message2 == null) {
                return;
            }
            try {
                QueueProducer producer = producer(brokerManager, message);
                if (producer == null) {
                    return;
                }
                message2.setJMSCorrelationID(message.getJMSCorrelationID());
                producer.produce(message2);
            } catch (Throwable th) {
                Logger.error("Error while handling response: " + th.getMessage(), th);
            }
        }).start();
    }

    private void response(BrokerManager brokerManager, Message message, Stream<Message> stream) {
        QueueProducer producer;
        if (stream == null || (producer = producer(brokerManager, message)) == null) {
            return;
        }
        new Thread(() -> {
            handleResponse(message, stream, producer);
        }).start();
    }

    private void handleResponse(Message message, Stream<Message> stream, QueueProducer queueProducer) {
        stream.forEach(message2 -> {
            try {
                message2.setJMSCorrelationID(message.getJMSCorrelationID());
                queueProducer.produce(message2);
            } catch (Throwable th) {
                Logger.error("Error while handling response: " + th.getMessage(), th);
            }
        });
    }

    private static QueueProducer producer(BrokerManager brokerManager, Message message) {
        try {
            ActiveMQTempQueue jMSReplyTo = message.getJMSReplyTo();
            return brokerManager.queueProducerOf(jMSReplyTo instanceof ActiveMQTempQueue ? jMSReplyTo.getQueueName() : jMSReplyTo.toString());
        } catch (JMSException e) {
            Logger.error(e);
            return null;
        }
    }
}
