/*
 * Decompiled with CFR 0.152.
 */
package com.github.sonus21.rqueue.listener;

import com.github.sonus21.rqueue.core.RqueueBeanProvider;
import com.github.sonus21.rqueue.core.middleware.Middleware;
import com.github.sonus21.rqueue.listener.PostProcessingHandler;
import com.github.sonus21.rqueue.listener.QueueDetail;
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer;
import com.github.sonus21.rqueue.listener.RqueueMessagePoller;
import com.github.sonus21.rqueue.utils.QueueThreadPool;
import com.github.sonus21.rqueue.utils.TimeoutUtils;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import org.slf4j.event.Level;
import org.springframework.messaging.MessageHeaders;

class DefaultRqueuePoller
extends RqueueMessagePoller {
    private Long lastNotAvailableAt;
    private final QueueDetail queueDetail;
    private final QueueThreadPool queueThreadPool;

    DefaultRqueuePoller(QueueDetail queueDetail, QueueThreadPool queueThreadPool, RqueueBeanProvider rqueueBeanProvider, RqueueMessageListenerContainer.QueueStateMgr queueStateMgr, List<Middleware> middlewares, long pollingInterval, long backoffTime, PostProcessingHandler postProcessingHandler, MessageHeaders messageHeaders) {
        super(queueDetail.getName(), rqueueBeanProvider, queueStateMgr, middlewares, pollingInterval, backoffTime, postProcessingHandler, messageHeaders);
        this.queueDetail = queueDetail;
        this.queueThreadPool = queueThreadPool;
        this.queues = Collections.singletonList(queueDetail.getName());
    }

    @Override
    long getSemaphoreWaitTime() {
        return this.pollingInterval;
    }

    @Override
    void deactivate(int index, String queue, RqueueMessagePoller.DeactivateType deactivateType) {
        if (deactivateType == RqueueMessagePoller.DeactivateType.SEMAPHORE_UNAVAILABLE || deactivateType == RqueueMessagePoller.DeactivateType.NO_MESSAGE) {
            TimeoutUtils.sleepLog(this.pollingInterval, false);
        } else if (deactivateType == RqueueMessagePoller.DeactivateType.POLL_FAILED) {
            TimeoutUtils.sleepLog(this.backoffTime, false);
        }
    }

    private void logNotAvailable() {
        long maxNotAvailableDelay = 600000L;
        if (Objects.isNull(this.lastNotAvailableAt)) {
            this.lastNotAvailableAt = System.currentTimeMillis();
        } else if (System.currentTimeMillis() - this.lastNotAvailableAt > maxNotAvailableDelay) {
            this.log(Level.ERROR, "deadlock?? frozen?? stuck?? No Threads are available in last {}", null, Duration.ofMillis(maxNotAvailableDelay));
        }
        this.log(Level.DEBUG, "No Threads are available sleeping {}Ms", null, this.pollingInterval);
    }

    void poll() {
        if (!this.hasAvailableThreads(this.queueDetail, this.queueThreadPool)) {
            this.logNotAvailable();
            TimeoutUtils.sleepLog(this.pollingInterval, false);
        } else {
            super.poll(-1, this.queueDetail.getName(), this.queueDetail, this.queueThreadPool);
            this.lastNotAvailableAt = null;
        }
    }

    @Override
    public void start() {
        this.log(Level.INFO, "poll starting", null, new Object[0]);
        while (true) {
            try {
                while (true) {
                    if (this.eligibleForPolling(this.queueDetail.getName())) {
                        this.poll();
                        continue;
                    }
                    if (this.shouldExit()) {
                        return;
                    }
                    this.deactivate(-1, this.queueDetail.getName(), RqueueMessagePoller.DeactivateType.NO_MESSAGE);
                }
            }
            catch (Throwable e) {
                this.log(Level.ERROR, "error in polling", e, new Object[0]);
                if (!this.shouldExit()) continue;
                return;
            }
            break;
        }
    }
}

