package io.intino.konos.datalake.fs;

import io.intino.konos.datalake.EventDatalake;
import io.intino.konos.datalake.Helper;
import io.intino.konos.datalake.ReflowConfiguration;
import io.intino.konos.datalake.ReflowDispatcher;
import io.intino.ness.datalake.ReflowMessageInputStream;
import io.intino.ness.datalake.Scale;
import io.intino.ness.datalake.graph.DatalakeGraph;
import io.intino.ness.datalake.graph.Tank;
import io.intino.ness.inl.Message;
import io.intino.tara.magritte.Graph;
import io.intino.tara.magritte.stores.ResourcesStore;
import java.io.File;
import java.time.Instant;
import java.util.AbstractMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/* loaded from: input_file:io/intino/konos/datalake/fs/FSEventDatalake.class */
public class FSEventDatalake implements EventDatalake {
    private final DatalakeGraph datalake = new Graph(new ResourcesStore()).loadStashes(new String[]{"Datalake"}).as(DatalakeGraph.class);

    public FSEventDatalake(File file, Scale scale) {
        file.mkdirs();
        this.datalake.directory(file);
        this.datalake.scale(scale);
    }

    public FSEventDatalake(String str) {
        File eventDatalakeDirectory = Helper.eventDatalakeDirectory(str);
        eventDatalakeDirectory.mkdirs();
        this.datalake.directory(eventDatalakeDirectory);
        this.datalake.scale(Helper.scaleOf(str));
    }

    public boolean put(String str, Message message) {
        this.datalake.tank(str).put(message);
        return true;
    }

    @Override // io.intino.konos.datalake.EventDatalake
    public EventDatalake.ReflowSession reflow(ReflowConfiguration reflowConfiguration, ReflowDispatcher reflowDispatcher) {
        return reflow(reflowConfiguration, reflowDispatcher, () -> {
        });
    }

    @Override // io.intino.konos.datalake.EventDatalake
    public EventDatalake.EventSession createEventSession() {
        return new FSEventSession(this.datalake.tankList(), this.datalake.scale());
    }

    public Tank tank(String str) {
        return this.datalake.tank(str);
    }

    public EventDatalake.ReflowSession reflow(final ReflowConfiguration reflowConfiguration, final ReflowDispatcher reflowDispatcher, final Runnable runnable) {
        return new EventDatalake.ReflowSession() { // from class: io.intino.konos.datalake.fs.FSEventDatalake.1
            final ReflowMessageInputStream stream;
            int messages = 0;

            {
                this.stream = new ReflowMessageInputStream(FSEventDatalake.this.map(reflowConfiguration));
            }

            @Override // io.intino.konos.datalake.EventDatalake.ReflowSession
            public void next() {
                while (this.stream.hasNext()) {
                    reflowDispatcher.dispatch(this.stream.next());
                    int i = this.messages + 1;
                    this.messages = i;
                    if (i % reflowConfiguration.blockSize().intValue() == 0) {
                        break;
                    }
                }
                reflowDispatcher.dispatch(this.stream.hasNext() ? createEndBlockMessage(this.messages) : createEndReflowMessage(this.messages));
            }

            private Message createEndBlockMessage(int i) {
                return new Message("endBlock").set("count", Integer.valueOf(i));
            }

            private Message createEndReflowMessage(int i) {
                return new Message("endReflow").set("count", Integer.valueOf(i));
            }

            @Override // io.intino.konos.datalake.EventDatalake.ReflowSession
            public void finish() {
                this.stream.close();
                runnable.run();
            }

            @Override // io.intino.konos.datalake.EventDatalake.ReflowSession
            public void play() {
            }

            @Override // io.intino.konos.datalake.EventDatalake.ReflowSession
            public void pause() {
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Map<Tank, Map.Entry<Instant, Instant>> map(ReflowConfiguration reflowConfiguration) {
        return (Map) reflowConfiguration.tankList().stream().collect(Collectors.toMap(tank -> {
            return this.datalake.tank(tank.name());
        }, tank2 -> {
            return new AbstractMap.SimpleEntry(tank2.from() == null ? Instant.MIN : tank2.from(), tank2.to() == null ? Instant.MAX : tank2.to());
        }));
    }

    public void commit() {
    }

    @Override // io.intino.konos.datalake.EventDatalake
    public EventDatalake.Tank add(String str) {
        this.datalake.add(str);
        return new FSTank(str, this);
    }

    @Override // io.intino.konos.datalake.EventDatalake
    public void disconnect() {
        this.datalake.tankList().forEach((v0) -> {
            v0.terminate();
        });
    }

    @Override // io.intino.konos.datalake.EventDatalake
    public boolean isConnected() {
        return true;
    }

    @Override // io.intino.konos.datalake.EventDatalake
    public void connect(String... strArr) {
    }

    @Override // io.intino.konos.datalake.EventDatalake
    public List<EventDatalake.User> users() {
        return Collections.emptyList();
    }
}
