package io.intino.ness.core.fs;

import io.intino.alexandria.logger.Logger;
import io.intino.ness.core.Blob;
import io.intino.ness.core.Datalake;
import io.intino.ness.core.sessions.EventSessionManager;
import io.intino.ness.core.sessions.SetSessionManager;
import java.io.File;
import java.io.IOException;
import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.time.Instant;
import java.util.stream.Stream;

/* loaded from: input_file:io/intino/ness/core/fs/FSDatalake.class */
public class FSDatalake implements Datalake {
    private static final String EventStoreFolder = "events";
    private static final String SetStoreFolder = "sets";
    private static final String StageFolder = "stage";
    private static final String TreatedFolder = "treated";
    private File root;

    public FSDatalake(File file) {
        this.root = file;
        mkdirs();
    }

    @Override // io.intino.ness.core.Datalake
    public Datalake.Connection connection() {
        return new Datalake.Connection() { // from class: io.intino.ness.core.fs.FSDatalake.1
            @Override // io.intino.ness.core.Datalake.Connection
            public void connect(String... strArr) {
            }

            @Override // io.intino.ness.core.Datalake.Connection
            public void disconnect() {
            }
        };
    }

    @Override // io.intino.ness.core.Datalake
    public Datalake.EventStore eventStore() {
        return new FSEventStore(eventStoreFolder());
    }

    @Override // io.intino.ness.core.Datalake
    public Datalake.SetStore setStore() {
        return new FSSetStore(setStoreFolder());
    }

    @Override // io.intino.ness.core.Datalake
    public void push(Stream<Blob> stream) {
        stream.forEach(this::process);
    }

    private void process(Blob blob) {
        if (blob.type() == Blob.Type.event) {
            EventSessionManager.push(stageFolder(), blob);
        } else {
            SetSessionManager.push(blob, stageFolder());
        }
    }

    @Override // io.intino.ness.core.Datalake
    public void seal() {
        EventSessionManager.seal(stageFolder(), eventStoreFolder());
        SetSessionManager.seal(stageFolder(), setStoreFolder());
        moveToTreated();
    }

    private void moveToTreated() {
        File file = new File(treatedFolder(), sealDateFolderName());
        file.mkdirs();
        FS.filesIn(stageFolder(), (v0) -> {
            return v0.isFile();
        }).forEach(file2 -> {
            move(file2, file);
        });
    }

    private void move(File file, File file2) {
        try {
            Files.move(file.toPath(), new File(file2, file.getName()).toPath(), new CopyOption[0]);
        } catch (IOException e) {
            Logger.error(e);
        }
    }

    private String sealDateFolderName() {
        return Instant.now().toString().substring(0, 19).replaceAll("[:T\\-]", "");
    }

    private void mkdirs() {
        eventStoreFolder().mkdirs();
        setStoreFolder().mkdirs();
        stageFolder().mkdirs();
        treatedFolder().mkdirs();
    }

    private File eventStoreFolder() {
        return new File(this.root, EventStoreFolder);
    }

    private File setStoreFolder() {
        return new File(this.root, SetStoreFolder);
    }

    private File stageFolder() {
        return new File(this.root, StageFolder);
    }

    private File treatedFolder() {
        return new File(this.root, TreatedFolder);
    }
}
