/*
 * Decompiled with CFR 0.152.
 */
package io.intino.datahub.datalake;

import io.intino.alexandria.Fingerprint;
import io.intino.alexandria.event.Event;
import io.intino.alexandria.event.message.MessageEvent;
import io.intino.alexandria.event.resource.ResourceEventReader;
import io.intino.alexandria.ingestion.EventSession;
import io.intino.alexandria.ingestion.SessionHandler;
import io.intino.alexandria.logger.Logger;
import io.intino.alexandria.message.Message;
import io.intino.alexandria.message.MessageReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.util.Objects;

public class BrokerSessions {
    private final File brokerStageDirectory;
    private final File stageDirectory;

    public BrokerSessions(File brokerStageDirectory, File stageDirectory) {
        this.brokerStageDirectory = brokerStageDirectory;
        this.stageDirectory = stageDirectory;
    }

    public void push() {
        Logger.info((String)"Pushing broker events");
        this.pushTemporalSessions();
        Logger.info((String)"Pushed broker events");
    }

    private void pushTemporalSessions() {
        try {
            SessionHandler handler = new SessionHandler(this.stageDirectory);
            File tmp = new File(this.stageDirectory, "tmp");
            tmp.mkdirs();
            this.moveTo(tmp);
            for (File file : Objects.requireNonNull(tmp.listFiles(f -> f.getName().endsWith(Event.Format.Message + ".session") || f.getName().endsWith(Event.Format.Measurement + ".session")))) {
                BrokerSessions.processMessageAndMeasurements(handler, file);
            }
            for (File file : Objects.requireNonNull(tmp.listFiles(f -> f.getName().endsWith(Event.Format.Resource + ".session")))) {
                this.processResources(handler, file);
            }
        }
        catch (Exception e) {
            Logger.error((Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processResources(SessionHandler handler, File file) {
        try (EventSession eventSession = handler.createEventSession();
             ResourceEventReader resources = new ResourceEventReader(file);){
            Fingerprint fingerprint = Fingerprint.of((File)file);
            resources.forEachRemaining(e -> {
                try {
                    eventSession.put(fingerprint.tank(), fingerprint.source(), fingerprint.timetag(), Event.Format.Resource, new Event[]{e});
                }
                catch (IOException ex) {
                    Logger.error((Throwable)ex);
                }
            });
        }
        file.delete();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void processMessageAndMeasurements(SessionHandler handler, File file) throws Exception {
        try (EventSession eventSession = handler.createEventSession();
             MessageReader messages = new MessageReader((InputStream)new FileInputStream(file));){
            Fingerprint fingerprint = Fingerprint.of((File)file);
            for (Message message : messages) {
                eventSession.put(fingerprint.tank(), fingerprint.source(), fingerprint.timetag(), Event.Format.Message, new Event[]{new MessageEvent(message)});
            }
        }
        file.delete();
    }

    private void moveTo(File tmp) {
        for (File file : Objects.requireNonNull(this.brokerStageDirectory.listFiles(f -> f.getName().endsWith(".session")))) {
            this.moveTo(file, tmp);
        }
    }

    private void moveTo(File file, File tmp) {
        try {
            Files.move(file.toPath(), new File(tmp, file.getName()).toPath(), new CopyOption[0]);
        }
        catch (IOException e) {
            Logger.error((Throwable)e);
        }
    }
}

