/*
 * Decompiled with CFR 0.152.
 */
package io.gitbub.devlibx.easy.helper.queue;

import io.gitbub.devlibx.easy.helper.queue.DefaultRateLimiter;
import io.gitbub.devlibx.easy.helper.queue.IProcessor;
import io.gitbub.devlibx.easy.helper.queue.IRateLimiter;
import io.gitbub.devlibx.easy.helper.queue.NoOpRateLimiter;
import io.github.resilience4j.ratelimiter.RequestNotPermitted;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProcessorQueue<T> {
    private static final Logger log = LoggerFactory.getLogger(ProcessorQueue.class);
    private final int threadCount;
    private final BlockingQueue<T> queue;
    private final ExecutorService executorService;
    private final IProcessor<T> processor;
    private final AtomicBoolean STOP_PROCESSOR;
    private final int maxTimeToWaitForAItemToProcessInSec;
    private final CountDownLatch waitLatch;
    private final int maxRetryPerItem;
    private final int waitTimeToRetry;
    private final IRateLimiter rateLimiter;
    private final AtomicBoolean clientInitiatedAllEeventsArePosted = new AtomicBoolean(false);

    public ProcessorQueue(int threadCount, int queueBufferSize, int maxTimeToWaitForAItemToProcessInSec, int maxRetryPerItem, IRateLimiter.Config rateLimiterConfig, IProcessor<T> processor) {
        this.maxRetryPerItem = maxRetryPerItem <= 0 ? Integer.MAX_VALUE : maxRetryPerItem;
        this.waitTimeToRetry = maxTimeToWaitForAItemToProcessInSec <= 0 ? 1 : maxTimeToWaitForAItemToProcessInSec;
        this.STOP_PROCESSOR = new AtomicBoolean(false);
        this.waitLatch = new CountDownLatch(threadCount);
        this.processor = processor;
        this.threadCount = threadCount;
        this.queue = new ArrayBlockingQueue<T>(queueBufferSize);
        this.maxTimeToWaitForAItemToProcessInSec = maxTimeToWaitForAItemToProcessInSec <= 0 ? 10 : maxTimeToWaitForAItemToProcessInSec;
        this.executorService = Executors.newFixedThreadPool(threadCount);
        this.rateLimiter = rateLimiterConfig.limit > 0 ? new DefaultRateLimiter(rateLimiterConfig) : new NoOpRateLimiter();
    }

    public ProcessorQueue(int threadCount, int queueBufferSize, int maxTimeToWaitForAItemToProcessInSec, int maxRetryPerItem, int rateLimit, IProcessor<T> processor) {
        this(threadCount, queueBufferSize, maxTimeToWaitForAItemToProcessInSec, maxRetryPerItem, IRateLimiter.Config.builder().limit(rateLimit).build(), processor);
    }

    public void noMoreItemsToProcess() {
        this.clientInitiatedAllEeventsArePosted.set(true);
    }

    public CountDownLatch start() {
        for (int i = 0; i < this.threadCount; ++i) {
            this.executorService.submit(this.runnable(i));
        }
        new Thread(() -> {
            try {
                this.waitLatch.await();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            this.executorService.shutdown();
        }).start();
        return this.waitLatch;
    }

    public void processItem(T item) {
        while (true) {
            try {
                this.rateLimiter.execute(() -> this._processItem(item));
            }
            catch (RequestNotPermitted e) {
                try {
                    System.out.println("rate limit error - will repost it");
                    Thread.sleep(1L);
                }
                catch (InterruptedException interruptedException) {}
                continue;
            }
            break;
        }
    }

    private void _processItem(T item) {
        try {
            this.queue.put(item);
        }
        catch (InterruptedException e) {
            log.error("Failed to put items to queue for processing", (Throwable)e);
        }
    }

    private Runnable runnable(int threadId) {
        return () -> {
            block4: while (!this.STOP_PROCESSOR.get()) {
                try {
                    T item = this.queue.poll(this.maxTimeToWaitForAItemToProcessInSec, TimeUnit.SECONDS);
                    if (item == null && this.clientInitiatedAllEeventsArePosted.get()) {
                        log.info("Worker Id=" + threadId + " - Stop worker thread - did not get items for " + this.maxTimeToWaitForAItemToProcessInSec + " sec");
                        break;
                    }
                    if (item == null) {
                        Thread.sleep(10L);
                        log.debug("Worker Id=" + threadId + " did not find any work... Continue in loop and waiting for work...");
                        continue;
                    }
                    for (int temp = this.maxRetryPerItem; temp > 0; --temp) {
                        try {
                            this.processor.process(item);
                            continue block4;
                        }
                        catch (Throwable e) {
                            Thread.sleep(this.waitTimeToRetry);
                            log.info("Worker Id=" + threadId + " - Got error in items: " + item + " Retry=" + temp + " Wait for " + this.waitTimeToRetry + "ms");
                            continue;
                        }
                    }
                }
                catch (Throwable e) {
                    log.error("Worker Id=" + threadId + " - Got error: " + e.getMessage());
                }
            }
            log.info("Worker Id=" + threadId + " - Stopping the worker thread...");
            this.waitLatch.countDown();
        };
    }
}

