/*
 * Decompiled with CFR 0.152.
 */
package io.intino.datahub.broker.jms;

import io.intino.alexandria.Scale;
import io.intino.alexandria.jms.JmsConsumer;
import io.intino.alexandria.jms.JmsProducer;
import io.intino.alexandria.jms.MessageReader;
import io.intino.alexandria.jms.QueueConsumer;
import io.intino.alexandria.jms.QueueProducer;
import io.intino.alexandria.jms.TopicConsumer;
import io.intino.alexandria.jms.TopicProducer;
import io.intino.alexandria.logger.Logger;
import io.intino.datahub.broker.BrokerService;
import io.intino.datahub.broker.jms.AdvisoryManager;
import io.intino.datahub.broker.jms.TopicSaver;
import io.intino.datahub.graph.Broker;
import io.intino.datahub.graph.Datalake;
import io.intino.datahub.graph.NessGraph;
import java.io.File;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageNotWriteableException;
import javax.jms.Session;
import javax.jms.TopicConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
import org.apache.activemq.broker.region.policy.PendingMessageLimitStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.network.jms.InboundTopicBridge;
import org.apache.activemq.network.jms.JmsConnector;
import org.apache.activemq.network.jms.SimpleJmsTopicConnector;
import org.apache.activemq.plugin.java.JavaRuntimeConfigurationPlugin;
import org.apache.activemq.security.AuthenticationUser;
import org.apache.activemq.security.SimpleAuthenticationPlugin;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;

public class JmsBrokerService
implements BrokerService {
    private static final String NESS = "ness";
    private final File root;
    private final NessGraph graph;
    private final File brokerStage;
    private final BrokerManager brokerManager;
    private final PipeManager pipeManager;
    private final Map<String, VirtualDestinationInterceptor> pipes = new HashMap<String, VirtualDestinationInterceptor>();
    private org.apache.activemq.broker.BrokerService service;

    public JmsBrokerService(NessGraph graph, File brokerStage) {
        this.root = new File(graph.broker().path());
        this.graph = graph;
        this.brokerStage = brokerStage;
        this.configure();
        this.brokerManager = new BrokerManager(graph, new AdvisoryManager(this.jmsBroker()));
        this.pipeManager = new PipeManager(this.brokerManager, graph.broker().pipeList());
    }

    @Override
    public void start() {
        try {
            Logger.info((String)"Starting broker...");
            this.service.start();
            this.service.waitUntilStarted();
            this.brokerManager.start();
            this.pipeManager.start();
            Logger.info((String)"Broker started!");
        }
        catch (Exception e) {
            Logger.error((String)e.getMessage(), (Throwable)e);
        }
    }

    @Override
    public void stop() {
        try {
            if (this.service != null) {
                this.pipeManager.stop();
                this.brokerManager.stop();
                this.service.stop();
                this.service.waitUntilStopped();
            }
        }
        catch (Exception e) {
            Logger.error((String)e.getMessage(), (Throwable)e);
        }
    }

    @Override
    public io.intino.datahub.broker.BrokerManager manager() {
        return this.brokerManager;
    }

    private void configure() {
        try {
            this.service = new org.apache.activemq.broker.BrokerService();
            this.service.setBrokerName(NESS);
            this.service.setPersistent(true);
            this.service.setPersistenceAdapter((PersistenceAdapter)this.persistenceAdapter());
            this.service.setDataDirectory(new File(this.root, "activemq-data").getAbsolutePath());
            this.service.setRestartAllowed(true);
            this.service.setUseJmx(true);
            this.service.setUseShutdownHook(true);
            this.service.setAdvisorySupport(true);
            this.service.setPlugins(new BrokerPlugin[]{new SimpleAuthenticationPlugin(this.registerUsers()), new JavaRuntimeConfigurationPlugin()});
            this.addPolicies();
            this.addTCPConnector();
            this.addMQTTConnector();
            this.graph.broker().bridgeList().forEach(this::addJmsBridge);
        }
        catch (Exception e) {
            Logger.error((String)("Error configuring: " + e.getMessage()), (Throwable)e);
        }
    }

    private List<AuthenticationUser> registerUsers() {
        ArrayList<AuthenticationUser> users = new ArrayList<AuthenticationUser>();
        users.add(new AuthenticationUser(NESS, NESS, "admin"));
        for (Broker.User user : this.graph.broker().userList()) {
            users.add(new AuthenticationUser(user.name(), user.password(), "users"));
        }
        return users;
    }

    public Map<String, VirtualDestinationInterceptor> pipes() {
        return this.pipes;
    }

    private Broker jmsBroker() {
        try {
            return this.service.getBroker();
        }
        catch (Exception e) {
            Logger.error((String)e.getMessage(), (Throwable)e);
            return null;
        }
    }

    private InboundTopicBridge[] toInboundBridges(List<String> inboundTopics) {
        return (InboundTopicBridge[])inboundTopics.stream().map(topicName -> {
            InboundTopicBridge bridge = new InboundTopicBridge(topicName);
            bridge.setLocalTopicName(topicName);
            return bridge;
        }).toArray(InboundTopicBridge[]::new);
    }

    private void addJmsBridge(Broker.Bridge c) {
        try {
            SimpleJmsTopicConnector connector = new SimpleJmsTopicConnector();
            connector.setName(c.name$());
            connector.setLocalTopicConnectionFactory((TopicConnectionFactory)new ActiveMQConnectionFactory(NESS, NESS, "vm://ness?waitForStart=1000&create=false"));
            connector.setOutboundTopicConnectionFactory((TopicConnectionFactory)new ActiveMQConnectionFactory(c.externalBus().user(), c.externalBus().password(), c.externalBus().url()));
            connector.setOutboundUsername(c.externalBus().user());
            connector.setOutboundPassword(c.externalBus().password());
            connector.setInboundTopicBridges(this.toInboundBridges(c.topics()));
            this.service.addJmsConnector((JmsConnector)connector);
            Logger.info((String)("Connector with " + c.externalBus().url() + " started"));
        }
        catch (Exception e) {
            Logger.error((String)e.getMessage(), (Throwable)e);
        }
    }

    private void addPolicies() {
        ArrayList policyEntries = new ArrayList();
        PolicyEntry entry = new PolicyEntry();
        entry.setAdvisoryForDiscardingMessages(true);
        entry.setTopicPrefetch(1);
        ConstantPendingMessageLimitStrategy pendingMessageLimitStrategy = new ConstantPendingMessageLimitStrategy();
        pendingMessageLimitStrategy.setLimit(1000000);
        entry.setPendingMessageLimitStrategy((PendingMessageLimitStrategy)pendingMessageLimitStrategy);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setPolicyEntries(policyEntries);
        this.service.setDestinationPolicy(policyMap);
    }

    private void addTCPConnector() throws Exception {
        TransportConnector connector = new TransportConnector();
        connector.setUri(new URI("tcp://0.0.0.0:" + this.graph.broker().port() + "?transport.useKeepAlive=true"));
        connector.setName("OWireConn");
        this.service.addConnector(connector);
    }

    private void addMQTTConnector() throws Exception {
        if (this.graph.broker().secondaryPort() == 0) {
            return;
        }
        TransportConnector mqtt = new TransportConnector();
        mqtt.setUri(new URI("mqtt://0.0.0.0:" + this.graph.broker().secondaryPort()));
        mqtt.setName("MQTTConn");
        this.service.addConnector(mqtt);
    }

    private KahaDBPersistenceAdapter persistenceAdapter() {
        KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
        adapter.setDirectoryArchive(new File(this.root, "archive"));
        adapter.setIndexDirectory(new File(this.root, "entries"));
        adapter.setDirectory(this.root);
        adapter.setBrokerName(NESS);
        adapter.setBrokerService(this.service);
        return adapter;
    }

    private ActiveMQTextMessage createMessage(String message) throws MessageNotWriteableException {
        ActiveMQTextMessage textMessage = new ActiveMQTextMessage();
        textMessage.setText(message);
        return textMessage;
    }

    private String tankQn(Datalake.Tank.Event t, Datalake.Context c) {
        return (String)(!c.qn().isEmpty() ? c.qn() + "." : "") + t.event().name$();
    }

    private class PipeManager {
        private final List<Broker.Pipe> pipes;
        private BrokerManager brokerManager;

        PipeManager(BrokerManager manager, List<Broker.Pipe> pipes) {
            this.brokerManager = manager;
            this.pipes = pipes;
        }

        void start() {
            for (Broker.Pipe pipe : this.pipes) {
                this.brokerManager.registerTopicConsumer(pipe.origin(), message -> this.send(pipe.destination(), MessageReader.textFrom((Message)message)));
                Logger.info((String)("Pipe " + pipe.origin() + " -> " + pipe.destination() + " established"));
            }
        }

        void stop() {
            for (Broker.Pipe pipe : this.pipes) {
                this.brokerManager.stopConsumersOf(pipe.origin());
                Logger.info((String)("Pipe " + pipe.origin() + " -> " + pipe.destination() + " established"));
            }
        }

        private void send(String destination, String message) {
            TopicProducer producer = this.brokerManager.topicProducerOf(destination);
            new Thread(() -> this.send(producer, message)).start();
        }

        private void send(TopicProducer producer, String message) {
            if (producer != null) {
                try {
                    producer.produce((Message)JmsBrokerService.this.createMessage(message));
                }
                catch (MessageNotWriteableException e) {
                    Logger.error((Throwable)e);
                }
            }
        }
    }

    final class BrokerManager
    implements io.intino.datahub.broker.BrokerManager {
        private final Map<String, JmsProducer> producers = new HashMap<String, JmsProducer>();
        private final Map<String, List<JmsConsumer>> consumers = new HashMap<String, List<JmsConsumer>>();
        private final NessGraph graph;
        private final AdvisoryManager advisoryManager;
        private Connection connection;
        private Session session;

        BrokerManager(NessGraph graph, AdvisoryManager advisoryManager) {
            this.graph = graph;
            this.advisoryManager = advisoryManager;
        }

        void start() {
            this.startNessSession();
            this.startTanks();
        }

        void stop() {
            try {
                Logger.info((String)"Stopping bus");
                this.consumers.values().forEach(c -> c.forEach(JmsConsumer::close));
                this.consumers.clear();
                this.producers.values().forEach(JmsProducer::close);
                this.producers.clear();
                this.session.close();
                this.session = null;
                this.connection = null;
                JmsBrokerService.this.service.stop();
                Logger.info((String)"bus stopped");
            }
            catch (Throwable e) {
                Logger.error((Throwable)e);
            }
        }

        private void startNessSession() {
            try {
                this.connection = new ActiveMQConnectionFactory("vm://ness").createConnection(JmsBrokerService.NESS, JmsBrokerService.NESS);
                this.connection.setClientID(JmsBrokerService.NESS);
                this.session = this.connection.createSession(false, 1);
                this.advisoryManager.start(this.session);
                this.connection.start();
            }
            catch (JMSException e) {
                Logger.error((String)e.getMessage(), (Throwable)e);
            }
        }

        private void startTanks() {
            if (this.graph.datalake() != null) {
                JmsBrokerService.this.brokerStage.mkdirs();
                this.graph.datalake().tankList().stream().filter(Datalake.Tank::isEvent).map(Datalake.Tank::asEvent).forEach(t -> {
                    if (!t.asTank().isContextual() || t.asTank().asContextual().context().isLeaf().booleanValue()) {
                        JmsBrokerService.this.brokerManager.registerTopicConsumer(t.qn(), new TopicSaver(JmsBrokerService.this.brokerStage, t.qn(), this.scale((Datalake.Tank.Event)((Object)t))).create());
                    } else {
                        Datalake.Context context = t.asTank().asContextual().context();
                        this.register((Datalake.Tank.Event)((Object)t), this.scale((Datalake.Tank.Event)((Object)t)), context);
                        context.leafs().forEach(c -> this.register((Datalake.Tank.Event)((Object)t), this.scale((Datalake.Tank.Event)((Object)t)), (Datalake.Context)((Object)((Object)c))));
                    }
                });
                Datalake.ProcessStatus processStatus = this.graph.datalake().processStatus();
                if (processStatus != null) {
                    this.registerProcessStatus(this.datalakeScale(), processStatus);
                }
                JmsBrokerService.this.brokerManager.registerTopicConsumer("Session", new TopicSaver(JmsBrokerService.this.brokerStage, "Session", this.datalakeScale()).create());
                Logger.info((String)"Tanks started!");
            }
        }

        private Scale scale(Datalake.Tank.Event t) {
            return t.scale() != null ? Scale.valueOf((String)t.scale().name()) : this.datalakeScale();
        }

        private Scale datalakeScale() {
            return Scale.valueOf((String)this.graph.datalake().scale().name());
        }

        private void registerProcessStatus(Scale scale, Datalake.ProcessStatus ps) {
            if (ps.context() == null) {
                String topic = this.processStatusQn(ps, ps.context());
                JmsBrokerService.this.brokerManager.registerTopicConsumer(topic, new TopicSaver(JmsBrokerService.this.brokerStage, topic, scale).create());
            } else {
                ps.context().leafs().forEach(c -> {
                    String topic = this.processStatusQn(ps, (Datalake.Context)((Object)c));
                    JmsBrokerService.this.brokerManager.registerTopicConsumer(topic, new TopicSaver(JmsBrokerService.this.brokerStage, topic, scale).create());
                });
            }
        }

        private void register(Datalake.Tank.Event t, Scale scale, Datalake.Context c) {
            JmsBrokerService.this.brokerManager.registerTopicConsumer(JmsBrokerService.this.tankQn(t, c), new TopicSaver(JmsBrokerService.this.brokerStage, JmsBrokerService.this.tankQn(t, c), scale).create());
        }

        private String processStatusQn(Datalake.ProcessStatus ps, Datalake.Context c) {
            return (String)(c == null ? "" : c.qn() + ".") + ps.name();
        }

        private Session nessSession() {
            if (this.session == null || this.closedSession()) {
                this.startNessSession();
            }
            return this.session;
        }

        @Override
        public TopicConsumer registerTopicConsumer(String topic, Consumer<Message> consumer) {
            List<Object> list = new ArrayList();
            if (!this.consumers.containsKey(topic)) {
                this.consumers.putIfAbsent(topic, list);
            } else {
                list = this.consumers.get(topic);
            }
            try {
                TopicConsumer topicConsumer = new TopicConsumer(this.nessSession(), topic);
                topicConsumer.listen(consumer);
                list.add(topicConsumer);
                return topicConsumer;
            }
            catch (JMSException e) {
                Logger.error((Throwable)e);
                return null;
            }
        }

        @Override
        public QueueConsumer registerQueueConsumer(String topic, Consumer<Message> consumer) {
            List<Object> list = new ArrayList();
            if (!this.consumers.containsKey(topic)) {
                this.consumers.putIfAbsent(topic, list);
            } else {
                list = this.consumers.get(topic);
            }
            try {
                QueueConsumer queueConsumer = new QueueConsumer(this.nessSession(), topic);
                queueConsumer.listen(consumer);
                list.add(queueConsumer);
                return queueConsumer;
            }
            catch (JMSException e) {
                Logger.error((Throwable)e);
                return null;
            }
        }

        @Override
        public void unregisterConsumer(TopicConsumer consumer) {
            consumer.close();
            this.consumers.values().forEach(list -> list.remove(consumer));
        }

        @Override
        public QueueProducer queueProducerOf(String queue) {
            try {
                if (!this.producers.containsKey(queue)) {
                    this.producers.put(queue, (JmsProducer)new QueueProducer(this.nessSession(), queue));
                }
                return (QueueProducer)this.producers.get(queue);
            }
            catch (JMSException e) {
                Logger.error((String)e.getMessage(), (Throwable)e);
                return null;
            }
        }

        @Override
        public TopicProducer topicProducerOf(String topic) {
            try {
                if (!this.producers.containsKey(topic)) {
                    this.producers.put(topic, (JmsProducer)new TopicProducer(this.nessSession(), topic));
                }
                return (TopicProducer)this.producers.get(topic);
            }
            catch (JMSException e) {
                Logger.error((String)e.getMessage(), (Throwable)e);
                return null;
            }
        }

        void stopConsumersOf(String topic) {
            if (!this.consumers.containsKey(topic)) {
                return;
            }
            this.consumers.get(topic).forEach(JmsConsumer::close);
            this.consumers.get(topic).clear();
        }

        private boolean closedSession() {
            return ((ActiveMQSession)this.session).isClosed();
        }
    }
}

