package io.intino.konos.datalake.jms;

import io.intino.konos.datalake.Datalake;
import io.intino.konos.datalake.MessageHandler;
import io.intino.konos.datalake.MessageTranslator;
import io.intino.konos.jms.TopicConsumer;
import io.intino.konos.jms.TopicProducer;
import io.intino.ness.inl.Message;
import javax.jms.JMSException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/intino/konos/datalake/jms/JMSTank.class */
public class JMSTank implements Datalake.Tank {
    private static Logger logger = LoggerFactory.getLogger(JMSTank.class);
    private String name;
    private JMSDatalake datalake;
    private TopicConsumer flow;
    private MessageHandler handler;

    public JMSTank(String str, JMSDatalake jMSDatalake) {
        this.name = str;
        this.datalake = jMSDatalake;
    }

    @Override // io.intino.konos.datalake.Datalake.Tank
    public void handler(MessageHandler messageHandler) {
        this.handler = messageHandler;
    }

    @Override // io.intino.konos.datalake.Datalake.Tank
    public void handle(Message message) {
        this.handler.handle(message);
    }

    @Override // io.intino.konos.datalake.Datalake.Tank
    public String name() {
        return this.name;
    }

    @Override // io.intino.konos.datalake.Datalake.Tank
    public String flowChannel() {
        return "flow." + this.name;
    }

    @Override // io.intino.konos.datalake.Datalake.Tank
    public String putChannel() {
        return "put." + this.name;
    }

    @Override // io.intino.konos.datalake.Datalake.Tank
    public String feedChannel() {
        return "feed." + this.name;
    }

    @Override // io.intino.konos.datalake.Datalake.Tank
    public void feed(Message... messageArr) {
        TopicProducer newProducer = this.datalake.newProducer(feedChannel());
        for (Message message : messageArr) {
            newProducer.produce(MessageTranslator.fromInlMessage(message));
        }
    }

    @Override // io.intino.konos.datalake.Datalake.Tank
    public void put(Message... messageArr) {
        TopicProducer newProducer = this.datalake.newProducer(putChannel());
        for (Message message : messageArr) {
            newProducer.produce(onlyRegister(MessageTranslator.fromInlMessage(message)));
        }
    }

    @Override // io.intino.konos.datalake.Datalake.Tank
    public Datalake.Tank batchSession(int i) {
        this.datalake.batch(this.name, i);
        return this;
    }

    @Override // io.intino.konos.datalake.Datalake.Tank
    public Datalake.Tank endBatch() {
        this.datalake.endBatch(this.name);
        return this;
    }

    @Override // io.intino.konos.datalake.Datalake.Tank
    public TopicConsumer flow(String str) {
        if (this.datalake.session() == null) {
            logger.error("Session is null");
        }
        this.flow = new TopicConsumer(this.datalake.session(), flowChannel());
        if (str != null) {
            this.flow.listen(this.handler, str);
        } else {
            this.flow.listen(this.handler);
        }
        return this.flow;
    }

    @Override // io.intino.konos.datalake.Datalake.Tank
    public void unregister() {
        if (this.flow != null) {
            this.flow.stop();
        }
        this.flow = null;
    }

    private javax.jms.Message onlyRegister(javax.jms.Message message) {
        try {
            message.setBooleanProperty(Datalake.REGISTER_ONLY, true);
        } catch (JMSException e) {
            logger.error(e.getMessage(), e);
        }
        return message;
    }
}
