/*
 * Decompiled with CFR 0.152.
 */
package io.intino.datahub.datamart;

import io.intino.alexandria.Scale;
import io.intino.alexandria.Timetag;
import io.intino.alexandria.datalake.Datalake;
import io.intino.alexandria.event.Event;
import io.intino.alexandria.event.EventStream;
import io.intino.alexandria.event.measurement.MeasurementEvent;
import io.intino.alexandria.event.message.MessageEvent;
import io.intino.alexandria.logger.Logger;
import io.intino.alexandria.message.Message;
import io.intino.alexandria.zim.ZimStream;
import io.intino.alexandria.zim.ZimWriter;
import io.intino.datahub.box.DataHubBox;
import io.intino.datahub.datamart.MasterDatamart;
import io.intino.datahub.datamart.impl.LocalMasterDatamart;
import io.intino.datahub.datamart.mounters.EntityMounter;
import io.intino.datahub.datamart.mounters.ReelMounter;
import io.intino.datahub.datamart.mounters.TimelineMounter;
import io.intino.datahub.datamart.mounters.TimelineUtils;
import io.intino.datahub.model.Datalake;
import io.intino.datahub.model.Datamart;
import io.intino.datahub.model.Entity;
import io.intino.datahub.model.Sensor;
import io.intino.datahub.model.Timeline;
import java.io.File;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.Spliterators;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.commons.io.FileUtils;

public class DatamartFactory {
    private final DataHubBox box;
    private final Datalake datalake;

    public DatamartFactory(DataHubBox box, Datalake datalake) {
        this.box = box;
        this.datalake = datalake;
    }

    public MasterDatamart create(Datamart definition) throws Exception {
        Reference<MasterDatamart> datamart = new Reference<MasterDatamart>();
        Reference<Instant> fromTs = new Reference<Instant>();
        if (this.failedToLoadLastSnapshotOf(definition, datamart, fromTs)) {
            datamart.value = new LocalMasterDatamart(this.box, definition);
            fromTs.value = null;
        }
        return this.reflow((MasterDatamart)datamart.value, (Instant)fromTs.value, definition);
    }

    private boolean failedToLoadLastSnapshotOf(Datamart definition, Reference<MasterDatamart> datamart, Reference<Instant> fromTs) {
        Optional<MasterDatamart.Snapshot> snapshot = this.box.datamartSerializer().loadMostRecentSnapshot(definition.name$());
        if (snapshot.isPresent()) {
            datamart.value = snapshot.get().datamart();
            fromTs.value = snapshot.get().datamart().ts();
            return false;
        }
        return true;
    }

    public MasterDatamart reflow(MasterDatamart datamart, Instant fromTs, Datamart definition) throws Exception {
        FileUtils.deleteDirectory((File)this.box.datamartTimelinesDirectory(datamart.name()));
        FileUtils.deleteDirectory((File)this.box.datamartReelsDirectory(datamart.name()));
        this.reflowEntitiesAndCookedTimelines(datamart, fromTs, DatamartFactory.entityTanks(definition), this.cookedTimelinesTanks(definition));
        this.reflowRawTimelines(datamart, definition);
        this.reflowReels(datamart, DatamartFactory.reelTanks(definition));
        Logger.debug((String)"Reflow complete");
        this.box.datamartSerializer().saveSnapshot(Timetag.today(), datamart);
        return datamart;
    }

    private void reflowRawTimelines(MasterDatamart datamart, Datamart definition) {
        Logger.debug((String)"Reflowing raw timelines...");
        this.reflowTimelines(datamart, definition);
    }

    private void reflowEntitiesAndCookedTimelines(MasterDatamart datamart, Instant fromTs, Set<String> entityTanks, Set<String> cookedTimelineTanks) {
        Logger.debug((String)"Reflowing entities and cooked timelines...");
        this.reflow(new EntityMounter(datamart), new TimelineMounter(datamart), entityTanks, cookedTimelineTanks, this.reflowTanks(fromTs, entityTanks, cookedTimelineTanks));
    }

    private void reflowReels(MasterDatamart datamart, Set<String> tanks) {
        Logger.debug((String)"Reflowing reels...");
        for (String tankName : tanks) {
            Datalake.Store.Tank tank = this.datalake.messageStore().tank(tankName);
            try (ReelMounter.Reflow mounter = new ReelMounter.Reflow(datamart);){
                tank.content().forEach(mounter::mount);
            }
        }
    }

    private void reflow(EntityMounter entityMounter, TimelineMounter timelineMounter, Set<String> entityTanks, Set<String> cookedTimelineTanks, Iterator<Event> events) {
        entityTanks = entityTanks.stream().map(this::getTankEventName).collect(Collectors.toSet());
        cookedTimelineTanks = cookedTimelineTanks.stream().map(this::getTankEventName).collect(Collectors.toSet());
        while (events.hasNext()) {
            Event event = events.next();
            if (entityTanks.contains(event.type())) {
                entityMounter.mount(event);
            }
            if (!cookedTimelineTanks.contains(event.type())) continue;
            timelineMounter.mount(event);
        }
    }

    private Set<String> cookedTimelinesTanks(Datamart definition) {
        return definition.timelineList().stream().filter(Timeline::isCooked).map(TimelineUtils::getCookedTanks).flatMap(Collection::stream).collect(Collectors.toSet());
    }

    private void reflowTimelines(MasterDatamart datamart, Datamart definition) {
        for (Timeline timeline : definition.timelineList(Timeline::isRaw)) {
            Supplier<MessageEventStream> messageEvents = this.bake(this.messageTanksOf(definition, timeline));
            Datalake.Store.Tank measurementTank = this.datalake.measurementStore().tank(DatamartFactory.tankName(timeline.asRaw().tank().sensor()));
            measurementTank.sources().forEach(ss -> this.reflowTimelinesOf(datamart, timeline, this.getTankEventName(measurementTank.name()), (Datalake.Store.Source<MeasurementEvent>)ss, (MessageEventStream)messageEvents.get()));
        }
    }

    private Supplier<MessageEventStream> bake(List<Datalake.Store.Tank<MessageEvent>> tanks) {
        File file = this.bakeEventsInCacheFile(tanks);
        if ((double)file.length() < (double)Runtime.getRuntime().freeMemory() * 0.8) {
            return () -> {
                MessageEventStream.InMemory events = new MessageEventStream.InMemory(file);
                file.delete();
                return events;
            };
        }
        return () -> new MessageEventStream.Reading(file);
    }

    private File bakeEventsInCacheFile(List<Datalake.Store.Tank<MessageEvent>> tanks) {
        File file = new File(this.box.datamartsDirectory(), ".tmp" + File.separator + System.nanoTime() + ".tmp");
        file.getParentFile().mkdirs();
        file.deleteOnExit();
        try (ZimWriter writer = new ZimWriter(file);){
            EventStream.merge(tanks.stream().map(Datalake.Store.Tank::content)).forEach(e -> {
                try {
                    writer.write(e.toMessage());
                }
                catch (IOException ex) {
                    throw new RuntimeException(ex);
                }
            });
        }
        catch (IOException e2) {
            throw new RuntimeException(e2);
        }
        return file;
    }

    private List<Datalake.Store.Tank<MessageEvent>> messageTanksOf(Datamart definition, Timeline timeline) {
        Entity entity = timeline.entity();
        ArrayList<Datalake.Store.Tank<MessageEvent>> tanks = new ArrayList<Datalake.Store.Tank<MessageEvent>>();
        if (DatamartFactory.tankName(entity) != null) {
            tanks.add(this.datalake.messageStore().tank(DatamartFactory.tankName(entity)));
        }
        definition.entityList(e -> DatamartFactory.isDescendantOf(e, entity)).stream().map(DatamartFactory::tankName).map(tank -> this.datalake.messageStore().tank(tank)).forEach(tanks::add);
        return tanks;
    }

    private static boolean isDescendantOf(Entity node, Entity expectedParent) {
        if (!node.isExtensionOf()) {
            return false;
        }
        Entity parent = node.asExtensionOf().entity();
        return ((Object)((Object)parent)).equals((Object)expectedParent) || DatamartFactory.isDescendantOf(parent, expectedParent);
    }

    private String getTankEventName(String name) {
        return name.substring(name.lastIndexOf(46) + 1);
    }

    private void reflowTimelinesOf(MasterDatamart datamart, Timeline timeline, String measurementTank, Datalake.Store.Source<MeasurementEvent> ss, MessageEventStream messageEvents) {
        try (TimelineMounter.OfSingleTimeline mounter = new TimelineMounter.OfSingleTimeline(datamart, timeline, measurementTank, ss.name());
             MessageEventStream messageEventStream = messageEvents;){
            Stream<MessageEvent> messages = messageEvents.stream();
            List measurements = ss.tubs().flatMap(Datalake.Store.Tub::events).toList();
            Iterator events = EventStream.merge(Stream.of(messages, measurements.stream())).iterator();
            while (events.hasNext()) {
                mounter.mount((Event)events.next());
            }
        }
        catch (Exception e) {
            Logger.error((Throwable)e);
        }
    }

    private Iterator<Event> reflowTanks(Instant fromTs, Set<String> ... tanks) {
        HashSet<String> tankNames = new HashSet<String>();
        Arrays.stream(tanks).forEach(tankNames::addAll);
        if (fromTs != null) {
            Timetag fromTimetag = Timetag.of((Instant)fromTs, (Scale)Scale.Minute);
            return EventStream.merge(this.tanks(tankNames).map(tank -> tank.content((ss, tt) -> tt.isAfter(fromTimetag)))).filter(e -> e.ts().isAfter(fromTs)).iterator();
        }
        return EventStream.merge(this.tanks(tankNames).map(tank -> tank.content())).iterator();
    }

    private Iterator<Event> reflowTanks(Set<String> entityTanks, Set<String> timelineTanks, Set<String> reelTanks, Instant fromTs) {
        HashSet<String> tankNames = new HashSet<String>(entityTanks);
        tankNames.addAll(timelineTanks);
        tankNames.addAll(reelTanks);
        if (fromTs != null) {
            Timetag fromTimetag = Timetag.of((Instant)fromTs, (Scale)Scale.Minute);
            return EventStream.merge(this.tanks(tankNames).map(tank -> tank.content((ss, tt) -> tt.isAfter(fromTimetag)))).filter(e -> e.ts().isAfter(fromTs)).iterator();
        }
        return EventStream.merge(this.tanks(tankNames).map(tank -> tank.content())).iterator();
    }

    private Stream<Datalake.Store.Tank<? extends Event>> tanks(Set<String> tankNames) {
        return Stream.of(this.datalake.messageStore().tanks().filter(t -> tankNames.contains(t.name())), this.datalake.measurementStore().tanks().filter(t -> tankNames.contains(t.name())), this.datalake.resourceStore().tanks().filter(t -> tankNames.contains(t.name()))).flatMap(Function.identity());
    }

    private static Set<String> reelTanks(Datamart definition) {
        return definition.reelList().stream().map(r -> DatamartFactory.tankName(r.tank())).collect(Collectors.toSet());
    }

    private static Set<String> timelineTanks(Datamart definition) {
        return definition.timelineList().stream().flatMap(TimelineUtils::tanksOf).filter(Objects::nonNull).collect(Collectors.toSet());
    }

    public static Stream<String> tanksOf(Timeline.Cooked.TimeSeries.Count ts) {
        return ts.operationList().stream().map(d -> DatamartFactory.tankName(d.tank()));
    }

    private static Set<String> entityTanks(Datamart definition) {
        return definition.entityList().stream().filter(e -> e.from() != null).map(DatamartFactory::tankName).collect(Collectors.toSet());
    }

    private static String tankName(Datalake.Tank.Message tank) {
        return tank.message().core$().fullName().replace("$", ".");
    }

    private static String tankName(Entity e) {
        return e.from() == null ? null : e.from().message().core$().fullName().replace("$", ".");
    }

    private static String tankName(Sensor sensor) {
        return sensor.core$().fullName().replace("$", ".");
    }

    private void deleteDirectorySafe(File backup) {
        try {
            FileUtils.deleteDirectory((File)backup);
        }
        catch (Exception e) {
            Logger.error((Throwable)e);
        }
    }

    private static class Reference<T> {
        private T value;

        private Reference() {
        }
    }

    private static interface MessageEventStream
    extends Iterator<MessageEvent>,
    AutoCloseable {
        default public Stream<MessageEvent> stream() {
            return StreamSupport.stream(Spliterators.spliteratorUnknownSize(this, 4), false);
        }

        public static class InMemory
        implements MessageEventStream {
            private final MessageEvent[] events;
            private int index;

            public InMemory(File file) {
                try {
                    this.events = (MessageEvent[])ZimStream.of((File)file).map(MessageEvent::new).toArray(MessageEvent[]::new);
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }

            @Override
            public void close() throws Exception {
                this.index = 0;
            }

            @Override
            public boolean hasNext() {
                return this.index < this.events.length;
            }

            @Override
            public MessageEvent next() {
                return this.events[this.index++];
            }
        }

        public static class Reading
        implements MessageEventStream {
            private final ZimStream events;

            public Reading(File file) {
                try {
                    this.events = ZimStream.of((File)file);
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }

            @Override
            public void close() throws Exception {
                this.events.close();
            }

            @Override
            public boolean hasNext() {
                return this.events.hasNext();
            }

            @Override
            public MessageEvent next() {
                return new MessageEvent((Message)this.events.next());
            }
        }
    }
}

