package io.intino.alexandria.datalake.file.eventsourcing;

import io.intino.alexandria.datalake.Datalake;
import io.intino.alexandria.datalake.file.eventsourcing.EventPump;
import io.intino.alexandria.event.Event;
import io.intino.alexandria.event.EventStream;
import java.util.Arrays;
import java.util.Objects;
import java.util.stream.Stream;

/* loaded from: input_file:io/intino/alexandria/datalake/file/eventsourcing/FileEventPump.class */
public class FileEventPump implements EventPump {
    private final Datalake.EventStore store;

    /* loaded from: input_file:io/intino/alexandria/datalake/file/eventsourcing/FileEventPump$ReflowBlock.class */
    private static class ReflowBlock {
        private final EventStream is;
        private final EventHandler[] eventHandlers;

        ReflowBlock(EventStream eventStream, EventHandler[] eventHandlerArr) {
            this.is = eventStream;
            this.eventHandlers = eventHandlerArr;
        }

        void reflow(int i) {
            terminate(process(i));
        }

        private int process(int i) {
            int i2 = i;
            while (this.is.hasNext()) {
                int i3 = i2;
                i2--;
                if (i3 < 0) {
                    break;
                }
                Event next = this.is.next();
                Arrays.stream(this.eventHandlers).forEach(eventHandler -> {
                    eventHandler.handle(next);
                });
            }
            return i - i2;
        }

        private void terminate(int i) {
            Arrays.stream(this.eventHandlers).filter(eventHandler -> {
                return eventHandler instanceof EventPump.ReflowHandler;
            }).map(eventHandler2 -> {
                return (EventPump.ReflowHandler) eventHandler2;
            }).forEach(reflowHandler -> {
                terminate(reflowHandler, i);
            });
        }

        private void terminate(EventPump.ReflowHandler reflowHandler, int i) {
            if (this.is.hasNext()) {
                reflowHandler.onBlock(i);
            } else {
                reflowHandler.onFinish(i);
            }
        }
    }

    public FileEventPump(Datalake.EventStore eventStore) {
        this.store = eventStore;
    }

    @Override // io.intino.alexandria.datalake.file.eventsourcing.EventPump
    public EventPump.Reflow reflow(final EventPump.Reflow.Filter filter) {
        return new EventPump.Reflow() { // from class: io.intino.alexandria.datalake.file.eventsourcing.FileEventPump.1
            private EventStream is = new EventStream.Merge(tankInputStreams());

            EventStream tankInputStream(Datalake.EventStore.Tank tank) {
                EventPump.Reflow.Filter filter2 = filter;
                return tank.content(timetag -> {
                    return filter2.allow(tank, timetag);
                });
            }

            private EventStream[] tankInputStreams() {
                Stream<Datalake.EventStore.Tank> tanks = FileEventPump.this.store.tanks();
                EventPump.Reflow.Filter filter2 = filter;
                Objects.requireNonNull(filter2);
                return (EventStream[]) tanks.filter(filter2::allow).map(this::tankInputStream).toArray(i -> {
                    return new EventStream[i];
                });
            }

            @Override // io.intino.alexandria.datalake.file.eventsourcing.EventPump.Reflow
            public boolean hasNext() {
                return this.is.hasNext();
            }

            @Override // io.intino.alexandria.datalake.file.eventsourcing.EventPump.Reflow
            public void next(int i, EventHandler... eventHandlerArr) {
                new ReflowBlock(this.is, eventHandlerArr).reflow(i);
            }
        };
    }
}
