/*
 * Decompiled with CFR 0.152.
 */
package com.github.cafdataprocessing.utilities.queuehelper;

import com.github.cafdataprocessing.utilities.queuehelper.MessageHandler;
import com.github.cafdataprocessing.utilities.queuehelper.QueueConsumerImpl;
import com.github.cafdataprocessing.utilities.queuehelper.RabbitServices;
import com.google.common.base.Strings;
import com.hpe.caf.api.Codec;
import com.hpe.caf.api.CodecException;
import com.hpe.caf.api.worker.TaskMessage;
import com.hpe.caf.codec.JsonCodec;
import com.hpe.caf.configs.RabbitConfiguration;
import com.hpe.caf.util.rabbitmq.DefaultRabbitConsumer;
import com.hpe.caf.util.rabbitmq.QueueConsumer;
import com.hpe.caf.util.rabbitmq.RabbitUtil;
import com.hpe.caf.worker.queue.rabbit.RabbitWorkerQueueConfiguration;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.MessageProperties;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException;
import net.jodah.lyra.ConnectionOptions;
import net.jodah.lyra.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueueManager
implements Closeable {
    private static final Logger LOGGER = LoggerFactory.getLogger(QueueManager.class);
    private static QueueManager instance;
    private final DefaultRabbitConsumer consumer;
    private final Connection connection;
    private final Channel publisherChannel;
    private final Channel consumerChannel;
    private final RabbitServices services = RabbitServices.getInstance();
    private final List<String> consumerTags = new ArrayList<String>();
    private final Thread consumerThread;
    private final String publishQueue;
    private final List<String> consumeQueues;
    private final MessageHandler messageHandler = new MessageHandler();
    private final Codec codec = new JsonCodec();

    public static QueueManager getInstance() throws Exception {
        if (instance == null) {
            instance = new QueueManager();
        }
        return instance;
    }

    private QueueManager() throws Exception {
        int maxPriority;
        RabbitWorkerQueueConfiguration configuration = RabbitServices.getInstance().getRabbitQueueConfiguration();
        this.connection = QueueManager.createRabbitConnection(configuration);
        this.publishQueue = this.services.getRabbitProperties().getPublishQueue();
        this.consumeQueues = this.services.getRabbitProperties().getConsumeQueueNames();
        int n = maxPriority = this.services.getRabbitProperties().getMaxPriority() == null ? 0 : this.services.getRabbitProperties().getMaxPriority();
        if (Strings.isNullOrEmpty((String)this.publishQueue)) {
            LOGGER.debug("No RabbitMQ queue to publish to passed. Check your RabbitMQ properties if this is unexpected.");
            this.publisherChannel = null;
        } else {
            this.publisherChannel = this.connection.createChannel();
            RabbitUtil.declareWorkerQueue((Channel)this.publisherChannel, (String)this.publishQueue, (int)maxPriority);
        }
        if (this.consumeQueues == null || this.consumeQueues.isEmpty()) {
            LOGGER.debug("No RabbitMQ queues to consume message from set. Check your RabbitMQ properties if this is unexpected.");
            this.consumerChannel = null;
            this.consumer = null;
            this.consumerThread = null;
        } else {
            this.consumerChannel = this.connection.createChannel();
            for (String queue : this.consumeQueues) {
                RabbitUtil.declareWorkerQueue((Channel)this.consumerChannel, (String)queue, (int)maxPriority);
            }
            LinkedBlockingQueue conEvents = new LinkedBlockingQueue();
            this.consumer = new DefaultRabbitConsumer(conEvents, (QueueConsumer)new QueueConsumerImpl(this.consumerChannel, this.messageHandler, this.codec));
            for (String queue : this.consumeQueues) {
                this.consumerTags.add(this.consumerChannel.basicConsume(queue, true, (Consumer)this.consumer));
            }
            this.consumerThread = new Thread((Runnable)this.consumer);
            this.consumerThread.start();
        }
    }

    public void publishMessage(TaskMessage message) throws CodecException, IOException {
        if (Strings.isNullOrEmpty((String)this.publishQueue)) {
            LOGGER.warn("Attempted to publish message when publish queue is not set. Message not published.");
            return;
        }
        LOGGER.info("Publishing task message " + message.getTaskId());
        byte[] data = this.codec.serialise((Object)message);
        this.publisherChannel.basicPublish("", this.publishQueue, MessageProperties.TEXT_PLAIN, data);
    }

    @Override
    public void close() throws IOException {
        if (this.consumerTags != null) {
            for (String tag : this.consumerTags) {
                if (tag == null) continue;
                this.consumerChannel.basicCancel(tag);
            }
        }
        if (this.consumer != null) {
            this.consumer.shutdown();
        }
        if (this.consumerChannel != null) {
            try {
                this.consumerChannel.close();
            }
            catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
        if (this.publisherChannel != null) {
            try {
                this.publisherChannel.close();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
        if (this.connection != null) {
            try {
                this.connection.close();
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static Connection createRabbitConnection(RabbitWorkerQueueConfiguration configuration) throws IOException, TimeoutException {
        RabbitConfiguration rc = configuration.getRabbitConfiguration();
        ConnectionOptions lyraOpts = RabbitUtil.createLyraConnectionOptions((String)rc.getRabbitHost(), (int)rc.getRabbitPort(), (String)rc.getRabbitUser(), (String)rc.getRabbitPassword());
        Config lyraConfig = RabbitUtil.createLyraConfig((int)rc.getBackoffInterval(), (int)rc.getMaxBackoffInterval(), (int)rc.getMaxAttempts());
        Connection connection = RabbitUtil.createRabbitConnection((ConnectionOptions)lyraOpts, (Config)lyraConfig);
        return connection;
    }

    public MessageHandler getMessageHandler() {
        return this.messageHandler;
    }
}

