/*
 * Decompiled with CFR 0.152.
 */
package io.intino.datahub.broker.jms;

import io.intino.alexandria.Scale;
import io.intino.alexandria.jms.DurableTopicConsumer;
import io.intino.alexandria.jms.JmsConsumer;
import io.intino.alexandria.jms.JmsProducer;
import io.intino.alexandria.jms.MessageReader;
import io.intino.alexandria.jms.QueueConsumer;
import io.intino.alexandria.jms.QueueProducer;
import io.intino.alexandria.jms.TopicConsumer;
import io.intino.alexandria.jms.TopicProducer;
import io.intino.alexandria.logger.Logger;
import io.intino.datahub.box.DataHubBox;
import io.intino.datahub.broker.BrokerService;
import io.intino.datahub.broker.jms.AdvisoryManager;
import io.intino.datahub.broker.jms.JmsMessageSerializer;
import io.intino.datahub.broker.jms.ProcessStatusSerializer;
import io.intino.datahub.broker.jms.SSLConfiguration;
import io.intino.datahub.model.Broker;
import io.intino.datahub.model.Datalake;
import io.intino.datahub.model.NessGraph;
import jakarta.jms.Connection;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageNotWriteableException;
import jakarta.jms.Session;
import jakarta.jms.TopicConnectionFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URI;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.TrustManagerFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.SslContext;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
import org.apache.activemq.broker.region.policy.PendingMessageLimitStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.virtual.CompositeQueue;
import org.apache.activemq.broker.region.virtual.CompositeTopic;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.broker.util.TimeStampingBrokerPlugin;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.jms.InboundTopicBridge;
import org.apache.activemq.network.jms.JmsConnector;
import org.apache.activemq.network.jms.SimpleJmsTopicConnector;
import org.apache.activemq.plugin.java.JavaRuntimeConfigurationPlugin;
import org.apache.activemq.security.AuthenticationUser;
import org.apache.activemq.security.SimpleAuthenticationPlugin;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;

public class JmsBrokerService
implements BrokerService {
    private static final String NESS = "ness";
    private final DataHubBox box;
    private final File root;
    private final File brokerStage;
    private final SSLConfiguration sslConfiguration;
    private final BrokerManager brokerManager;
    private final PipeManager pipeManager;
    private final Map<String, VirtualDestinationInterceptor> pipes = new HashMap<String, VirtualDestinationInterceptor>();
    private static ExecutorService mounterService = JmsBrokerService.jmsMounterService();
    private org.apache.activemq.broker.BrokerService service;

    public JmsBrokerService(DataHubBox box, File brokerStage) {
        this(box, brokerStage, null);
    }

    public JmsBrokerService(DataHubBox box, File brokerStage, SSLConfiguration sslConfiguration) {
        this.box = box;
        this.root = new File(box.graph().broker().path());
        this.brokerStage = brokerStage;
        this.sslConfiguration = sslConfiguration;
        this.configure();
        this.brokerManager = new BrokerManager(box.graph(), new AdvisoryManager(this.jmsBroker()));
        this.pipeManager = new PipeManager(this.brokerManager, box.graph().broker().pipeList());
    }

    @Override
    public void start() {
        try {
            Logger.info((String)"Starting broker...");
            this.service.start();
            this.service.waitUntilStarted();
            this.brokerManager.start();
            this.pipeManager.start();
            Logger.info((String)("Broker started in port " + this.graph().broker().port()));
        }
        catch (Exception e) {
            Logger.error((String)e.getMessage(), (Throwable)e);
        }
    }

    @Override
    public void stop() {
        try {
            if (this.service != null) {
                this.pipeManager.stop();
                this.brokerManager.stop();
                this.service.stop();
                this.service.waitUntilStopped();
            }
        }
        catch (Exception e) {
            Logger.error((String)e.getMessage(), (Throwable)e);
        }
    }

    @Override
    public io.intino.datahub.broker.BrokerManager manager() {
        return this.brokerManager;
    }

    private NessGraph graph() {
        return this.box.graph();
    }

    private void configure() {
        try {
            this.service = new org.apache.activemq.broker.BrokerService();
            this.service.setBrokerName(NESS);
            this.service.setPersistent(true);
            this.service.setOfflineDurableSubscriberTaskSchedule(86400000L);
            this.service.setOfflineDurableSubscriberTimeout(259200000L);
            this.service.setPersistenceAdapter((PersistenceAdapter)this.persistenceAdapter());
            this.service.setDataDirectory(new File(this.root, "activemq-data").getAbsolutePath());
            this.service.setRestartAllowed(true);
            this.service.setUseJmx(true);
            this.service.setUseShutdownHook(true);
            this.service.setAdvisorySupport(true);
            this.service.setSchedulePeriodForDestinationPurge(86400000);
            this.service.setPlugins(new BrokerPlugin[]{new SimpleAuthenticationPlugin(this.registerUsers()), new JavaRuntimeConfigurationPlugin(), new TimeStampingBrokerPlugin()});
            ArrayList<CompositeDestinationInterceptor> destinationInterceptors = new ArrayList<CompositeDestinationInterceptor>();
            for (Broker.CompositeDestination o : this.box.graph().broker().compositeDestinationList()) {
                CompositeTopic composite = o.type().equals((Object)Broker.CompositeDestination.Type.Topic) ? new CompositeTopic() : new CompositeQueue();
                composite.setForwardTo(o.forwardTo().stream().map(f -> o.type().equals((Object)Broker.CompositeDestination.Type.Topic) ? new ActiveMQTopic(f) : new ActiveMQQueue(f)).toList());
                destinationInterceptors.add(new CompositeDestinationInterceptor(new DestinationInterceptor[]{composite}));
            }
            this.service.setDestinationInterceptors((DestinationInterceptor[])destinationInterceptors.toArray(DestinationInterceptor[]::new));
            this.addPolicies();
            if (this.sslConfiguration != null) {
                this.addSSLConnector();
            } else {
                this.addTCPConnector();
                this.addMQTTConnector();
            }
            this.graph().broker().bridgeList().forEach(this::addJmsBridge);
        }
        catch (Exception e) {
            Logger.error((String)("Error configuring: " + e.getMessage()), (Throwable)e);
        }
    }

    private List<AuthenticationUser> registerUsers() {
        ArrayList<AuthenticationUser> users = new ArrayList<AuthenticationUser>();
        users.add(new AuthenticationUser(NESS, NESS, "admin"));
        for (Broker.User user : this.graph().broker().userList()) {
            users.add(new AuthenticationUser(user.name(), user.password(), "users"));
        }
        return users;
    }

    public Map<String, VirtualDestinationInterceptor> pipes() {
        return this.pipes;
    }

    private Broker jmsBroker() {
        try {
            return this.service.getBroker();
        }
        catch (Exception e) {
            Logger.error((String)e.getMessage(), (Throwable)e);
            return null;
        }
    }

    private InboundTopicBridge[] toInboundBridges(List<String> inboundTopics) {
        return (InboundTopicBridge[])inboundTopics.stream().map(topicName -> {
            InboundTopicBridge bridge = new InboundTopicBridge(topicName);
            bridge.setLocalTopicName(topicName);
            return bridge;
        }).toArray(InboundTopicBridge[]::new);
    }

    private void addJmsBridge(Broker.Bridge c) {
        try {
            SimpleJmsTopicConnector connector = new SimpleJmsTopicConnector();
            connector.setName(c.name$());
            connector.setLocalTopicConnectionFactory((TopicConnectionFactory)new ActiveMQConnectionFactory(NESS, NESS, "vm://ness?waitForStart=1000&create=false"));
            connector.setOutboundTopicConnectionFactory((TopicConnectionFactory)new ActiveMQConnectionFactory(c.externalBus().user(), c.externalBus().password(), c.externalBus().url()));
            connector.setOutboundUsername(c.externalBus().user());
            connector.setOutboundPassword(c.externalBus().password());
            connector.setInboundTopicBridges(this.toInboundBridges(c.topics()));
            this.service.addJmsConnector((JmsConnector)connector);
            Logger.info((String)("Connector with " + c.externalBus().url() + " started"));
        }
        catch (Exception e) {
            Logger.error((String)e.getMessage(), (Throwable)e);
        }
    }

    private void addPolicies() {
        ArrayList<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
        policyEntries.add(JmsBrokerService.pendingMessagesPolicy());
        policyEntries.add(this.gcOldQueues());
        PolicyMap policyMap = new PolicyMap();
        policyMap.setPolicyEntries(policyEntries);
        this.service.setDestinationPolicy(policyMap);
    }

    private PolicyEntry gcOldQueues() {
        PolicyEntry entry = new PolicyEntry();
        entry.setQueue(">");
        entry.setGcInactiveDestinations(true);
        entry.setInactiveTimeoutBeforeGC(3600000L);
        return entry;
    }

    private static PolicyEntry pendingMessagesPolicy() {
        PolicyEntry entry = new PolicyEntry();
        entry.setAdvisoryForDiscardingMessages(true);
        entry.setTopicPrefetch(1);
        entry.setTopic(">");
        ConstantPendingMessageLimitStrategy pendingMessageLimitStrategy = new ConstantPendingMessageLimitStrategy();
        pendingMessageLimitStrategy.setLimit(1000000);
        entry.setPendingMessageLimitStrategy((PendingMessageLimitStrategy)pendingMessageLimitStrategy);
        return entry;
    }

    private void addTCPConnector() throws Exception {
        TransportConnector connector = new TransportConnector();
        connector.setUri(new URI("tcp://0.0.0.0:" + this.graph().broker().port() + "?transport.useKeepAlive=true"));
        connector.setName("OWireConn");
        this.service.addConnector(connector);
    }

    private void addSSLConnector() throws Exception {
        TransportConnector connector = new TransportConnector();
        connector.setUri(new URI("ssl://0.0.0.0:" + this.graph().broker().port() + "?transport.useKeepAlive=true&amp;needClientAuth=true"));
        connector.setName("ssl");
        this.configureSSL();
        this.service.addConnector(connector);
    }

    private void configureSSL() throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException, UnrecoverableKeyException {
        KeyStore keyStore = KeyStore.getInstance("JKS");
        keyStore.load(new FileInputStream(this.sslConfiguration.keyStore()), this.sslConfiguration.keyStorePassword());
        KeyStore trustStore = KeyStore.getInstance("JKS");
        trustStore.load(new FileInputStream(this.sslConfiguration.trustStore()), this.sslConfiguration.trustStorePassword());
        KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance("SunX509");
        keyManagerFactory.init(keyStore, this.sslConfiguration.keyStorePassword());
        TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance("SunX509");
        trustManagerFactory.init(trustStore);
        this.service.setSslContext(new SslContext(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), null));
    }

    private void addMQTTConnector() throws Exception {
        if (this.graph().broker().secondaryPort() == 0) {
            return;
        }
        TransportConnector mqtt = new TransportConnector();
        mqtt.setUri(new URI("mqtt://0.0.0.0:" + this.graph().broker().secondaryPort()));
        mqtt.setName("MQTTConn");
        this.service.addConnector(mqtt);
    }

    private KahaDBPersistenceAdapter persistenceAdapter() {
        KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
        adapter.setDirectoryArchive(new File(this.root, "archive"));
        adapter.setIndexDirectory(new File(this.root, "entries"));
        adapter.setDirectory(this.root);
        adapter.setBrokerName(NESS);
        adapter.setBrokerService(this.service);
        return adapter;
    }

    private ActiveMQTextMessage createMessage(String message) throws MessageNotWriteableException {
        ActiveMQTextMessage textMessage = new ActiveMQTextMessage();
        textMessage.setText(message);
        return textMessage;
    }

    private static ExecutorService jmsMounterService() {
        return Executors.newSingleThreadExecutor(r -> new Thread(r, "MessageMounter"));
    }

    final class BrokerManager
    implements io.intino.datahub.broker.BrokerManager {
        private final Map<String, JmsProducer> producers = new ConcurrentHashMap<String, JmsProducer>();
        private final Map<String, List<JmsConsumer>> consumers = new HashMap<String, List<JmsConsumer>>();
        private final NessGraph graph;
        private final AdvisoryManager advisoryManager;
        private Connection connection;
        private Session session;

        BrokerManager(NessGraph graph, AdvisoryManager advisoryManager) {
            this.graph = graph;
            this.advisoryManager = advisoryManager;
        }

        void start() {
            this.startNessSession();
            this.startTankConsumers();
            Datalake.ProcessStatus processStatus = this.graph.datalake().processStatus();
            if (processStatus != null) {
                this.registerProcessStatus(this.datalakeScale(), processStatus);
            }
        }

        void stop() {
            try {
                Logger.info((String)"Stopping bus");
                this.consumers.values().forEach(c -> c.forEach(JmsConsumer::close));
                this.consumers.clear();
                this.producers.values().forEach(JmsProducer::close);
                this.producers.clear();
                this.session.close();
                mounterService.shutdownNow();
                this.session = null;
                this.connection = null;
                JmsBrokerService.this.service.stop();
                Logger.info((String)"bus stopped");
            }
            catch (Throwable e) {
                Logger.error((Throwable)e);
            }
        }

        @Override
        public Session session() {
            return this.session;
        }

        @Override
        public void registerTopicConsumer(String topic, Consumer<Message> consumer) {
            List<Object> list = new ArrayList();
            if (!this.consumers.containsKey(topic)) {
                this.consumers.putIfAbsent(topic, list);
            } else {
                list = this.consumers.get(topic);
            }
            try {
                DurableTopicConsumer topicConsumer = new DurableTopicConsumer(this.nessSession(), topic, "ness-" + topic);
                topicConsumer.listen(consumer);
                list.add(topicConsumer);
            }
            catch (JMSException e) {
                Logger.error((Throwable)e);
            }
        }

        @Override
        public void registerQueueConsumer(String topic, Consumer<Message> consumer) {
            List<Object> list = new ArrayList();
            if (!this.consumers.containsKey(topic)) {
                this.consumers.putIfAbsent(topic, list);
            } else {
                list = this.consumers.get(topic);
            }
            try {
                QueueConsumer queueConsumer = new QueueConsumer(this.nessSession(), topic);
                queueConsumer.listen(consumer);
                list.add(queueConsumer);
            }
            catch (JMSException e) {
                Logger.error((Throwable)e);
            }
        }

        @Override
        public void unregisterConsumer(String topic) {
            Optional.ofNullable(this.consumers.remove(topic)).ifPresent(c -> c.forEach(JmsConsumer::close));
        }

        @Override
        public void unregisterConsumer(TopicConsumer consumer) {
            consumer.close();
            this.consumers.values().forEach(list -> list.remove(consumer));
        }

        @Override
        public void unregisterQueueProducer(String destination) {
            JmsProducer producer = this.producers.get(destination);
            if (producer != null && !producer.isClosed()) {
                producer.close();
            }
            this.producers.remove(destination);
        }

        @Override
        public QueueProducer queueProducerOf(String queue) {
            try {
                if (!this.producers.containsKey(queue) || this.producers.get(queue).isClosed()) {
                    this.producers.put(queue, (JmsProducer)new QueueProducer(this.nessSession(), queue));
                }
                return (QueueProducer)this.producers.get(queue);
            }
            catch (JMSException e) {
                Logger.error((String)e.getMessage(), (Throwable)e);
                return null;
            }
        }

        @Override
        public TopicProducer topicProducerOf(String topic) {
            try {
                if (!this.producers.containsKey(topic)) {
                    this.producers.put(topic, (JmsProducer)new TopicProducer(this.nessSession(), topic));
                }
                return (TopicProducer)this.producers.get(topic);
            }
            catch (JMSException e) {
                Logger.error((String)e.getMessage(), (Throwable)e);
                return null;
            }
        }

        void stopConsumersOf(String topic) {
            if (!this.consumers.containsKey(topic)) {
                return;
            }
            this.consumers.get(topic).forEach(JmsConsumer::close);
            this.consumers.get(topic).clear();
        }

        private void startNessSession() {
            try {
                this.connection = new ActiveMQConnectionFactory("vm://ness").createConnection(JmsBrokerService.NESS, JmsBrokerService.NESS);
                this.connection.setClientID(JmsBrokerService.NESS);
                this.session = this.connection.createSession(false, 1);
                this.advisoryManager.start(this.session);
                this.connection.start();
            }
            catch (JMSException e) {
                Logger.error((String)e.getMessage(), (Throwable)e);
            }
        }

        @Override
        public void startTankConsumers() {
            if (this.graph.datalake() == null) {
                return;
            }
            JmsBrokerService.this.brokerStage.mkdirs();
            mounterService = JmsBrokerService.jmsMounterService();
            this.graph.datalake().tankList().forEach(this::registerTankConsumer);
            Logger.info((String)"Tanks ignited!");
        }

        @Override
        public void pauseTankConsumers() {
            if (this.graph.datalake() == null) {
                return;
            }
            JmsBrokerService.this.brokerStage.mkdirs();
            this.graph.datalake().tankList().forEach(this::unregisterTankConsumer);
            BrokerManager.finishConsumerService();
            Logger.info((String)"Tanks paused!");
        }

        private static void finishConsumerService() {
            try {
                mounterService.shutdownNow();
                mounterService.awaitTermination(1L, TimeUnit.SECONDS);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }

        private void unregisterTankConsumer(Datalake.Tank t) {
            JmsBrokerService.this.brokerManager.unregisterConsumer(t.qn());
        }

        private void registerTankConsumer(Datalake.Tank t) {
            JmsBrokerService.this.brokerManager.registerTopicConsumer(t.qn(), new JmsMessageSerializer(JmsBrokerService.this.brokerStage, t, this.scale(t), JmsBrokerService.this.box.datamarts(), JmsBrokerService.this.box.nessService(), mounterService).create());
        }

        private void registerProcessStatus(Scale scale, Datalake.ProcessStatus ps) {
            JmsBrokerService.this.brokerManager.registerTopicConsumer(ps.name(), new ProcessStatusSerializer(JmsBrokerService.this.brokerStage, ps.name(), scale).create());
        }

        private Scale scale(Datalake.Tank t) {
            return t.scale() != null ? Scale.valueOf((String)t.scale().name()) : this.datalakeScale();
        }

        private Scale datalakeScale() {
            return Scale.valueOf((String)this.graph.datalake().scale().name());
        }

        private Session nessSession() {
            if (this.session == null || this.closedSession()) {
                this.startNessSession();
            }
            return this.session;
        }

        private boolean closedSession() {
            return ((ActiveMQSession)this.session).isClosed();
        }
    }

    private class PipeManager {
        private final List<Broker.Pipe> pipes;
        private final BrokerManager brokerManager;

        PipeManager(BrokerManager manager, List<Broker.Pipe> pipes) {
            this.brokerManager = manager;
            this.pipes = pipes;
        }

        void start() {
            for (Broker.Pipe pipe : this.pipes) {
                this.brokerManager.registerTopicConsumer(pipe.origin(), message -> this.send(pipe.destination(), MessageReader.textFrom((Message)message)));
                Logger.info((String)("Pipe " + pipe.origin() + " -> " + pipe.destination() + " established"));
            }
        }

        void stop() {
            for (Broker.Pipe pipe : this.pipes) {
                this.brokerManager.stopConsumersOf(pipe.origin());
                Logger.info((String)("Pipe " + pipe.origin() + " -> " + pipe.destination() + " established"));
            }
        }

        private void send(String destination, String message) {
            TopicProducer producer = this.brokerManager.topicProducerOf(destination);
            new Thread(() -> this.send(producer, message)).start();
        }

        private void send(TopicProducer producer, String message) {
            if (producer != null) {
                try {
                    producer.produce((Message)JmsBrokerService.this.createMessage(message));
                }
                catch (MessageNotWriteableException e) {
                    Logger.error((Throwable)e);
                }
            }
        }
    }
}

