/*
 * Decompiled with CFR 0.152.
 */
package io.intino.cesar.box.ness;

import io.intino.cesar.box.CesarBox;
import io.intino.cesar.box.ness.NessOperationsMBean;
import io.intino.cesar.box.ness.NessTanks;
import io.intino.cesar.box.ness.ReflowAssistant;
import io.intino.konos.datalake.MessageDispatcher;
import io.intino.konos.datalake.Ness;
import io.intino.konos.jmx.JMXServer;
import io.intino.tara.magritte.Graph;
import io.intino.tara.magritte.RemounterGraph;
import io.intino.tara.magritte.stores.FileSystemStore;
import java.io.File;
import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NessOperations
implements NessOperationsMBean {
    private static Logger logger = LoggerFactory.getLogger((String)"ROOT");
    private final CesarBox box;
    private final ReflowAssistant assistant;
    private int processed = 0;
    private Ness.ReflowSession session;

    public NessOperations(CesarBox box) {
        this.box = box;
        this.assistant = new ReflowAssistant(box);
    }

    @Override
    public List<String> help() {
        ArrayList<String> operations = new ArrayList<String>();
        operations.add("boolean reflow():Starts remount mode to reproduce events coming from datalake");
        return operations;
    }

    @Override
    public boolean reflow(String from) {
        return this.reflow(Instant.parse(from));
    }

    @Override
    public boolean reflow() {
        return this.reflow(Instant.MIN);
    }

    public boolean reflow(Instant from) {
        logger.info("Starting Reflow...");
        this.assistant.before();
        NessTanks.unregister();
        this.session = this.box.datalake().reflow(this.assistant.blockSize(), (MessageDispatcher)new NessTanks(), from, this.assistant.tanks().toArray(new Ness.Tank[0]));
        this.box.datalake().lastMessage(null);
        this.box.datalake().reset();
        Graph graph = this.assistant.graph();
        try {
            if (graph.store() instanceof FileSystemStore && from.equals(Instant.MIN)) {
                FileUtils.deleteDirectory((File)((FileSystemStore)graph.store()).directory());
            }
            RemounterGraph original = (RemounterGraph)new RemounterGraph(graph.store()).loadStashes(this.assistant.coreStashes());
            this.allowWriting((Graph)original, false);
            RemounterGraph clone = original.realClone();
            this.box.put(clone);
            this.session.next();
            this.timerTask((Graph)clone).run();
        }
        catch (IOException e) {
            logger.error(e.getMessage(), (Throwable)e);
            return false;
        }
        return true;
    }

    private TimerTask timerTask(final Graph clone) {
        return new TimerTask(){

            @Override
            public void run() {
                Graph newClone = NessOperations.this.write(clone);
                if (NessOperations.this.finished()) {
                    NessOperations.this.doWrite(clone);
                    NessOperations.this.onFinish(clone);
                } else {
                    new Timer("Reflow task", true).schedule(NessOperations.this.timerTask(newClone), 5000L);
                }
            }
        };
    }

    private Graph write(Graph clone) {
        if (this.box.datalake().receivedMessages() < this.assistant.blockSize()) {
            return clone;
        }
        Graph result = this.doWrite(clone);
        this.box.datalake().reset();
        this.session.next();
        return result;
    }

    private Graph doWrite(Graph clone) {
        logger.info("processed " + (this.processed += this.box.datalake().receivedMessages()));
        this.session.pause();
        this.allowWriting(clone, true);
        this.assistant.saveGraph(clone);
        Graph newClone = new RemounterGraph(clone.store()).loadStashes(this.assistant.coreStashes());
        this.allowWriting(newClone, false);
        this.box.put(newClone);
        this.session.play();
        return newClone;
    }

    private void allowWriting(Graph original, boolean flag) {
        if (original.store() instanceof FileSystemStore) {
            ((FileSystemStore)original.store()).allowWriting(flag);
        }
    }

    private void onFinish(Graph clone) {
        this.session.finish();
        Graph graph = new Graph(clone.store());
        this.allowWriting(graph, true);
        this.box.put(graph.loadStashes(this.assistant.coreStashes()));
        NessTanks.registerTanks(this.box);
        this.assistant.after();
        logger.info("Reflow finished successfully!");
    }

    private boolean finished() {
        return this.box.datalake().lastMessage() != null && this.box.datalake().lastMessage().until(Instant.now(), ChronoUnit.SECONDS) > 120L;
    }

    public static JMXServer init(CesarBox box) {
        JMXServer server = new JMXServer(Collections.singletonMap("io.intino.cesar.box.ness.NessOperations", new Object[]{box}));
        server.init();
        return server;
    }
}

