package io.intino.datahub.broker.jms;

import io.intino.alexandria.Scale;
import io.intino.alexandria.jms.JmsConsumer;
import io.intino.alexandria.jms.JmsProducer;
import io.intino.alexandria.jms.MessageReader;
import io.intino.alexandria.jms.QueueConsumer;
import io.intino.alexandria.jms.QueueProducer;
import io.intino.alexandria.jms.TopicConsumer;
import io.intino.alexandria.jms.TopicProducer;
import io.intino.alexandria.logger.Logger;
import io.intino.datahub.box.DataHubBox;
import io.intino.datahub.broker.BrokerService;
import io.intino.datahub.model.Broker;
import io.intino.datahub.model.Datalake;
import io.intino.datahub.model.NessGraph;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URI;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageNotWriteableException;
import javax.jms.Session;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.TrustManagerFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.SslContext;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.virtual.CompositeQueue;
import org.apache.activemq.broker.region.virtual.CompositeTopic;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.broker.util.TimeStampingBrokerPlugin;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.jms.InboundTopicBridge;
import org.apache.activemq.network.jms.SimpleJmsTopicConnector;
import org.apache.activemq.plugin.java.JavaRuntimeConfigurationPlugin;
import org.apache.activemq.security.AuthenticationUser;
import org.apache.activemq.security.SimpleAuthenticationPlugin;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;

/* loaded from: input_file:io/intino/datahub/broker/jms/JmsBrokerService.class */
public class JmsBrokerService implements BrokerService {
    private static final String NESS = "ness";
    private final DataHubBox box;
    private final File root;
    private final File brokerStage;
    private final SSLConfiguration sslConfiguration;
    private final BrokerManager brokerManager;
    private final PipeManager pipeManager;
    private final Map<String, VirtualDestinationInterceptor> pipes;
    private org.apache.activemq.broker.BrokerService service;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/intino/datahub/broker/jms/JmsBrokerService$BrokerManager.class */
    public final class BrokerManager implements io.intino.datahub.broker.BrokerManager {
        private final Map<String, JmsProducer> producers = new HashMap();
        private final Map<String, List<JmsConsumer>> consumers = new HashMap();
        private final NessGraph graph;
        private final AdvisoryManager advisoryManager;
        private Connection connection;
        private Session session;

        BrokerManager(NessGraph nessGraph, AdvisoryManager advisoryManager) {
            this.graph = nessGraph;
            this.advisoryManager = advisoryManager;
        }

        void start() {
            startNessSession();
            initTankConsumers();
            Datalake.ProcessStatus processStatus = this.graph.datalake().processStatus();
            if (processStatus != null) {
                registerProcessStatus(datalakeScale(), processStatus);
            }
        }

        void stop() {
            try {
                Logger.info("Stopping bus");
                this.consumers.values().forEach(list -> {
                    list.forEach((v0) -> {
                        v0.close();
                    });
                });
                this.consumers.clear();
                this.producers.values().forEach((v0) -> {
                    v0.close();
                });
                this.producers.clear();
                this.session.close();
                this.session = null;
                this.connection = null;
                JmsBrokerService.this.service.stop();
                Logger.info("bus stopped");
            } catch (Throwable th) {
                Logger.error(th);
            }
        }

        @Override // io.intino.datahub.broker.BrokerManager
        public Session session() {
            return this.session;
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v7, types: [java.util.List] */
        @Override // io.intino.datahub.broker.BrokerManager
        public TopicConsumer registerTopicConsumer(String str, Consumer<Message> consumer) {
            ArrayList arrayList = new ArrayList();
            if (this.consumers.containsKey(str)) {
                arrayList = (List) this.consumers.get(str);
            } else {
                this.consumers.putIfAbsent(str, arrayList);
            }
            try {
                TopicConsumer topicConsumer = new TopicConsumer(nessSession(), str);
                topicConsumer.listen(consumer);
                arrayList.add(topicConsumer);
                return topicConsumer;
            } catch (JMSException e) {
                Logger.error(e);
                return null;
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v7, types: [java.util.List] */
        @Override // io.intino.datahub.broker.BrokerManager
        public QueueConsumer registerQueueConsumer(String str, Consumer<Message> consumer) {
            ArrayList arrayList = new ArrayList();
            if (this.consumers.containsKey(str)) {
                arrayList = (List) this.consumers.get(str);
            } else {
                this.consumers.putIfAbsent(str, arrayList);
            }
            try {
                QueueConsumer queueConsumer = new QueueConsumer(nessSession(), str);
                queueConsumer.listen(consumer);
                arrayList.add(queueConsumer);
                return queueConsumer;
            } catch (JMSException e) {
                Logger.error(e);
                return null;
            }
        }

        @Override // io.intino.datahub.broker.BrokerManager
        public void unregisterConsumer(TopicConsumer topicConsumer) {
            topicConsumer.close();
            this.consumers.values().forEach(list -> {
                list.remove(topicConsumer);
            });
        }

        @Override // io.intino.datahub.broker.BrokerManager
        public QueueProducer queueProducerOf(String str) {
            try {
                if (!this.producers.containsKey(str)) {
                    this.producers.put(str, new QueueProducer(nessSession(), str));
                }
                return this.producers.get(str);
            } catch (JMSException e) {
                Logger.error(e.getMessage(), e);
                return null;
            }
        }

        @Override // io.intino.datahub.broker.BrokerManager
        public TopicProducer topicProducerOf(String str) {
            try {
                if (!this.producers.containsKey(str)) {
                    this.producers.put(str, new TopicProducer(nessSession(), str));
                }
                return this.producers.get(str);
            } catch (JMSException e) {
                Logger.error(e.getMessage(), e);
                return null;
            }
        }

        void stopConsumersOf(String str) {
            if (this.consumers.containsKey(str)) {
                this.consumers.get(str).forEach((v0) -> {
                    v0.close();
                });
                this.consumers.get(str).clear();
            }
        }

        private void startNessSession() {
            try {
                this.connection = new ActiveMQConnectionFactory("vm://ness").createConnection(JmsBrokerService.NESS, JmsBrokerService.NESS);
                this.connection.setClientID(JmsBrokerService.NESS);
                this.session = this.connection.createSession(false, 1);
                this.advisoryManager.start(this.session);
                this.connection.start();
            } catch (JMSException e) {
                Logger.error(e.getMessage(), e);
            }
        }

        private void initTankConsumers() {
            if (this.graph.datalake() == null) {
                return;
            }
            JmsBrokerService.this.brokerStage.mkdirs();
            this.graph.datalake().tankList().forEach(this::registerTankConsumer);
            Logger.info("Tanks ignited!");
        }

        private void registerTankConsumer(Datalake.Tank tank) {
            JmsBrokerService.this.brokerManager.registerTopicConsumer(tank.qn(), new JmsMessageSerializer(JmsBrokerService.this.brokerStage, tank, scale(tank), JmsBrokerService.this.box.datamarts()).create());
        }

        private void registerProcessStatus(Scale scale, Datalake.ProcessStatus processStatus) {
            JmsBrokerService.this.brokerManager.registerTopicConsumer(processStatus.name(), new ProcessStatusSerializer(JmsBrokerService.this.brokerStage, processStatus.name(), scale).create());
        }

        private Scale scale(Datalake.Tank tank) {
            return tank.scale() != null ? Scale.valueOf(tank.scale().name()) : datalakeScale();
        }

        private Scale datalakeScale() {
            return Scale.valueOf(this.graph.datalake().scale().name());
        }

        private Session nessSession() {
            if (this.session == null || closedSession()) {
                startNessSession();
            }
            return this.session;
        }

        private boolean closedSession() {
            return this.session.isClosed();
        }
    }

    /* loaded from: input_file:io/intino/datahub/broker/jms/JmsBrokerService$PipeManager.class */
    private class PipeManager {
        private final List<Broker.Pipe> pipes;
        private final BrokerManager brokerManager;

        PipeManager(BrokerManager brokerManager, List<Broker.Pipe> list) {
            this.brokerManager = brokerManager;
            this.pipes = list;
        }

        void start() {
            for (Broker.Pipe pipe : this.pipes) {
                this.brokerManager.registerTopicConsumer(pipe.origin(), message -> {
                    send(pipe.destination(), MessageReader.textFrom(message));
                });
                Logger.info("Pipe " + pipe.origin() + " -> " + pipe.destination() + " established");
            }
        }

        void stop() {
            for (Broker.Pipe pipe : this.pipes) {
                this.brokerManager.stopConsumersOf(pipe.origin());
                Logger.info("Pipe " + pipe.origin() + " -> " + pipe.destination() + " established");
            }
        }

        private void send(String str, String str2) {
            TopicProducer topicProducer = this.brokerManager.topicProducerOf(str);
            new Thread(() -> {
                send(topicProducer, str2);
            }).start();
        }

        private void send(TopicProducer topicProducer, String str) {
            if (topicProducer != null) {
                try {
                    topicProducer.produce(JmsBrokerService.this.createMessage(str));
                } catch (MessageNotWriteableException e) {
                    Logger.error(e);
                }
            }
        }
    }

    public JmsBrokerService(DataHubBox dataHubBox, File file) {
        this(dataHubBox, file, null);
    }

    public JmsBrokerService(DataHubBox dataHubBox, File file, SSLConfiguration sSLConfiguration) {
        this.pipes = new HashMap();
        this.box = dataHubBox;
        this.root = new File(dataHubBox.graph().broker().path());
        this.brokerStage = file;
        this.sslConfiguration = sSLConfiguration;
        configure();
        this.brokerManager = new BrokerManager(dataHubBox.graph(), new AdvisoryManager(jmsBroker()));
        this.pipeManager = new PipeManager(this.brokerManager, dataHubBox.graph().broker().pipeList());
    }

    @Override // io.intino.datahub.broker.BrokerService
    public void start() {
        try {
            Logger.info("Starting broker...");
            this.service.start();
            this.service.waitUntilStarted();
            this.brokerManager.start();
            this.pipeManager.start();
            Logger.info("Broker started in port " + graph().broker().port());
        } catch (Exception e) {
            Logger.error(e.getMessage(), e);
        }
    }

    @Override // io.intino.datahub.broker.BrokerService
    public void stop() {
        try {
            if (this.service != null) {
                this.pipeManager.stop();
                this.brokerManager.stop();
                this.service.stop();
                this.service.waitUntilStopped();
            }
        } catch (Exception e) {
            Logger.error(e.getMessage(), e);
        }
    }

    @Override // io.intino.datahub.broker.BrokerService
    public io.intino.datahub.broker.BrokerManager manager() {
        return this.brokerManager;
    }

    private NessGraph graph() {
        return this.box.graph();
    }

    private void configure() {
        try {
            this.service = new org.apache.activemq.broker.BrokerService();
            this.service.setBrokerName(NESS);
            this.service.setPersistent(true);
            this.service.setOfflineDurableSubscriberTaskSchedule(86400000L);
            this.service.setOfflineDurableSubscriberTimeout(259200000L);
            this.service.setPersistenceAdapter(persistenceAdapter());
            this.service.setDataDirectory(new File(this.root, "activemq-data").getAbsolutePath());
            this.service.setRestartAllowed(true);
            this.service.setUseJmx(true);
            this.service.setUseShutdownHook(true);
            this.service.setAdvisorySupport(true);
            this.service.setPlugins(new BrokerPlugin[]{new SimpleAuthenticationPlugin(registerUsers()), new JavaRuntimeConfigurationPlugin(), new TimeStampingBrokerPlugin()});
            ArrayList arrayList = new ArrayList();
            for (Broker.CompositeDestination compositeDestination : this.box.graph().broker().compositeDestinationList()) {
                CompositeTopic compositeTopic = compositeDestination.type().equals(Broker.CompositeDestination.Type.Topic) ? new CompositeTopic() : new CompositeQueue();
                compositeTopic.setForwardTo(compositeDestination.forwardTo().stream().map(str -> {
                    return compositeDestination.type().equals(Broker.CompositeDestination.Type.Topic) ? new ActiveMQTopic(str) : new ActiveMQQueue(str);
                }).toList());
                arrayList.add(new CompositeDestinationInterceptor(new DestinationInterceptor[]{compositeTopic}));
            }
            this.service.setDestinationInterceptors((DestinationInterceptor[]) arrayList.toArray(i -> {
                return new DestinationInterceptor[i];
            }));
            addPolicies();
            if (this.sslConfiguration != null) {
                addSSLConnector();
            } else {
                addTCPConnector();
                addMQTTConnector();
            }
            graph().broker().bridgeList().forEach(this::addJmsBridge);
        } catch (Exception e) {
            Logger.error("Error configuring: " + e.getMessage(), e);
        }
    }

    private List<AuthenticationUser> registerUsers() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new AuthenticationUser(NESS, NESS, "admin"));
        for (Broker.User user : graph().broker().userList()) {
            arrayList.add(new AuthenticationUser(user.name(), user.password(), "users"));
        }
        return arrayList;
    }

    public Map<String, VirtualDestinationInterceptor> pipes() {
        return this.pipes;
    }

    private org.apache.activemq.broker.Broker jmsBroker() {
        try {
            return this.service.getBroker();
        } catch (Exception e) {
            Logger.error(e.getMessage(), e);
            return null;
        }
    }

    private InboundTopicBridge[] toInboundBridges(List<String> list) {
        return (InboundTopicBridge[]) list.stream().map(str -> {
            InboundTopicBridge inboundTopicBridge = new InboundTopicBridge(str);
            inboundTopicBridge.setLocalTopicName(str);
            return inboundTopicBridge;
        }).toArray(i -> {
            return new InboundTopicBridge[i];
        });
    }

    private void addJmsBridge(Broker.Bridge bridge) {
        try {
            SimpleJmsTopicConnector simpleJmsTopicConnector = new SimpleJmsTopicConnector();
            simpleJmsTopicConnector.setName(bridge.name$());
            simpleJmsTopicConnector.setLocalTopicConnectionFactory(new ActiveMQConnectionFactory(NESS, NESS, "vm://ness?waitForStart=1000&create=false"));
            simpleJmsTopicConnector.setOutboundTopicConnectionFactory(new ActiveMQConnectionFactory(bridge.externalBus().user(), bridge.externalBus().password(), bridge.externalBus().url()));
            simpleJmsTopicConnector.setOutboundUsername(bridge.externalBus().user());
            simpleJmsTopicConnector.setOutboundPassword(bridge.externalBus().password());
            simpleJmsTopicConnector.setInboundTopicBridges(toInboundBridges(bridge.topics()));
            this.service.addJmsConnector(simpleJmsTopicConnector);
            Logger.info("Connector with " + bridge.externalBus().url() + " started");
        } catch (Exception e) {
            Logger.error(e.getMessage(), e);
        }
    }

    private void addPolicies() {
        ArrayList arrayList = new ArrayList();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setAdvisoryForDiscardingMessages(true);
        policyEntry.setTopicPrefetch(1);
        ConstantPendingMessageLimitStrategy constantPendingMessageLimitStrategy = new ConstantPendingMessageLimitStrategy();
        constantPendingMessageLimitStrategy.setLimit(1000000);
        policyEntry.setPendingMessageLimitStrategy(constantPendingMessageLimitStrategy);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setPolicyEntries(arrayList);
        this.service.setDestinationPolicy(policyMap);
    }

    private void addTCPConnector() throws Exception {
        TransportConnector transportConnector = new TransportConnector();
        transportConnector.setUri(new URI("tcp://0.0.0.0:" + graph().broker().port() + "?transport.useKeepAlive=true"));
        transportConnector.setName("OWireConn");
        this.service.addConnector(transportConnector);
    }

    private void addSSLConnector() throws Exception {
        TransportConnector transportConnector = new TransportConnector();
        transportConnector.setUri(new URI("ssl://0.0.0.0:" + graph().broker().port() + "?transport.useKeepAlive=true&amp;needClientAuth=true"));
        transportConnector.setName("ssl");
        configureSSL();
        this.service.addConnector(transportConnector);
    }

    private void configureSSL() throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException, UnrecoverableKeyException {
        KeyStore keyStore = KeyStore.getInstance("JKS");
        keyStore.load(new FileInputStream(this.sslConfiguration.keyStore()), this.sslConfiguration.keyStorePassword());
        KeyStore keyStore2 = KeyStore.getInstance("JKS");
        keyStore2.load(new FileInputStream(this.sslConfiguration.trustStore()), this.sslConfiguration.trustStorePassword());
        KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance("SunX509");
        keyManagerFactory.init(keyStore, this.sslConfiguration.keyStorePassword());
        TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance("SunX509");
        trustManagerFactory.init(keyStore2);
        this.service.setSslContext(new SslContext(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), (SecureRandom) null));
    }

    private void addMQTTConnector() throws Exception {
        if (graph().broker().secondaryPort() == 0) {
            return;
        }
        TransportConnector transportConnector = new TransportConnector();
        transportConnector.setUri(new URI("mqtt://0.0.0.0:" + graph().broker().secondaryPort()));
        transportConnector.setName("MQTTConn");
        this.service.addConnector(transportConnector);
    }

    private KahaDBPersistenceAdapter persistenceAdapter() {
        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
        kahaDBPersistenceAdapter.setDirectoryArchive(new File(this.root, "archive"));
        kahaDBPersistenceAdapter.setIndexDirectory(new File(this.root, "entries"));
        kahaDBPersistenceAdapter.setDirectory(this.root);
        kahaDBPersistenceAdapter.setBrokerName(NESS);
        kahaDBPersistenceAdapter.setBrokerService(this.service);
        return kahaDBPersistenceAdapter;
    }

    private ActiveMQTextMessage createMessage(String str) throws MessageNotWriteableException {
        ActiveMQTextMessage activeMQTextMessage = new ActiveMQTextMessage();
        activeMQTextMessage.setText(str);
        return activeMQTextMessage;
    }
}
