/*
 * Decompiled with CFR 0.152.
 */
package io.intino.konos.datalake.fs;

import io.intino.konos.datalake.Datalake;
import io.intino.konos.datalake.ReflowConfiguration;
import io.intino.konos.datalake.ReflowDispatcher;
import io.intino.ness.datalake.ReflowMessageInputStream;
import io.intino.ness.datalake.Scale;
import io.intino.ness.datalake.graph.DatalakeGraph;
import io.intino.ness.datalake.graph.Tank;
import io.intino.ness.inl.Message;
import io.intino.tara.magritte.Graph;
import io.intino.tara.magritte.Store;
import io.intino.tara.magritte.stores.ResourcesStore;
import java.io.File;
import java.io.IOException;
import java.util.stream.Collectors;

public class FSDatalake
implements Datalake {
    private final DatalakeGraph datalake = (DatalakeGraph)new Graph((Store)new ResourcesStore()).loadStashes(new String[]{"Datalake"}).as(DatalakeGraph.class);

    public FSDatalake(String url) {
        File store = this.datalakeDirectory(url);
        store.mkdirs();
        this.datalake.directory(store);
        this.datalake.scale(this.scaleOf(url));
    }

    public void put(String name, Message[] messages) {
        for (Message message : messages) {
            this.datalake.tank(name).put(message);
        }
    }

    @Override
    public Datalake.ReflowSession reflow(ReflowConfiguration reflow, ReflowDispatcher dispatcher) {
        return this.reflow(reflow, dispatcher, () -> {});
    }

    public Tank tank(String name) {
        return this.datalake.tank(name);
    }

    public Datalake.ReflowSession reflow(final ReflowConfiguration reflow, final ReflowDispatcher dispatcher, final Runnable onFinish) {
        return new Datalake.ReflowSession(){
            final ReflowMessageInputStream stream;
            int messages;
            {
                this.stream = new ReflowMessageInputStream(reflow.tankList().stream().collect(Collectors.toMap(t -> FSDatalake.this.datalake.tank(t.name()), ReflowConfiguration.Tank::from)));
                this.messages = 0;
            }

            @Override
            public void next() {
                while (this.stream.hasNext()) {
                    Message next = this.stream.next();
                    dispatcher.dispatch(next);
                    if (++this.messages % reflow.blockSize() != 0) continue;
                    break;
                }
                dispatcher.dispatch(this.stream.hasNext() ? this.createEndBlockMessage(this.messages) : this.createEndReflowMessage(this.messages));
            }

            private Message createEndBlockMessage(int count) {
                return new Message("endBlock").set("count", Integer.valueOf(count));
            }

            private Message createEndReflowMessage(int count) {
                return new Message("endReflow").set("count", Integer.valueOf(count));
            }

            @Override
            public void finish() {
                this.stream.close();
                onFinish.run();
            }

            @Override
            public void play() {
            }

            @Override
            public void pause() {
            }
        };
    }

    @Override
    public void commit() {
    }

    @Override
    public void add(String tank) {
        this.datalake.add(this.clean(tank));
    }

    @Override
    public void disconnect() {
        this.datalake.tankList().forEach(Tank::terminate);
    }

    @Override
    public void connect(String ... args) {
    }

    private String clean(String url) {
        int index = url.indexOf("?");
        if (index != -1) {
            url = url.substring(0, index);
        }
        return url.replace("file://", "");
    }

    private Scale scaleOf(String url) {
        return url.contains("?") ? Scale.valueOf((String)url.split("=")[1]) : Scale.Day;
    }

    private File datalakeDirectory(String url) {
        try {
            return new File(this.clean(url), "datalake").getCanonicalFile();
        }
        catch (IOException e) {
            return new File(this.clean(url), "datalake");
        }
    }
}

