/*
 * Decompiled with CFR 0.152.
 */
package io.intino.ness.datalake.hadoop;

import io.intino.alexandria.logger.Logger;
import io.intino.ness.datalake.hadoop.sessions.SessionWriter;
import io.intino.ness.ingestion.Session;
import io.intino.ness.ingestion.Stage;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.util.Arrays;
import java.util.stream.Stream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class HadoopStage
implements Stage {
    private final FileSystem fs;
    private final Path path;
    private final Path sessions;

    HadoopStage(FileSystem fs, Path stagePath, Path sessionsPath) {
        this.fs = fs;
        this.path = stagePath;
        this.sessions = sessionsPath;
    }

    public Path path() {
        return this.path;
    }

    public Stream<Session> sessions() {
        return Stream.of(new HadoopSession(this.path));
    }

    public void push(Stream<Session> sessions) {
        SessionWriter writer = new SessionWriter(this.fs, this.path);
        sessions.forEach(writer::write);
    }

    public void clear() {
        try {
            Path destination = new Path(this.sessions, this.sealDateFolderName());
            Arrays.stream(this.fs.listStatus(destination, this::isFile)).forEach(f -> this.move((FileStatus)f, destination));
        }
        catch (IOException e) {
            Logger.error((Throwable)e);
        }
    }

    private void move(FileStatus f, Path destination) {
        try {
            this.fs.rename(f.getPath(), destination);
        }
        catch (IOException e) {
            Logger.error((Throwable)e);
        }
    }

    private boolean isFile(Path l) {
        try {
            return !this.fs.isDirectory(l);
        }
        catch (IOException e) {
            Logger.error((Throwable)e);
            return false;
        }
    }

    private String sealDateFolderName() {
        return Instant.now().toString().substring(0, 19).replaceAll("[:T\\-]", "");
    }

    private String extensionOf(Session.Type type) {
        return "." + type.name() + ".event.seq";
    }

    private class HadoopSession
    implements Session {
        private final Path path;
        private final Session.Type type;

        HadoopSession(Path path) {
            this.path = path;
            this.type = this.typeOf(path.getName());
        }

        public String name() {
            String name = this.path.getName();
            return name.substring(0, name.lastIndexOf("."));
        }

        private Session.Type typeOf(String filename) {
            return Arrays.stream(Session.Type.values()).filter(type -> filename.endsWith(HadoopStage.this.extensionOf(type))).findFirst().orElse(null);
        }

        public Session.Type type() {
            return this.type;
        }

        public InputStream inputStream() {
            return new BufferedInputStream(this.inputStreamOfFile());
        }

        private InputStream inputStreamOfFile() {
            try {
                return HadoopStage.this.fs.open(this.path);
            }
            catch (IOException e) {
                Logger.error((Throwable)e);
                return new ByteArrayInputStream(new byte[0]);
            }
        }
    }
}

