package io.intino.consul.terminal;

import io.intino.alexandria.datalake.Datalake;
import io.intino.alexandria.datalake.file.FileDatalake;
import io.intino.alexandria.event.Event;
import io.intino.alexandria.jms.ConnectionConfig;
import io.intino.alexandria.logger.Logger;
import io.intino.alexandria.terminal.JmsConnector;
import io.intino.alexandria.terminal.remotedatalake.RemoteDatalake;
import io.intino.cosmos.datahub.ObserverTerminal;
import io.intino.cosmos.datahub.datamarts.master.MasterDatamart;
import io.intino.cosmos.datahub.messages.universe.ApplicationAssertion;
import io.intino.cosmos.datahub.messages.universe.ApplicationJavaAssertion;
import io.intino.cosmos.datahub.messages.universe.ComputerAssertion;
import io.intino.cosmos.datahub.messages.universe.ObserverAssertion;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.TextMessage;
import java.io.File;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.apache.activemq.Service;
import org.apache.activemq.command.ActiveMQTextMessage;

/* loaded from: input_file:io/intino/consul/terminal/Terminal.class */
public class Terminal implements Service {
    private final String cosulID;
    private final JmsConnector connector;
    private final Map<BiConsumer<?, String>, List<Consumer<Event>>> consumers = new HashMap();
    private final ObserverTerminal terminal;
    private ConsulService service;

    public Terminal(String str, ConnectionConfig connectionConfig) {
        this.cosulID = str;
        this.connector = new JmsConnector(connectionConfig, null);
        this.terminal = new ObserverTerminal(this.connector);
    }

    @Override // org.apache.activemq.Service
    public void start() {
        startConnector();
        this.terminal.initDatamarts("ss LIKE '" + this.cosulID + "%' AND type IN (" + String.join(", ", quote(ComputerAssertion.class.getSimpleName()), quote(ObserverAssertion.class.getSimpleName()), quote(ApplicationAssertion.class.getSimpleName()), quote(ApplicationJavaAssertion.class.getSimpleName())) + ")");
        this.service = new ConsulService(this.connector, this.connector.clientId());
    }

    private String quote(String str) {
        return "'" + str + "'";
    }

    public MasterDatamart master() {
        return this.terminal.datamart();
    }

    @Override // org.apache.activemq.Service
    public void stop() {
        this.connector.stop();
    }

    private void startConnector() {
        if (this.connector.connection() != null) {
            return;
        }
        if (this.connector.connection() == null) {
            this.connector.start();
        }
        while (this.connector.connection() == null) {
            try {
                Thread.sleep(30000L);
                this.connector.start();
            } catch (InterruptedException e) {
                Logger.error(e);
            }
        }
    }

    public ConsulService service() {
        return this.service;
    }

    public JmsConnector connector() {
        return this.connector;
    }

    public void publish(Event event) {
        this.terminal.publish(event);
    }

    public void publish(Event event, Event... eventArr) {
        this.terminal.publish(event, eventArr);
    }

    public void publish(Event[] eventArr) {
        this.terminal.publish(eventArr);
    }

    public void publish(Collection<Event> collection) {
        this.terminal.publish(collection);
    }

    public void publish(Stream<Event> stream) {
        this.terminal.publish(stream);
    }

    public Datalake datalake() {
        String text;
        try {
            Message requestResponse = this.connector.requestResponse("service.ness.datalake.eventstore", request("Datalake"), 5L, TimeUnit.SECONDS);
            if (requestResponse == null || (text = ((TextMessage) requestResponse).getText()) == null) {
                return null;
            }
            return new File(text).exists() ? new FileDatalake(new File(text)) : new RemoteDatalake(this.connector);
        } catch (JMSException e) {
            Logger.error(e);
            return null;
        }
    }

    private Message request(String str) {
        return request(str, Collections.emptyMap());
    }

    private Message request(String str, Map<String, String> map) {
        try {
            ActiveMQTextMessage activeMQTextMessage = new ActiveMQTextMessage();
            io.intino.alexandria.message.Message message = new io.intino.alexandria.message.Message(str);
            Objects.requireNonNull(message);
            map.forEach(message::set);
            activeMQTextMessage.setText(message.toString());
            return activeMQTextMessage;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}
