/*
 * Decompiled with CFR 0.152.
 */
package com.github.sonus21.rqueue.listener;

import com.github.sonus21.rqueue.config.RqueueConfig;
import com.github.sonus21.rqueue.listener.PostProcessingHandler;
import com.github.sonus21.rqueue.listener.QueueDetail;
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer;
import com.github.sonus21.rqueue.listener.RqueueMessagePoller;
import com.github.sonus21.rqueue.utils.ThreadUtils;
import com.github.sonus21.rqueue.utils.TimeoutUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.event.Level;

class StrictPriorityPoller
extends RqueueMessagePoller {
    private final Map<String, QueueDetail> queueNameToDetail;
    private final Map<String, ThreadUtils.QueueThread> queueNameToThread;
    private final Map<String, Long> queueDeactivationTime = new HashMap<String, Long>();
    private Map<String, Long> lastFetchedTime = new HashMap<String, Long>();

    StrictPriorityPoller(String groupName, RqueueMessageListenerContainer container, List<QueueDetail> queueDetails, Map<String, ThreadUtils.QueueThread> queueNameToThread, PostProcessingHandler postProcessingHandler, RqueueConfig rqueueConfig) {
        super("Strict-" + groupName, container, postProcessingHandler, rqueueConfig);
        ArrayList<QueueDetail> queueDetailList = new ArrayList<QueueDetail>(queueDetails);
        queueDetailList.sort((o1, o2) -> o2.getPriority().get("DEFAULT_PRIORITY") - o1.getPriority().get("DEFAULT_PRIORITY"));
        this.queues = queueDetailList.stream().map(QueueDetail::getName).collect(Collectors.toList());
        this.queues.forEach(queue -> this.lastFetchedTime.put((String)queue, 0L));
        this.queueNameToDetail = queueDetailList.stream().collect(Collectors.toMap(QueueDetail::getName, Function.identity()));
        this.queueNameToThread = queueNameToThread;
    }

    private String getQueueToPoll() {
        long now = System.currentTimeMillis();
        for (String queue : this.queues) {
            if (!this.isQueueActive(queue) || now - this.lastFetchedTime.get(queue) <= 60000L) continue;
            return queue;
        }
        for (String queue : this.queues) {
            if (!this.isQueueActive(queue)) continue;
            Long deactivationTime = this.queueDeactivationTime.get(queue);
            if (deactivationTime == null) {
                return queue;
            }
            if (now - deactivationTime <= this.getPollingInterval()) continue;
            return queue;
        }
        return null;
    }

    private String getQueueToPollOrWait() {
        String queueToPoll = this.getQueueToPoll();
        if (queueToPoll == null) {
            if (this.shouldExit()) {
                return null;
            }
            queueToPoll = "";
        }
        this.log(Level.DEBUG, "Queue to be poll : {}", null, queueToPoll);
        return queueToPoll;
    }

    @Override
    void start() {
        this.log(Level.DEBUG, "Running, Ordered Queues: {}", null, this.queues);
        while (true) {
            try {
                while (true) {
                    String queue;
                    if ((queue = this.getQueueToPollOrWait()) == null) {
                        return;
                    }
                    if (queue.equals("")) {
                        TimeoutUtils.sleepLog(this.getPollingInterval(), false);
                        continue;
                    }
                    this.lastFetchedTime.put(queue, System.currentTimeMillis());
                    ThreadUtils.QueueThread queueThread = this.queueNameToThread.get(queue);
                    QueueDetail queueDetail = this.queueNameToDetail.get(queue);
                    this.poll(-1, queue, queueDetail, queueThread);
                }
            }
            catch (Exception e) {
                this.log(Level.ERROR, "Exception in the poller {}", e, e.getMessage());
                if (!this.shouldExit()) continue;
                return;
            }
            break;
        }
    }

    @Override
    long getSemaphoreWaiTime() {
        return 20L;
    }

    @Override
    void deactivate(int index, String queue, RqueueMessagePoller.DeactivateType deactivateType) {
        if (deactivateType == RqueueMessagePoller.DeactivateType.POLL_FAILED) {
            TimeoutUtils.sleepLog(this.getBackOffTime(), false);
        } else {
            this.queueDeactivationTime.put(queue, System.currentTimeMillis());
        }
    }
}

