package io.intino.alexandria.terminal;

import io.intino.alexandria.event.Event;
import io.intino.alexandria.jms.BusConnector;
import io.intino.alexandria.jms.ConnectionListener;
import io.intino.alexandria.jms.JmsConsumer;
import io.intino.alexandria.jms.JmsProducer;
import io.intino.alexandria.jms.MessageWriter;
import io.intino.alexandria.jms.QueueConsumer;
import io.intino.alexandria.jms.QueueProducer;
import io.intino.alexandria.jms.TopicConsumer;
import io.intino.alexandria.jms.TopicProducer;
import io.intino.alexandria.logger.Logger;
import io.intino.alexandria.message.Message;
import io.intino.alexandria.message.MessageReader;
import io.intino.alexandria.terminal.Connector;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.command.ActiveMQDestination;

/* loaded from: input_file:io/intino/alexandria/terminal/JmsConnector.class */
public class JmsConnector implements Connector {
    private final Map<String, JmsProducer> producers;
    private final Map<String, JmsConsumer> consumers;
    private final Map<String, List<Consumer<Event>>> eventConsumers;
    private final Map<String, List<Connector.MessageConsumer>> messageConsumers;
    private final Map<Consumer<Event>, Integer> jmsEventConsumers;
    private final Map<Connector.MessageConsumer, Integer> jmsMessageConsumers;
    private final String brokerUrl;
    private final String user;
    private final String password;
    private final String clientId;
    private final boolean transactedSession;
    private final AtomicBoolean connected;
    private final AtomicBoolean started;
    private EventOutBox eventOutBox;
    private MessageOutBox messageOutBox;
    private Connection connection;
    private Session session;
    private ScheduledExecutorService scheduler;
    private final ExecutorService eventDispatcher;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/intino/alexandria/terminal/JmsConnector$MessageDeserializer.class */
    public static class MessageDeserializer {
        private MessageDeserializer() {
        }

        static Message deserialize(javax.jms.Message message) {
            return new MessageReader(io.intino.alexandria.jms.MessageReader.textFrom(message)).next();
        }
    }

    /* loaded from: input_file:io/intino/alexandria/terminal/JmsConnector$NamedThreadFactory.class */
    public static class NamedThreadFactory implements ThreadFactory {
        private final AtomicInteger sequence = new AtomicInteger(1);
        private final String prefix;

        public NamedThreadFactory(String str) {
            this.prefix = str;
        }

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable);
            int andIncrement = this.sequence.getAndIncrement();
            thread.setName(this.prefix + (andIncrement > 1 ? "-" + andIncrement : ""));
            if (!thread.isDaemon()) {
                thread.setDaemon(true);
            }
            return thread;
        }
    }

    public JmsConnector(String str, String str2, String str3, String str4, File file) {
        this(str, str2, str3, str4, false, file);
    }

    public JmsConnector(String str, String str2, String str3, String str4, boolean z, File file) {
        this.connected = new AtomicBoolean(false);
        this.started = new AtomicBoolean(false);
        this.brokerUrl = str;
        this.user = str2;
        this.password = str3;
        this.clientId = str4;
        this.transactedSession = z;
        this.producers = new HashMap();
        this.consumers = new HashMap();
        this.jmsEventConsumers = new HashMap();
        this.jmsMessageConsumers = new HashMap();
        this.eventConsumers = new HashMap();
        this.messageConsumers = new HashMap();
        if (file != null) {
            this.eventOutBox = new EventOutBox(new File(file, "events"));
            this.messageOutBox = new MessageOutBox(new File(file, "requests"));
        }
        this.eventDispatcher = Executors.newSingleThreadExecutor(new NamedThreadFactory("jms-connector"));
    }

    public void start() {
        if (this.brokerUrl == null || this.brokerUrl.isEmpty()) {
            Logger.warn("Invalid broker URL. Connection aborted");
            return;
        }
        try {
            connect();
        } catch (JMSException e) {
            Logger.error(e);
        }
        this.started.set(true);
        if (this.scheduler == null) {
            this.scheduler = Executors.newScheduledThreadPool(1);
            this.scheduler.scheduleAtFixedRate(this::checkConnection, 15L, 10L, TimeUnit.MINUTES);
        }
    }

    private void connect() throws JMSException {
        if (!Broker.isRunning(this.brokerUrl)) {
            Logger.warn("Broker Unreachable. Connection aborted");
            return;
        }
        initConnection();
        if (this.connection == null || !((ActiveMQConnection) this.connection).isStarted()) {
            return;
        }
        clearProducers();
        this.session = createSession(this.transactedSession);
        if (this.session == null || !((ActiveMQSession) this.session).isRunning()) {
            return;
        }
        this.connected.set(true);
        recoverEventsAndMessages();
    }

    @Override // io.intino.alexandria.terminal.Connector
    public synchronized void sendEvent(String str, Event event) {
        Iterator it = new ArrayList(this.eventConsumers.getOrDefault(str, Collections.emptyList())).iterator();
        while (it.hasNext()) {
            ((Consumer) it.next()).accept(event);
        }
        this.eventDispatcher.execute(() -> {
            if (doSendEvent(str, event) || this.eventOutBox == null) {
                return;
            }
            this.eventOutBox.push(str, event);
        });
    }

    @Override // io.intino.alexandria.terminal.Connector
    public synchronized void sendEvent(String str, Event event, int i) {
        Iterator it = new ArrayList(this.eventConsumers.getOrDefault(str, Collections.emptyList())).iterator();
        while (it.hasNext()) {
            ((Consumer) it.next()).accept(event);
        }
        this.eventDispatcher.execute(() -> {
            if (doSendEvent(str, event, i) || this.eventOutBox == null) {
                return;
            }
            this.eventOutBox.push(str, event);
        });
    }

    @Override // io.intino.alexandria.terminal.Connector
    public void attachListener(String str, Consumer<Event> consumer) {
        registerEventConsumer(str, consumer);
        JmsConsumer jmsConsumer = this.consumers.get(str);
        if (jmsConsumer == null) {
            return;
        }
        Consumer<javax.jms.Message> consumer2 = message -> {
            consumer.accept(new Event(MessageDeserializer.deserialize(message)));
        };
        this.jmsEventConsumers.put(consumer, Integer.valueOf(consumer2.hashCode()));
        jmsConsumer.listen(consumer2);
    }

    @Override // io.intino.alexandria.terminal.Connector
    public void sendMessage(String str, String str2) {
        recoverEventsAndMessages();
        if (doSendMessage(str, str2) || this.messageOutBox == null) {
            return;
        }
        this.messageOutBox.push(str, str2);
    }

    @Override // io.intino.alexandria.terminal.Connector
    public void attachListener(String str, String str2, Consumer<Event> consumer) {
        registerEventConsumer(str, consumer);
        TopicConsumer topicConsumer = (TopicConsumer) this.consumers.get(str);
        if (topicConsumer == null) {
            return;
        }
        Consumer<javax.jms.Message> consumer2 = message -> {
            consumer.accept(new Event(MessageDeserializer.deserialize(message)));
        };
        this.jmsEventConsumers.put(consumer, Integer.valueOf(consumer2.hashCode()));
        topicConsumer.listen(consumer2, str2);
    }

    @Override // io.intino.alexandria.terminal.Connector
    public void detachListeners(Consumer<Event> consumer) {
        Integer num = this.jmsEventConsumers.get(consumer);
        if (num == null) {
            return;
        }
        for (JmsConsumer jmsConsumer : this.consumers.values()) {
            List list = (List) jmsConsumer.listeners().stream().filter(consumer2 -> {
                return consumer2.hashCode() == num.intValue();
            }).collect(Collectors.toList());
            Objects.requireNonNull(jmsConsumer);
            list.forEach(jmsConsumer::removeListener);
        }
        this.eventConsumers.values().forEach(list2 -> {
            list2.remove(consumer);
        });
    }

    @Override // io.intino.alexandria.terminal.Connector
    public void attachListener(String str, Connector.MessageConsumer messageConsumer) {
        registerMessageConsumer(str, messageConsumer);
        JmsConsumer jmsConsumer = this.consumers.get(str);
        if (jmsConsumer == null) {
            return;
        }
        Consumer<javax.jms.Message> consumer = message -> {
            messageConsumer.accept(io.intino.alexandria.jms.MessageReader.textFrom(message), callback(message));
        };
        this.jmsMessageConsumers.put(messageConsumer, Integer.valueOf(consumer.hashCode()));
        jmsConsumer.listen(consumer);
    }

    @Override // io.intino.alexandria.terminal.Connector
    public void detachListeners(Connector.MessageConsumer messageConsumer) {
        Integer num = this.jmsMessageConsumers.get(messageConsumer);
        if (num == null) {
            return;
        }
        for (JmsConsumer jmsConsumer : this.consumers.values()) {
            List list = (List) jmsConsumer.listeners().stream().filter(consumer -> {
                return consumer.hashCode() == num.intValue();
            }).collect(Collectors.toList());
            Objects.requireNonNull(jmsConsumer);
            list.forEach(jmsConsumer::removeListener);
        }
        this.messageConsumers.values().forEach(list2 -> {
            list2.remove(messageConsumer);
        });
    }

    @Override // io.intino.alexandria.terminal.Connector
    public void detachListeners(String str) {
        if (this.consumers.containsKey(str)) {
            this.consumers.get(str).close();
            this.consumers.remove(str);
            this.eventConsumers.get(str).clear();
            this.messageConsumers.get(str).clear();
        }
    }

    @Override // io.intino.alexandria.terminal.Connector
    public void requestResponse(String str, String str2, Consumer<String> consumer) {
        if (this.session == null) {
            Logger.error("Session is null");
            return;
        }
        try {
            JmsProducer queueProducer = new QueueProducer(this.session, str);
            TemporaryQueue createTemporaryQueue = this.session.createTemporaryQueue();
            MessageConsumer createConsumer = this.session.createConsumer(createTemporaryQueue);
            createConsumer.setMessageListener(message -> {
                acceptMessage(consumer, createConsumer, (TextMessage) message);
            });
            TextMessage createTextMessage = this.session.createTextMessage();
            createTextMessage.setText(str2);
            createTextMessage.setJMSReplyTo(createTemporaryQueue);
            createTextMessage.setJMSCorrelationID(createRandomString());
            sendMessage(queueProducer, createTextMessage, 100);
            queueProducer.close();
        } catch (JMSException e) {
            Logger.error(e);
        }
    }

    @Override // io.intino.alexandria.terminal.Connector
    public void requestResponse(String str, String str2, String str3) {
        if (doSendMessage(str, str2, str3)) {
            return;
        }
        this.messageOutBox.push(str, str2);
    }

    public Connection connection() {
        return this.connection;
    }

    public Session session() {
        return this.session;
    }

    public void stop() {
        try {
            this.consumers.values().forEach((v0) -> {
                v0.close();
            });
            this.consumers.clear();
            this.producers.values().forEach((v0) -> {
                v0.close();
            });
            this.producers.clear();
            if (this.session != null) {
                this.session.close();
            }
            if (this.connection != null) {
                this.connection.close();
            }
            this.session = null;
            this.connection = null;
        } catch (Exception e) {
            Logger.error(e);
        }
    }

    private Session createSession(boolean z) throws JMSException {
        return this.connection.createSession(z, z ? 0 : 1);
    }

    private void registerEventConsumer(String str, Consumer<Event> consumer) {
        this.eventConsumers.putIfAbsent(str, new CopyOnWriteArrayList());
        this.eventConsumers.get(str).add(consumer);
        if (this.session == null || this.consumers.containsKey(str)) {
            return;
        }
        this.consumers.put(str, topicConsumer(str));
    }

    private void registerMessageConsumer(String str, Connector.MessageConsumer messageConsumer) {
        this.messageConsumers.putIfAbsent(str, new CopyOnWriteArrayList());
        this.messageConsumers.get(str).add(messageConsumer);
        if (this.session == null || this.consumers.containsKey(str)) {
            return;
        }
        this.consumers.put(str, queueConsumer(str));
    }

    private boolean doSendEvent(String str, Event event) {
        return doSendEvent(str, event, 0);
    }

    private boolean doSendEvent(String str, Event event, int i) {
        if (cannotSendMessage()) {
            return false;
        }
        try {
            topicProducer(str);
            return sendMessage(this.producers.get(str), serialize(event), i);
        } catch (IOException | JMSException e) {
            Logger.error(e);
            return false;
        }
    }

    private boolean doSendMessage(String str, String str2) {
        if (cannotSendMessage()) {
            return false;
        }
        try {
            queueProducer(str);
            return sendMessage(this.producers.get(str), serialize(str2));
        } catch (IOException | JMSException e) {
            Logger.error(e);
            return false;
        }
    }

    private boolean doSendMessage(String str, String str2, String str3) {
        if (cannotSendMessage()) {
            return false;
        }
        try {
            queueProducer(str);
            return sendMessage(this.producers.get(str), serialize(str2, str3));
        } catch (IOException | JMSException e) {
            Logger.error(e);
            return false;
        }
    }

    private void topicProducer(String str) throws JMSException {
        if (this.producers.containsKey(str)) {
            return;
        }
        this.producers.put(str, new TopicProducer(this.session, str));
    }

    private void queueProducer(String str) throws JMSException {
        if (this.producers.containsKey(str)) {
            return;
        }
        this.producers.put(str, new QueueProducer(this.session, str));
    }

    private boolean cannotSendMessage() {
        return this.session == null || !this.connected.get();
    }

    private boolean sendMessage(JmsProducer jmsProducer, javax.jms.Message message) {
        return sendMessage(jmsProducer, message, 0);
    }

    private boolean sendMessage(JmsProducer jmsProducer, javax.jms.Message message, int i) {
        boolean[] zArr = {false};
        try {
            Thread thread = new Thread(() -> {
                zArr[0] = jmsProducer.produce(message, i);
            });
            thread.start();
            thread.join(1000L);
            thread.interrupt();
        } catch (InterruptedException e) {
        }
        return zArr[0];
    }

    private void acceptMessage(Consumer<String> consumer, MessageConsumer messageConsumer, TextMessage textMessage) {
        try {
            consumer.accept(textMessage.getText());
            messageConsumer.close();
        } catch (JMSException e) {
            Logger.error(e);
        }
    }

    private ConnectionListener connectionListener() {
        return new ConnectionListener() { // from class: io.intino.alexandria.terminal.JmsConnector.1
            @Override // org.apache.activemq.transport.TransportListener
            public void transportInterupted() {
                Logger.warn("Connection with Data Hub interrupted!");
                JmsConnector.this.connected.set(false);
            }

            @Override // org.apache.activemq.transport.TransportListener
            public void transportResumed() {
                Logger.info("Connection with Data Hub established!");
                JmsConnector.this.connected.set(true);
                JmsConnector.this.recoverConsumers();
            }
        };
    }

    private void clearProducers() {
        this.producers.values().forEach((v0) -> {
            v0.close();
        });
        this.producers.clear();
    }

    private void clearConsumers() {
        this.consumers.values().forEach((v0) -> {
            v0.close();
        });
        this.consumers.clear();
    }

    private TopicConsumer topicConsumer(String str) {
        try {
            return new TopicConsumer(this.session, str);
        } catch (JMSException e) {
            Logger.error(e);
            return null;
        }
    }

    private QueueConsumer queueConsumer(String str) {
        try {
            return new QueueConsumer(this.session, str);
        } catch (JMSException e) {
            Logger.error(e);
            return null;
        }
    }

    private void recoverConsumers() {
        if (this.started.get()) {
            if (!this.eventConsumers.isEmpty() && this.consumers.isEmpty()) {
                for (String str : this.eventConsumers.keySet()) {
                    this.consumers.put(str, topicConsumer(str));
                }
            }
            if (this.messageConsumers.isEmpty() || !this.consumers.isEmpty()) {
                return;
            }
            for (String str2 : this.messageConsumers.keySet()) {
                if (!this.consumers.containsKey(str2) && this.session != null) {
                    this.consumers.put(str2, queueConsumer(str2));
                }
                for (Connector.MessageConsumer messageConsumer : this.messageConsumers.get(str2)) {
                    Consumer<javax.jms.Message> consumer = message -> {
                        messageConsumer.accept(io.intino.alexandria.jms.MessageReader.textFrom(message), callback(message));
                    };
                    this.jmsMessageConsumers.put(messageConsumer, Integer.valueOf(consumer.hashCode()));
                    this.consumers.get(str2).listen(consumer);
                }
            }
        }
    }

    private synchronized void recoverEventsAndMessages() {
        recoverEvents();
        recoverMessages();
    }

    private void recoverEvents() {
        if (this.eventOutBox == null) {
            return;
        }
        synchronized (this.eventOutBox) {
            if (!this.eventOutBox.isEmpty()) {
                while (!this.eventOutBox.isEmpty()) {
                    Map.Entry<String, Event> entry = this.eventOutBox.get();
                    if (entry != null) {
                        if (!doSendEvent(entry.getKey(), entry.getValue())) {
                            break;
                        } else {
                            this.eventOutBox.pop();
                        }
                    }
                }
            }
        }
    }

    private void recoverMessages() {
        if (this.messageOutBox == null) {
            return;
        }
        synchronized (this.messageOutBox) {
            if (!this.messageOutBox.isEmpty()) {
                while (!this.messageOutBox.isEmpty()) {
                    Map.Entry<String, String> entry = this.messageOutBox.get();
                    if (entry != null) {
                        if (!doSendMessage(entry.getKey(), entry.getValue())) {
                            break;
                        } else {
                            this.messageOutBox.pop();
                        }
                    }
                }
            }
        }
    }

    private void checkConnection() {
        if (this.session != null && this.brokerUrl.startsWith("failover") && !this.connected.get()) {
            Logger.debug("Data-hub currently disconnected. Waiting for reconnection...");
            return;
        }
        if (this.connection != null && ((ActiveMQConnection) this.connection).isStarted() && this.session != null && ((ActiveMQSession) this.session).isRunning()) {
            this.connected.set(true);
            return;
        }
        Logger.debug("Restarting data-hub connection...");
        stop();
        try {
            connect();
        } catch (JMSException e) {
        }
        this.connected.set(true);
    }

    private void initConnection() {
        try {
            this.connection = BusConnector.createConnection(this.brokerUrl, this.user, this.password, connectionListener());
            if (this.connection != null) {
                if (this.clientId != null && !this.clientId.isEmpty()) {
                    this.connection.setClientID(this.clientId);
                }
                this.connection.start();
            }
        } catch (JMSException e) {
            Logger.error(e);
        }
    }

    private String callback(javax.jms.Message message) {
        try {
            ActiveMQDestination activeMQDestination = (ActiveMQDestination) message.getJMSReplyTo();
            if (activeMQDestination == null) {
                return null;
            }
            return activeMQDestination.getPhysicalName();
        } catch (JMSException e) {
            return null;
        }
    }

    private javax.jms.Message serialize(String str, String str2) throws IOException, JMSException {
        javax.jms.Message write = MessageWriter.write(str);
        write.setJMSReplyTo(this.session.createQueue(str2));
        write.setJMSCorrelationID(createRandomString());
        return write;
    }

    private static javax.jms.Message serialize(String str) throws IOException, JMSException {
        return MessageWriter.write(str);
    }

    private static String createRandomString() {
        return Long.toHexString(new Random(System.currentTimeMillis()).nextLong());
    }

    private static javax.jms.Message serialize(Event event) throws IOException, JMSException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        io.intino.alexandria.message.MessageWriter messageWriter = new io.intino.alexandria.message.MessageWriter(byteArrayOutputStream);
        messageWriter.write(event.toMessage());
        messageWriter.close();
        return serialize(byteArrayOutputStream.toString());
    }
}
