/*
 * Decompiled with CFR 0.152.
 */
package io.intino.alexandria.datalake.file.eventsourcing;

import io.intino.alexandria.Timetag;
import io.intino.alexandria.datalake.Datalake;
import io.intino.alexandria.datalake.file.eventsourcing.EventHandler;
import io.intino.alexandria.datalake.file.eventsourcing.EventPump;
import io.intino.alexandria.event.Event;
import io.intino.alexandria.event.EventStream;
import java.util.Arrays;

public class FileEventPump
implements EventPump {
    private final Datalake.EventStore store;

    public FileEventPump(Datalake.EventStore store) {
        this.store = store;
    }

    @Override
    public EventPump.Reflow reflow(final EventPump.Reflow.Filter filter) {
        return new EventPump.Reflow(){
            private EventStream is = new EventStream.Merge(this.tankInputStreams());

            EventStream tankInputStream(Datalake.EventStore.Tank tank) {
                return tank.content(ts -> filter.allow(tank, (Timetag)ts));
            }

            private EventStream[] tankInputStreams() {
                return (EventStream[])FileEventPump.this.store.tanks().filter(filter::allow).map(this::tankInputStream).toArray(EventStream[]::new);
            }

            @Override
            public boolean hasNext() {
                return this.is.hasNext();
            }

            @Override
            public void next(int blockSize, EventHandler ... eventHandlers) {
                new ReflowBlock(this.is, eventHandlers).reflow(blockSize);
            }
        };
    }

    private static class ReflowBlock {
        private final EventStream is;
        private final EventHandler[] eventHandlers;

        ReflowBlock(EventStream is, EventHandler[] eventHandlers) {
            this.is = is;
            this.eventHandlers = eventHandlers;
        }

        void reflow(int blockSize) {
            this.terminate(this.process(blockSize));
        }

        private int process(int messages) {
            int pendingMessages = messages;
            while (this.is.hasNext() && pendingMessages-- >= 0) {
                Event event = this.is.next();
                Arrays.stream(this.eventHandlers).forEach(mh -> mh.handle(event));
            }
            return messages - pendingMessages;
        }

        private void terminate(int reflowedMessages) {
            Arrays.stream(this.eventHandlers).filter(m -> m instanceof EventPump.ReflowHandler).map(m -> (EventPump.ReflowHandler)((Object)m)).forEach(m -> this.terminate((EventPump.ReflowHandler)m, reflowedMessages));
        }

        private void terminate(EventPump.ReflowHandler reflowHandler, int reflowedMessages) {
            if (this.is.hasNext()) {
                reflowHandler.onBlock(reflowedMessages);
            } else {
                reflowHandler.onFinish(reflowedMessages);
            }
        }
    }
}

