/*
 * Decompiled with CFR 0.152.
 */
package io.intino.datahub.box.service.jms;

import io.intino.alexandria.jms.MessageReader;
import io.intino.alexandria.jms.QueueProducer;
import io.intino.alexandria.logger.Logger;
import io.intino.datahub.box.DataHubBox;
import io.intino.datahub.box.service.jms.BackupRequest;
import io.intino.datahub.box.service.jms.DatalakeRequest;
import io.intino.datahub.box.service.jms.DatamartsRequest;
import io.intino.datahub.box.service.jms.LastSealRequest;
import io.intino.datahub.box.service.jms.SealRequest;
import io.intino.datahub.broker.BrokerManager;
import io.intino.datahub.broker.jms.JmsMessageTranslator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.activemq.command.ActiveMQTempQueue;

public class NessService {
    private final BrokerManager manager;
    private final ExecutorService dispatcherService = Executors.newFixedThreadPool(16, r -> new Thread(r, "Ness Datamarts Service"));

    public NessService(DataHubBox box) {
        this.manager = box.brokerService().manager();
        this.manager.registerQueueConsumer("service.ness.seal", m -> this.response(this.manager, (Message)m, new SealRequest(box).accept(MessageReader.textFrom((Message)m))));
        this.manager.registerQueueConsumer("service.ness.seal.last", m -> this.response(this.manager, (Message)m, new LastSealRequest(box).accept(MessageReader.textFrom((Message)m))));
        this.manager.registerQueueConsumer("service.ness.backup", m -> this.response(this.manager, (Message)m, new BackupRequest(box).accept(MessageReader.textFrom((Message)m))));
        this.manager.registerQueueConsumer("service.ness.datalake", m -> this.response(this.manager, (Message)m, new DatalakeRequest(box).accept((Message)m)));
        this.manager.registerQueueConsumer("service.ness.datamarts", m -> this.response(this.manager, (Message)m, new DatamartsRequest(box).accept((Message)m)));
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                this.dispatcherService.shutdown();
                this.dispatcherService.awaitTermination(1L, TimeUnit.MINUTES);
            }
            catch (InterruptedException e) {
                Logger.error((Throwable)e);
            }
        }));
    }

    private void response(BrokerManager manager, Message requestMessage, String response) {
        this.response(manager, requestMessage, JmsMessageTranslator.toJmsMessage(response));
    }

    private void response(BrokerManager manager, Message request, Message response) {
        new Thread(() -> {
            try {
                if (response == null) {
                    return;
                }
                QueueProducer queueProducer = NessService.producer(manager, request);
                if (queueProducer == null) {
                    return;
                }
                response.setJMSCorrelationID(request.getJMSCorrelationID());
                queueProducer.produce(response);
            }
            catch (Throwable e) {
                Logger.error((String)("Error while handling response: " + e.getMessage()), (Throwable)e);
            }
        }).start();
    }

    private void response(BrokerManager manager, Message request, Stream<Message> response) {
        this.dispatcherService.execute(() -> {
            if (response == null) {
                return;
            }
            QueueProducer producer = NessService.producer(manager, request);
            if (producer == null) {
                return;
            }
            this.publishResponse(request, response, producer);
        });
    }

    private void publishResponse(Message request, Stream<Message> response, QueueProducer producer) {
        response.forEach(m -> {
            try {
                m.setJMSCorrelationID(request.getJMSCorrelationID());
                producer.produce(m);
            }
            catch (Throwable e) {
                Logger.error((String)("Error while handling response: " + e.getMessage()), (Throwable)e);
            }
        });
        try {
            this.manager.unregisterQueueProducer(NessService.replyQueue(request));
        }
        catch (JMSException e) {
            Logger.error((Throwable)e);
        }
    }

    private static QueueProducer producer(BrokerManager manager, Message request) {
        try {
            String queue = NessService.replyQueue(request);
            return manager.queueProducerOf(queue);
        }
        catch (JMSException e) {
            Logger.error((Throwable)e);
            return null;
        }
    }

    private static String replyQueue(Message request) throws JMSException {
        String string;
        Destination reply = request.getJMSReplyTo();
        if (reply instanceof ActiveMQTempQueue) {
            ActiveMQTempQueue r = (ActiveMQTempQueue)reply;
            string = r.getQueueName();
        } else {
            string = reply.toString();
        }
        return string;
    }
}

