package io.intino.alexandria.fsm;

import io.intino.alexandria.fsm.LockFile;
import io.intino.alexandria.fsm.SessionMessagePipeline;
import io.intino.alexandria.fsm.StatefulScheduledService;
import io.intino.alexandria.logger.Logger;
import java.io.File;
import java.io.IOException;
import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/* loaded from: input_file:io/intino/alexandria/fsm/FileSessionManager.class */
public class FileSessionManager extends StatefulScheduledService.Task {
    private static final long KB = 1048576;
    private static final long MB = 1073741824;
    public static final String MESSAGE_EXTENSION = ".session";
    public static final String ERROR_EXTENSION = ".errors";
    public static final String TEMP_EXTENSION = ".temp";
    private static final long DEFAULT_TIMEOUT = 1;
    public static final long MIN_BYTES_PER_SESSION = 1048576;
    private final String id;
    private final Mailbox inputMailbox;
    private final Mailbox outputMailbox;
    private final StatefulScheduledService service;
    private final long maxBytesPerSession;
    private final TimePeriod sessionTimeout;
    private final TimePeriod lockTimeout;
    private final TimePeriod processedFilesMaxAge;
    private final SessionMessagePipeline messagePipeline;
    private final LockFile lockFile;
    private volatile Session session;
    private final AtomicBoolean executingMessagePipeline;
    private CompletableFuture<Instant> pauseFuture;
    private volatile IndexFile currentIndexFile;
    private volatile ErrorFile currentErrorFile;
    private Instant cleanMailboxLastTime;
    private final ExecutorService cleanerThread;
    private final AtomicBoolean cleanerIsRunning;
    private static final TimeUnit DEFAULT_TIMEOUT_UNIT = TimeUnit.DAYS;
    public static final TimePeriod DEFAULT_MAX_PROCESSED_FILES_AGE = new TimePeriod(60, TimeUnit.DAYS);
    private static final TimePeriod CLEAN_MAILBOX_MAX_TIME = new TimePeriod(2, TimeUnit.HOURS);

    /* loaded from: input_file:io/intino/alexandria/fsm/FileSessionManager$Builder.class */
    public static class Builder {
        private Mailbox input;
        private Mailbox output;
        private SessionMessagePipeline messagePipeline;
        private String id = UUID.randomUUID().toString();
        private TimePeriod readInputMailboxRate = new TimePeriod(10, TimeUnit.SECONDS);
        private long maxBytesPerSession = 3221225472L;
        private TimePeriod sessionTimeout = new TimePeriod(30, TimeUnit.SECONDS);
        private TimePeriod processedFilesMaxAge = FileSessionManager.DEFAULT_MAX_PROCESSED_FILES_AGE;
        private TimePeriod lockTimeout = new TimePeriod(5, TimeUnit.MINUTES);

        public Builder id(String str) {
            this.id = str;
            return this;
        }

        public Builder readsFrom(Mailbox mailbox) {
            this.input = mailbox;
            return this;
        }

        public Builder readsFrom(File file) {
            return readsFrom(new Mailbox(file));
        }

        public Builder writesTo(Mailbox mailbox) {
            this.output = mailbox;
            return this;
        }

        public Builder writesTo(File file) {
            return writesTo(new Mailbox(file));
        }

        public Builder atFixedRate(long j, TimeUnit timeUnit) {
            this.readInputMailboxRate = new TimePeriod(j, timeUnit);
            return this;
        }

        public Builder maxBytesPerSession(long j) {
            this.maxBytesPerSession = Math.max(j, 1048576L);
            return this;
        }

        public Builder sessionTimeout(long j, TimeUnit timeUnit) {
            this.sessionTimeout = new TimePeriod(j, timeUnit);
            return this;
        }

        public Builder lockTimeout(long j, TimeUnit timeUnit) {
            this.lockTimeout = new TimePeriod(j, timeUnit);
            return this;
        }

        public Builder withoutLockTimeout() {
            this.lockTimeout = null;
            return this;
        }

        public Builder processedFilesMaxAge(TimePeriod timePeriod) {
            this.processedFilesMaxAge = timePeriod;
            return this;
        }

        public Builder setMessagePipeline(SessionMessagePipeline sessionMessagePipeline) {
            this.messagePipeline = (SessionMessagePipeline) Objects.requireNonNull(sessionMessagePipeline);
            return this;
        }

        public Builder onMessageProcess(final MessageConsumer messageConsumer) {
            return messageConsumer == null ? this : setMessagePipeline(new SessionMessagePipeline() { // from class: io.intino.alexandria.fsm.FileSessionManager.Builder.1
                @Override // io.intino.alexandria.fsm.SessionMessagePipeline
                protected void processMessage(String str, FileSessionManager fileSessionManager) throws Throwable {
                    messageConsumer.accept(str);
                }
            });
        }

        public Builder onMessageProcess(final MessageBiConsumer messageBiConsumer) {
            return messageBiConsumer == null ? this : setMessagePipeline(new SessionMessagePipeline() { // from class: io.intino.alexandria.fsm.FileSessionManager.Builder.2
                @Override // io.intino.alexandria.fsm.SessionMessagePipeline
                protected void processMessage(String str, FileSessionManager fileSessionManager) throws Throwable {
                    messageBiConsumer.accept(str, fileSessionManager);
                }
            });
        }

        public FileSessionManager build() {
            return new FileSessionManager(this.id, this.input, this.output, this.readInputMailboxRate, this.maxBytesPerSession, this.sessionTimeout, this.lockTimeout, this.processedFilesMaxAge, this.messagePipeline);
        }
    }

    @FunctionalInterface
    /* loaded from: input_file:io/intino/alexandria/fsm/FileSessionManager$MessageBiConsumer.class */
    public interface MessageBiConsumer {
        void accept(String str, FileSessionManager fileSessionManager) throws Throwable;
    }

    @FunctionalInterface
    /* loaded from: input_file:io/intino/alexandria/fsm/FileSessionManager$MessageConsumer.class */
    public interface MessageConsumer {
        void accept(String str) throws Throwable;
    }

    /* loaded from: input_file:io/intino/alexandria/fsm/FileSessionManager$PublishResult.class */
    public enum PublishResult {
        Ok,
        MessageTooLong,
        SessionNotOpen,
        Error
    }

    public FileSessionManager(String str, Mailbox mailbox, Mailbox mailbox2, TimePeriod timePeriod, long j, TimePeriod timePeriod2, TimePeriod timePeriod3, TimePeriod timePeriod4, SessionMessagePipeline sessionMessagePipeline) {
        this.id = str;
        this.inputMailbox = (Mailbox) Objects.requireNonNull(mailbox);
        this.outputMailbox = (Mailbox) Objects.requireNonNull(mailbox2);
        this.lockTimeout = timePeriod3;
        this.processedFilesMaxAge = timePeriod4;
        if (mailbox.equals(mailbox2)) {
            throw new IllegalArgumentException("Input and output mailbox cannot be the same");
        }
        this.service = new StatefulScheduledService(str, timePeriod);
        if (j <= 0) {
            throw new IllegalArgumentException("maxBytesPerSession must be > 0");
        }
        this.maxBytesPerSession = j;
        this.sessionTimeout = timePeriod2;
        this.messagePipeline = (SessionMessagePipeline) Objects.requireNonNull(sessionMessagePipeline);
        this.lockFile = new LockFile(this);
        this.executingMessagePipeline = new AtomicBoolean(false);
        this.cleanerThread = Executors.newSingleThreadScheduledExecutor(runnable -> {
            return new Thread(runnable, "FSM-" + str + "-Cleaner-Thread");
        });
        this.cleanerIsRunning = new AtomicBoolean();
        addShutdownHookToSaveState();
    }

    public String id() {
        return this.id;
    }

    public synchronized PublishResult publish(String str) {
        if (str == null) {
            throw new NullPointerException("Message cannot be null");
        }
        Session currentSession = getCurrentSession();
        if (currentSession == null) {
            return PublishResult.SessionNotOpen;
        }
        byte[] bytes = str.getBytes();
        if (bytes.length > this.maxBytesPerSession) {
            Logger.warn("Message is too long for this session's size limit: size: " + bytes.length + " bytes vs max: " + this.maxBytesPerSession + " bytes");
            return PublishResult.MessageTooLong;
        }
        if (currentSession.byteCount() + bytes.length > this.maxBytesPerSession) {
            closeSession();
            currentSession = getCurrentSession();
        }
        return currentSession.write(bytes) ? PublishResult.Ok : PublishResult.Error;
    }

    public boolean start() {
        if (!this.service.start(this)) {
            return false;
        }
        Logger.info("FileSessionManager " + this.id + " started");
        return true;
    }

    @Override // io.intino.alexandria.fsm.StatefulScheduledService.Task
    void onUpdate() {
        if (validateMailboxOwnership()) {
            consumeMessages();
        }
    }

    @Override // io.intino.alexandria.fsm.StatefulScheduledService.Task
    void onFinally() {
        if (shouldCloseSession()) {
            closeSession();
        }
        if (shouldCleanMailbox()) {
            cleanMailbox();
        }
    }

    private void cleanMailbox() {
        try {
            this.cleanMailboxLastTime = Instant.now();
            this.cleanerThread.submit(this::doCleanMailbox);
        } catch (Exception e) {
        }
    }

    private boolean shouldCleanMailbox() {
        return this.cleanMailboxLastTime == null || timeout(this.cleanMailboxLastTime, CLEAN_MAILBOX_MAX_TIME);
    }

    private void doCleanMailbox() {
        try {
            if (this.cleanerIsRunning.compareAndSet(false, true)) {
                MailboxCleaner.clean(this.inputMailbox, this.processedFilesMaxAge);
            }
        } finally {
            this.cleanerIsRunning.set(false);
        }
    }

    private void consumeMessages() {
        List<SessionMessageFile> listProcessingMessages = this.inputMailbox.listProcessingMessages();
        List<SessionMessageFile> listPendingMessages = this.inputMailbox.listPendingMessages();
        consumeMessages(listProcessingMessages, SessionMessagePipeline.Stage.OnProcessing, SessionMessagePipeline.Stage.OnProcessed);
        consumeMessages(listPendingMessages, new SessionMessagePipeline.Stage[0]);
        if (this.service.state() == StatefulScheduledService.State.Running) {
            logProgressInLockFile(listProcessingMessages.size() + listPendingMessages.size());
        }
    }

    private void logProgressInLockFile(int i) {
        this.lockFile.write("Sleeping (next iteration begins in " + this.service.period() + ")\nMessage Files Consumed = " + i + "\n");
    }

    private void consumeMessages(List<SessionMessageFile> list, SessionMessagePipeline.Stage... stageArr) {
        for (SessionMessageFile sessionMessageFile : list) {
            if (this.service.state() != StatefulScheduledService.State.Running) {
                return;
            }
            this.lockFile.write("Consuming " + sessionMessageFile);
            executeMessagePipeline(sessionMessageFile, stageArr);
        }
    }

    private void executeMessagePipeline(SessionMessageFile sessionMessageFile, SessionMessagePipeline.Stage... stageArr) {
        try {
            try {
                this.executingMessagePipeline.set(true);
                this.messagePipeline.execute(sessionMessageFile, this, stageArr);
                this.executingMessagePipeline.set(false);
                if (this.pauseFuture != null) {
                    this.pauseFuture.complete(Instant.now());
                }
            } catch (Throwable th) {
                try {
                    this.messagePipeline.onPipelineError(th, sessionMessageFile, this);
                } catch (Throwable th2) {
                    Logger.error(th);
                }
                this.executingMessagePipeline.set(false);
                if (this.pauseFuture != null) {
                    this.pauseFuture.complete(Instant.now());
                }
            }
        } catch (Throwable th3) {
            this.executingMessagePipeline.set(false);
            if (this.pauseFuture != null) {
                this.pauseFuture.complete(Instant.now());
            }
            throw th3;
        }
    }

    private boolean validateMailboxOwnership() {
        try {
            this.lockFile.validate();
            return true;
        } catch (LockFile.LockFileException e) {
            Logger.error(getLockValidationErrorMessage(e));
            return waitForRelease();
        }
    }

    private boolean waitForRelease() {
        boolean waitForRelease = this.lockFile.waitForRelease(this.lockTimeout);
        if (!waitForRelease) {
            Logger.warn(this.id + ": " + this.inputMailbox.root().getPath() + " was not released after the established timeout. This FSM will cancel its consumer routine.");
            cancel();
        }
        return waitForRelease;
    }

    private String getLockValidationErrorMessage(LockFile.LockFileException lockFileException) {
        return lockFileException.getMessage() + ". Waiting until lock is released " + (this.lockTimeout != null ? "(timeout=" + this.lockTimeout + ")" : "") + ". If not released, this FSM will cancel execution.";
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public IndexFile createIndexFile(SessionMessageFile sessionMessageFile) {
        IndexFile indexFile = new IndexFile(this.inputMailbox, sessionMessageFile);
        this.currentIndexFile = indexFile;
        return indexFile;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ErrorFile createErrorFile() {
        ErrorFile errorFile = new ErrorFile(this.inputMailbox);
        this.currentErrorFile = errorFile;
        return errorFile;
    }

    public Future<Instant> pause() {
        if (state() == StatefulScheduledService.State.Paused) {
            return this.pauseFuture != null ? this.pauseFuture : CompletableFuture.completedFuture(Instant.now());
        }
        if (!this.service.pause()) {
            return CompletableFuture.completedFuture(null);
        }
        this.lockFile.write("Paused");
        if (!this.executingMessagePipeline.get()) {
            return CompletableFuture.completedFuture(Instant.now());
        }
        CompletableFuture<Instant> completableFuture = new CompletableFuture<>();
        this.pauseFuture = completableFuture;
        return completableFuture;
    }

    public boolean resume() {
        stopPendingTasks();
        if (!this.service.resume()) {
            return false;
        }
        this.lockFile.write("Resumed");
        return true;
    }

    public void cancel() {
        stopPendingTasks();
        Logger.info("Cancelling FileSessionManager " + this.id);
        this.service.cancel();
        this.lockFile.delete();
        Logger.info("FileSessionManager " + this.id + " cancelled");
    }

    public void terminate() {
        terminate(DEFAULT_TIMEOUT, DEFAULT_TIMEOUT_UNIT);
    }

    public void terminate(long j, TimeUnit timeUnit) {
        stopPendingTasks();
        Logger.info("Terminating FileSessionManager " + this.id);
        this.service.stop(j, timeUnit);
        this.lockFile.delete();
        Logger.info("FileSessionManager " + this.id + " terminated");
    }

    private void stopPendingTasks() {
        if (this.pauseFuture != null) {
            this.pauseFuture.complete(null);
            this.pauseFuture = null;
        }
        Logger.debug("Waiting for cleaner thread to finish (1 hour timeout)...");
        this.cleanerThread.shutdown();
        try {
            this.cleanerThread.awaitTermination(DEFAULT_TIMEOUT, TimeUnit.HOURS);
        } catch (InterruptedException e) {
            Logger.error(e);
        }
        this.cleanerIsRunning.set(false);
    }

    private boolean shouldCloseSession() {
        if (this.session == null) {
            return false;
        }
        if (state() == StatefulScheduledService.State.Cancelled || state() == StatefulScheduledService.State.Terminated || this.session.byteCount() >= this.maxBytesPerSession) {
            return true;
        }
        return this.sessionTimeout != null && sessionTimeout();
    }

    private boolean sessionTimeout() {
        return timeout(this.session.creationTime(), this.sessionTimeout);
    }

    private boolean timeout(Instant instant, TimePeriod timePeriod) {
        return Math.abs(timePeriod.temporalUnit().between(instant, Instant.now())) >= timePeriod.amount();
    }

    private synchronized void closeSession() {
        if (this.session == null) {
            return;
        }
        this.session.close();
        File file = this.session.file();
        try {
            Files.move(file.toPath(), new File(file.getAbsolutePath().replace(TEMP_EXTENSION, "")).toPath(), new CopyOption[0]);
        } catch (IOException e) {
            Logger.error(e);
        }
        this.session = null;
    }

    private Session getCurrentSession() {
        if (this.session != null) {
            return this.session;
        }
        try {
            this.session = new Session(SessionHelper.newTempSessionFile(this.outputMailbox.pending(), LocalDateTime.now(), this.id));
            return this.session;
        } catch (IOException e) {
            Logger.error(e);
            return null;
        }
    }

    public StatefulScheduledService.State state() {
        return this.service.state();
    }

    public Mailbox inputMailbox() {
        return this.inputMailbox;
    }

    public Mailbox outputMailbox() {
        return this.outputMailbox;
    }

    private void addShutdownHookToSaveState() {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            Logger.trace(this.id + ": Closing index file and error files in shutdown-hook...");
            if (this.currentIndexFile != null) {
                this.currentIndexFile.save();
            }
            if (this.currentErrorFile != null) {
                this.currentErrorFile.close();
            }
        }, this.id + "_state_saver"));
    }
}
