/*
 * Decompiled with CFR 0.152.
 */
package com.github.sonus21.rqueue.core;

import com.github.sonus21.rqueue.config.RqueueSchedulerConfig;
import com.github.sonus21.rqueue.core.RqueueRedisListenerContainerFactory;
import com.github.sonus21.rqueue.utils.ThreadUtils;
import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.function.Function;
import org.slf4j.Logger;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.Topic;

class RedisScheduleTriggerHandler {
    private final RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory;
    private final RqueueSchedulerConfig rqueueSchedulerConfig;
    private final Logger logger;
    private final Function<String, Future<?>> scheduler;
    private final Function<String, String> channelNameProducer;
    private final List<String> queueNames;
    @VisibleForTesting
    Map<String, Long> queueNameToLastRunTime;
    @VisibleForTesting
    Map<String, Future<?>> queueNameToFuture;
    @VisibleForTesting
    Map<String, String> channelNameToQueueName;
    @VisibleForTesting
    MessageListener messageListener;

    RedisScheduleTriggerHandler(Logger logger, RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory, RqueueSchedulerConfig rqueueSchedulerConfig, List<String> queueNames, Function<String, Future<?>> scheduler, Function<String, String> channelNameProducer) {
        this.queueNames = queueNames;
        this.rqueueSchedulerConfig = rqueueSchedulerConfig;
        this.rqueueRedisListenerContainerFactory = rqueueRedisListenerContainerFactory;
        this.logger = logger;
        this.scheduler = scheduler;
        this.channelNameProducer = channelNameProducer;
    }

    void initialize() {
        this.messageListener = new MessageSchedulerListener();
        this.channelNameToQueueName = new HashMap<String, String>(this.queueNames.size());
        this.queueNameToFuture = new ConcurrentHashMap(this.queueNames.size());
        this.queueNameToLastRunTime = new ConcurrentHashMap<String, Long>(this.queueNames.size());
    }

    void stop() {
        for (String queue : this.queueNames) {
            this.stopQueue(queue);
        }
    }

    void startQueue(String queueName) {
        this.queueNameToLastRunTime.put(queueName, 0L);
        this.subscribeToRedisTopic(queueName);
    }

    void stopQueue(String queueName) {
        Future<?> future = this.queueNameToFuture.get(queueName);
        ThreadUtils.waitForTermination(this.logger, future, this.rqueueSchedulerConfig.getTerminationWaitTime(), "An exception occurred while stopping scheduler queue '{}'", queueName);
        this.queueNameToLastRunTime.put(queueName, 0L);
        this.queueNameToFuture.remove(queueName);
        this.unsubscribeFromRedis(queueName);
    }

    private void unsubscribeFromRedis(String queueName) {
        String channelName = this.channelNameProducer.apply(queueName);
        this.logger.debug("Queue {} unsubscribe from channel {}", (Object)queueName, (Object)channelName);
        this.rqueueRedisListenerContainerFactory.removeMessageListener(this.messageListener, new ChannelTopic(channelName));
        this.channelNameToQueueName.put(channelName, queueName);
    }

    private void subscribeToRedisTopic(String queueName) {
        String channelName = this.channelNameProducer.apply(queueName);
        this.channelNameToQueueName.put(channelName, queueName);
        this.logger.debug("Queue {} subscribe to channel {}", (Object)queueName, (Object)channelName);
        this.rqueueRedisListenerContainerFactory.addMessageListener(this.messageListener, (Topic)new ChannelTopic(channelName));
    }

    protected long getMinDelay() {
        return this.rqueueSchedulerConfig.minMessageMoveDelay();
    }

    private class MessageSchedulerListener
    implements MessageListener {
        private MessageSchedulerListener() {
        }

        private void schedule(String queueName, long currentTime) {
            Future<?> future = RedisScheduleTriggerHandler.this.queueNameToFuture.get(queueName);
            if (future == null || future.isCancelled() || future.isDone()) {
                RedisScheduleTriggerHandler.this.queueNameToLastRunTime.put(queueName, currentTime);
                Future<?> newFuture = RedisScheduleTriggerHandler.this.scheduler.apply(queueName);
                RedisScheduleTriggerHandler.this.queueNameToFuture.put(queueName, newFuture);
            }
        }

        private void handleMessage(String queueName, long startTime) {
            long currentTime = System.currentTimeMillis();
            if (startTime > currentTime) {
                RedisScheduleTriggerHandler.this.logger.warn("Received message body is not correct queue: {}, time: {}", (Object)queueName, (Object)startTime);
                return;
            }
            long lastRunTime = RedisScheduleTriggerHandler.this.queueNameToLastRunTime.get(queueName);
            if (currentTime - lastRunTime < RedisScheduleTriggerHandler.this.getMinDelay()) {
                return;
            }
            this.schedule(queueName, currentTime);
        }

        public void onMessage(Message message, byte[] pattern) {
            if (message.getBody().length == 0 || message.getChannel().length == 0) {
                return;
            }
            String body = new String(message.getBody());
            String channel = new String(message.getChannel());
            RedisScheduleTriggerHandler.this.logger.trace("Body: {} Channel: {}", (Object)body, (Object)channel);
            try {
                long startTime = Long.parseLong(body);
                String queueName = RedisScheduleTriggerHandler.this.channelNameToQueueName.get(channel);
                if (queueName == null) {
                    RedisScheduleTriggerHandler.this.logger.warn("Unknown channel name {}", (Object)channel);
                    return;
                }
                this.handleMessage(queueName, startTime);
            }
            catch (Exception e) {
                RedisScheduleTriggerHandler.this.logger.error("Error occurred on a channel {}, body: {}", new Object[]{channel, body, e});
            }
        }
    }
}

