/*
 * Decompiled with CFR 0.152.
 */
package io.intino.datahub.datalake.pump;

import io.intino.alexandria.Timetag;
import io.intino.alexandria.datalake.Datalake;
import io.intino.alexandria.event.Event;
import io.intino.alexandria.event.EventStream;
import io.intino.datahub.datalake.pump.EventPump;
import java.util.Arrays;
import java.util.Iterator;
import java.util.stream.Stream;

public class FileEventPump
implements EventPump {
    private final Datalake.Store<? extends Event> store;

    public FileEventPump(Datalake.Store<? extends Event> store) {
        this.store = store;
    }

    @Override
    public EventPump.Reflow reflow(final EventPump.Reflow.Filter filter) {
        return new EventPump.Reflow(){
            private final Stream<Stream<Event>> streamStream = this.tankInputStreams();
            private final Iterator<? extends Event> iterator = EventStream.merge(this.streamStream).iterator();

            private Stream<Stream<Event>> tankInputStreams() {
                return FileEventPump.this.store.tanks().filter(filter::allow).map(this::tankInputStream);
            }

            Stream<Event> tankInputStream(Datalake.Store.Tank<? extends Event> tank) {
                return tank.content((ss, tt) -> filter.allow(tank, (Datalake.Store.Source<? extends Event>)ss, (Timetag)tt));
            }

            @Override
            public boolean hasNext() {
                return this.iterator.hasNext();
            }

            @Override
            public void next(int blockSize, EventPump.EventHandler ... eventHandlers) {
                new ReflowBlock(this.iterator, eventHandlers).reflow(blockSize);
            }
        };
    }

    private static class ReflowBlock {
        private final Iterator<? extends Event> is;
        private final EventPump.EventHandler[] eventHandlers;

        ReflowBlock(Iterator<? extends Event> is, EventPump.EventHandler[] eventHandlers) {
            this.is = is;
            this.eventHandlers = eventHandlers;
        }

        void reflow(int blockSize) {
            this.terminate(this.process(blockSize));
        }

        private int process(int messages) {
            int pendingMessages = messages;
            while (this.is.hasNext() && pendingMessages-- >= 0) {
                Event event = this.is.next();
                Arrays.stream(this.eventHandlers).forEach(mh -> mh.handle(event));
            }
            return messages - pendingMessages;
        }

        private void terminate(int reflowedMessages) {
            Arrays.stream(this.eventHandlers).filter(m -> m instanceof EventPump.ReflowHandler).map(m -> (EventPump.ReflowHandler)((Object)m)).forEach(m -> this.terminate((EventPump.ReflowHandler)m, reflowedMessages));
        }

        private void terminate(EventPump.ReflowHandler reflowHandler, int reflowedMessages) {
            if (this.is.hasNext()) {
                reflowHandler.onBlock(reflowedMessages);
            } else {
                reflowHandler.onFinish(reflowedMessages);
            }
        }
    }
}

