/*
 * Decompiled with CFR 0.152.
 */
package com.github.sonus21.rqueue.core;

import com.github.sonus21.rqueue.config.RqueueConfig;
import com.github.sonus21.rqueue.config.RqueueSchedulerConfig;
import com.github.sonus21.rqueue.core.EndpointRegistry;
import com.github.sonus21.rqueue.core.RedisScheduleTriggerHandler;
import com.github.sonus21.rqueue.core.RedisScriptFactory;
import com.github.sonus21.rqueue.core.RqueueRedisListenerContainerFactory;
import com.github.sonus21.rqueue.listener.QueueDetail;
import com.github.sonus21.rqueue.models.event.RqueueBootstrapEvent;
import com.github.sonus21.rqueue.utils.ThreadUtils;
import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import org.slf4j.Logger;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationListener;
import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultScriptExecutor;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

public abstract class MessageScheduler
implements DisposableBean,
ApplicationListener<RqueueBootstrapEvent> {
    private final Object monitor = new Object();
    @Autowired
    protected RqueueSchedulerConfig rqueueSchedulerConfig;
    @Autowired
    protected RqueueConfig rqueueConfig;
    private RedisScript<Long> redisScript;
    private DefaultScriptExecutor<String> defaultScriptExecutor;
    private Map<String, Boolean> queueRunningState;
    private Map<String, ScheduledFuture<?>> queueNameToScheduledTask;
    private Map<String, Long> queueNameToNextRunTime;
    @VisibleForTesting
    protected RedisScheduleTriggerHandler redisScheduleTriggerHandler;
    private ThreadPoolTaskScheduler scheduler;
    @Autowired
    private RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory;
    @Autowired
    @Qualifier(value="rqueueRedisLongTemplate")
    private RedisTemplate<String, Long> redisTemplate;
    private Map<String, Integer> errorCount;

    protected abstract Logger getLogger();

    protected abstract long getNextScheduleTime(String var1, long var2, Long var4);

    protected abstract String getChannelName(String var1);

    protected abstract String getZsetName(String var1);

    protected abstract String getThreadNamePrefix();

    protected abstract int getThreadPoolSize();

    protected Duration getPeriod() {
        long delay = this.rqueueSchedulerConfig.getScheduledMessageTimeIntervalInMilli();
        if (delay <= 0L) {
            delay = 100L;
        }
        return Duration.ofMillis(delay);
    }

    protected abstract boolean isProcessingQueue();

    private void doStart() {
        for (String queueName : this.queueRunningState.keySet()) {
            this.startQueue(queueName);
        }
    }

    private MessageMoverTask task(String queueName, boolean periodic) {
        QueueDetail queueDetail = EndpointRegistry.get(queueName);
        String zsetName = this.getZsetName(queueName);
        return new MessageMoverTask(queueDetail, zsetName, periodic);
    }

    protected void schedule(String queueName) {
        MessageMoverTask task = this.task(queueName, true);
        ScheduledFuture future = this.scheduler.scheduleAtFixedRate((Runnable)task, this.getPeriod());
        this.queueNameToScheduledTask.put(queueName, future);
    }

    private void startQueue(String queueName) {
        if (Boolean.TRUE.equals(this.queueRunningState.get(queueName))) {
            return;
        }
        this.queueRunningState.put(queueName, true);
        if (this.rqueueSchedulerConfig.isAutoStart()) {
            this.schedule(queueName);
        }
        if (this.isRedisEnabled()) {
            this.redisScheduleTriggerHandler.startQueue(queueName);
        }
    }

    private void doStop() {
        if (CollectionUtils.isEmpty(this.queueRunningState)) {
            return;
        }
        for (Map.Entry<String, Boolean> runningStateByQueue : this.queueRunningState.entrySet()) {
            if (!Boolean.TRUE.equals(runningStateByQueue.getValue())) continue;
            this.stopQueue(runningStateByQueue.getKey());
        }
        this.waitForRunningQueuesToStop();
        this.queueNameToScheduledTask.clear();
        if (this.isRedisEnabled()) {
            this.redisScheduleTriggerHandler.stop();
        }
    }

    private void waitForRunningQueuesToStop() {
        for (Map.Entry<String, Boolean> runningState : this.queueRunningState.entrySet()) {
            String queueName = runningState.getKey();
            ScheduledFuture<?> scheduledFuture = this.queueNameToScheduledTask.get(queueName);
            ThreadUtils.waitForTermination(this.getLogger(), scheduledFuture, this.rqueueSchedulerConfig.getTerminationWaitTime(), "An exception occurred while stopping scheduler queue '{}'", queueName);
        }
    }

    private void stopQueue(String queueName) {
        Assert.isTrue((boolean)this.queueRunningState.containsKey(queueName), (String)("Queue with name '" + queueName + "' does not exist"));
        this.queueRunningState.put(queueName, false);
    }

    private boolean isRedisEnabled() {
        return this.rqueueSchedulerConfig.isRedisEnabled();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void destroy() throws Exception {
        Object object = this.monitor;
        synchronized (object) {
            this.doStop();
            if (this.scheduler != null) {
                this.scheduler.destroy();
            }
            this.monitor.notifyAll();
        }
    }

    private void createScheduler(int queueCount) {
        if (queueCount == 0) {
            return;
        }
        int threadPoolSize = Math.min(this.getThreadPoolSize(), queueCount);
        String threadNamePrefix = this.getThreadNamePrefix();
        int terminationTime = 60;
        this.scheduler = ThreadUtils.createTaskScheduler(threadPoolSize, threadNamePrefix, terminationTime);
    }

    private boolean isQueueActive(String queueName) {
        Boolean val = this.queueRunningState.get(queueName);
        if (val == null) {
            return false;
        }
        return val;
    }

    protected void initialize() {
        List<String> queueNames = EndpointRegistry.getActiveQueues();
        this.defaultScriptExecutor = new DefaultScriptExecutor(this.redisTemplate);
        this.redisScript = RedisScriptFactory.getScript(RedisScriptFactory.ScriptType.MOVE_EXPIRED_MESSAGE);
        this.queueRunningState = new ConcurrentHashMap<String, Boolean>(queueNames.size());
        this.queueNameToScheduledTask = new ConcurrentHashMap(queueNames.size());
        this.queueNameToNextRunTime = new ConcurrentHashMap<String, Long>(queueNames.size());
        this.errorCount = new ConcurrentHashMap<String, Integer>(queueNames.size());
        this.createScheduler(queueNames.size());
        for (String queueName : queueNames) {
            this.initQueue(queueName);
        }
        if (this.isRedisEnabled()) {
            this.redisScheduleTriggerHandler = new RedisScheduleTriggerHandler(this.getLogger(), this.rqueueRedisListenerContainerFactory, this.rqueueSchedulerConfig, queueNames, this::addTask, this::getChannelName);
            this.redisScheduleTriggerHandler.initialize();
        }
    }

    private void initQueue(String queueName) {
        this.queueRunningState.put(queueName, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Async
    public void onApplicationEvent(RqueueBootstrapEvent event) {
        Object object = this.monitor;
        synchronized (object) {
            this.doStop();
            if (!this.rqueueSchedulerConfig.isEnabled()) {
                this.getLogger().debug("Scheduler is not enabled");
                return;
            }
            if (this.rqueueConfig.isProducer()) {
                this.getLogger().debug("Producer mode");
                return;
            }
            if (event.isStartup()) {
                if (EndpointRegistry.getActiveQueueCount() == 0) {
                    this.getLogger().warn("No queues are configured");
                    return;
                }
                this.initialize();
                this.doStart();
            }
            this.monitor.notifyAll();
        }
    }

    protected Future<?> addTask(String queueName) {
        return this.scheduler.submit((Runnable)this.task(queueName, false));
    }

    private class MessageMoverTask
    implements Runnable {
        private final String id = UUID.randomUUID().toString();
        private final String name;
        private final String queueName;
        private final String zsetName;
        private final boolean processingQueue;
        private final boolean periodic;

        MessageMoverTask(QueueDetail queueDetail, String zsetName, boolean periodic) {
            this.name = queueDetail.getName();
            this.queueName = queueDetail.getQueueName();
            this.zsetName = zsetName;
            this.periodic = periodic;
            this.processingQueue = MessageScheduler.this.isProcessingQueue();
        }

        private long getNextScheduleTimeInternal(Long value, long currentTime, Exception e) {
            long nextTime;
            int errCount = 0;
            if (null != e) {
                errCount = MessageScheduler.this.errorCount.getOrDefault(this.name, 0) + 1;
                if (errCount % 3 == 0) {
                    MessageScheduler.this.getLogger().error("Message mover task is failing continuously queue: {}", (Object)this.name, (Object)e);
                }
                double delay = (double)MessageScheduler.this.rqueueSchedulerConfig.minMessageMoveDelay() * Math.pow(1.5, errCount);
                long maxDelay = Math.min((long)delay, MessageScheduler.this.rqueueSchedulerConfig.getMaxMessageMoverDelay());
                nextTime = currentTime + maxDelay;
            } else {
                nextTime = MessageScheduler.this.getNextScheduleTime(this.name, currentTime, value);
            }
            MessageScheduler.this.errorCount.put(this.name, errCount);
            return nextTime;
        }

        public String toString() {
            return String.format("MessageMoverTask(id=%s, queue=%s, periodic=%s)", this.id, this.name, this.periodic);
        }

        private long getMessageCount() {
            return MessageScheduler.this.rqueueSchedulerConfig.getMaxMessageCount();
        }

        private List<String> scriptKeys() {
            return Arrays.asList(this.queueName, this.zsetName);
        }

        private Object[] scriptArgs() {
            long currentTime = System.currentTimeMillis();
            return new Object[]{currentTime, this.getMessageCount(), this.processingQueue ? 1 : 0};
        }

        private boolean shouldSkip(long currentTime) {
            Long nextRunTime = MessageScheduler.this.queueNameToNextRunTime.get(this.queueName);
            return nextRunTime != null && nextRunTime > currentTime;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            long currentTime = System.currentTimeMillis();
            if (this.shouldSkip(currentTime)) {
                MessageScheduler.this.getLogger().debug("Skipped {}", (Object)this);
                return;
            }
            MessageScheduler.this.getLogger().debug("Running {}", (Object)this);
            Long value = null;
            Throwable e = null;
            try {
                if (MessageScheduler.this.isQueueActive(this.name)) {
                    value = (Long)MessageScheduler.this.defaultScriptExecutor.execute(MessageScheduler.this.redisScript, this.scriptKeys(), this.scriptArgs());
                }
            }
            catch (RedisSystemException ex) {
                e = ex;
            }
            catch (Exception ex) {
                e = ex;
                MessageScheduler.this.getLogger().warn("Task execution failed for the queue: {}", (Object)this.getName(), (Object)e);
            }
            finally {
                long nextExecutionTime = this.getNextScheduleTimeInternal(value, currentTime, (Exception)e);
                MessageScheduler.this.queueNameToNextRunTime.put(this.queueName, nextExecutionTime);
            }
        }

        public String getName() {
            return this.name;
        }
    }
}

