package io.intino.cesar.box.datalake;

import io.intino.alexandria.inl.Message;
import io.intino.alexandria.jmx.JMXServer;
import io.intino.alexandria.logger.Logger;
import io.intino.cesar.box.CesarBox;
import io.intino.ness.core.Datalake;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/* loaded from: input_file:io/intino/cesar/box/datalake/NessOperations.class */
public class NessOperations implements NessOperationsMBean {
    private final CesarBox box;
    private final ReflowAssistant assistant;
    private int processed = 0;
    private Datalake.EventStore.Reflow session;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/intino/cesar/box/datalake/NessOperations$ReflowHandler.class */
    public class ReflowHandler implements Datalake.EventStore.MessageHandler, Datalake.EventStore.ReflowHandler {
        private ReflowHandler() {
        }

        public void handle(Message message) {
            Datalake.handlers().get(message.type()).handle(message);
        }

        public void onBlock(int i) {
            NessOperations.this.assistant.onBlock();
            Logger.info("Block processed - " + i + " messages processed");
            NessOperations.this.session.next(NessOperations.this.assistant.defaultBlockSize(), new Datalake.EventStore.MessageHandler[]{this});
        }

        public void onFinish(int i) {
            NessOperations.this.assistant.onFinish();
            Datalake.registerTanks(NessOperations.this.box);
            Logger.info("Reflow finished - " + i + " messages processed");
        }
    }

    public NessOperations(CesarBox cesarBox) {
        this.box = cesarBox;
        this.assistant = new ReflowAssistant(cesarBox);
    }

    @Override // io.intino.cesar.box.datalake.NessOperationsMBean
    public List<String> help() {
        ArrayList arrayList = new ArrayList();
        arrayList.add("boolean reflow():Starts reflow mode to reproduce events coming from datalake");
        arrayList.add("boolean reflow(String from):Starts reflow mode to reproduce events coming from datalake since the instant parameter");
        arrayList.add("boolean customReflow(String reflowConfiguration):Starts reflow mode to reproduce events coming from datalake since the instant parameter");
        return arrayList;
    }

    @Override // io.intino.cesar.box.datalake.NessOperationsMBean
    public boolean reflow() {
        return reflow(this.assistant.filter());
    }

    private boolean reflow(Datalake.EventStore.Reflow.Filter filter) {
        Logger.info("Starting Reflow...");
        this.assistant.onStart();
        Datalake.unsubscribeAll(this.box.nessAccessor());
        this.session = this.box.nessAccessor().eventStore().reflow(filter);
        this.session.next(this.assistant.defaultBlockSize(), new Datalake.EventStore.MessageHandler[]{messageHandler()});
        return true;
    }

    private Datalake.EventStore.MessageHandler messageHandler() {
        return new ReflowHandler();
    }

    public static JMXServer init(CesarBox cesarBox) {
        JMXServer jMXServer = new JMXServer(Collections.singletonMap("io.intino.cesar.box.datalake.NessOperations", new Object[]{cesarBox}));
        jMXServer.init();
        return jMXServer;
    }
}
