/*
 * Decompiled with CFR 0.152.
 */
package io.intino.alexandria.fsm;

import io.intino.alexandria.fsm.IndexFile;
import io.intino.alexandria.fsm.LockFile;
import io.intino.alexandria.fsm.Mailbox;
import io.intino.alexandria.fsm.MailboxCleaner;
import io.intino.alexandria.fsm.Session;
import io.intino.alexandria.fsm.SessionHelper;
import io.intino.alexandria.fsm.SessionMessageFile;
import io.intino.alexandria.fsm.SessionMessagePipeline;
import io.intino.alexandria.fsm.StatefulScheduledService;
import io.intino.alexandria.fsm.TimePeriod;
import io.intino.alexandria.logger.Logger;
import java.io.File;
import java.io.IOException;
import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class FileSessionManager
extends StatefulScheduledService.Task {
    private static final long KB = 0x100000L;
    private static final long MB = 0x40000000L;
    public static final String MESSAGE_EXTENSION = ".session";
    public static final String ERROR_EXTENSION = ".errors";
    public static final String TEMP_EXTENSION = ".temp";
    private static final long DEFAULT_TIMEOUT = 1L;
    private static final TimeUnit DEFAULT_TIMEOUT_UNIT = TimeUnit.DAYS;
    public static final long MIN_BYTES_PER_SESSION = 0x100000L;
    public static final TimePeriod DEFAULT_MAX_PROCESSED_FILES_AGE = new TimePeriod(60L, TimeUnit.DAYS);
    private final String id;
    private final Mailbox inputMailbox;
    private final Mailbox outputMailbox;
    private final StatefulScheduledService service;
    private final long maxBytesPerSession;
    private final TimePeriod sessionTimeout;
    private final TimePeriod lockTimeout;
    private final TimePeriod processedFilesMaxAge;
    private final SessionMessagePipeline messagePipeline;
    private final LockFile lockFile;
    private volatile Session session;
    private final AtomicBoolean executingMessagePipeline;
    private CompletableFuture<Instant> pauseFuture;
    private volatile IndexFile currentIndexFile;

    public FileSessionManager(String id, Mailbox inputMailbox, Mailbox outputMailbox, TimePeriod readInputMailboxRate, long maxBytesPerSession, TimePeriod sessionTimeout, TimePeriod lockTimeout, TimePeriod processedFilesMaxAge, SessionMessagePipeline messagePipeline) {
        this.id = id;
        this.inputMailbox = Objects.requireNonNull(inputMailbox);
        this.outputMailbox = Objects.requireNonNull(outputMailbox);
        this.lockTimeout = lockTimeout;
        this.processedFilesMaxAge = processedFilesMaxAge;
        if (inputMailbox.equals(outputMailbox)) {
            throw new IllegalArgumentException("Input and output mailbox cannot be the same");
        }
        this.service = new StatefulScheduledService(readInputMailboxRate);
        if (maxBytesPerSession <= 0L) {
            throw new IllegalArgumentException("maxBytesPerSession must be > 0");
        }
        this.maxBytesPerSession = maxBytesPerSession;
        this.sessionTimeout = sessionTimeout;
        this.messagePipeline = Objects.requireNonNull(messagePipeline);
        this.lockFile = new LockFile(this);
        this.executingMessagePipeline = new AtomicBoolean(false);
        this.addShutdownHookToSaveCurrentIndexFile();
    }

    public synchronized PublishResult publish(String message) {
        if (message == null) {
            throw new NullPointerException("Message cannot be null");
        }
        Session session = this.getCurrentSession();
        if (session == null) {
            return PublishResult.SessionNotOpen;
        }
        byte[] bytes = message.concat("\n").getBytes();
        if ((long)bytes.length > this.maxBytesPerSession) {
            Logger.warn((String)("Message is too long for this session's size limit: size: " + bytes.length + " bytes vs max: " + this.maxBytesPerSession + " bytes"));
            return PublishResult.MessageTooLong;
        }
        if ((long)(session.byteCount() + bytes.length) > this.maxBytesPerSession) {
            this.closeSession();
            session = this.getCurrentSession();
        }
        return session.write(bytes) ? PublishResult.Ok : PublishResult.Error;
    }

    public boolean start() {
        if (this.service.start(this)) {
            Logger.info((String)("FileSessionManager " + this.id + " started"));
            return true;
        }
        return false;
    }

    @Override
    void onUpdate() {
        if (!this.validateMailboxOwnership()) {
            return;
        }
        this.consumeMessages();
    }

    @Override
    void onFinally() {
        if (this.shouldCloseSession()) {
            this.closeSession();
        }
        MailboxCleaner.clean(this.inputMailbox, this.processedFilesMaxAge);
    }

    private void consumeMessages() {
        List<SessionMessageFile> processingMessages = this.inputMailbox.listProcessingMessages();
        List<SessionMessageFile> pendingMessages = this.inputMailbox.listPendingMessages();
        this.consumeMessages(processingMessages, SessionMessagePipeline.Stage.OnProcessing, SessionMessagePipeline.Stage.OnProcessed);
        this.consumeMessages(pendingMessages, new SessionMessagePipeline.Stage[0]);
        if (this.service.state() != StatefulScheduledService.State.Paused) {
            List<SessionMessageFile> messages = Stream.concat(processingMessages.stream(), pendingMessages.stream()).collect(Collectors.toList());
            this.logProgressInLockFile(messages);
        }
    }

    private void logProgressInLockFile(List<SessionMessageFile> messages) {
        this.lockFile.write("Sleeping (next iteration begins in " + this.service.period() + ")\nMessage Files Consumed (" + messages.size() + "):\n\t" + messages.stream().map(SessionMessageFile::toString).collect(Collectors.joining("\n\t")));
    }

    private void consumeMessages(List<SessionMessageFile> files, SessionMessagePipeline.Stage ... stages) {
        for (SessionMessageFile messageFile : files) {
            if (this.service.state() != StatefulScheduledService.State.Running) {
                return;
            }
            this.lockFile.write("Consuming " + messageFile);
            this.executeMessagePipeline(messageFile, stages);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void executeMessagePipeline(SessionMessageFile messageFile, SessionMessagePipeline.Stage ... stages) {
        try {
            this.executingMessagePipeline.set(true);
            this.messagePipeline.execute(messageFile, this, stages);
        }
        catch (Throwable e) {
            try {
                this.messagePipeline.onPipelineError(e, messageFile, this);
            }
            catch (Throwable ex) {
                Logger.error((Throwable)e);
            }
        }
        finally {
            this.executingMessagePipeline.set(false);
            if (this.pauseFuture != null) {
                this.pauseFuture.complete(Instant.now());
            }
        }
    }

    private boolean validateMailboxOwnership() {
        try {
            this.lockFile.validate();
            return true;
        }
        catch (LockFile.LockFileException e) {
            Logger.error((String)this.getLockValidationErrorMessage(e));
            return this.waitForRelease();
        }
    }

    private boolean waitForRelease() {
        boolean mailboxWasReleased = this.lockFile.waitForRelease(this.lockTimeout);
        if (!mailboxWasReleased) {
            Logger.warn((String)(this.id + ": " + this.inputMailbox.root().getPath() + " was not released after the established timeout. This FSM will cancel its consumer routine."));
            this.cancel();
        }
        return mailboxWasReleased;
    }

    private String getLockValidationErrorMessage(LockFile.LockFileException e) {
        return e.getMessage() + ". Waiting until lock is released " + (String)(this.lockTimeout != null ? "(timeout=" + this.lockTimeout + ")" : "") + ". If not released, this FSM will cancel execution.";
    }

    IndexFile createIndexFile(SessionMessageFile messageFile) {
        this.currentIndexFile = new IndexFile(this.inputMailbox, messageFile);
        return this.currentIndexFile;
    }

    public Future<Instant> pause() {
        CompletableFuture<Instant> completableFuture;
        if (this.state() == StatefulScheduledService.State.Paused) {
            return this.pauseFuture != null ? this.pauseFuture : CompletableFuture.completedFuture(Instant.now());
        }
        if (!this.service.pause()) {
            return CompletableFuture.completedFuture(null);
        }
        this.lockFile.write("Paused");
        if (!this.executingMessagePipeline.get()) {
            completableFuture = CompletableFuture.completedFuture(Instant.now());
        } else {
            this.pauseFuture = new CompletableFuture();
            completableFuture = this.pauseFuture;
        }
        return completableFuture;
    }

    public boolean resume() {
        this.cancelPauseFuture();
        if (!this.service.resume()) {
            return false;
        }
        this.lockFile.write("Resumed");
        return true;
    }

    public void cancel() {
        this.cancelPauseFuture();
        Logger.info((String)("Cancelling FileSessionManager " + this.id));
        this.service.cancel();
        this.lockFile.delete();
        Logger.info((String)("FileSessionManager " + this.id + " cancelled"));
    }

    public void terminate() {
        this.terminate(1L, DEFAULT_TIMEOUT_UNIT);
    }

    public void terminate(long timeout, TimeUnit timeUnit) {
        this.cancelPauseFuture();
        Logger.info((String)("Terminating FileSessionManager " + this.id));
        this.service.stop(timeout, timeUnit);
        this.lockFile.delete();
        Logger.info((String)("FileSessionManager " + this.id + " terminated"));
    }

    private void cancelPauseFuture() {
        if (this.pauseFuture != null) {
            this.pauseFuture.complete(null);
            this.pauseFuture = null;
        }
    }

    private boolean shouldCloseSession() {
        if (this.session == null) {
            return false;
        }
        if (this.state() == StatefulScheduledService.State.Cancelled || this.state() == StatefulScheduledService.State.Terminated) {
            return true;
        }
        if ((long)this.session.byteCount() >= this.maxBytesPerSession) {
            return true;
        }
        return this.session.lastWriting() != null && this.sessionTimeout != null && this.sessionTimeout();
    }

    private boolean sessionTimeout() {
        return !this.session.lastWriting().plus(this.sessionTimeout.amount(), this.sessionTimeout.temporalUnit()).isBefore(Instant.now());
    }

    private synchronized void closeSession() {
        if (this.session == null) {
            return;
        }
        this.session.close();
        File file = this.session.file();
        try {
            Files.move(file.toPath(), new File(file.getAbsolutePath().replace(TEMP_EXTENSION, "")).toPath(), new CopyOption[0]);
        }
        catch (IOException e) {
            Logger.error((Throwable)e);
        }
        this.session = null;
    }

    private Session getCurrentSession() {
        if (this.session != null) {
            return this.session;
        }
        try {
            this.session = new Session(SessionHelper.newTempSessionFile(this.outputMailbox.pending(), LocalDateTime.now(), this.id));
            return this.session;
        }
        catch (IOException e) {
            Logger.error((Throwable)e);
            return null;
        }
    }

    public String id() {
        return this.id;
    }

    public StatefulScheduledService.State state() {
        return this.service.state();
    }

    public Mailbox inputMailbox() {
        return this.inputMailbox;
    }

    public Mailbox outputMailbox() {
        return this.outputMailbox;
    }

    private void addShutdownHookToSaveCurrentIndexFile() {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            if (this.currentIndexFile != null) {
                this.currentIndexFile.save();
            }
        }, this.id + "_index_file_saver"));
    }

    @FunctionalInterface
    public static interface MessageBiConsumer {
        public void accept(String var1, FileSessionManager var2) throws Throwable;
    }

    @FunctionalInterface
    public static interface MessageConsumer {
        public void accept(String var1) throws Throwable;
    }

    public static class Builder {
        private String id = UUID.randomUUID().toString();
        private Mailbox input;
        private Mailbox output;
        private TimePeriod readInputMailboxRate = new TimePeriod(10L, TimeUnit.SECONDS);
        private long maxBytesPerSession = 0xC0000000L;
        private TimePeriod sessionTimeout = new TimePeriod(30L, TimeUnit.SECONDS);
        private TimePeriod processedFilesMaxAge = DEFAULT_MAX_PROCESSED_FILES_AGE;
        private TimePeriod lockTimeout = new TimePeriod(5L, TimeUnit.MINUTES);
        private SessionMessagePipeline messagePipeline;

        public Builder id(String id) {
            this.id = id;
            return this;
        }

        public Builder readsFrom(Mailbox mailbox) {
            this.input = mailbox;
            return this;
        }

        public Builder readsFrom(File mailbox) {
            return this.readsFrom(new Mailbox(mailbox));
        }

        public Builder writesTo(Mailbox mailbox) {
            this.output = mailbox;
            return this;
        }

        public Builder writesTo(File mailbox) {
            return this.writesTo(new Mailbox(mailbox));
        }

        public Builder atFixedRate(long period, TimeUnit timeUnit) {
            this.readInputMailboxRate = new TimePeriod(period, timeUnit);
            return this;
        }

        public Builder maxBytesPerSession(long bytes) {
            this.maxBytesPerSession = Math.max(bytes, 0x100000L);
            return this;
        }

        public Builder sessionTimeout(long period, TimeUnit timeUnit) {
            this.sessionTimeout = new TimePeriod(period, timeUnit);
            return this;
        }

        public Builder lockTimeout(long period, TimeUnit timeUnit) {
            this.lockTimeout = new TimePeriod(period, timeUnit);
            return this;
        }

        public Builder withoutLockTimeout() {
            this.lockTimeout = null;
            return this;
        }

        public Builder processedFilesMaxAge(TimePeriod processedFilesMaxAge) {
            this.processedFilesMaxAge = processedFilesMaxAge;
            return this;
        }

        public Builder setMessagePipeline(SessionMessagePipeline pipeline) {
            this.messagePipeline = Objects.requireNonNull(pipeline);
            return this;
        }

        public Builder onMessageProcess(final MessageConsumer consumer) {
            if (consumer == null) {
                return this;
            }
            return this.setMessagePipeline(new SessionMessagePipeline(){

                @Override
                protected void processMessage(String message, FileSessionManager fsm) throws Throwable {
                    consumer.accept(message);
                }
            });
        }

        public Builder onMessageProcess(final MessageBiConsumer consumer) {
            if (consumer == null) {
                return this;
            }
            return this.setMessagePipeline(new SessionMessagePipeline(){

                @Override
                protected void processMessage(String message, FileSessionManager fsm) throws Throwable {
                    consumer.accept(message, fsm);
                }
            });
        }

        public FileSessionManager build() {
            return new FileSessionManager(this.id, this.input, this.output, this.readInputMailboxRate, this.maxBytesPerSession, this.sessionTimeout, this.lockTimeout, this.processedFilesMaxAge, this.messagePipeline);
        }
    }

    public static enum PublishResult {
        Ok,
        MessageTooLong,
        SessionNotOpen,
        Error;

    }
}

