/*
 * Decompiled with CFR 0.152.
 */
package com.github.sonus21.rqueue.listener;

import com.github.sonus21.rqueue.core.LockManager;
import com.github.sonus21.rqueue.core.RqueueMessage;
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
import com.github.sonus21.rqueue.core.StringMessageTemplate;
import com.github.sonus21.rqueue.listener.ConsumerQueueDetail;
import com.github.sonus21.rqueue.listener.MappingInformation;
import com.github.sonus21.rqueue.listener.MessageExecutor;
import com.github.sonus21.rqueue.listener.RqueueMessageHandler;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

public class RqueueMessageListenerContainer
implements InitializingBean,
DisposableBean,
SmartLifecycle,
BeanNameAware {
    private static final String DEFAULT_THREAD_NAME_PREFIX = ClassUtils.getShortName(RqueueMessageListenerContainer.class);
    private static final int DEFAULT_WORKER_COUNT_PER_QUEUE = 2;
    private static Logger logger = LoggerFactory.getLogger(RqueueMessageListenerContainer.class);
    private final Object lifecycleMgr = new Object();
    private Integer maxNumWorkers;
    private String beanName;
    private RqueueMessageTemplate rqueueMessageTemplate;
    private RqueueMessageHandler rqueueMessageHandler;
    private boolean defaultTaskExecutor = false;
    private AsyncTaskExecutor taskExecutor;
    private AsyncTaskExecutor spinningTaskExecutor;
    private boolean active = false;
    private boolean autoStartup = true;
    private Map<String, ConsumerQueueDetail> registeredQueues = new ConcurrentHashMap<String, ConsumerQueueDetail>();
    private Map<String, Boolean> queueRunningState;
    private ConcurrentHashMap<String, Future<?>> scheduledFutureByQueue = new ConcurrentHashMap();
    private LockManager lockManager;
    private boolean running = false;
    private long backOffTime = 1000L;
    private long maxWorkerWaitTime = 200000L;
    private long delayedQueueSleepTime = 100L;
    private int phase = Integer.MAX_VALUE;

    public long getDelayedQueueSleepTime() {
        return this.delayedQueueSleepTime;
    }

    public void setDelayedQueueSleepTime(long delayedQueueSleepTime) {
        this.delayedQueueSleepTime = delayedQueueSleepTime;
    }

    public RqueueMessageListenerContainer(RqueueMessageHandler rqueueMessageHandler, RqueueMessageTemplate rqueueMessageTemplate, StringMessageTemplate stringMessageTemplate) {
        Assert.notNull((Object)((Object)rqueueMessageHandler), (String)"rqueueMessageHandler can not be null");
        Assert.notNull((Object)rqueueMessageTemplate, (String)"rqueueMessageTemplate can not be null");
        Assert.notNull((Object)stringMessageTemplate, (String)"stringMessageTemplate can not be null");
        this.rqueueMessageHandler = rqueueMessageHandler;
        this.rqueueMessageTemplate = rqueueMessageTemplate;
        this.lockManager = new LockManager(stringMessageTemplate);
    }

    public void setMaxWorkerWaitTime(long stopTime) {
        this.maxWorkerWaitTime = stopTime;
    }

    public long getMaxWorkerWaitTime() {
        return this.maxWorkerWaitTime;
    }

    public String getBeanName() {
        return this.beanName;
    }

    public void setBeanName(String name) {
        this.beanName = name;
    }

    public RqueueMessageHandler getRqueueMessageHandler() {
        return this.rqueueMessageHandler;
    }

    public Integer getMaxNumWorkers() {
        return this.maxNumWorkers;
    }

    public void setMaxNumWorkers(int maxNumWorkers) {
        this.maxNumWorkers = maxNumWorkers;
    }

    public void setBackOffTime(long backOffTime) {
        this.backOffTime = backOffTime;
    }

    public long getBackoffTime() {
        return this.backOffTime;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void destroy() throws Exception {
        Object object = this.lifecycleMgr;
        synchronized (object) {
            this.stop();
            this.active = false;
            this.doDestroy();
        }
    }

    protected void doDestroy() {
        if (this.defaultTaskExecutor && this.taskExecutor != null) {
            ((ThreadPoolTaskExecutor)this.taskExecutor).destroy();
        }
        if (this.spinningTaskExecutor != null) {
            ((ThreadPoolTaskExecutor)this.spinningTaskExecutor).destroy();
        }
    }

    public int getPhase() {
        return this.phase;
    }

    public void setPhase(int phase) {
        this.phase = phase;
    }

    public boolean isAutoStartup() {
        return this.autoStartup;
    }

    public void setAutoStartup(boolean autoStartup) {
        this.autoStartup = autoStartup;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void afterPropertiesSet() throws Exception {
        Object object = this.lifecycleMgr;
        synchronized (object) {
            for (MappingInformation mappingInformation : this.rqueueMessageHandler.getHandlerMethods().keySet()) {
                for (String queue : mappingInformation.getQueueNames()) {
                    ConsumerQueueDetail consumerQueueDetail = this.getConsumerQueueDetail(queue, mappingInformation);
                    this.registeredQueues.put(queue, consumerQueueDetail);
                }
            }
            this.active = true;
            this.lifecycleMgr.notifyAll();
        }
        if (this.taskExecutor == null) {
            this.defaultTaskExecutor = true;
            this.taskExecutor = this.createDefaultTaskExecutor();
        } else {
            this.spinningTaskExecutor = this.createSpinningTaskExecutor();
        }
        this.initializeRunningQueueState();
    }

    protected AsyncTaskExecutor getSpinningTaskExecutor() {
        return this.spinningTaskExecutor;
    }

    private AsyncTaskExecutor createSpinningTaskExecutor() {
        String beanName = this.getBeanName();
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setThreadNamePrefix(beanName != null ? beanName + "-" : DEFAULT_THREAD_NAME_PREFIX);
        int messageMoverThreadsCount = this.getDelayedQueueCounts();
        int messagePullerThreadsCount = this.getRegisteredQueues().size();
        int spinningThreads = messageMoverThreadsCount + messagePullerThreadsCount;
        if (spinningThreads > 0) {
            threadPoolTaskExecutor.setCorePoolSize(spinningThreads);
            threadPoolTaskExecutor.setMaxPoolSize(2 * spinningThreads);
        }
        threadPoolTaskExecutor.setQueueCapacity(0);
        threadPoolTaskExecutor.afterPropertiesSet();
        return threadPoolTaskExecutor;
    }

    private Map<String, ConsumerQueueDetail> getRegisteredQueues() {
        return Collections.unmodifiableMap(this.registeredQueues);
    }

    private void initializeRunningQueueState() {
        this.queueRunningState = new ConcurrentHashMap<String, Boolean>(this.getRegisteredQueues().size());
        for (String queue : this.registeredQueues.keySet()) {
            this.queueRunningState.put(queue, false);
        }
    }

    private int getDelayedQueueCounts() {
        return (int)this.getRegisteredQueues().values().stream().filter(ConsumerQueueDetail::isDelayedQueue).count();
    }

    public AsyncTaskExecutor createDefaultTaskExecutor() {
        String beanName = this.getBeanName();
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setThreadNamePrefix(beanName != null ? beanName + "-" : DEFAULT_THREAD_NAME_PREFIX);
        int messageMoverThreadsCount = this.getDelayedQueueCounts();
        int messagePullerThreadsCount = this.getRegisteredQueues().size();
        int spinningThreads = messageMoverThreadsCount + messagePullerThreadsCount;
        if (spinningThreads > 0) {
            threadPoolTaskExecutor.setCorePoolSize(spinningThreads + messagePullerThreadsCount);
            int maxWorkers = this.getMaxNumWorkers() == null ? messagePullerThreadsCount * 2 : this.getMaxNumWorkers();
            threadPoolTaskExecutor.setMaxPoolSize(spinningThreads + maxWorkers);
        }
        threadPoolTaskExecutor.setQueueCapacity(0);
        threadPoolTaskExecutor.afterPropertiesSet();
        return threadPoolTaskExecutor;
    }

    private ConsumerQueueDetail getConsumerQueueDetail(String queue, MappingInformation mappingInformation) {
        return new ConsumerQueueDetail(queue, mappingInformation.getNumRetries(), mappingInformation.getDeadLaterQueueName(), mappingInformation.isDelayedQueue());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        logger.info("Starting Rqueue Message container");
        Object object = this.lifecycleMgr;
        synchronized (object) {
            this.running = true;
            this.lifecycleMgr.notifyAll();
        }
        this.doStart();
    }

    protected void doStart() {
        for (Map.Entry<String, ConsumerQueueDetail> registeredQueue : this.getRegisteredQueues().entrySet()) {
            this.startQueue(registeredQueue.getKey(), registeredQueue.getValue());
        }
    }

    private void startQueue(String queueName, ConsumerQueueDetail queueDetail) {
        if (this.queueRunningState.containsKey(queueName) && this.queueRunningState.get(queueName).booleanValue()) {
            return;
        }
        this.queueRunningState.put(queueName, true);
        AsynchronousMessageListener messageListener = new AsynchronousMessageListener(queueName, queueDetail);
        Future future = this.spinningTaskExecutor == null ? this.getTaskExecutor().submit((Runnable)messageListener) : this.spinningTaskExecutor.submit((Runnable)messageListener);
        this.scheduledFutureByQueue.put(queueName, future);
        if (queueDetail.isDelayedQueue()) {
            AsynchronousMessageMover messageMover = new AsynchronousMessageMover(queueDetail);
            future = this.spinningTaskExecutor == null ? this.getTaskExecutor().submit((Runnable)messageMover) : this.spinningTaskExecutor.submit((Runnable)messageMover);
            this.scheduledFutureByQueue.put(queueDetail.getZsetName(), future);
        }
    }

    private boolean isQueueActive(String queueName) {
        return this.queueRunningState.getOrDefault(queueName, false);
    }

    public AsyncTaskExecutor getTaskExecutor() {
        return this.taskExecutor;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() {
        logger.info("Stopping Rqueue Message container");
        Object object = this.lifecycleMgr;
        synchronized (object) {
            this.running = false;
            this.lifecycleMgr.notifyAll();
        }
        this.doStop();
    }

    protected void doStop() {
        for (Map.Entry<String, Boolean> runningStateByQueue : this.queueRunningState.entrySet()) {
            if (!runningStateByQueue.getValue().booleanValue()) continue;
            this.stopQueue(runningStateByQueue.getKey());
        }
        this.waitForRunningQueuesToStop();
    }

    private void waitForRunningQueuesToStop() {
        for (Map.Entry<String, Boolean> queueRunningState : this.queueRunningState.entrySet()) {
            Future<?> messageMoverThread;
            String queueName = queueRunningState.getKey();
            ConsumerQueueDetail queueDetail = this.registeredQueues.get(queueName);
            Future<?> queueSpinningThread = this.scheduledFutureByQueue.get(queueName);
            if (queueSpinningThread != null) {
                try {
                    queueSpinningThread.get(this.getMaxWorkerWaitTime(), TimeUnit.MILLISECONDS);
                }
                catch (ExecutionException | TimeoutException e) {
                    logger.warn("An exception occurred while stopping queue '{}'", (Object)queueName, (Object)e);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            if (!queueDetail.isDelayedQueue() || (messageMoverThread = this.scheduledFutureByQueue.get(queueDetail.getZsetName())) == null) continue;
            try {
                messageMoverThread.get(this.getMaxWorkerWaitTime(), TimeUnit.MILLISECONDS);
            }
            catch (ExecutionException | TimeoutException e) {
                logger.warn("An exception occurred while stopping queue '{}'", (Object)queueName, (Object)e);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    private void stopQueue(String queueName) {
        Assert.isTrue((boolean)this.queueRunningState.containsKey(queueName), (String)("Queue with name '" + queueName + "' does not exist"));
        this.queueRunningState.put(queueName, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isRunning() {
        Object object = this.lifecycleMgr;
        synchronized (object) {
            return this.running;
        }
    }

    public void setTaskExecutor(AsyncTaskExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    private class AsynchronousMessageListener
    implements Runnable {
        private final String queueName;
        private final ConsumerQueueDetail queueDetail;

        private RqueueMessage getMessage() {
            return RqueueMessageListenerContainer.this.rqueueMessageTemplate.lpop(this.queueName);
        }

        AsynchronousMessageListener(String queueName, ConsumerQueueDetail value) {
            this.queueName = queueName;
            this.queueDetail = value;
        }

        @Override
        public void run() {
            while (RqueueMessageListenerContainer.this.isQueueActive(this.queueName)) {
                try {
                    RqueueMessage message = this.getMessage();
                    if (message == null) continue;
                    if (RqueueMessageListenerContainer.this.isQueueActive(this.queueName)) {
                        RqueueMessageListenerContainer.this.getTaskExecutor().submit((Runnable)new MessageExecutor(message, this.queueDetail, RqueueMessageListenerContainer.this.getRqueueMessageHandler(), RqueueMessageListenerContainer.this.rqueueMessageTemplate, logger));
                        continue;
                    }
                    RqueueMessageListenerContainer.this.rqueueMessageTemplate.add(this.queueName, message);
                }
                catch (Exception e) {
                    logger.warn("Message listener failed for queue {}, it will be retried in {} Ms", new Object[]{this.queueName, RqueueMessageListenerContainer.this.getBackoffTime(), e});
                    try {
                        Thread.sleep(RqueueMessageListenerContainer.this.getBackoffTime());
                    }
                    catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
                }
            }
        }
    }

    private class AsynchronousMessageMover
    implements Runnable {
        private final ConsumerQueueDetail queueDetail;

        AsynchronousMessageMover(ConsumerQueueDetail consumerQueueDetail) {
            this.queueDetail = consumerQueueDetail;
        }

        @Override
        public void run() {
            while (RqueueMessageListenerContainer.this.isQueueActive(this.queueDetail.getQueueName())) {
                try {
                    RqueueMessage rqueueMessage = RqueueMessageListenerContainer.this.rqueueMessageTemplate.getFirstFromZset(this.queueDetail.getZsetName());
                    if (rqueueMessage != null && rqueueMessage.getProcessAt() < System.currentTimeMillis()) {
                        if (RqueueMessageListenerContainer.this.lockManager.acquireLock(rqueueMessage.getId())) {
                            RqueueMessageListenerContainer.this.rqueueMessageTemplate.add(this.queueDetail.getQueueName(), rqueueMessage);
                            RqueueMessageListenerContainer.this.rqueueMessageTemplate.removeFromZset(this.queueDetail.getZsetName(), rqueueMessage);
                        }
                        RqueueMessageListenerContainer.this.lockManager.releaseLock(rqueueMessage.getId());
                        continue;
                    }
                    try {
                        Thread.sleep(RqueueMessageListenerContainer.this.getDelayedQueueSleepTime());
                    }
                    catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
                }
                catch (Exception e) {
                    logger.warn("Message mover failed for queue {}, it will be retried in {} Ms", new Object[]{this.queueDetail.getQueueName(), RqueueMessageListenerContainer.this.getBackoffTime(), e});
                    try {
                        Thread.sleep(RqueueMessageListenerContainer.this.getBackoffTime());
                    }
                    catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
                }
            }
        }
    }
}

