/*
 * Decompiled with CFR 0.152.
 */
package io.intino.alexandria.nessaccessor.tcp;

import io.intino.alexandria.logger.Logger;
import io.intino.alexandria.nessaccessor.tcp.TCPEventStore;
import io.intino.alexandria.zim.ZimReader;
import io.intino.alexandria.zim.ZimStream;
import io.intino.ness.core.Blob;
import io.intino.ness.core.Datalake;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.stream.Stream;
import java.util.zip.GZIPInputStream;
import javax.jms.JMSException;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;

public class TCPDatalake
implements Datalake {
    private final Connection connection;
    private TCPEventStore tcpEventStore;

    public TCPDatalake(String uri, String username, String password, String clientId) {
        this.connection = new Connection(uri, username, password, clientId, this.onOpen(), this.onClose());
        this.tcpEventStore = new TCPEventStore(this.connection);
    }

    public Datalake.Connection connection() {
        return this.connection;
    }

    public Datalake.EventStore eventStore() {
        return this.tcpEventStore;
    }

    public Datalake.SetStore setStore() {
        return null;
    }

    public void seal() {
        this.tcpEventStore.seal();
    }

    public void push(Stream<Blob> blobs) {
        blobs.filter(b -> b.type().equals((Object)Blob.Type.event)).forEach(b -> this.tcpEventStore.put(this.read((Blob)b), b.name()));
    }

    private CallBack onOpen() {
        return () -> this.tcpEventStore.open();
    }

    private CallBack onClose() {
        return () -> this.tcpEventStore.producers().forEach(p -> {
            if (!p.isClosed()) {
                p.close();
            }
        });
    }

    private ZimStream read(Blob b) {
        try {
            return new ZimReader((InputStream)new GZIPInputStream(b.inputStream()));
        }
        catch (IOException e) {
            Logger.error((Throwable)e);
            return new ZimReader((InputStream)new ByteArrayInputStream(new byte[0]));
        }
    }

    public static class Connection
    implements Datalake.Connection {
        private final String uri;
        private final String username;
        private final String password;
        private final String clientId;
        private final CallBack onOpen;
        private final CallBack onClose;
        private Session session;

        Connection(String uri, String username, String password, String clientId, CallBack onOpen, CallBack onClose) {
            this.uri = uri;
            this.username = username;
            this.password = password;
            this.clientId = clientId;
            this.onOpen = onOpen;
            this.onClose = onClose;
        }

        public void connect(String ... args) {
            this.session = this.createSession(args.length > 0 ? args[0] : "");
            this.onOpen.execute();
        }

        private Session createSession(String arg) {
            try {
                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.uri);
                javax.jms.Connection connection = connectionFactory.createConnection(this.username, this.password);
                if (this.clientId != null) {
                    connection.setClientID(this.clientId);
                }
                connection.start();
                return connection.createSession("Transacted".equals(arg), 1);
            }
            catch (JMSException e) {
                Logger.error((Throwable)e);
                return null;
            }
        }

        public Session session() {
            return this.session;
        }

        public void disconnect() {
            if (this.session != null && !((ActiveMQSession)this.session).isClosed()) {
                try {
                    this.onClose.execute();
                    this.session.close();
                }
                catch (JMSException e) {
                    Logger.error((Throwable)e);
                }
            }
        }
    }

    private static interface CallBack {
        public void execute();
    }
}

