package io.intino.ness.core.fs;

import io.intino.alexandria.inl.Message;
import io.intino.alexandria.zim.ZimStream;
import io.intino.ness.core.Datalake;
import java.io.File;
import java.util.Arrays;
import java.util.stream.Stream;

/* loaded from: input_file:io/intino/ness/core/fs/FSEventStore.class */
public class FSEventStore implements Datalake.EventStore {
    public static final String EventExtension = ".zim";
    public static final String SessionExtension = ".inl";
    private File root;

    /* loaded from: input_file:io/intino/ness/core/fs/FSEventStore$ReflowBlock.class */
    private static class ReflowBlock {
        private final ZimStream is;
        private final Datalake.EventStore.MessageHandler[] messageHandlers;

        ReflowBlock(ZimStream zimStream, Datalake.EventStore.MessageHandler[] messageHandlerArr) {
            this.is = zimStream;
            this.messageHandlers = messageHandlerArr;
        }

        void reflow(int i) {
            terminate(process(i));
        }

        private int process(int i) {
            int i2 = i;
            while (this.is.hasNext()) {
                int i3 = i2;
                i2--;
                if (i3 < 0) {
                    break;
                }
                Message next = this.is.next();
                Arrays.stream(this.messageHandlers).forEach(messageHandler -> {
                    messageHandler.handle(next);
                });
            }
            return i - i2;
        }

        private void terminate(int i) {
            Arrays.stream(this.messageHandlers).filter(messageHandler -> {
                return messageHandler instanceof Datalake.EventStore.ReflowHandler;
            }).map(messageHandler2 -> {
                return (Datalake.EventStore.ReflowHandler) messageHandler2;
            }).forEach(reflowHandler -> {
                terminate(reflowHandler, i);
            });
        }

        private void terminate(Datalake.EventStore.ReflowHandler reflowHandler, int i) {
            if (this.is.hasNext()) {
                reflowHandler.onBlock(i);
            } else {
                reflowHandler.onFinish(i);
            }
        }
    }

    public FSEventStore(File file) {
        this.root = file;
    }

    @Override // io.intino.ness.core.Datalake.EventStore
    public Stream<Datalake.EventStore.Tank> tanks() {
        return FS.foldersIn(this.root).map(FSEventTank::new);
    }

    @Override // io.intino.ness.core.Datalake.EventStore
    public Datalake.EventStore.Tank tank(String str) {
        return new FSEventTank(new File(this.root, str));
    }

    @Override // io.intino.ness.core.Datalake.EventStore
    public Datalake.EventStore.Reflow reflow(final Datalake.EventStore.Reflow.Filter filter) {
        return new Datalake.EventStore.Reflow() { // from class: io.intino.ness.core.fs.FSEventStore.1
            private ZimStream is = new ZimStream.Merge(tankInputStreams());

            ZimStream tankInputStream(Datalake.EventStore.Tank tank) {
                Datalake.EventStore.Reflow.Filter filter2 = filter;
                return tank.content(timetag -> {
                    return filter2.allow(tank, timetag);
                });
            }

            private ZimStream[] tankInputStreams() {
                Stream<Datalake.EventStore.Tank> tanks = FSEventStore.this.tanks();
                Datalake.EventStore.Reflow.Filter filter2 = filter;
                filter2.getClass();
                return (ZimStream[]) tanks.filter(filter2::allow).map(this::tankInputStream).toArray(i -> {
                    return new ZimStream[i];
                });
            }

            @Override // io.intino.ness.core.Datalake.EventStore.Reflow
            public void next(int i, Datalake.EventStore.MessageHandler... messageHandlerArr) {
                new ReflowBlock(this.is, messageHandlerArr).reflow(i);
            }
        };
    }

    @Override // io.intino.ness.core.Datalake.EventStore
    public Datalake.EventStore.Subscription subscribe(Datalake.EventStore.Tank tank) {
        return (str, messageHandlerArr) -> {
        };
    }

    @Override // io.intino.ness.core.Datalake.EventStore
    public void unsubscribe(Datalake.EventStore.Tank tank) {
    }
}
