/*
 * Decompiled with CFR 0.152.
 */
package io.intino.datahub.datamart.impl;

import io.intino.alexandria.logger.Logger;
import io.intino.alexandria.message.Message;
import io.intino.datahub.box.DataHubBox;
import io.intino.datahub.datamart.MasterDatamart;
import io.intino.datahub.datamart.TimeShiftCache;
import io.intino.datahub.datamart.mounters.EntityMounter;
import io.intino.datahub.datamart.mounters.MasterDatamartMounter;
import io.intino.datahub.datamart.mounters.ReelMounter;
import io.intino.datahub.datamart.mounters.TimelineMounter;
import io.intino.datahub.datamart.mounters.TimelineUtils;
import io.intino.datahub.model.Datalake;
import io.intino.datahub.model.Datamart;
import io.intino.datahub.model.Entity;
import io.intino.sumus.chronos.ReelFile;
import io.intino.sumus.chronos.TimelineStore;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class LocalMasterDatamart
implements MasterDatamart {
    private final DataHubBox box;
    private final Datamart definition;
    private final File directory;
    private final MasterDatamart.Store<Message> entities;
    private final MasterDatamart.ChronosDirectory<TimelineStore> timelines;
    private final MasterDatamart.ChronosDirectory<ReelFile> reels;
    private final Map<String, TimeShiftCache> caches;

    public LocalMasterDatamart(DataHubBox box, Datamart definition) {
        this.box = box;
        this.definition = definition;
        this.directory = box.datamartDirectory(definition.name$());
        this.entities = new EntityStore(definition);
        this.timelines = new TimelineDirectory(definition, new File(this.directory, "timelines"));
        this.reels = new ReelDirectory(definition, new File(this.directory, "reels"));
        this.caches = new HashMap<String, TimeShiftCache>();
    }

    @Override
    public Datamart definition() {
        return this.definition;
    }

    public File directory() {
        return this.directory;
    }

    @Override
    public DataHubBox box() {
        return this.box;
    }

    @Override
    public String name() {
        return this.definition.name$();
    }

    @Override
    public MasterDatamart.Store<Message> entityStore() {
        return this.entities;
    }

    @Override
    public MasterDatamart.ChronosDirectory<TimelineStore> timelineStore() {
        return this.timelines;
    }

    @Override
    public MasterDatamart.ChronosDirectory<ReelFile> reelStore() {
        return this.reels;
    }

    @Override
    public synchronized TimeShiftCache cacheOf(String timeline) {
        if (!this.caches.containsKey(timeline)) {
            File dir = new File(this.box.datamartsDirectory(), ".cache");
            dir.mkdirs();
            this.caches.put(timeline, new TimeShiftCache(new File(dir, MasterDatamart.ChronosDirectory.normalizePath(timeline) + ".db")).open());
        }
        return this.caches.get(timeline);
    }

    @Override
    public void close() {
        this.caches.values().forEach(c -> {
            try {
                c.close();
            }
            catch (Exception e) {
                Logger.error((Throwable)e);
            }
        });
    }

    @Override
    public Stream<MasterDatamartMounter> createMountersFor(Datalake.Tank tank) {
        if (tank.isMeasurement()) {
            return Stream.of(new TimelineMounter(this));
        }
        if (!tank.isMessage()) {
            return Stream.empty();
        }
        ArrayList<MasterDatamartMounter> mounters = new ArrayList<MasterDatamartMounter>(2);
        if (this.entityStore().isSubscribedTo(tank)) {
            mounters.add(new EntityMounter(this));
        }
        if (this.timelineStore().isSubscribedTo(tank)) {
            mounters.add(new TimelineMounter(this));
        }
        if (this.reelStore().isSubscribedTo(tank)) {
            mounters.add(new ReelMounter(this));
        }
        return mounters.stream();
    }

    public LocalMasterDatamart reflow(Stream<Message> messages) {
        try (Stream<Message> stream = messages;){
            messages.forEach(m -> {
                String id = m.get("id").asString();
                if (id != null && !id.isBlank()) {
                    this.entities.put(id, (Message)m);
                }
            });
        }
        return this;
    }

    private static class EntityStore
    implements MasterDatamart.Store<Message> {
        private final Map<String, Message> entities;
        private final Set<String> subscribedEvents;

        public EntityStore(Datamart definition) {
            this.entities = new ConcurrentHashMap<String, Message>(1024);
            this.subscribedEvents = definition.entityList().stream().map(Entity::from).filter(Objects::nonNull).map(m -> m.message().name$()).collect(Collectors.toSet());
        }

        public EntityStore(Datamart definition, Stream<Message> messages) {
            this.entities = Collections.synchronizedMap(messages.filter(m -> m.contains("id")).collect(Collectors.toMap(m -> m.get("id").asString(), Function.identity())));
            this.subscribedEvents = definition.entityList().stream().map(Entity::from).filter(Objects::nonNull).map(m -> m.message().name$()).collect(Collectors.toSet());
        }

        @Override
        public int size() {
            return this.entities.size();
        }

        @Override
        public boolean contains(String id) {
            return this.entities.containsKey(id);
        }

        @Override
        public Message get(String id) {
            return this.entities.get(id);
        }

        @Override
        public void put(String id, Message value) {
            this.entities.put(id, value);
        }

        @Override
        public void remove(String id) {
            this.entities.remove(id);
        }

        @Override
        public void clear() {
            this.entities.clear();
        }

        @Override
        public Stream<Message> stream() {
            return this.entities.values().stream();
        }

        @Override
        public Map<String, Message> toMap() {
            return this.entities;
        }

        @Override
        public Collection<String> subscribedEvents() {
            return this.subscribedEvents;
        }

        @Override
        public boolean isSubscribedTo(Datalake.Tank tank) {
            if (!tank.isMessage() || tank.asMessage() == null || tank.asMessage().message() == null) {
                return false;
            }
            return this.subscribedEvents().contains(tank.asMessage().message().name$());
        }
    }

    private static class TimelineDirectory
    extends MasterDatamart.ChronosDirectory<TimelineStore> {
        private final Set<String> subscribedEvents;

        public TimelineDirectory(Datamart definition, File root) {
            super(root);
            this.subscribedEvents = definition.timelineList().stream().flatMap(TimelineUtils::types).collect(Collectors.toSet());
        }

        @Override
        protected String extension() {
            return ".timeline";
        }

        @Override
        public TimelineStore get(String type, String id) {
            try {
                return this.contains(type, id) ? TimelineStore.of((File)this.fileOf(type, id)) : null;
            }
            catch (IOException e) {
                Logger.error((Throwable)e);
                return null;
            }
        }

        @Override
        public Stream<TimelineStore> stream() {
            return this.listFiles().stream().map(f -> {
                try {
                    return TimelineStore.of((File)f);
                }
                catch (IOException e) {
                    return null;
                }
            }).filter(Objects::nonNull);
        }

        @Override
        public Collection<String> subscribedEvents() {
            return this.subscribedEvents;
        }

        @Override
        public boolean isSubscribedTo(Datalake.Tank tank) {
            Collection<String> events = this.subscribedEvents();
            if (tank.isMeasurement() && events.contains(tank.asMeasurement().sensor().name$())) {
                return true;
            }
            return tank.isMessage() && events.contains(tank.asMessage().message().name$());
        }
    }

    private static class ReelDirectory
    extends MasterDatamart.ChronosDirectory<ReelFile> {
        private final Set<String> subscribedEvents;

        public ReelDirectory(Datamart definition, File root) {
            super(root);
            this.subscribedEvents = definition.reelList().stream().map(r -> r.tank().message().name$()).collect(Collectors.toSet());
        }

        @Override
        protected String extension() {
            return ".reel";
        }

        @Override
        public ReelFile get(String type, String id) {
            try {
                return this.contains(type, id) ? ReelFile.open((File)this.fileOf(type, id)) : null;
            }
            catch (IOException e) {
                Logger.error((Throwable)e);
                return null;
            }
        }

        @Override
        public Stream<ReelFile> stream() {
            return this.listFiles().stream().map(f -> {
                try {
                    return ReelFile.open((File)f);
                }
                catch (IOException e) {
                    return null;
                }
            }).filter(Objects::nonNull);
        }

        @Override
        public Collection<String> subscribedEvents() {
            return this.subscribedEvents;
        }

        @Override
        public boolean isSubscribedTo(Datalake.Tank tank) {
            if (!tank.isMessage() || tank.asMessage() == null || tank.asMessage().message() == null) {
                return false;
            }
            return this.subscribedEvents().contains(tank.asMessage().message().name$());
        }
    }
}

