/*
 * Decompiled with CFR 0.152.
 */
package io.intino.alexandria.sealing;

import io.intino.alexandria.Fingerprint;
import io.intino.alexandria.datalake.Datalake;
import io.intino.alexandria.datalake.file.FileStore;
import io.intino.alexandria.event.Event;
import io.intino.alexandria.event.EventReader;
import io.intino.alexandria.event.EventStream;
import io.intino.alexandria.event.EventWriter;
import io.intino.alexandria.logger.Logger;
import io.intino.alexandria.sealing.EventSorter;
import io.intino.alexandria.sealing.SessionSealer;
import io.intino.alexandria.sealing.sorters.MessageEventSorter;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class EventSealer {
    private final Map<Event.Format, Datalake.Store<? extends Event>> stores;
    private final SessionSealer.TankNameFilter tankNameFilter;
    private final File tempFolder;
    private boolean multithreading;

    public EventSealer(Datalake datalake, SessionSealer.TankNameFilter tankNameFilter, File tempFolder) {
        this(datalake, tankNameFilter, tempFolder, true);
    }

    public EventSealer(Datalake datalake, SessionSealer.TankNameFilter tankNameFilter, File tempFolder, boolean multithreading) {
        this.stores = Map.of(Event.Format.Message, datalake.messageStore(), Event.Format.Measurement, datalake.measurementStore(), Event.Format.Resource, datalake.resourceStore());
        this.tankNameFilter = Objects.requireNonNull(tankNameFilter, "tankNameFilter cannot be null");
        this.tempFolder = Objects.requireNonNull(tempFolder, "tempFolder cannot be null");
        this.multithreading = multithreading;
    }

    public EventSealer multithreading(boolean multithreading) {
        this.multithreading = multithreading;
        return this;
    }

    public void seal(Fingerprint fingerprint, List<File> sessions) throws IOException {
        File datalakeFile = this.datalakeFile(fingerprint);
        File temp = new File(this.tempFolder, System.nanoTime() + datalakeFile.getName());
        if (fingerprint.format().equals((Object)Event.Format.Resource)) {
            this.sealResources(datalakeFile, sessions, temp);
        } else {
            this.sealMessages(datalakeFile, fingerprint.format(), this.sort(fingerprint, sessions), temp);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sealResources(File datalakeFile, List<File> sessions, File temp) {
        try (EventWriter writer = EventWriter.of((File)temp);){
            for (File s : sessions) {
                EventReader of = EventReader.of((Event.Format)Event.Format.Resource, (File)s);
                of.forEachRemaining(e -> {
                    try {
                        writer.write(e);
                    }
                    catch (IOException ex) {
                        Logger.error((Throwable)ex);
                    }
                });
                of.close();
            }
            Files.move(temp.toPath(), datalakeFile.toPath(), StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.ATOMIC_MOVE);
        }
        catch (Exception e2) {
            Logger.error((Throwable)e2);
        }
        finally {
            temp.delete();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sealMessages(File datalakeFile, Event.Format format, List<File> sortedSessions, File temp) throws IOException {
        try {
            try (EventWriter writer = EventWriter.of((File)temp);){
                writer.write(this.streamOf(format, datalakeFile, sortedSessions));
            }
            Files.move(temp.toPath(), datalakeFile.toPath(), StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.ATOMIC_MOVE);
        }
        finally {
            temp.delete();
        }
    }

    private Stream<Event> streamOf(Event.Format format, File datalakeFile, List<File> files) {
        return EventStream.merge(Stream.concat(Stream.of(datalakeFile), files.stream()).map(file -> this.readEvents(format, (File)file)));
    }

    private Stream<Event> readEvents(Event.Format format, File file) {
        if (!file.exists()) {
            return Stream.empty();
        }
        try {
            return this.readEvents(format, new BufferedInputStream(new FileInputStream(file)));
        }
        catch (IOException e) {
            Logger.error((Throwable)e);
            return Stream.empty();
        }
    }

    private Stream<Event> readEvents(Event.Format format, InputStream inputStream) {
        try {
            return new EventStream(this.readerOf(format, inputStream));
        }
        catch (IOException e) {
            Logger.error((Throwable)e);
            return Stream.empty();
        }
    }

    private EventReader<Event> readerOf(Event.Format type, InputStream inputStream) throws IOException {
        return EventReader.of((Event.Format)type, (InputStream)inputStream);
    }

    private List<File> sort(Fingerprint fingerprint, List<File> files) {
        try {
            EventSorter.Factory sorter = this.sorterFactoryOf(fingerprint.format());
            if (!this.tankNameFilter.accepts(fingerprint.tank()) || sorter == null) {
                return files;
            }
            return this.shouldSortInParallel(files) ? this.parallelSort(sorter, files) : this.sequentialSort(sorter, files);
        }
        catch (Throwable e) {
            Logger.error((Throwable)e);
            return Collections.emptyList();
        }
    }

    private boolean shouldSortInParallel(List<File> files) {
        return this.multithreading && files.size() > 1 && Runtime.getRuntime().availableProcessors() > 1;
    }

    private List<File> parallelSort(EventSorter.Factory sorter, List<File> files) throws Throwable {
        ExecutorService threadPool = Executors.newFixedThreadPool(Math.min(4, Runtime.getRuntime().availableProcessors()));
        Throwable[] error = new Throwable[1];
        threadPool.invokeAll(files.stream().map(file -> this.sort(sorter, (File)file, error)).collect(Collectors.toList()));
        threadPool.shutdown();
        if (error[0] != null) {
            throw error[0];
        }
        return files;
    }

    private Callable<Void> sort(EventSorter.Factory sorter, File file, Throwable[] error) {
        return () -> {
            try {
                sorter.of(file, this.tempFolder).sort();
            }
            catch (Throwable e) {
                error[0] = new RuntimeException("Error while sorting " + file + ": " + e.getMessage(), e);
            }
            return null;
        };
    }

    private List<File> sequentialSort(EventSorter.Factory sorter, List<File> files) throws Throwable {
        for (File file : files) {
            sorter.of(file, this.tempFolder).sort();
        }
        return files;
    }

    public EventSorter.Factory sorterFactoryOf(Event.Format format) {
        switch (format) {
            case Message: {
                return MessageEventSorter::new;
            }
        }
        return null;
    }

    private File datalakeFile(Fingerprint fingerprint) {
        FileStore store = (FileStore)this.stores.get(fingerprint.format());
        File datalakeFile = new File(store.directory(), this.filenameOf(fingerprint) + store.fileExtension());
        datalakeFile.getParentFile().mkdirs();
        return datalakeFile;
    }

    private String filenameOf(Fingerprint fp) {
        return fp.tank() + File.separator + fp.source() + File.separator + fp.timetag();
    }
}

