/*
 * Decompiled with CFR 0.152.
 */
package com.github.sonus21.rqueue.listener;

import com.github.sonus21.rqueue.config.RqueueConfig;
import com.github.sonus21.rqueue.config.RqueueWebConfig;
import com.github.sonus21.rqueue.core.EndpointRegistry;
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
import com.github.sonus21.rqueue.core.middleware.Middleware;
import com.github.sonus21.rqueue.core.support.MessageProcessor;
import com.github.sonus21.rqueue.dao.RqueueJobDao;
import com.github.sonus21.rqueue.dao.RqueueStringDao;
import com.github.sonus21.rqueue.dao.RqueueSystemConfigDao;
import com.github.sonus21.rqueue.listener.DefaultRqueuePoller;
import com.github.sonus21.rqueue.listener.MappingInformation;
import com.github.sonus21.rqueue.listener.MessageProcessorHandler;
import com.github.sonus21.rqueue.listener.PostProcessingHandler;
import com.github.sonus21.rqueue.listener.QueueDetail;
import com.github.sonus21.rqueue.listener.RqueueMessageHandler;
import com.github.sonus21.rqueue.listener.StrictPriorityPoller;
import com.github.sonus21.rqueue.listener.WeightedPriorityPoller;
import com.github.sonus21.rqueue.metrics.RqueueMetricsCounter;
import com.github.sonus21.rqueue.models.Concurrency;
import com.github.sonus21.rqueue.models.enums.PriorityMode;
import com.github.sonus21.rqueue.models.enums.RqueueMode;
import com.github.sonus21.rqueue.models.event.RqueueBootstrapEvent;
import com.github.sonus21.rqueue.utils.StringUtils;
import com.github.sonus21.rqueue.utils.ThreadUtils;
import com.github.sonus21.rqueue.utils.backoff.FixedTaskExecutionBackOff;
import com.github.sonus21.rqueue.utils.backoff.TaskExecutionBackOff;
import com.github.sonus21.rqueue.web.service.RqueueMessageMetadataService;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.stream.Collectors;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

public class RqueueMessageListenerContainer
implements InitializingBean,
DisposableBean,
SmartLifecycle,
BeanNameAware {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(RqueueMessageListenerContainer.class);
    private static final String DEFAULT_THREAD_NAME_PREFIX = ClassUtils.getShortName(RqueueMessageListenerContainer.class);
    private final Object lifecycleMgr = new Object();
    private final RqueueMessageTemplate rqueueMessageTemplate;
    private final RqueueMessageHandler rqueueMessageHandler;
    private MessageProcessor discardMessageProcessor;
    private MessageProcessor deadLetterQueueMessageProcessor;
    private MessageProcessor manualDeletionMessageProcessor;
    private MessageProcessor postExecutionMessageProcessor;
    private MessageProcessor preExecutionMessageProcessor;
    private TaskExecutionBackOff taskExecutionBackOff = new FixedTaskExecutionBackOff();
    private PostProcessingHandler postProcessingHandler;
    private final Map<String, ThreadUtils.QueueThread> queueThreadMap = new ConcurrentHashMap<String, ThreadUtils.QueueThread>();
    private final Map<String, Boolean> queueRunningState = new ConcurrentHashMap<String, Boolean>();
    private final ConcurrentHashMap<String, Future<?>> scheduledFutureByQueue = new ConcurrentHashMap();
    @Autowired(required=false)
    private RqueueMetricsCounter rqueueMetricsCounter;
    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;
    @Autowired
    private RqueueWebConfig rqueueWebConfig;
    @Autowired
    private RqueueConfig rqueueConfig;
    @Autowired
    private RqueueMessageMetadataService rqueueMessageMetadataService;
    @Autowired
    private RqueueSystemConfigDao rqueueSystemConfigDao;
    private AsyncTaskExecutor taskExecutor;
    @Autowired
    private RqueueJobDao rqueueJobDao;
    @Autowired
    private RqueueStringDao rqueueStringDao;
    private List<Middleware> middlewares;
    private Integer maxNumWorkers;
    private String beanName;
    private boolean defaultTaskExecutor = false;
    private boolean autoStartup = true;
    private boolean running = false;
    private long backOffTime = 5000L;
    private long maxWorkerWaitTime = 20000L;
    private long pollingInterval = 200L;
    private int phase = Integer.MAX_VALUE;
    private PriorityMode priorityMode;
    private Middleware middleware;

    public RqueueMessageListenerContainer(RqueueMessageHandler rqueueMessageHandler, RqueueMessageTemplate rqueueMessageTemplate) {
        Assert.notNull((Object)((Object)rqueueMessageHandler), (String)"rqueueMessageHandler cannot be null");
        Assert.notNull((Object)rqueueMessageTemplate, (String)"rqueueMessageTemplate cannot be null");
        this.rqueueMessageHandler = rqueueMessageHandler;
        this.rqueueMessageTemplate = rqueueMessageTemplate;
        this.deadLetterQueueMessageProcessor = this.discardMessageProcessor = new MessageProcessor(){};
        this.manualDeletionMessageProcessor = this.discardMessageProcessor;
        this.postExecutionMessageProcessor = this.discardMessageProcessor;
        this.preExecutionMessageProcessor = this.discardMessageProcessor;
    }

    public RqueueMessageTemplate getRqueueMessageTemplate() {
        return this.rqueueMessageTemplate;
    }

    public long getMaxWorkerWaitTime() {
        return this.maxWorkerWaitTime;
    }

    public void setMaxWorkerWaitTime(long stopTime) {
        this.maxWorkerWaitTime = stopTime;
    }

    public String getBeanName() {
        return this.beanName;
    }

    public void setBeanName(String name) {
        this.beanName = name;
    }

    public RqueueMessageHandler getRqueueMessageHandler() {
        return this.rqueueMessageHandler;
    }

    public Integer getMaxNumWorkers() {
        return this.maxNumWorkers;
    }

    public void setMaxNumWorkers(int maxNumWorkers) {
        this.maxNumWorkers = maxNumWorkers;
    }

    public long getBackOffTime() {
        return this.backOffTime;
    }

    public void setBackOffTime(long backOffTime) {
        this.backOffTime = backOffTime;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void destroy() throws Exception {
        Object object = this.lifecycleMgr;
        synchronized (object) {
            this.stop();
            this.doDestroy();
        }
    }

    protected void doDestroy() {
        ThreadPoolTaskExecutor executor;
        HashSet<String> destroyedExecutors = new HashSet<String>();
        for (Map.Entry<String, ThreadUtils.QueueThread> entry : this.queueThreadMap.entrySet()) {
            ThreadPoolTaskExecutor executor2;
            String name;
            ThreadUtils.QueueThread queueThread = entry.getValue();
            if (!queueThread.isDefaultExecutor() || destroyedExecutors.contains(name = (executor2 = (ThreadPoolTaskExecutor)queueThread.getTaskExecutor()).getThreadNamePrefix())) continue;
            destroyedExecutors.add(name);
            executor2.destroy();
        }
        if (this.defaultTaskExecutor && this.taskExecutor != null && !destroyedExecutors.contains((executor = (ThreadPoolTaskExecutor)this.taskExecutor).getThreadNamePrefix())) {
            executor.destroy();
        }
    }

    public int getPhase() {
        return this.phase;
    }

    public void setPhase(int phase) {
        this.phase = phase;
    }

    public boolean isAutoStartup() {
        return this.autoStartup;
    }

    public void setAutoStartup(boolean autoStartup) {
        this.autoStartup = autoStartup;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop(Runnable callback) {
        Object object = this.lifecycleMgr;
        synchronized (object) {
            this.stop();
            callback.run();
        }
    }

    private void initializeQueue() {
        EndpointRegistry.delete();
        for (MappingInformation mappingInformation : this.rqueueMessageHandler.getHandlerMethodMap().keySet()) {
            for (String queue : mappingInformation.getQueueNames()) {
                for (QueueDetail queueDetail : this.getQueueDetail(queue, mappingInformation)) {
                    EndpointRegistry.register(queueDetail);
                }
            }
        }
        List<QueueDetail> queueDetails = EndpointRegistry.getActiveQueueDetails();
        if (queueDetails.isEmpty()) {
            return;
        }
        if (this.taskExecutor == null) {
            this.defaultTaskExecutor = true;
            this.taskExecutor = this.createDefaultTaskExecutor(queueDetails);
        } else {
            this.initializeThreadMap(queueDetails, this.taskExecutor, false, queueDetails.size());
        }
        this.initializeRunningQueueState();
    }

    private void initialize() {
        this.initializeQueue();
        this.postProcessingHandler = new PostProcessingHandler(this.rqueueConfig, this.rqueueWebConfig, this.applicationEventPublisher, this.rqueueMessageTemplate, this.taskExecutionBackOff, new MessageProcessorHandler(this.manualDeletionMessageProcessor, this.deadLetterQueueMessageProcessor, this.discardMessageProcessor, this.postExecutionMessageProcessor), this.rqueueSystemConfigDao);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void afterPropertiesSet() throws Exception {
        Object object = this.lifecycleMgr;
        synchronized (object) {
            if (RqueueMode.PRODUCER.equals((Object)this.rqueueConfig.getMode())) {
                log.info("Producer mode nothing to do...");
            } else {
                this.initialize();
            }
            this.lifecycleMgr.notifyAll();
        }
    }

    private void initializeThreadMap(List<QueueDetail> queueDetails, AsyncTaskExecutor taskExecutor, boolean defaultExecutor, int workersCount) {
        Semaphore semaphore = new Semaphore(workersCount);
        for (QueueDetail queueDetail : queueDetails) {
            this.queueThreadMap.put(queueDetail.getName(), new ThreadUtils.QueueThread(defaultExecutor, taskExecutor, semaphore, workersCount));
        }
    }

    private void initializeRunningQueueState() {
        for (String queue : EndpointRegistry.getActiveQueues()) {
            this.queueRunningState.put(queue, false);
        }
    }

    private int getWorkersCount(int queueCount) {
        return this.maxNumWorkers == null ? queueCount * 2 : this.maxNumWorkers;
    }

    private AsyncTaskExecutor createTaskExecutor(int corePoolSize, int maxPoolSize) {
        String name = this.getBeanName();
        String prefix = name != null ? name + "-" : DEFAULT_THREAD_NAME_PREFIX;
        return ThreadUtils.createTaskExecutor(DEFAULT_THREAD_NAME_PREFIX, prefix, corePoolSize, maxPoolSize);
    }

    private AsyncTaskExecutor createNonConcurrencyBasedExecutor(List<QueueDetail> queueDetails, int pollerCount) {
        int workersCount = this.getWorkersCount(queueDetails.size());
        int maxPoolSize = workersCount + pollerCount;
        int corePoolSize = queueDetails.size() + pollerCount;
        AsyncTaskExecutor executor = this.createTaskExecutor(corePoolSize, maxPoolSize);
        this.initializeThreadMap(queueDetails, executor, true, workersCount);
        return executor;
    }

    private void createExecutor(QueueDetail queueDetail) {
        Concurrency concurrency = queueDetail.getConcurrency();
        AsyncTaskExecutor executor = this.createTaskExecutor(queueDetail, concurrency.getMin(), concurrency.getMax());
        Semaphore semaphore = new Semaphore(concurrency.getMax());
        this.queueThreadMap.put(queueDetail.getName(), new ThreadUtils.QueueThread(true, executor, semaphore, concurrency.getMax()));
    }

    public AsyncTaskExecutor createDefaultTaskExecutor(List<QueueDetail> registeredActiveQueueDetail) {
        List queueDetails = registeredActiveQueueDetail.stream().filter(e -> !e.isSystemGenerated()).collect(Collectors.toList());
        ArrayList<QueueDetail> withoutConcurrency = new ArrayList<QueueDetail>();
        for (QueueDetail queueDetail : queueDetails) {
            if (queueDetail.getConcurrency().getMin() > 0) {
                this.createExecutor(queueDetail);
                continue;
            }
            withoutConcurrency.add(queueDetail);
        }
        return this.createNonConcurrencyBasedExecutor(withoutConcurrency, queueDetails.size());
    }

    private AsyncTaskExecutor createTaskExecutor(QueueDetail queueDetail, int corePoolSize, int maxPoolSize) {
        String name = ThreadUtils.getWorkerName(queueDetail.getName());
        return ThreadUtils.createTaskExecutor(name, name + "-", corePoolSize, maxPoolSize);
    }

    private List<QueueDetail> getQueueDetail(String queue, MappingInformation mappingInformation) {
        QueueDetail queueDetail;
        int numRetry = mappingInformation.getNumRetry();
        if (!mappingInformation.getDeadLetterQueueName().isEmpty() && numRetry == -1) {
            log.warn("Dead letter queue {} is set but retry is not set", (Object)mappingInformation.getDeadLetterQueueName());
            numRetry = 3;
        } else if (numRetry == -1) {
            numRetry = Integer.MAX_VALUE;
        }
        String priorityGroup = mappingInformation.getPriorityGroup();
        Map<String, Integer> priority = mappingInformation.getPriority();
        if (StringUtils.isEmpty(priorityGroup) && priority.size() == 1) {
            priorityGroup = "Default";
        }
        if ((queueDetail = QueueDetail.builder().name(queue).queueName(this.rqueueConfig.getQueueName(queue)).processingQueueName(this.rqueueConfig.getProcessingQueueName(queue)).delayedQueueName(this.rqueueConfig.getDelayedQueueName(queue)).processingQueueChannelName(this.rqueueConfig.getProcessingQueueChannelName(queue)).delayedQueueChannelName(this.rqueueConfig.getDelayedQueueChannelName(queue)).deadLetterQueueName(mappingInformation.getDeadLetterQueueName()).visibilityTimeout(mappingInformation.getVisibilityTimeout()).deadLetterConsumerEnabled(mappingInformation.isDeadLetterConsumerEnabled()).concurrency(mappingInformation.getConcurrency()).active(mappingInformation.isActive()).numRetry(numRetry).priority(priority).priorityGroup(priorityGroup).build()).getPriority().size() <= 1) {
            return Collections.singletonList(queueDetail);
        }
        return queueDetail.expandQueueDetail(this.rqueueConfig.isAddDefaultQueueWithQueueLevelPriority(), this.rqueueConfig.getDefaultQueueWithQueueLevelPriority());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        log.info("Starting Rqueue Message container");
        Object object = this.lifecycleMgr;
        synchronized (object) {
            this.running = true;
            this.doStart();
            this.applicationEventPublisher.publishEvent((ApplicationEvent)new RqueueBootstrapEvent("Container", true));
            this.lifecycleMgr.notifyAll();
        }
    }

    protected void doStart() {
        if (RqueueMode.PRODUCER.equals((Object)this.rqueueConfig.getMode())) {
            log.info("Producer mode nothing to do...");
            return;
        }
        HashMap<String, List> queueGroupToDetails = new HashMap<String, List>();
        for (QueueDetail queueDetail : EndpointRegistry.getActiveQueueDetails()) {
            int prioritySize = queueDetail.getPriority().size();
            if (prioritySize == 0) {
                this.startQueue(queueDetail.getName(), queueDetail);
                continue;
            }
            List queueDetails = queueGroupToDetails.getOrDefault(queueDetail.getPriorityGroup(), new ArrayList());
            queueDetails.add(queueDetail);
            queueGroupToDetails.put(queueDetail.getPriorityGroup(), queueDetails);
        }
        for (Map.Entry entry : queueGroupToDetails.entrySet()) {
            this.startGroup((String)entry.getKey(), (List)entry.getValue());
        }
    }

    private Map<String, ThreadUtils.QueueThread> getQueueThreadMap(String groupName, List<QueueDetail> queueDetails) {
        ThreadUtils.QueueThread queueThread = this.queueThreadMap.get(groupName);
        if (queueThread != null) {
            return queueDetails.stream().collect(Collectors.toMap(QueueDetail::getName, e -> queueThread));
        }
        return queueDetails.stream().collect(Collectors.toMap(QueueDetail::getName, e -> this.queueThreadMap.get(e.getName())));
    }

    protected void startGroup(String groupName, List<QueueDetail> queueDetails) {
        if (this.getPriorityMode() == null) {
            throw new IllegalStateException("Priority mode is not set");
        }
        for (QueueDetail queueDetail : queueDetails) {
            this.queueRunningState.put(queueDetail.getName(), true);
        }
        Map<String, ThreadUtils.QueueThread> queueThread = this.getQueueThreadMap(groupName, queueDetails);
        Future future = this.getPriorityMode() == PriorityMode.STRICT ? this.taskExecutor.submit((Runnable)new StrictPriorityPoller(StringUtils.groupName(groupName), this, queueDetails, queueThread, this.postProcessingHandler, this.rqueueConfig)) : this.taskExecutor.submit((Runnable)new WeightedPriorityPoller(StringUtils.groupName(groupName), this, queueDetails, queueThread, this.postProcessingHandler, this.rqueueConfig));
        this.scheduledFutureByQueue.put(groupName, future);
    }

    protected void startQueue(String queueName, QueueDetail queueDetail) {
        if (Boolean.TRUE.equals(this.queueRunningState.get(queueName))) {
            return;
        }
        this.queueRunningState.put(queueName, true);
        ThreadUtils.QueueThread queueThread = this.queueThreadMap.get(queueName);
        DefaultRqueuePoller messagePoller = new DefaultRqueuePoller(queueThread, queueDetail, this, this.postProcessingHandler, this.rqueueConfig);
        Future future = this.getTaskExecutor().submit((Runnable)messagePoller);
        this.scheduledFutureByQueue.put(queueName, future);
    }

    boolean isQueueActive(String queueName) {
        return this.queueRunningState.getOrDefault(queueName, false);
    }

    public AsyncTaskExecutor getTaskExecutor() {
        return this.taskExecutor;
    }

    public void setTaskExecutor(AsyncTaskExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() {
        log.info("Stopping Rqueue Message container");
        Object object = this.lifecycleMgr;
        synchronized (object) {
            this.running = false;
            this.applicationEventPublisher.publishEvent((ApplicationEvent)new RqueueBootstrapEvent("Container", false));
            this.doStop();
            this.lifecycleMgr.notifyAll();
        }
    }

    protected void doStop() {
        if (RqueueMode.PRODUCER.equals((Object)this.rqueueConfig.getMode())) {
            log.info("Producer mode nothing to do...");
            return;
        }
        for (Map.Entry<String, Boolean> runningStateByQueue : this.queueRunningState.entrySet()) {
            if (!Boolean.TRUE.equals(runningStateByQueue.getValue())) continue;
            this.stopQueue(runningStateByQueue.getKey());
        }
        this.waitForRunningQueuesToStop();
    }

    private void waitForRunningQueuesToStop() {
        for (Map.Entry<String, Boolean> entry : this.queueRunningState.entrySet()) {
            String queueName = entry.getKey();
            Future<?> queueSpinningThread = this.scheduledFutureByQueue.get(queueName);
            ThreadUtils.waitForTermination(log, queueSpinningThread, this.getMaxWorkerWaitTime(), "An exception occurred while stopping queue '{}'", queueName);
        }
        if (!ThreadUtils.waitForWorkerTermination(this.queueThreadMap.values(), this.getMaxWorkerWaitTime())) {
            log.error("Some workers are not stopped within time");
        }
    }

    private void stopQueue(String queueName) {
        Assert.isTrue((boolean)this.queueRunningState.containsKey(queueName), (String)("Queue with name '" + queueName + "' does not exist"));
        this.queueRunningState.put(queueName, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isRunning() {
        Object object = this.lifecycleMgr;
        synchronized (object) {
            return this.running;
        }
    }

    public long getPollingInterval() {
        return this.pollingInterval;
    }

    public void setPollingInterval(long pollingInterval) {
        this.pollingInterval = pollingInterval;
    }

    public MessageProcessor getDiscardMessageProcessor() {
        return this.discardMessageProcessor;
    }

    public void setDiscardMessageProcessor(MessageProcessor discardMessageProcessor) {
        Assert.notNull((Object)discardMessageProcessor, (String)"discardMessageProcessor cannot be null");
        this.discardMessageProcessor = discardMessageProcessor;
    }

    public MessageProcessor getDeadLetterQueueMessageProcessor() {
        return this.deadLetterQueueMessageProcessor;
    }

    public void setDeadLetterQueueMessageProcessor(MessageProcessor deadLetterQueueMessageProcessor) {
        Assert.notNull((Object)deadLetterQueueMessageProcessor, (String)"deadLetterQueueMessageProcessor cannot be null");
        this.deadLetterQueueMessageProcessor = deadLetterQueueMessageProcessor;
    }

    public MessageProcessor getManualDeletionMessageProcessor() {
        return this.manualDeletionMessageProcessor;
    }

    public void setManualDeletionMessageProcessor(MessageProcessor manualDeletionMessageProcessor) {
        Assert.notNull((Object)manualDeletionMessageProcessor, (String)"manualDeletionMessageProcessor cannot be null");
        this.manualDeletionMessageProcessor = manualDeletionMessageProcessor;
    }

    public MessageProcessor getPostExecutionMessageProcessor() {
        return this.postExecutionMessageProcessor;
    }

    public void setPostExecutionMessageProcessor(MessageProcessor postExecutionMessageProcessor) {
        Assert.notNull((Object)postExecutionMessageProcessor, (String)"postExecutionMessageProcessor cannot be null");
        this.postExecutionMessageProcessor = postExecutionMessageProcessor;
    }

    public MessageProcessor getPreExecutionMessageProcessor() {
        return this.preExecutionMessageProcessor;
    }

    public void setPreExecutionMessageProcessor(MessageProcessor preExecutionMessageProcessor) {
        Assert.notNull((Object)preExecutionMessageProcessor, (String)"preExecutionMessageProcessor cannot be null");
        this.preExecutionMessageProcessor = preExecutionMessageProcessor;
    }

    public TaskExecutionBackOff getTaskExecutionBackOff() {
        return this.taskExecutionBackOff;
    }

    public void setTaskExecutionBackOff(TaskExecutionBackOff taskExecutionBackOff) {
        Assert.notNull((Object)taskExecutionBackOff, (String)"taskExecutionBackOff cannot be null");
        this.taskExecutionBackOff = taskExecutionBackOff;
    }

    public PriorityMode getPriorityMode() {
        return this.priorityMode;
    }

    public void setPriorityMode(PriorityMode priorityMode) {
        this.priorityMode = priorityMode;
    }

    public void setMiddlewares(List<Middleware> middlewares) {
        Assert.notEmpty(middlewares, (String)"middlewares cannot be null");
        this.middlewares = middlewares;
    }

    RqueueMessageMetadataService rqueueMessageMetadataService() {
        return this.rqueueMessageMetadataService;
    }

    RqueueMetricsCounter getRqueueMetricsCounter() {
        return this.rqueueMetricsCounter;
    }

    RqueueJobDao rqueueJobDao() {
        return this.rqueueJobDao;
    }

    RqueueStringDao rqueueStringDao() {
        return this.rqueueStringDao;
    }

    public List<Middleware> getMiddleWares() {
        return this.middlewares;
    }
}

