/*
 * Decompiled with CFR 0.152.
 */
package io.intino.ness.core.fs;

import io.intino.alexandria.inl.Message;
import io.intino.alexandria.zim.ZimStream;
import io.intino.ness.core.Datalake;
import io.intino.ness.core.Timetag;
import io.intino.ness.core.fs.FS;
import io.intino.ness.core.fs.FSEventTank;
import java.io.File;
import java.util.Arrays;
import java.util.stream.Stream;

public class FSEventStore
implements Datalake.EventStore {
    public static final String EventExtension = ".zim";
    public static final String SessionExtension = ".inl";
    private File root;

    public FSEventStore(File root) {
        this.root = root;
    }

    @Override
    public Stream<Datalake.EventStore.Tank> tanks() {
        return FS.foldersIn(this.root).map(FSEventTank::new);
    }

    @Override
    public Datalake.EventStore.Tank tank(String name) {
        return new FSEventTank(new File(this.root, name));
    }

    @Override
    public Datalake.EventStore.Reflow reflow(final Datalake.EventStore.Reflow.Filter filter) {
        return new Datalake.EventStore.Reflow(){
            private ZimStream is = new ZimStream.Merge(this.tankInputStreams());

            ZimStream tankInputStream(Datalake.EventStore.Tank tank) {
                return tank.content(ts -> filter.allow(tank, (Timetag)ts));
            }

            private ZimStream[] tankInputStreams() {
                return (ZimStream[])FSEventStore.this.tanks().filter(filter::allow).map(this::tankInputStream).toArray(ZimStream[]::new);
            }

            @Override
            public void next(int blockSize, Datalake.EventStore.MessageHandler ... messageHandlers) {
                new ReflowBlock(this.is, messageHandlers).reflow(blockSize);
            }
        };
    }

    @Override
    public Datalake.EventStore.Subscription subscribe(Datalake.EventStore.Tank tank) {
        return (clientId, messageHandlers) -> {};
    }

    @Override
    public void unsubscribe(Datalake.EventStore.Tank tank) {
    }

    private static class ReflowBlock {
        private final ZimStream is;
        private final Datalake.EventStore.MessageHandler[] messageHandlers;

        ReflowBlock(ZimStream is, Datalake.EventStore.MessageHandler[] messageHandlers) {
            this.is = is;
            this.messageHandlers = messageHandlers;
        }

        void reflow(int blockSize) {
            this.terminate(this.process(blockSize));
        }

        private int process(int messages) {
            int pendingMessages = messages;
            while (this.is.hasNext() && pendingMessages-- >= 0) {
                Message message = this.is.next();
                Arrays.stream(this.messageHandlers).forEach(mh -> mh.handle(message));
            }
            return messages - pendingMessages;
        }

        private void terminate(int reflowedMessages) {
            Arrays.stream(this.messageHandlers).filter(m -> m instanceof Datalake.EventStore.ReflowHandler).map(m -> (Datalake.EventStore.ReflowHandler)((Object)m)).forEach(m -> this.terminate((Datalake.EventStore.ReflowHandler)m, reflowedMessages));
        }

        private void terminate(Datalake.EventStore.ReflowHandler reflowHandler, int reflowedMessages) {
            if (this.is.hasNext()) {
                reflowHandler.onBlock(reflowedMessages);
            } else {
                reflowHandler.onFinish(reflowedMessages);
            }
        }
    }
}

