package io.intino.datahub.box.service.jms;

import com.google.gson.JsonObject;
import io.intino.alexandria.Json;
import io.intino.alexandria.Scale;
import io.intino.alexandria.Timetag;
import io.intino.alexandria.datalake.Datalake;
import io.intino.alexandria.datalake.file.message.MessageEventTub;
import io.intino.alexandria.event.message.MessageEvent;
import io.intino.alexandria.jms.MessageReader;
import io.intino.alexandria.logger.Logger;
import io.intino.datahub.box.DataHubBox;
import io.intino.datahub.broker.jms.MessageTranslator;
import io.intino.datahub.datamart.MasterDatamart;
import io.intino.datahub.datamart.serialization.MasterDatamartSerializer;
import io.intino.datahub.datamart.serialization.MasterDatamartSnapshots;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQTextMessage;

/* loaded from: input_file:io/intino/datahub/box/service/jms/MessageStoreRequest.class */
public class MessageStoreRequest {
    private final DataHubBox box;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/intino/datahub/box/service/jms/MessageStoreRequest$Tank.class */
    public static class Tank {
        String name;
        String scale;
        List<String> sources;

        public Tank(String str, String str2, List<String> list) {
            this.name = str;
            this.scale = str2;
            this.sources = list;
        }
    }

    public MessageStoreRequest(DataHubBox dataHubBox) {
        this.box = dataHubBox;
    }

    public Stream<Message> accept(Message message) {
        try {
            String textFrom = MessageReader.textFrom(message);
            return textFrom.startsWith("datamart") ? handleDatamartDownload(textFrom) : handleDatalakeDownload(textFrom);
        } catch (Throwable th) {
            Logger.error(th);
            return Stream.empty();
        }
    }

    private Stream<Message> handleDatamartDownload(String str) {
        String[] split = str.split(":", 3);
        if (split.length < 3) {
            return fail("Datamart requests must be like this: datamart:<name>:[snapshots | timetag], but it was " + str);
        }
        String trim = split[1].trim();
        String trim2 = split[2].trim();
        return trim2.equals("snapshots") ? listAvailableSnapshotsOf(trim) : downloadDatamart(trim, trim2);
    }

    private Stream<Message> downloadDatamart(String str, String str2) {
        if (!str2.isEmpty()) {
            return (Stream) MasterDatamartSnapshots.loadMostRecentSnapshotTo(this.box.datamarts().root(), str, asTimetag(str2), this.box.graph()).map((v0) -> {
                return v0.datamart();
            }).map(this::downloadDatamart).orElse(Stream.empty());
        }
        MasterDatamart<?> masterDatamart = this.box.datamarts().get(str);
        return masterDatamart == null ? Stream.empty() : downloadDatamart(masterDatamart);
    }

    private Timetag asTimetag(String str) {
        return str.isEmpty() ? Timetag.of(LocalDate.now(), Scale.Day) : Timetag.of(str);
    }

    private Stream<Message> listAvailableSnapshotsOf(String str) {
        List<Timetag> listAvailableSnapshotsOf = MasterDatamartSnapshots.listAvailableSnapshotsOf(this.box.datamarts().root(), str);
        if (listAvailableSnapshotsOf.isEmpty()) {
            return Stream.empty();
        }
        try {
            ActiveMQTextMessage activeMQTextMessage = new ActiveMQTextMessage();
            activeMQTextMessage.setIntProperty("size", listAvailableSnapshotsOf.size());
            activeMQTextMessage.setText((String) listAvailableSnapshotsOf.stream().map((v0) -> {
                return v0.value();
            }).collect(Collectors.joining(",")));
            return Stream.of(activeMQTextMessage);
        } catch (Exception e) {
            Logger.error(e);
            return Stream.empty();
        }
    }

    private Stream<Message> downloadDatamart(MasterDatamart<?> masterDatamart) {
        try {
            ActiveMQBytesMessage activeMQBytesMessage = new ActiveMQBytesMessage();
            activeMQBytesMessage.setIntProperty("size", masterDatamart.size());
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(8192);
            MasterDatamartSerializer.serialize(masterDatamart, byteArrayOutputStream);
            byte[] byteArray = byteArrayOutputStream.toByteArray();
            activeMQBytesMessage.writeBytes(byteArray);
            activeMQBytesMessage.setIntProperty("content-size", byteArray.length);
            return Stream.of(activeMQBytesMessage);
        } catch (Exception e) {
            Logger.error(e);
            return Stream.empty();
        }
    }

    private Stream<Message> handleDatalakeDownload(String str) {
        if (str.equals("datalake")) {
            return Stream.of(MessageTranslator.toJmsMessage(this.box.datalake().root().getAbsolutePath()));
        }
        if (str.equals("eventStore/tanks")) {
            return Stream.of(MessageTranslator.toJmsMessage(Json.toString(this.box.datalake().messageStore().tanks().map(MessageStoreRequest::tankOf).collect(Collectors.toList()))));
        }
        if (str.startsWith("{")) {
            JsonObject jsonObject = (JsonObject) Json.fromString(str, JsonObject.class);
            if ("reflow".equals(jsonObject.get("operation").getAsString())) {
                return reflow(jsonObject);
            }
        }
        return Stream.empty();
    }

    private Stream<Message> reflow(JsonObject jsonObject) {
        String asString = jsonObject.get("tank").getAsString();
        ArrayList arrayList = new ArrayList();
        jsonObject.get("tubs").getAsJsonArray().forEach(jsonElement -> {
            arrayList.add(jsonElement.getAsString());
        });
        List<File> list = filesOf(asString, arrayList).toList();
        return IntStream.range(0, list.size()).mapToObj(i -> {
            return toMessage(read((File) list.get(i)), i < list.size() - 1);
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Message toMessage(byte[] bArr, boolean z) {
        try {
            ActiveMQBytesMessage activeMQBytesMessage = new ActiveMQBytesMessage();
            activeMQBytesMessage.setBooleanProperty("hasNext", z);
            activeMQBytesMessage.writeBytes(bArr);
            return activeMQBytesMessage;
        } catch (JMSException e) {
            Logger.error(e);
            return null;
        }
    }

    private static byte[] read(File file) {
        try {
            return Files.readAllBytes(file.toPath());
        } catch (IOException e) {
            Logger.error(e);
            return new byte[0];
        }
    }

    private Stream<File> filesOf(String str, List<String> list) {
        return this.box.datalake().messageStore().tank(str).sources().flatMap((v0) -> {
            return v0.tubs();
        }).filter(tub -> {
            return list.contains(tub.timetag().value());
        }).map(tub2 -> {
            return ((MessageEventTub) tub2).file();
        });
    }

    private static Tank tankOf(Datalake.Store.Tank<MessageEvent> tank) {
        return new Tank(tank.name(), tank.scale().name(), (List) tank.sources().map((v0) -> {
            return v0.name();
        }).collect(Collectors.toList()));
    }

    private static <T> Stream<T> fail(String str) {
        Logger.error(str);
        return Stream.empty();
    }
}
