package io.intino.konos.datalake.fs;

import io.intino.konos.datalake.Datalake;
import io.intino.konos.datalake.ReflowDispatcher;
import io.intino.ness.datalake.ReflowMessageInputStream;
import io.intino.ness.datalake.Scale;
import io.intino.ness.datalake.graph.DatalakeGraph;
import io.intino.ness.inl.Message;
import io.intino.tara.magritte.Graph;
import io.intino.tara.magritte.stores.ResourcesStore;
import java.io.File;
import java.time.Instant;
import java.util.List;
import java.util.stream.Collectors;

/* loaded from: input_file:io/intino/konos/datalake/fs/FSDatalake.class */
public class FSDatalake implements Datalake {
    private final DatalakeGraph datalake = new Graph(new ResourcesStore()).loadStashes(new String[]{"Datalake"}).as(DatalakeGraph.class);

    public FSDatalake(String str) {
        File file = new File(clean(str), "datalake");
        file.mkdirs();
        this.datalake.directory(file);
        this.datalake.scale(scaleOf(str));
    }

    public void drop(String str, Message message) {
        this.datalake.tank(str).drop(message);
    }

    @Override // io.intino.konos.datalake.Datalake
    public Datalake.ReflowSession reflow(int i, ReflowDispatcher reflowDispatcher, Instant instant) {
        return reflow(i, reflowDispatcher, instant, () -> {
        });
    }

    public Datalake.ReflowSession reflow(final int i, final ReflowDispatcher reflowDispatcher, final Instant instant, final Runnable runnable) {
        return new Datalake.ReflowSession() { // from class: io.intino.konos.datalake.fs.FSDatalake.1
            final ReflowMessageInputStream stream;
            int messages = 0;

            {
                this.stream = new ReflowMessageInputStream((List) reflowDispatcher.tanks().stream().map(tank -> {
                    return FSDatalake.this.datalake.tank(tank.name());
                }).collect(Collectors.toList()), instant);
            }

            @Override // io.intino.konos.datalake.Datalake.ReflowSession
            public void next() {
                while (this.stream.hasNext()) {
                    reflowDispatcher.dispatch(this.stream.next());
                    int i2 = this.messages + 1;
                    this.messages = i2;
                    if (i2 % i == 0) {
                        break;
                    }
                }
                reflowDispatcher.dispatch(this.stream.hasNext() ? createEndBlockMessage(this.messages) : createEndReflowMessage(this.messages));
            }

            private Message createEndBlockMessage(int i2) {
                return new Message("endBlock").set("count", Integer.valueOf(i2));
            }

            private Message createEndReflowMessage(int i2) {
                return new Message("endReflow").set("count", Integer.valueOf(i2));
            }

            @Override // io.intino.konos.datalake.Datalake.ReflowSession
            public void finish() {
                this.stream.close();
                runnable.run();
            }

            @Override // io.intino.konos.datalake.Datalake.ReflowSession
            public void play() {
            }

            @Override // io.intino.konos.datalake.Datalake.ReflowSession
            public void pause() {
            }
        };
    }

    @Override // io.intino.konos.datalake.Datalake
    public void commit() {
    }

    @Override // io.intino.konos.datalake.Datalake
    public void add(String str) {
        this.datalake.add(clean(str));
    }

    @Override // io.intino.konos.datalake.Datalake
    public void disconnect() {
        this.datalake.tankList().forEach((v0) -> {
            v0.terminate();
        });
    }

    @Override // io.intino.konos.datalake.Datalake
    public void connect(String... strArr) {
    }

    private String clean(String str) {
        int indexOf = str.indexOf("?");
        if (indexOf != -1) {
            str = str.substring(0, indexOf);
        }
        return str.replace("file://", "");
    }

    private Scale scaleOf(String str) {
        return str.contains("?") ? Scale.valueOf(str.split("=")[1]) : Scale.Day;
    }
}
