/*
 * Decompiled with CFR 0.152.
 */
package io.intino.konos.datalake.jms;

import io.intino.konos.datalake.Datalake;
import io.intino.konos.datalake.ReflowConfiguration;
import io.intino.konos.datalake.ReflowDispatcher;
import io.intino.konos.datalake.fs.FSDatalake;
import io.intino.konos.jms.Consumer;
import io.intino.konos.jms.MessageFactory;
import io.intino.konos.jms.TopicConsumer;
import io.intino.konos.jms.TopicProducer;
import io.intino.ness.inl.Message;
import java.io.File;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JMSDatalake
implements Datalake {
    private static Logger logger = LoggerFactory.getLogger(JMSDatalake.class);
    private final String url;
    private final String user;
    private final String password;
    private final String clientID;
    private Session session;
    private Connection connection;
    private Map<String, TopicProducer> topicProducers = new HashMap<String, TopicProducer>();
    private List<String> registeredTanks;

    public JMSDatalake(String url, String user, String password, String clientID) {
        this.url = url;
        this.user = user;
        this.password = password;
        this.clientID = clientID;
    }

    @Override
    public void connect(String ... args) {
        try {
            if (this.session != null && !((ActiveMQSession)this.session).isClosed()) {
                return;
            }
            this.createConnection();
            boolean transacted = args.length > 0 && args[0].equalsIgnoreCase("transacted");
            this.session = this.connection.createSession(transacted, transacted ? 0 : 1);
            this.registeredTanks = this.registeredTanks();
        }
        catch (JMSException e) {
            logger.error(e.getMessage(), (Throwable)e);
        }
    }

    private List<String> registeredTanks() {
        TopicProducer producer = this.newProducer("service.ness.admin");
        String tanks = this.requestResponseWithTimeout(producer, MessageFactory.createMessageFor((Object)"tanks"), 1000);
        return Arrays.asList(tanks.split(";"));
    }

    public Session session() {
        if (this.session == null || ((ActiveMQSession)this.session).isClosed()) {
            this.connect(new String[0]);
        }
        return this.session;
    }

    @Override
    public Datalake.ReflowSession reflow(ReflowConfiguration reflow, ReflowDispatcher dispatcher) {
        TopicProducer producer = this.newProducer("service.ness.reflow");
        String quickURL = this.tryWithQuickReflow(producer);
        if (quickURL != null && new File(quickURL.replace("file://", "")).exists()) {
            return this.requestResponse(producer, Objects.requireNonNull(MessageFactory.createMessageFor((Object)"startQuickReflow"))).equalsIgnoreCase("ack") ? this.fsReflow(reflow, dispatcher, producer, quickURL) : null;
        }
        return this.reflow(reflow, dispatcher, producer);
    }

    private Datalake.ReflowSession fsReflow(ReflowConfiguration reflow, ReflowDispatcher dispatcher, TopicProducer producer, String quickURL) {
        FSDatalake fsDatalake = new FSDatalake(quickURL);
        dispatcher.tanks().forEach(t -> fsDatalake.add(t.name()));
        return fsDatalake.reflow(reflow, dispatcher, () -> producer.produce(MessageFactory.createMessageFor((Object)"finish")));
    }

    private Datalake.ReflowSession reflow(ReflowConfiguration reflow, final ReflowDispatcher dispatcher, TopicProducer producer) {
        producer.produce(MessageFactory.createMessageFor((Object)reflow));
        this.waitUntilReflowSession();
        final TopicConsumer topicConsumer = new TopicConsumer(this.session, "flow.ness.reflow");
        topicConsumer.listen(m -> this.consume(dispatcher, m), "consumer-flow.ness.reflow");
        return new Datalake.ReflowSession(){

            @Override
            public void next() {
                TopicProducer topicProducer = JMSDatalake.this.newProducer("service.ness.reflow");
                topicProducer.produce(MessageFactory.createMessageFor((Object)"next"));
                topicProducer.close();
            }

            @Override
            public void finish() {
                TopicProducer topicProducer = JMSDatalake.this.newProducer("service.ness.reflow");
                topicProducer.produce(MessageFactory.createMessageFor((Object)"finish"));
                topicProducer.close();
                topicConsumer.stop();
            }

            @Override
            public void play() {
                topicConsumer.listen(m -> JMSDatalake.this.consume(dispatcher, m));
            }

            @Override
            public void pause() {
                topicConsumer.stop();
            }
        };
    }

    private String tryWithQuickReflow(TopicProducer producer) {
        return this.requestResponse(producer, MessageFactory.createMessageFor((Object)"quickReflow"));
    }

    @Override
    public void commit() {
        try {
            this.session.commit();
        }
        catch (JMSException e) {
            logger.error(e.getMessage(), (Throwable)e);
        }
    }

    private String requestResponse(TopicProducer producer, javax.jms.Message message) {
        try {
            message.setJMSReplyTo((Destination)this.session.createTemporaryQueue());
            producer.produce(message);
            String response = Consumer.textFrom((javax.jms.Message)this.session.createConsumer(message.getJMSReplyTo()).receive(1000L));
            producer.close();
            return response;
        }
        catch (JMSException e) {
            logger.error(e.getMessage(), (Throwable)e);
            return "";
        }
    }

    private String requestResponseWithTimeout(TopicProducer producer, javax.jms.Message message, int timeout) {
        try {
            message.setJMSReplyTo((Destination)this.session.createTemporaryQueue());
            producer.produce(message);
            if (this.session.getTransacted()) {
                this.session.commit();
            }
            String response = Consumer.textFrom((javax.jms.Message)this.session.createConsumer(message.getJMSReplyTo()).receive((long)timeout));
            producer.close();
            return response;
        }
        catch (JMSException e) {
            logger.error(e.getMessage(), (Throwable)e);
            return "";
        }
    }

    @Override
    public void add(String tank) {
        if (!this.registeredTanks.contains(tank)) {
            logger.warn("Tank " + tank + " is not registered in datalake");
        }
    }

    @Override
    public void disconnect() {
        try {
            for (TopicProducer topicProducer : this.topicProducers.values()) {
                topicProducer.close();
            }
            if (this.session != null) {
                this.session.close();
                this.session = null;
            }
            if (this.connection != null) {
                this.connection.close();
                this.connection = null;
            }
        }
        catch (JMSException e) {
            logger.error(e.getMessage(), (Throwable)e);
        }
    }

    public Connection connection() {
        return this.connection;
    }

    public void closeSession() {
        try {
            if (this.session != null) {
                this.session.close();
            }
        }
        catch (JMSException e) {
            logger.error(e.getMessage(), (Throwable)e);
        }
    }

    TopicProducer newProducer(String tank) {
        if (this.session() == null) {
            logger.error("Session is null");
            return null;
        }
        try {
            if (!this.topicProducers.containsKey(tank) || this.topicProducers.get(tank).isClosed()) {
                this.topicProducers.put(tank, new TopicProducer(this.session, tank));
            }
            return this.topicProducers.get(tank);
        }
        catch (JMSException e) {
            logger.error(e.getMessage(), (Throwable)e);
            return null;
        }
    }

    private void waitUntilReflowSession() {
        try {
            this.disconnect();
            Thread.sleep(40000L);
            this.connect(new String[0]);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    private void createConnection() throws JMSException {
        this.connection = new ActiveMQConnectionFactory(this.url).createConnection(this.user, this.password);
        if (this.clientID != null && !this.clientID.isEmpty()) {
            this.connection.setClientID(this.clientID);
        }
        this.connection.start();
    }

    private void consume(ReflowDispatcher dispatcher, javax.jms.Message m) {
        dispatcher.dispatch(Message.load((String)Consumer.textFrom((javax.jms.Message)m)));
    }
}

