package io.intino.alexandria.event;

import io.intino.alexandria.event.EventHub;
import io.intino.alexandria.jms.BusConnector;
import io.intino.alexandria.jms.ConnectionListener;
import io.intino.alexandria.jms.JmsConsumer;
import io.intino.alexandria.jms.JmsProducer;
import io.intino.alexandria.jms.QueueConsumer;
import io.intino.alexandria.jms.QueueProducer;
import io.intino.alexandria.jms.TopicConsumer;
import io.intino.alexandria.jms.TopicProducer;
import io.intino.alexandria.logger.Logger;
import io.intino.alexandria.message.MessageReader;
import io.intino.alexandria.message.MessageWriter;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;

/* loaded from: input_file:io/intino/alexandria/event/JmsEventHub.class */
public class JmsEventHub implements EventHub {
    private final Map<String, JmsProducer> producers;
    private final Map<String, JmsConsumer> consumers;
    private final Map<String, List<Consumer<Event>>> eventConsumers;
    private final Map<Consumer<Message>, Integer> jmsConsumers;
    private final EventOutBox eventOutBox;
    private Connection connection;
    private Session session;
    private AtomicBoolean connected;
    private AtomicBoolean recoveringEvents;
    private ScheduledExecutorService scheduler;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/intino/alexandria/event/JmsEventHub$MessageDeserializer.class */
    public static class MessageDeserializer {
        private MessageDeserializer() {
        }

        static io.intino.alexandria.message.Message deserialize(Message message) {
            return new MessageReader(io.intino.alexandria.jms.MessageReader.textFrom(message)).next();
        }
    }

    public JmsEventHub(String str, String str2, String str3, String str4, File file) {
        this(str, str2, str3, str4, false, file);
    }

    public JmsEventHub(String str, String str2, String str3, String str4, boolean z, File file) {
        this.connected = new AtomicBoolean(false);
        this.recoveringEvents = new AtomicBoolean(false);
        this.producers = new HashMap();
        this.consumers = new HashMap();
        this.jmsConsumers = new HashMap();
        this.eventConsumers = new HashMap();
        this.eventOutBox = new EventOutBox(file);
        if (str == null || str.isEmpty()) {
            Logger.warn("Broker url is null");
        } else {
            Thread currentThread = Thread.currentThread();
            new Thread(() -> {
                initConnection(str, str2, str3, str4);
                currentThread.interrupt();
            }).start();
            try {
                try {
                    Thread.sleep(5000L);
                } catch (InterruptedException e) {
                }
                if (this.connection != null && this.connection.isStarted()) {
                    this.session = createSession(z);
                    if (this.session != null && this.session.isRunning()) {
                        this.connected.set(true);
                        Logger.info("Connection with Data Hub stablished!");
                    }
                }
            } catch (JMSException e2) {
                Logger.error(e2);
            }
        }
        this.scheduler = Executors.newScheduledThreadPool(1);
        this.scheduler.scheduleAtFixedRate(this::recoverEvents, 0L, 1L, TimeUnit.HOURS);
    }

    private void initConnection(String str, String str2, String str3, String str4) {
        try {
            this.connection = BusConnector.createConnection(str, str2, str3, connectionListener());
            if (str4 != null && !str4.isEmpty()) {
                this.connection.setClientID(str4);
            }
            this.connection.start();
        } catch (JMSException e) {
            Logger.error(e);
        }
    }

    public synchronized void sendEvent(String str, Event event) {
        new ArrayList(this.eventConsumers.getOrDefault(str, Collections.emptyList())).forEach(consumer -> {
            consumer.accept(event);
        });
        new Thread(() -> {
            if (this.connected.get() && !this.eventOutBox.isEmpty() && !this.recoveringEvents.get()) {
                recoverEvents();
            }
            if (doSendEvent(str, event)) {
                return;
            }
            this.eventOutBox.push(str, event);
        }).start();
    }

    public void requestResponse(String str, String str2, Consumer<String> consumer) {
        if (this.session == null) {
            Logger.error("Session is null");
            return;
        }
        try {
            QueueProducer queueProducer = new QueueProducer(this.session, str);
            TemporaryQueue createTemporaryQueue = this.session.createTemporaryQueue();
            MessageConsumer createConsumer = this.session.createConsumer(createTemporaryQueue);
            createConsumer.setMessageListener(message -> {
                acceptMessage(consumer, createConsumer, (TextMessage) message);
            });
            TextMessage createTextMessage = this.session.createTextMessage();
            createTextMessage.setText(str2);
            createTextMessage.setJMSReplyTo(createTemporaryQueue);
            createTextMessage.setJMSCorrelationID(createRandomString());
            queueProducer.produce(createTextMessage);
            queueProducer.close();
        } catch (JMSException e) {
            Logger.error(e);
        }
    }

    public void attachListener(String str, Consumer<Event> consumer) {
        registerConsumer(str, consumer);
        JmsConsumer jmsConsumer = this.consumers.get(str);
        if (jmsConsumer == null) {
            return;
        }
        Consumer<Message> consumer2 = message -> {
            consumer.accept(new Event(MessageDeserializer.deserialize(message)));
        };
        this.jmsConsumers.put(consumer2, Integer.valueOf(consumer2.hashCode()));
        jmsConsumer.listen(consumer2);
    }

    public void attachListener(String str, String str2, Consumer<Event> consumer) {
        registerConsumer(str, consumer);
        TopicConsumer topicConsumer = this.consumers.get(str);
        if (topicConsumer == null) {
            return;
        }
        Consumer<Message> consumer2 = message -> {
            consumer.accept(new Event(MessageDeserializer.deserialize(message)));
        };
        this.jmsConsumers.put(consumer2, Integer.valueOf(consumer2.hashCode()));
        topicConsumer.listen(consumer2, str2);
    }

    public void detachListeners(String str) {
        if (this.consumers.containsKey(str)) {
            this.consumers.get(str).close();
            this.consumers.remove(str);
            this.eventConsumers.get(str).clear();
        }
    }

    public void detachListeners(Consumer<Event> consumer) {
        Integer num = this.jmsConsumers.get(consumer);
        if (num == null) {
            return;
        }
        this.eventConsumers.values().forEach(list -> {
            list.remove(consumer);
        });
        for (JmsConsumer jmsConsumer : this.consumers.values()) {
            List list2 = (List) jmsConsumer.listeners().stream().filter(consumer2 -> {
                return consumer2.hashCode() == num.intValue();
            }).collect(Collectors.toList());
            Objects.requireNonNull(jmsConsumer);
            list2.forEach(jmsConsumer::removeListener);
        }
    }

    public void attachRequestListener(String str, EventHub.RequestConsumer requestConsumer) {
        if (this.session == null) {
            return;
        }
        if (!this.consumers.containsKey(str)) {
            this.consumers.put(str, queueConsumer(str));
        }
        JmsConsumer jmsConsumer = this.consumers.get(str);
        if (jmsConsumer == null) {
            return;
        }
        if (jmsConsumer instanceof QueueConsumer) {
            jmsConsumer.listen(message -> {
                new Thread(() -> {
                    try {
                        String accept = requestConsumer.accept(io.intino.alexandria.jms.MessageReader.textFrom(message));
                        if (accept == null) {
                            return;
                        }
                        TextMessage createTextMessage = this.session.createTextMessage();
                        createTextMessage.setText(accept);
                        createTextMessage.setJMSCorrelationID(message.getJMSCorrelationID());
                        QueueProducer queueProducer = new QueueProducer(this.session, message.getJMSReplyTo());
                        queueProducer.produce(createTextMessage);
                        queueProducer.close();
                    } catch (JMSException e) {
                        Logger.error(e);
                    }
                }).start();
            });
        } else {
            Logger.error("Already exists a topic and queue with this path " + str);
        }
    }

    public Connection connection() {
        return this.connection;
    }

    public Session session() {
        return this.session;
    }

    public void stop() {
        this.consumers.values().forEach((v0) -> {
            v0.close();
        });
        this.consumers.clear();
        this.producers.values().forEach((v0) -> {
            v0.close();
        });
        try {
            this.session.close();
            this.connection.close();
        } catch (JMSException e) {
            Logger.error(e);
        }
    }

    private Session createSession(boolean z) throws JMSException {
        return this.connection.createSession(z, z ? 0 : 1);
    }

    private void registerConsumer(String str, Consumer<Event> consumer) {
        this.eventConsumers.putIfAbsent(str, new CopyOnWriteArrayList());
        this.eventConsumers.get(str).add(consumer);
        if (this.consumers.containsKey(str) || this.session == null) {
            return;
        }
        this.consumers.put(str, topicConsumer(str));
    }

    private boolean doSendEvent(String str, Event event) {
        if (this.session == null || !this.connected.get()) {
            return false;
        }
        try {
            if (!this.producers.containsKey(str)) {
                this.producers.put(str, new TopicProducer(this.session, str));
            }
            JmsProducer jmsProducer = this.producers.get(str);
            if (jmsProducer == null) {
                return false;
            }
            return jmsProducer.produce(serialize(event));
        } catch (JMSException | IOException e) {
            Logger.error(e);
            return false;
        }
    }

    private void acceptMessage(Consumer<String> consumer, MessageConsumer messageConsumer, TextMessage textMessage) {
        try {
            consumer.accept(textMessage.getText());
            messageConsumer.close();
        } catch (JMSException e) {
            Logger.error(e);
        }
    }

    private ConnectionListener connectionListener() {
        return new ConnectionListener() { // from class: io.intino.alexandria.event.JmsEventHub.1
            public void transportInterupted() {
                JmsEventHub.this.connected.set(false);
            }

            public void transportResumed() {
                Logger.info("Connection with Data Hub resumed!");
                JmsEventHub.this.connected.set(true);
                if (JmsEventHub.this.eventConsumers.isEmpty() || !JmsEventHub.this.consumers.isEmpty()) {
                    return;
                }
                try {
                    JmsEventHub.this.session = JmsEventHub.this.createSession(false);
                    for (String str : JmsEventHub.this.eventConsumers.keySet()) {
                        JmsEventHub.this.consumers.put(str, JmsEventHub.this.topicConsumer(str));
                    }
                } catch (JMSException e) {
                    Logger.error(e);
                }
            }
        };
    }

    private TopicConsumer topicConsumer(String str) {
        try {
            return new TopicConsumer(this.session, str);
        } catch (JMSException e) {
            Logger.error(e);
            return null;
        }
    }

    private QueueConsumer queueConsumer(String str) {
        try {
            return new QueueConsumer(this.session, str);
        } catch (JMSException e) {
            Logger.error(e);
            return null;
        }
    }

    private void recoverEvents() {
        this.recoveringEvents.set(true);
        if (!this.eventOutBox.isEmpty()) {
            while (!this.eventOutBox.isEmpty()) {
                Map.Entry<String, Event> entry = this.eventOutBox.get();
                if (entry != null) {
                    if (!doSendEvent(entry.getKey(), entry.getValue())) {
                        break;
                    } else {
                        this.eventOutBox.pop();
                    }
                }
            }
        }
        this.recoveringEvents.set(false);
    }

    private static String createRandomString() {
        return Long.toHexString(new Random(System.currentTimeMillis()).nextLong());
    }

    private static Message serialize(Event event) throws IOException, JMSException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        MessageWriter messageWriter = new MessageWriter(byteArrayOutputStream);
        messageWriter.write(event.toMessage());
        messageWriter.close();
        return io.intino.alexandria.jms.MessageWriter.write(byteArrayOutputStream.toString());
    }
}
