package io.intino.konos.datalake.jms;

import io.intino.konos.datalake.Datalake;
import io.intino.konos.datalake.ReflowConfiguration;
import io.intino.konos.datalake.ReflowDispatcher;
import io.intino.konos.datalake.fs.FSDatalake;
import io.intino.konos.jms.Consumer;
import io.intino.konos.jms.MessageFactory;
import io.intino.konos.jms.TopicConsumer;
import io.intino.konos.jms.TopicProducer;
import java.io.File;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/intino/konos/datalake/jms/JMSDatalake.class */
public class JMSDatalake implements Datalake {
    private static Logger logger = LoggerFactory.getLogger(JMSDatalake.class);
    private final String url;
    private final String user;
    private final String password;
    private final String clientID;
    private Session session;
    private Connection connection;
    private List<String> registeredTanks;
    private final String ADMIN_PATH = "service.ness.admin";
    private Map<String, TopicProducer> topicProducers = new HashMap();

    public JMSDatalake(String str, String str2, String str3, String str4) {
        this.url = str;
        this.user = str2;
        this.password = str3;
        this.clientID = str4;
    }

    @Override // io.intino.konos.datalake.Datalake
    public void connect(String... strArr) {
        try {
            if (this.session == null || this.session.isClosed()) {
                createConnection();
                boolean z = strArr.length > 0 && strArr[0].equalsIgnoreCase("transacted");
                this.session = this.connection.createSession(z, z ? 0 : 1);
                this.registeredTanks = registeredTanks();
            }
        } catch (JMSException e) {
            logger.error(e.getMessage(), e);
        }
    }

    private List<String> registeredTanks() {
        return Arrays.asList(requestResponseWithTimeout(newProducer("service.ness.admin"), MessageFactory.createMessageFor("tanks"), 1000).split(";"));
    }

    public void batch(String str, int i) {
        newProducer("service.ness.admin").produce(MessageFactory.createMessageFor("batch:" + str + ":" + i));
    }

    public void endBatch(String str) {
        newProducer("service.ness.admin").produce(MessageFactory.createMessageFor("endBatch:" + str));
    }

    public Session session() {
        if (this.session == null || this.session.isClosed()) {
            connect(new String[0]);
        }
        return this.session;
    }

    @Override // io.intino.konos.datalake.Datalake
    public Datalake.ReflowSession reflow(ReflowConfiguration reflowConfiguration, ReflowDispatcher reflowDispatcher) {
        TopicProducer newProducer = newProducer(Datalake.REFLOW_PATH);
        String tryWithQuickReflow = tryWithQuickReflow(newProducer);
        if (tryWithQuickReflow == null || !new File(tryWithQuickReflow.replace("file://", "")).exists()) {
            return reflow(reflowConfiguration, reflowDispatcher, newProducer);
        }
        if (requestResponse(newProducer, (Message) Objects.requireNonNull(MessageFactory.createMessageFor("startQuickReflow"))).equalsIgnoreCase("ack")) {
            return fsReflow(reflowConfiguration, reflowDispatcher, newProducer, tryWithQuickReflow);
        }
        return null;
    }

    private Datalake.ReflowSession fsReflow(ReflowConfiguration reflowConfiguration, ReflowDispatcher reflowDispatcher, TopicProducer topicProducer, String str) {
        FSDatalake fSDatalake = new FSDatalake(str);
        reflowDispatcher.tanks().forEach(tank -> {
            fSDatalake.add(tank.name());
        });
        return fSDatalake.reflow(reflowConfiguration, reflowDispatcher, () -> {
            topicProducer.produce(MessageFactory.createMessageFor("finish"));
        });
    }

    private Datalake.ReflowSession reflow(ReflowConfiguration reflowConfiguration, final ReflowDispatcher reflowDispatcher, TopicProducer topicProducer) {
        topicProducer.produce(MessageFactory.createMessageFor(reflowConfiguration));
        waitUntilReflowSession();
        final TopicConsumer topicConsumer = new TopicConsumer(this.session, Datalake.FLOW_PATH);
        topicConsumer.listen(message -> {
            consume(reflowDispatcher, message);
        }, "consumer-flow.ness.reflow");
        return new Datalake.ReflowSession() { // from class: io.intino.konos.datalake.jms.JMSDatalake.1
            @Override // io.intino.konos.datalake.Datalake.ReflowSession
            public void next() {
                TopicProducer newProducer = JMSDatalake.this.newProducer(Datalake.REFLOW_PATH);
                newProducer.produce(MessageFactory.createMessageFor("next"));
                newProducer.close();
            }

            @Override // io.intino.konos.datalake.Datalake.ReflowSession
            public void finish() {
                TopicProducer newProducer = JMSDatalake.this.newProducer(Datalake.REFLOW_PATH);
                newProducer.produce(MessageFactory.createMessageFor("finish"));
                newProducer.close();
                topicConsumer.stop();
            }

            @Override // io.intino.konos.datalake.Datalake.ReflowSession
            public void play() {
                TopicConsumer topicConsumer2 = topicConsumer;
                ReflowDispatcher reflowDispatcher2 = reflowDispatcher;
                topicConsumer2.listen(message2 -> {
                    JMSDatalake.this.consume(reflowDispatcher2, message2);
                });
            }

            @Override // io.intino.konos.datalake.Datalake.ReflowSession
            public void pause() {
                topicConsumer.stop();
            }
        };
    }

    private String tryWithQuickReflow(TopicProducer topicProducer) {
        return requestResponse(topicProducer, MessageFactory.createMessageFor("quickReflow"));
    }

    @Override // io.intino.konos.datalake.Datalake
    public void commit() {
        try {
            this.session.commit();
        } catch (JMSException e) {
            logger.error(e.getMessage(), e);
        }
    }

    private String requestResponse(TopicProducer topicProducer, Message message) {
        try {
            message.setJMSReplyTo(this.session.createTemporaryQueue());
            topicProducer.produce(message);
            String textFrom = Consumer.textFrom(this.session.createConsumer(message.getJMSReplyTo()).receive(1000L));
            topicProducer.close();
            return textFrom;
        } catch (JMSException e) {
            logger.error(e.getMessage(), e);
            return "";
        }
    }

    private String requestResponseWithTimeout(TopicProducer topicProducer, Message message, int i) {
        try {
            message.setJMSReplyTo(this.session.createTemporaryQueue());
            topicProducer.produce(message);
            if (this.session.getTransacted()) {
                this.session.commit();
            }
            String textFrom = Consumer.textFrom(this.session.createConsumer(message.getJMSReplyTo()).receive(i));
            topicProducer.close();
            return textFrom;
        } catch (JMSException e) {
            logger.error(e.getMessage(), e);
            return "";
        }
    }

    @Override // io.intino.konos.datalake.Datalake
    public void add(String str) {
        if (this.registeredTanks.contains(str)) {
            return;
        }
        logger.warn("Tank " + str + " is not registered in datalake");
    }

    @Override // io.intino.konos.datalake.Datalake
    public void disconnect() {
        try {
            Iterator<TopicProducer> it = this.topicProducers.values().iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            if (this.session != null) {
                this.session.close();
                this.session = null;
            }
            if (this.connection != null) {
                this.connection.close();
                this.connection = null;
            }
        } catch (JMSException e) {
            logger.error(e.getMessage(), e);
        }
    }

    public Connection connection() {
        return this.connection;
    }

    public void closeSession() {
        try {
            if (this.session != null) {
                this.session.close();
            }
        } catch (JMSException e) {
            logger.error(e.getMessage(), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TopicProducer newProducer(String str) {
        if (session() == null) {
            logger.error("Session is null");
            return null;
        }
        try {
            if (!this.topicProducers.containsKey(str) || this.topicProducers.get(str).isClosed()) {
                this.topicProducers.put(str, new TopicProducer(this.session, str));
            }
            return this.topicProducers.get(str);
        } catch (JMSException e) {
            logger.error(e.getMessage(), e);
            return null;
        }
    }

    private void waitUntilReflowSession() {
        try {
            disconnect();
            Thread.sleep(40000L);
            connect(new String[0]);
        } catch (InterruptedException e) {
        }
    }

    private void createConnection() throws JMSException {
        this.connection = new ActiveMQConnectionFactory(this.url).createConnection(this.user, this.password);
        if (this.clientID != null && !this.clientID.isEmpty()) {
            this.connection.setClientID(this.clientID);
        }
        this.connection.start();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void consume(ReflowDispatcher reflowDispatcher, Message message) {
        reflowDispatcher.dispatch(io.intino.ness.inl.Message.load(Consumer.textFrom(message)));
    }
}
