/*
 * Decompiled with CFR 0.152.
 */
package io.intino.alexandria.nessaccessor.tcp;

import io.intino.alexandria.inl.Message;
import io.intino.alexandria.jms.TopicConsumer;
import io.intino.alexandria.jms.TopicProducer;
import io.intino.alexandria.logger.Logger;
import io.intino.alexandria.nessaccessor.MessageTranslator;
import io.intino.alexandria.nessaccessor.tcp.AdminService;
import io.intino.alexandria.nessaccessor.tcp.TCPDatalake;
import io.intino.alexandria.nessaccessor.tcp.TCPEventTank;
import io.intino.alexandria.zim.ZimStream;
import io.intino.ness.core.Datalake;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;
import javax.jms.JMSException;
import javax.jms.Session;
import org.apache.activemq.ActiveMQSession;

public class TCPEventStore
implements Datalake.EventStore {
    private final Map<String, TCPEventTank> tanks;
    private final Map<String, TopicConsumer> consumers;
    private final Map<String, TopicProducer> producers;
    private final TCPDatalake.Connection connection;
    private AdminService adminService;
    private Session session;

    public TCPEventStore(TCPDatalake.Connection connection) {
        this.connection = connection;
        this.tanks = new HashMap<String, TCPEventTank>();
        this.consumers = new HashMap<String, TopicConsumer>();
        this.producers = new HashMap<String, TopicProducer>();
    }

    public void open() {
        this.session = this.connection.session();
        this.adminService = new AdminService(this.session);
    }

    public Stream<Datalake.EventStore.Tank> tanks() {
        return this.tanks.values().stream().map(t -> t);
    }

    public Datalake.EventStore.Tank tank(String name) {
        TCPEventTank tank = new TCPEventTank(name, this.adminService);
        this.tanks.put(name, tank);
        return tank;
    }

    public void feed(String tank, Message ... messages) {
        for (Message message : messages) {
            this.put(message, this.feedProbe(tank));
        }
    }

    public Datalake.EventStore.Reflow reflow(final Datalake.EventStore.Reflow.Filter filter) {
        return new Datalake.EventStore.Reflow(){
            private ZimStream is = new ZimStream.Merge(this.tankInputStreams());

            ZimStream tankInputStream(Datalake.EventStore.Tank tank) {
                return tank.content(ts -> filter.allow(tank, ts));
            }

            private ZimStream[] tankInputStreams() {
                return (ZimStream[])TCPEventStore.this.tanks().filter(arg_0 -> ((Datalake.EventStore.Reflow.Filter)filter).allow(arg_0)).map(this::tankInputStream).toArray(ZimStream[]::new);
            }

            public void next(int blockSize, Datalake.EventStore.MessageHandler ... messageHandlers) {
                new ReflowBlock(this.is, messageHandlers).reflow(blockSize);
            }
        };
    }

    public Datalake.EventStore.Subscription subscribe(Datalake.EventStore.Tank tank) {
        return (clientId, messageHandlers) -> {
            TopicConsumer topicConsumer = new TopicConsumer(this.session, this.flowProbe(tank.name()));
            if (clientId != null) {
                topicConsumer.listen(message -> this.handle(message, messageHandlers), clientId);
            } else {
                topicConsumer.listen(message -> this.handle(message, messageHandlers));
            }
            this.consumers.put(tank.name(), topicConsumer);
        };
    }

    public void unsubscribe(Datalake.EventStore.Tank tank) {
        TopicConsumer topicConsumer = this.consumers.get(tank.name());
        if (topicConsumer != null) {
            topicConsumer.stop();
        }
        this.consumers.remove(tank.name());
    }

    void seal() {
        this.adminService.send("seal");
    }

    Collection<TopicProducer> producers() {
        return this.producers.values();
    }

    void put(ZimStream stream, String blob) {
        while (stream.hasNext()) {
            this.put(stream.next(), this.putProbe(blob));
        }
    }

    private void put(Message message, String topic) {
        if (this.session == null || ((ActiveMQSession)this.session).isClosed()) {
            Logger.error((String)"Session closed");
            return;
        }
        this.producer(topic).produce(MessageTranslator.fromInlMessage(message));
    }

    private void handle(javax.jms.Message message, Datalake.EventStore.MessageHandler[] messageHandlers) {
        for (Datalake.EventStore.MessageHandler handler : messageHandlers) {
            handler.handle(MessageTranslator.toInlMessage(message));
        }
    }

    private String feedProbe(String name) {
        return "feed." + name;
    }

    private String flowProbe(String name) {
        return "flow." + name;
    }

    private String putProbe(String name) {
        return "put." + name;
    }

    private TopicProducer producer(String topic) {
        try {
            if (this.producers.containsKey(topic) && !this.producers.get(topic).isClosed()) {
                return this.producers.get(topic);
            }
            this.producers.put(topic, new TopicProducer(this.session, topic));
            return this.producers.get(topic);
        }
        catch (JMSException e) {
            Logger.error((Throwable)e);
            return null;
        }
    }

    private static class ReflowBlock {
        private final ZimStream is;
        private final Datalake.EventStore.MessageHandler[] messageHandlers;

        ReflowBlock(ZimStream is, Datalake.EventStore.MessageHandler[] messageHandlers) {
            this.is = is;
            this.messageHandlers = messageHandlers;
        }

        void reflow(int blockSize) {
            this.terminate(this.process(blockSize));
        }

        private int process(int messages) {
            int pendingMessages = messages;
            while (this.is.hasNext() && pendingMessages-- >= 0) {
                Message message = this.is.next();
                Arrays.stream(this.messageHandlers).forEach(mh -> mh.handle(message));
            }
            return messages - pendingMessages;
        }

        private void terminate(int reflowedMessages) {
            Arrays.stream(this.messageHandlers).forEach(mh -> mh.handle(this.controlMessage(reflowedMessages)));
        }

        private Message controlMessage(int processedMessagesCount) {
            return new Message(this.type()).set("count", Integer.valueOf(processedMessagesCount));
        }

        private String type() {
            return this.is.hasNext() ? "endBlock" : "endReflow";
        }
    }
}

