/*
 * Decompiled with CFR 0.152.
 */
package io.intino.cosmos.datahub;

import com.google.gson.reflect.TypeToken;
import io.intino.alexandria.Json;
import io.intino.alexandria.Scale;
import io.intino.alexandria.Timetag;
import io.intino.alexandria.datalake.Datalake;
import io.intino.alexandria.datalake.file.FileDatalake;
import io.intino.alexandria.event.Event;
import io.intino.alexandria.ingestion.EventSession;
import io.intino.alexandria.ingestion.SessionHandler;
import io.intino.alexandria.logger.Logger;
import io.intino.alexandria.terminal.Connector;
import io.intino.alexandria.terminal.JmsConnector;
import io.intino.alexandria.terminal.remotedatalake.RemoteDatalake;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.TextMessage;
import java.io.File;
import java.io.IOException;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.activemq.command.ActiveMQTextMessage;

public class BigBangTerminal {
    public static String[] subscriptionChannels = new String[0];
    private final Connector connector;
    private final Set<BiConsumer> datamartConsumers = new HashSet<BiConsumer>();
    private volatile Datalake datalake;
    private String sourceSelector;
    private final Map<BiConsumer<?, String>, List<Consumer<Event>>> consumers = new HashMap();

    public BigBangTerminal(Connector connector) {
        this.connector = connector;
    }

    public void initDatamarts() {
        this.initDatamarts(null);
    }

    public void initDatamarts(String sourceSelector) {
    }

    public void publish(Event event) {
        switch (event.type()) {
            default: 
        }
        Logger.warn(this.getClass().getSimpleName() + " is not configured to publish " + event.type() + " events.");
    }

    public void publish(Event first, Event ... others) {
        this.publish(Stream.concat(Stream.of(first), Arrays.stream(others)));
    }

    public void publish(Event[] events) {
        this.publish(Arrays.stream(events));
    }

    public void publish(Collection<Event> events) {
        this.publish(events.stream());
    }

    public void publish(Stream<Event> events) {
        events.filter(e -> this.channelOf(e.type()) != null).collect(Collectors.groupingBy(Event::type)).forEach((type, eventList) -> this.connector.sendEvents(this.channelOf((String)type), (List<Event>)eventList));
    }

    public synchronized Datalake datalake() {
        return this.datalake != null ? this.datalake : (this.datalake = this.instantiateDatalake());
    }

    private Datalake instantiateDatalake() {
        try {
            Message message = this.connector.requestResponse("service.ness.datalake", this.request("Datalake"), 5L, TimeUnit.SECONDS);
            if (message == null) {
                return null;
            }
            String path = ((TextMessage)message).getText();
            if (path == null) {
                return null;
            }
            return new File(path).exists() ? new FileDatalake(new File(path)) : new RemoteDatalake((JmsConnector)this.connector);
        }
        catch (JMSException e) {
            Logger.error(e);
            return null;
        }
    }

    public BatchSession batch(File temporalStageDirectory) {
        return new BatchSession(temporalStageDirectory);
    }

    public BatchSession batch(File temporalStageDirectory, Config config) {
        return new BatchSession(temporalStageDirectory, config);
    }

    public synchronized List<MetaMessage> metamodel() {
        Message response = this.connector.requestResponse("service.ness.metamodel", this.request("Metamodel"), 10L, TimeUnit.SECONDS);
        if (response == null) {
            return null;
        }
        try {
            return (List)Json.fromJson(((TextMessage)response).getText(), new TypeToken<ArrayList<MetaMessage>>(this){}.getType());
        }
        catch (Exception e) {
            Logger.error(e);
            return null;
        }
    }

    public synchronized void requestSeal() {
        this.connector.requestResponse("service.ness.seal", this.request("Seal"), 30L, TimeUnit.MINUTES);
    }

    public synchronized Instant requestLastSeal() {
        Message message = this.connector.requestResponse("service.ness.seal.last", this.request("LastSeal"), 10L, TimeUnit.MINUTES);
        if (message == null) {
            return Instant.now();
        }
        try {
            return Instant.parse(((TextMessage)message).getText());
        }
        catch (Exception e) {
            Logger.error(e);
            return Instant.now();
        }
    }

    private Message request(String type) {
        return this.request(type, Collections.emptyMap());
    }

    private Message request(String type, Map<String, String> attributes) {
        try {
            ActiveMQTextMessage m = new ActiveMQTextMessage();
            io.intino.alexandria.message.Message message = new io.intino.alexandria.message.Message(type);
            attributes.forEach(message::set);
            m.setText(message.toString());
            return m;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private String channelOf(String type) {
        switch (type) {
            default: 
        }
        return null;
    }

    public class BatchSession {
        private final File temporalStage;
        private final SessionHandler sessionHandler;
        private final EventSession session;
        private final Scale scale;

        public BatchSession(File temporalStage) {
            this(temporalStage, new Config());
        }

        public BatchSession(File temporalStage, Config config) {
            this.temporalStage = temporalStage;
            this.scale = config.scale;
            this.sessionHandler = new SessionHandler(temporalStage);
            this.session = this.sessionHandler.createEventSession(config.eventsBufferSize);
        }

        public void feed(Event event) throws IOException {
            this.session.put(this.tankOf(event), event.ss(), Timetag.of(event.ts(), this.scale), event.format(), event);
        }

        public void feed(Event event, Scale scale) throws IOException {
            this.session.put(this.tankOf(event), event.ss(), Timetag.of(event.ts(), scale), event.format(), event);
        }

        public void flush() {
            this.session.flush();
        }

        public void push(File dataHubStage) {
            this.session.close();
            this.sessionHandler.pushTo(dataHubStage);
        }

        public void push(String host, String user, String dataHubStageAbsolutePath) {
            this.session.close();
            try {
                List<File> files = BatchSession.allFilesIn(this.temporalStage.toPath(), path -> path.getName().endsWith(".session")).collect(Collectors.toList());
                this.upload(files, host, user, dataHubStageAbsolutePath);
                this.temporalStage.renameTo(new File(this.temporalStage.getParentFile(), this.temporalStage.getName() + ".treated"));
            }
            catch (Exception e) {
                Logger.error(e);
            }
        }

        private static Stream<File> allFilesIn(Path path, Predicate<File> filter) throws Exception {
            Stream.Builder streamBuilder = Stream.builder();
            try (Stream<Path> paths = Files.walk(path, new FileVisitOption[0]);){
                paths.filter(p -> Files.isRegularFile(p, new LinkOption[0]) && filter.test(p.toFile())).forEach(p -> streamBuilder.add(p.toFile()));
            }
            return streamBuilder.build();
        }

        public synchronized void seal() {
            BigBangTerminal.this.connector.requestResponse("service.ness.seal", BigBangTerminal.this.request("Seal", Map.of("stage", this.temporalStage.getName())));
        }

        private void upload(List<File> sessions, String host, String user, String dataHubStageAbsolutePath) {
            try {
                String connectionChain = user + "@" + host + ":" + dataHubStageAbsolutePath;
                Logger.info("Uploading sessions to " + connectionChain + "...");
                for (File s : sessions) {
                    Process process = new ProcessBuilder("scp", s.getAbsolutePath(), connectionChain).inheritIO().start();
                    process.waitFor(1L, TimeUnit.HOURS);
                }
                Logger.info("sessions uploaded");
            }
            catch (IOException | InterruptedException exception) {
                // empty catch block
            }
        }

        private String tankOf(Event event) {
            return event.type();
        }
    }

    public static class Config {
        private int eventsBufferSize = 1000000;
        private int setsBufferSize = 1000000;
        private Scale scale = Scale.Day;

        public Config scale(Scale scale) {
            this.scale = scale;
            return this;
        }

        public Config eventsBufferSize(int eventsBufferSize) {
            this.eventsBufferSize = eventsBufferSize;
            return this;
        }

        public Config setsBufferSize(int setsBufferSize) {
            this.setsBufferSize = setsBufferSize;
            return this;
        }
    }

    public record MetaAttribute(String name, String type) {
    }

    public record MetaMessage(String name, boolean assertion, boolean multiple, List<MetaAttribute> attributes, List<MetaMessage> components, List<String> hierarchy) {
    }
}

