/*
 * Decompiled with CFR 0.152.
 */
package io.intino.alexandria.nessaccessor.tcp;

import io.intino.alexandria.logger.Logger;
import io.intino.alexandria.nessaccessor.tcp.TCPEventStore;
import io.intino.alexandria.zim.ZimReader;
import io.intino.alexandria.zim.ZimStream;
import io.intino.ness.core.Blob;
import io.intino.ness.core.Datalake;
import java.util.stream.Stream;
import javax.jms.JMSException;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;

public class TcpDatalake
implements Datalake {
    private final Connection connection;
    private TCPEventStore tcpEventStore;

    public TcpDatalake(String uri, String username, String password, String clientId) {
        this.connection = new Connection(uri, username, password, clientId, this.onClose());
        this.tcpEventStore = new TCPEventStore(this.connection.session);
    }

    public Datalake.Connection connection() {
        return this.connection;
    }

    public Datalake.EventStore eventStore() {
        return this.tcpEventStore;
    }

    public Datalake.SetStore setStore() {
        return null;
    }

    public void seal() {
        this.tcpEventStore.seal();
    }

    public void push(Stream<Blob> blobs) {
        blobs.filter(b -> b.type().equals((Object)Blob.Type.event)).forEach(b -> this.tcpEventStore.put(this.read((Blob)b), b.name()));
    }

    private ZimStream read(Blob b) {
        return new ZimReader(b.inputStream());
    }

    private CallBack onClose() {
        return () -> this.tcpEventStore.producers().forEach(p -> {
            if (!p.isClosed()) {
                p.close();
            }
        });
    }

    private static interface CallBack {
        public void execute();
    }

    public static class Connection
    implements Datalake.Connection {
        private final String uri;
        private final String username;
        private final String password;
        private final String clientId;
        private Session session;
        private final CallBack onClose;

        Connection(String uri, String username, String password, String clientId, CallBack onClose) {
            this.uri = uri;
            this.username = username;
            this.password = password;
            this.clientId = clientId;
            this.onClose = onClose;
        }

        public void connect(String ... args) {
            this.session = this.createSession(args[0]);
        }

        private Session createSession(String arg) {
            try {
                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.uri);
                javax.jms.Connection connection = connectionFactory.createConnection(this.username, this.password);
                connection.start();
                return connection.createSession(arg != null && arg.equals("Transacted"), 1);
            }
            catch (JMSException e) {
                Logger.error((Throwable)e);
                return null;
            }
        }

        public Session session() {
            return this.session;
        }

        public void disconnect() {
            if (this.session != null && !((ActiveMQSession)this.session).isClosed()) {
                try {
                    this.onClose.execute();
                    this.session.close();
                }
                catch (JMSException e) {
                    Logger.error((Throwable)e);
                }
            }
        }
    }
}

