package io.intino.konos.datalake.fs;

import io.intino.konos.datalake.Datalake;
import io.intino.konos.datalake.ReflowConfiguration;
import io.intino.konos.datalake.ReflowDispatcher;
import io.intino.ness.datalake.ReflowMessageInputStream;
import io.intino.ness.datalake.Scale;
import io.intino.ness.datalake.graph.DatalakeGraph;
import io.intino.ness.datalake.graph.Tank;
import io.intino.ness.inl.Message;
import io.intino.tara.magritte.Graph;
import io.intino.tara.magritte.stores.ResourcesStore;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/* loaded from: input_file:io/intino/konos/datalake/fs/FSDatalake.class */
public class FSDatalake implements Datalake {
    private final DatalakeGraph datalake = new Graph(new ResourcesStore()).loadStashes(new String[]{"Datalake"}).as(DatalakeGraph.class);

    public FSDatalake(String str) {
        File datalakeDirectory = datalakeDirectory(str);
        datalakeDirectory.mkdirs();
        this.datalake.directory(datalakeDirectory);
        this.datalake.scale(scaleOf(str));
    }

    public void put(String str, Message[] messageArr) {
        for (Message message : messageArr) {
            this.datalake.tank(str).put(message);
        }
    }

    @Override // io.intino.konos.datalake.Datalake
    public Datalake.ReflowSession reflow(ReflowConfiguration reflowConfiguration, ReflowDispatcher reflowDispatcher) {
        return reflow(reflowConfiguration, reflowDispatcher, () -> {
        });
    }

    public Tank tank(String str) {
        return this.datalake.tank(str);
    }

    public Datalake.ReflowSession reflow(final ReflowConfiguration reflowConfiguration, final ReflowDispatcher reflowDispatcher, final Runnable runnable) {
        return new Datalake.ReflowSession() { // from class: io.intino.konos.datalake.fs.FSDatalake.1
            final ReflowMessageInputStream stream;
            int messages = 0;

            {
                this.stream = new ReflowMessageInputStream((Map) reflowConfiguration.tankList().stream().collect(Collectors.toMap(tank -> {
                    return FSDatalake.this.datalake.tank(tank.name());
                }, (v0) -> {
                    return v0.from();
                })));
            }

            @Override // io.intino.konos.datalake.Datalake.ReflowSession
            public void next() {
                while (this.stream.hasNext()) {
                    reflowDispatcher.dispatch(this.stream.next());
                    int i = this.messages + 1;
                    this.messages = i;
                    if (i % reflowConfiguration.blockSize().intValue() == 0) {
                        break;
                    }
                }
                reflowDispatcher.dispatch(this.stream.hasNext() ? createEndBlockMessage(this.messages) : createEndReflowMessage(this.messages));
            }

            private Message createEndBlockMessage(int i) {
                return new Message("endBlock").set("count", Integer.valueOf(i));
            }

            private Message createEndReflowMessage(int i) {
                return new Message("endReflow").set("count", Integer.valueOf(i));
            }

            @Override // io.intino.konos.datalake.Datalake.ReflowSession
            public void finish() {
                this.stream.close();
                runnable.run();
            }

            @Override // io.intino.konos.datalake.Datalake.ReflowSession
            public void play() {
            }

            @Override // io.intino.konos.datalake.Datalake.ReflowSession
            public void pause() {
            }
        };
    }

    @Override // io.intino.konos.datalake.Datalake
    public void commit() {
    }

    @Override // io.intino.konos.datalake.Datalake
    public void add(String str) {
        this.datalake.add(clean(str));
    }

    @Override // io.intino.konos.datalake.Datalake
    public void disconnect() {
        this.datalake.tankList().forEach((v0) -> {
            v0.terminate();
        });
    }

    @Override // io.intino.konos.datalake.Datalake
    public void connect(String... strArr) {
    }

    @Override // io.intino.konos.datalake.Datalake
    public List<Datalake.User> users() {
        return Collections.emptyList();
    }

    private String clean(String str) {
        int indexOf = str.indexOf("?");
        if (indexOf != -1) {
            str = str.substring(0, indexOf);
        }
        return str.replace("file://", "");
    }

    private Scale scaleOf(String str) {
        return str.contains("?") ? Scale.valueOf(str.split("=")[1]) : Scale.Day;
    }

    private File datalakeDirectory(String str) {
        try {
            return new File(clean(str), "datalake").getCanonicalFile();
        } catch (IOException e) {
            return new File(clean(str), "datalake");
        }
    }
}
