/*
 * Decompiled with CFR 0.152.
 */
package io.intino.konos.datalake.jms;

import io.intino.konos.datalake.Datalake;
import io.intino.konos.datalake.MessageHandler;
import io.intino.konos.datalake.MessageTranslator;
import io.intino.konos.datalake.jms.JMSDatalake;
import io.intino.konos.jms.Consumer;
import io.intino.konos.jms.TopicConsumer;
import io.intino.konos.jms.TopicProducer;
import io.intino.ness.inl.Message;
import javax.jms.JMSException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JMSTank
implements Datalake.Tank {
    private static Logger logger = LoggerFactory.getLogger(JMSTank.class);
    private String name;
    private JMSDatalake datalake;
    private TopicConsumer flow;
    private MessageHandler handler;

    public JMSTank(String name, JMSDatalake datalake) {
        this.name = name;
        this.datalake = datalake;
    }

    @Override
    public void handler(MessageHandler handler) {
        this.handler = handler;
    }

    @Override
    public void handle(Message message) {
        this.handler.handle(message);
    }

    @Override
    public String name() {
        return this.name;
    }

    @Override
    public String flowChannel() {
        return "flow." + this.name;
    }

    @Override
    public String dropChannel() {
        return "drop." + this.name;
    }

    @Override
    public String feedChannel() {
        return "feed." + this.name;
    }

    @Override
    public void feed(Message ... messages) {
        TopicProducer producer = this.datalake.newProducer(this.feedChannel());
        for (Message message : messages) {
            producer.produce(MessageTranslator.fromInlMessage(message));
        }
    }

    @Override
    public void drop(Message ... messages) {
        TopicProducer producer = this.datalake.newProducer(this.dropChannel());
        for (Message m : messages) {
            producer.produce(this.onlyRegister(MessageTranslator.fromInlMessage(m)));
        }
    }

    @Override
    public Datalake.Tank batchSession(int blockSize) {
        this.datalake.batch(this.name, blockSize);
        return this;
    }

    @Override
    public Datalake.Tank endBatch() {
        this.datalake.endBatch(this.name);
        return this;
    }

    @Override
    public TopicConsumer flow(String flowID) {
        if (this.datalake.session() == null) {
            logger.error("Session is null");
        }
        this.flow = new TopicConsumer(this.datalake.session(), this.flowChannel());
        if (flowID != null) {
            this.flow.listen((Consumer)this.handler, flowID);
        } else {
            this.flow.listen((Consumer)this.handler);
        }
        return this.flow;
    }

    @Override
    public void unregister() {
        if (this.flow != null) {
            this.flow.stop();
        }
        this.flow = null;
    }

    private javax.jms.Message onlyRegister(javax.jms.Message message) {
        try {
            message.setBooleanProperty("registerOnly", true);
        }
        catch (JMSException e) {
            logger.error(e.getMessage(), (Throwable)e);
        }
        return message;
    }
}

