/*
 * Decompiled with CFR 0.152.
 */
package io.intino.datahub.box.service.jms;

import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import io.intino.alexandria.Json;
import io.intino.alexandria.jms.MessageReader;
import io.intino.alexandria.jms.MessageWriter;
import io.intino.alexandria.jms.QueueProducer;
import io.intino.alexandria.jms.TopicProducer;
import io.intino.alexandria.logger.Logger;
import io.intino.datahub.box.DataHubBox;
import io.intino.datahub.box.service.jms.BackupRequest;
import io.intino.datahub.box.service.jms.DatalakeRequest;
import io.intino.datahub.box.service.jms.DatamartsRequest;
import io.intino.datahub.box.service.jms.LastSealRequest;
import io.intino.datahub.box.service.jms.SealRequest;
import io.intino.datahub.broker.BrokerManager;
import io.intino.datahub.broker.jms.JmsMessageTranslator;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.activemq.command.ActiveMQTempQueue;

public class NessService {
    public static final String SERVICE_NESS_DATAMARTS = "service.ness.datamarts";
    public static final String SERVICE_NESS_DATAMARTS_NOTIFICATIONS = "service.ness.datamarts.notifications";
    private BrokerManager manager;
    private ExecutorService dispatcherService;
    private TopicProducer notifier;
    private final DataHubBox box;

    public NessService(DataHubBox box) {
        this.box = box;
    }

    public void start() {
        this.dispatcherService = Executors.newFixedThreadPool(1, r -> new Thread(r, "Ness Datamarts Service"));
        this.manager = this.box.brokerService().manager();
        this.notifier = this.manager.topicProducerOf(SERVICE_NESS_DATAMARTS_NOTIFICATIONS);
        this.manager.registerQueueConsumer("service.ness.seal", m -> this.response(this.manager, (Message)m, new SealRequest(this.box).accept(MessageReader.textFrom((Message)m))));
        this.manager.registerQueueConsumer("service.ness.seal.last", m -> this.response(this.manager, (Message)m, new LastSealRequest(this.box).accept(MessageReader.textFrom((Message)m))));
        this.manager.registerQueueConsumer("service.ness.backup", m -> this.response(this.manager, (Message)m, new BackupRequest(this.box).accept(MessageReader.textFrom((Message)m))));
        this.manager.registerQueueConsumer("service.ness.datalake", m -> this.response(this.manager, (Message)m, new DatalakeRequest(this.box).accept((Message)m)));
        this.manager.registerQueueConsumer(SERVICE_NESS_DATAMARTS, m -> this.response(this.manager, (Message)m, new DatamartsRequest(this.box).accept((Message)m)));
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                this.dispatcherService.shutdown();
                this.dispatcherService.awaitTermination(1L, TimeUnit.MINUTES);
            }
            catch (InterruptedException e) {
                Logger.error((Throwable)e);
            }
        }));
    }

    public void notifyDatamartChange(List<String> sourcesChanged) {
        JsonObject notification = new JsonObject();
        notification.addProperty("operation", "refresh");
        JsonArray jsonElements = new JsonArray();
        sourcesChanged.forEach(arg_0 -> ((JsonArray)jsonElements).add(arg_0));
        notification.add("changes", (JsonElement)jsonElements);
        try {
            this.notifier.produce(MessageWriter.write((String)Json.toJson((Object)notification)));
        }
        catch (JMSException e) {
            Logger.error((Throwable)e);
        }
    }

    private void response(BrokerManager manager, Message requestMessage, String response) {
        this.response(manager, requestMessage, JmsMessageTranslator.toJmsMessage(response));
    }

    private void response(BrokerManager manager, Message request, Message response) {
        this.dispatcherService.execute(() -> {
            try {
                if (response == null) {
                    return;
                }
                QueueProducer queueProducer = NessService.producer(manager, request);
                if (queueProducer == null) {
                    return;
                }
                response.setJMSCorrelationID(request.getJMSCorrelationID());
                queueProducer.produce(response);
            }
            catch (Throwable e) {
                Logger.error((String)("Error while handling response: " + e.getMessage()), (Throwable)e);
            }
        });
    }

    private void response(BrokerManager manager, Message request, Stream<Message> response) {
        this.dispatcherService.execute(() -> {
            if (response == null) {
                return;
            }
            QueueProducer producer = NessService.producer(manager, request);
            if (producer == null) {
                return;
            }
            this.publishResponse(request, response, producer);
        });
    }

    private void publishResponse(Message request, Stream<Message> response, QueueProducer producer) {
        response.forEach(m -> {
            try {
                m.setJMSCorrelationID(request.getJMSCorrelationID());
                producer.produce(m);
            }
            catch (Throwable e) {
                Logger.error((String)("Error while handling response: " + e.getMessage()), (Throwable)e);
            }
        });
        try {
            this.manager.unregisterQueueProducer(NessService.replyQueue(request));
        }
        catch (JMSException e) {
            Logger.error((Throwable)e);
        }
    }

    private static QueueProducer producer(BrokerManager manager, Message request) {
        try {
            String queue = NessService.replyQueue(request);
            return manager.queueProducerOf(queue);
        }
        catch (JMSException e) {
            Logger.error((Throwable)e);
            return null;
        }
    }

    private static String replyQueue(Message request) throws JMSException {
        String string;
        Destination reply = request.getJMSReplyTo();
        if (reply instanceof ActiveMQTempQueue) {
            ActiveMQTempQueue r = (ActiveMQTempQueue)reply;
            string = r.getQueueName();
        } else {
            string = reply.toString();
        }
        return string;
    }
}

