/*
 * Decompiled with CFR 0.152.
 */
package io.intino.cesar.datahub;

import io.intino.alexandria.Scale;
import io.intino.alexandria.Timetag;
import io.intino.alexandria.event.Event;
import io.intino.alexandria.event.SessionEvent;
import io.intino.alexandria.ingestion.EventSession;
import io.intino.alexandria.ingestion.FS;
import io.intino.alexandria.ingestion.SessionHandler;
import io.intino.alexandria.ingestion.SetSession;
import io.intino.alexandria.logger.Logger;
import io.intino.alexandria.message.Message;
import io.intino.alexandria.terminal.Connector;
import io.intino.cesar.datahub.events.consul.process.ProcessLog;
import io.intino.cesar.datahub.events.consul.process.ProcessStatus;
import io.intino.cesar.datahub.events.consul.server.ServerBoot;
import io.intino.cesar.datahub.events.consul.server.ServerInfo;
import io.intino.cesar.datahub.events.consul.server.ServerLog;
import io.intino.cesar.datahub.events.consul.server.ServerStatus;
import java.io.File;
import java.io.IOException;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;

public class ConsulTerminal {
    private final Connector connector;
    private final Map<BiConsumer<?, String>, List<Consumer<Event>>> consumers = new HashMap();
    public static String[] subscriptionChannels = new String[0];
    private static final Object monitor = new Object();

    public ConsulTerminal(Connector connector) {
        this.connector = connector;
    }

    public void publish(Object event, String split) {
        if (event instanceof ProcessLog) {
            this.publish((ProcessLog)event);
        }
        if (event instanceof ProcessStatus) {
            this.publish((ProcessStatus)event);
        }
        if (event instanceof ServerBoot) {
            this.publish((ServerBoot)event);
        }
        if (event instanceof ServerInfo) {
            this.publish((ServerInfo)event);
        }
        if (event instanceof ServerLog) {
            this.publish((ServerLog)event);
        }
        if (event instanceof ServerStatus) {
            this.publish((ServerStatus)event);
        }
    }

    public BatchSession batch(File temporalStageDirectory) {
        return new BatchSession(temporalStageDirectory);
    }

    public BatchSession batch(File temporalStageDirectory, Config config) {
        return new BatchSession(temporalStageDirectory, config);
    }

    public void publish(SessionEvent session) {
        this.connector.sendEvent("Session", (Event)session);
    }

    public void subscribe(SessionEventConsumer onEventReceived) {
        this.consumers.put(onEventReceived, List.of(event -> onEventReceived.accept(new SessionEvent(event.toMessage()), "Session")));
        this.connector.attachListener("Session", this.consumers.get(onEventReceived).get(0));
    }

    public void publish(ProcessLog processLog) {
        this.connector.sendEvent("consul.process.ProcessLog", (Event)processLog);
    }

    public void publish(ProcessStatus processStatus) {
        this.connector.sendEvent("consul.process.ProcessStatus", (Event)processStatus);
    }

    public void publish(ServerBoot serverBoot) {
        this.connector.sendEvent("consul.server.ServerBoot", (Event)serverBoot);
    }

    public void publish(ServerInfo serverInfo) {
        this.connector.sendEvent("consul.server.ServerInfo", (Event)serverInfo);
    }

    public void publish(ServerLog serverLog) {
        this.connector.sendEvent("consul.server.ServerLog", (Event)serverLog);
    }

    public void publish(ServerStatus serverStatus) {
        this.connector.sendEvent("consul.server.ServerStatus", (Event)serverStatus);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void requestSeal() {
        Object object = monitor;
        synchronized (object) {
            this.connector.requestResponse("service.ness.seal", new Event(new Message("Seal")).ts(Instant.now()).toString(), s -> {
                Object object = monitor;
                synchronized (object) {
                    monitor.notify();
                }
            });
            try {
                monitor.wait(1800000L);
            }
            catch (InterruptedException e) {
                Logger.error((Throwable)e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized Instant requestLastSeal() {
        AtomicReference<Instant> timestamp = new AtomicReference<Instant>(Instant.now());
        Object object = monitor;
        synchronized (object) {
            this.connector.requestResponse("service.ness.seal.last", new Event(new Message("LastSeal")).ts(Instant.now()).toString(), s -> {
                Object object = monitor;
                synchronized (object) {
                    if (s != null) {
                        timestamp.set(Instant.parse(s));
                    }
                    monitor.notify();
                }
            });
            try {
                monitor.wait(10000L);
            }
            catch (InterruptedException e) {
                Logger.error((Throwable)e);
            }
        }
        return timestamp.get();
    }

    public static interface ConsulserverServerInfoConsumer
    extends BiConsumer<ServerInfo, String> {
    }

    public static interface ConsulprocessProcessLogConsumer
    extends BiConsumer<ProcessLog, String> {
    }

    public static interface ConsulserverServerBootConsumer
    extends BiConsumer<ServerBoot, String> {
    }

    public static interface ConsulprocessProcessStatusConsumer
    extends BiConsumer<ProcessStatus, String> {
    }

    public static interface ConsulserverServerLogConsumer
    extends BiConsumer<ServerLog, String> {
    }

    public static interface ConsulserverServerStatusConsumer
    extends BiConsumer<ServerStatus, String> {
    }

    public static interface SessionEventConsumer
    extends BiConsumer<SessionEvent, String> {
    }

    public static class Config {
        private int eventsBufferSize = 1000000;
        private int setsBufferSize = 1000000;
        private Scale scale = Scale.Day;

        public Config scale(Scale scale) {
            this.scale = scale;
            return this;
        }

        public Config eventsBufferSize(int eventsBufferSize) {
            this.eventsBufferSize = eventsBufferSize;
            return this;
        }

        public Config setsBufferSize(int setsBufferSize) {
            this.setsBufferSize = setsBufferSize;
            return this;
        }
    }

    public class BatchSession {
        private final File temporalStage;
        private final SessionHandler sessionHandler;
        private final EventSession eventSession;
        private final SetSession setSession;
        private final Scale scale;

        public BatchSession(File temporalStage) {
            this(temporalStage, new Config());
        }

        public BatchSession(File temporalStage, Config config) {
            this.temporalStage = temporalStage;
            this.scale = config.scale;
            this.sessionHandler = new SessionHandler(temporalStage);
            this.eventSession = this.sessionHandler.createEventSession(config.eventsBufferSize);
            this.setSession = this.sessionHandler.createSetSession(config.setsBufferSize);
        }

        public void feed(Event event, String split) {
            this.eventSession.put(this.tankOf(event, split), Timetag.of((Instant)event.ts(), (Scale)this.scale), new Event[]{event});
        }

        public void feed(Event event, String split, Scale scale) {
            this.eventSession.put(this.tankOf(event, split), Timetag.of((Instant)event.ts(), (Scale)scale), new Event[]{event});
        }

        public void feed(SessionEvent event) {
            this.eventSession.put("Session", Timetag.of((Instant)event.ts(), (Scale)Scale.Day), new Event[]{event});
        }

        public void flush() {
            this.eventSession.flush();
            this.setSession.flush();
        }

        public void push(File dataHubStage) {
            this.eventSession.close();
            this.setSession.close();
            this.sessionHandler.pushTo(dataHubStage);
        }

        public void push(String host, String user, String dataHubStageAbsolutePath) {
            this.eventSession.close();
            this.setSession.close();
            List<File> files = FS.allFilesIn((File)this.temporalStage, path -> path.getName().endsWith(".session")).collect(Collectors.toList());
            this.upload(files, host, user, dataHubStageAbsolutePath);
            this.temporalStage.renameTo(new File(this.temporalStage.getParentFile(), this.temporalStage.getName() + ".treated"));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public synchronized void seal() {
            Object object = monitor;
            synchronized (object) {
                ConsulTerminal.this.connector.requestResponse("service.ness.seal", new Event(new Message("Seal").set("stage", this.temporalStage.getName())).ts(Instant.now()).toString(), s -> {
                    Object object = monitor;
                    synchronized (object) {
                        monitor.notify();
                    }
                });
                try {
                    monitor.wait();
                }
                catch (InterruptedException e) {
                    Logger.error((Throwable)e);
                }
            }
        }

        private void upload(List<File> sessions, String host, String user, String dataHubStageAbsolutePath) {
            try {
                String connectionChain = user + "@" + host + ":" + dataHubStageAbsolutePath;
                Logger.info((String)("Uploading sessions to " + connectionChain + "..."));
                for (File s : sessions) {
                    Process process = new ProcessBuilder("scp", s.getAbsolutePath(), connectionChain).inheritIO().start();
                    process.waitFor(1L, TimeUnit.HOURS);
                }
                Logger.info((String)"sessions uploaded");
            }
            catch (IOException | InterruptedException exception) {
                // empty catch block
            }
        }

        private String tankOf(Event event, String split) {
            if (event instanceof ProcessLog) {
                return "consul.process.ProcessLog";
            }
            if (event instanceof ProcessStatus) {
                return "consul.process.ProcessStatus";
            }
            if (event instanceof ServerBoot) {
                return "consul.server.ServerBoot";
            }
            if (event instanceof ServerInfo) {
                return "consul.server.ServerInfo";
            }
            if (event instanceof ServerLog) {
                return "consul.server.ServerLog";
            }
            if (event instanceof ServerStatus) {
                return "consul.server.ServerStatus";
            }
            return event.toMessage().type();
        }
    }
}

