package io.intino.alexandria.terminal.remotedatalake;

import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import io.intino.alexandria.Scale;
import io.intino.alexandria.Timetag;
import io.intino.alexandria.datalake.Datalake;
import io.intino.alexandria.event.Event;
import io.intino.alexandria.event.EventReader;
import io.intino.alexandria.event.EventStream;
import io.intino.alexandria.logger.Logger;
import java.io.ByteArrayInputStream;
import java.util.List;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;

/* loaded from: input_file:io/intino/alexandria/terminal/remotedatalake/RemoteTank.class */
public class RemoteTank implements Datalake.EventStore.Tank {
    private final DatalakeAccessor accessor;
    private final JsonObject tank;
    private final JsonArray tubs;

    public RemoteTank(DatalakeAccessor datalakeAccessor, JsonObject jsonObject) {
        this.accessor = datalakeAccessor;
        this.tank = jsonObject;
        this.tubs = jsonObject.get("tubs").getAsJsonArray();
    }

    public String name() {
        return this.tank.get("name").getAsString();
    }

    public Scale scale() {
        return Scale.valueOf(this.tank.get("scale").getAsString());
    }

    public Stream<Datalake.EventStore.Tub> tubs() {
        return StreamSupport.stream(this.tubs.spliterator(), false).map(jsonElement -> {
            return new RemoteTub(this.accessor, name(), jsonElement.getAsString());
        });
    }

    public Datalake.EventStore.Tub first() {
        if (this.tubs.isEmpty()) {
            return null;
        }
        return new RemoteTub(this.accessor, name(), this.tubs.get(0).getAsString());
    }

    public Datalake.EventStore.Tub last() {
        if (this.tubs.isEmpty()) {
            return null;
        }
        return new RemoteTub(this.accessor, name(), this.tubs.get(this.tubs.size() - 1).getAsString());
    }

    public Datalake.EventStore.Tub on(Timetag timetag) {
        return tubs().filter(tub -> {
            return tub.timetag().equals(timetag);
        }).findFirst().orElse(null);
    }

    public EventStream content() {
        return eventStream(DatalakeAccessor.reflowSchema(name(), this.tubs));
    }

    public EventStream content(Predicate<Timetag> predicate) {
        return eventStream(DatalakeAccessor.reflowSchema(name(), (List) tubs().filter(tub -> {
            return predicate.test(tub.timetag());
        }).map(tub2 -> {
            return tub2.timetag().value();
        }).collect(Collectors.toList())));
    }

    public EventStream eventStream(final JsonObject jsonObject) {
        return new EventStream() { // from class: io.intino.alexandria.terminal.remotedatalake.RemoteTank.1
            final MessageConsumer consumer;
            boolean hasNextMessage;
            EventReader current;

            {
                this.consumer = RemoteTank.this.accessor.queryWithConsumer(jsonObject.toString());
                BytesMessage receive = RemoteTank.this.receive(this.consumer);
                if (receive != null) {
                    this.hasNextMessage = RemoteTank.calculateHasNext(receive);
                    this.current = RemoteTank.read(receive);
                }
            }

            public Event current() {
                if (this.current != null) {
                    return this.current.current();
                }
                return null;
            }

            public Event next() {
                BytesMessage receive;
                if (this.current.hasNext()) {
                    try {
                        return this.current.next();
                    } catch (NullPointerException e) {
                    }
                }
                if (!this.hasNextMessage || (receive = RemoteTank.this.receive(this.consumer)) == null) {
                    return null;
                }
                this.hasNextMessage = RemoteTank.calculateHasNext(receive);
                EventReader read = RemoteTank.read(receive);
                this.current = read;
                return read.next();
            }

            public boolean hasNext() {
                boolean z = this.current.hasNext() || this.hasNextMessage;
                if (!z) {
                    RemoteTank.this.close(this.consumer);
                }
                return z;
            }
        };
    }

    private Message receive(MessageConsumer messageConsumer) {
        try {
            return messageConsumer.receive(1000L);
        } catch (JMSException e) {
            Logger.error(e);
            return null;
        }
    }

    private static boolean calculateHasNext(Message message) {
        try {
            return message.getBooleanProperty("hasNext");
        } catch (JMSException e) {
            Logger.error(e);
            return false;
        }
    }

    private void close(MessageConsumer messageConsumer) {
        try {
            messageConsumer.close();
        } catch (JMSException e) {
            Logger.error(e);
        }
    }

    private static EventReader read(BytesMessage bytesMessage) {
        try {
            byte[] bArr = new byte[(int) bytesMessage.getBodyLength()];
            bytesMessage.readBytes(bArr);
            return new EventReader(new ByteArrayInputStream(bArr));
        } catch (JMSException e) {
            Logger.error(e);
            return null;
        }
    }
}
