package io.intino.ness.datalake.hadoop;

import io.intino.alexandria.logger.Logger;
import io.intino.ness.datalake.hadoop.sessions.SessionWriter;
import io.intino.ness.ingestion.Session;
import io.intino.ness.ingestion.Stage;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.util.Arrays;
import java.util.stream.Stream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

/* loaded from: input_file:io/intino/ness/datalake/hadoop/HadoopStage.class */
public class HadoopStage implements Stage {
    private final FileSystem fs;
    private final Path path;
    private final Path sessions;

    /* loaded from: input_file:io/intino/ness/datalake/hadoop/HadoopStage$HadoopSession.class */
    private class HadoopSession implements Session {
        private final Path path;
        private final Session.Type type;

        HadoopSession(Path path) {
            this.path = path;
            this.type = typeOf(path.getName());
        }

        public String name() {
            String name = this.path.getName();
            return name.substring(0, name.lastIndexOf("."));
        }

        private Session.Type typeOf(String str) {
            return (Session.Type) Arrays.stream(Session.Type.values()).filter(type -> {
                return str.endsWith(HadoopStage.this.extensionOf(type));
            }).findFirst().orElse(null);
        }

        public Session.Type type() {
            return this.type;
        }

        public InputStream inputStream() {
            return new BufferedInputStream(inputStreamOfFile());
        }

        private InputStream inputStreamOfFile() {
            try {
                return HadoopStage.this.fs.open(this.path);
            } catch (IOException e) {
                Logger.error(e);
                return new ByteArrayInputStream(new byte[0]);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public HadoopStage(FileSystem fileSystem, Path path, Path path2) {
        this.fs = fileSystem;
        this.path = path;
        this.sessions = path2;
    }

    public Path path() {
        return this.path;
    }

    public Stream<Session> sessions() {
        return Stream.of(new HadoopSession(this.path));
    }

    public void push(Stream<Session> stream) {
        SessionWriter sessionWriter = new SessionWriter(this.fs, this.path);
        sessionWriter.getClass();
        stream.forEach(sessionWriter::write);
    }

    public void clear() {
        try {
            Path path = new Path(this.sessions, sealDateFolderName());
            Arrays.stream(this.fs.listStatus(path, this::isFile)).forEach(fileStatus -> {
                move(fileStatus, path);
            });
        } catch (IOException e) {
            Logger.error(e);
        }
    }

    private void move(FileStatus fileStatus, Path path) {
        try {
            this.fs.rename(fileStatus.getPath(), path);
        } catch (IOException e) {
            Logger.error(e);
        }
    }

    private boolean isFile(Path path) {
        try {
            return !this.fs.isDirectory(path);
        } catch (IOException e) {
            Logger.error(e);
            return false;
        }
    }

    private String sealDateFolderName() {
        return Instant.now().toString().substring(0, 19).replaceAll("[:T\\-]", "");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String extensionOf(Session.Type type) {
        return "." + type.name() + HadoopEventStore.SessionExtension;
    }
}
