package io.intino.datahub.datalake.pump;

import io.intino.alexandria.datalake.Datalake;
import io.intino.alexandria.event.Event;
import io.intino.alexandria.event.EventStream;
import io.intino.datahub.datalake.pump.EventPump;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Objects;
import java.util.stream.Stream;

/* loaded from: input_file:io/intino/datahub/datalake/pump/FileEventPump.class */
public class FileEventPump implements EventPump {
    private final Datalake.Store<? extends Event> store;

    /* loaded from: input_file:io/intino/datahub/datalake/pump/FileEventPump$ReflowBlock.class */
    private static class ReflowBlock {
        private final Iterator<? extends Event> is;
        private final EventPump.EventHandler[] eventHandlers;

        ReflowBlock(Iterator<? extends Event> it, EventPump.EventHandler[] eventHandlerArr) {
            this.is = it;
            this.eventHandlers = eventHandlerArr;
        }

        void reflow(int i) {
            terminate(process(i));
        }

        private int process(int i) {
            int i2 = i;
            while (this.is.hasNext()) {
                int i3 = i2;
                i2--;
                if (i3 < 0) {
                    break;
                }
                Event next = this.is.next();
                Arrays.stream(this.eventHandlers).forEach(eventHandler -> {
                    eventHandler.handle(next);
                });
            }
            return i - i2;
        }

        private void terminate(int i) {
            Arrays.stream(this.eventHandlers).filter(eventHandler -> {
                return eventHandler instanceof EventPump.ReflowHandler;
            }).map(eventHandler2 -> {
                return (EventPump.ReflowHandler) eventHandler2;
            }).forEach(reflowHandler -> {
                terminate(reflowHandler, i);
            });
        }

        private void terminate(EventPump.ReflowHandler reflowHandler, int i) {
            if (this.is.hasNext()) {
                reflowHandler.onBlock(i);
            } else {
                reflowHandler.onFinish(i);
            }
        }
    }

    public FileEventPump(Datalake.Store<? extends Event> store) {
        this.store = store;
    }

    @Override // io.intino.datahub.datalake.pump.EventPump
    public EventPump.Reflow reflow(final EventPump.Reflow.Filter filter) {
        return new EventPump.Reflow() { // from class: io.intino.datahub.datalake.pump.FileEventPump.1
            private final Stream<Stream<Event>> streamStream = tankInputStreams();
            private final Iterator<? extends Event> iterator = EventStream.merge(this.streamStream).iterator();

            private Stream<Stream<Event>> tankInputStreams() {
                Stream tanks = FileEventPump.this.store.tanks();
                EventPump.Reflow.Filter filter2 = filter;
                Objects.requireNonNull(filter2);
                return tanks.filter(filter2::allow).map(this::tankInputStream);
            }

            Stream<Event> tankInputStream(Datalake.Store.Tank<? extends Event> tank) {
                EventPump.Reflow.Filter filter2 = filter;
                return tank.content((source, timetag) -> {
                    return filter2.allow(tank, source, timetag);
                });
            }

            @Override // io.intino.datahub.datalake.pump.EventPump.Reflow
            public boolean hasNext() {
                return this.iterator.hasNext();
            }

            @Override // io.intino.datahub.datalake.pump.EventPump.Reflow
            public void next(int i, EventPump.EventHandler... eventHandlerArr) {
                new ReflowBlock(this.iterator, eventHandlerArr).reflow(i);
            }
        };
    }
}
