/*
 * Decompiled with CFR 0.152.
 */
package dk.cloudcreate.essentials.components.queue.postgresql;

import dk.cloudcreate.essentials.components.foundation.messaging.RedeliveryPolicy;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueConsumer;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueueName;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuedMessageHandler;
import dk.cloudcreate.essentials.components.foundation.transaction.jdbi.HandleAwareUnitOfWork;
import dk.cloudcreate.essentials.components.foundation.transaction.jdbi.HandleAwareUnitOfWorkFactory;
import dk.cloudcreate.essentials.components.queue.postgresql.PostgresqlDurableQueues;
import dk.cloudcreate.essentials.shared.FailFast;
import dk.cloudcreate.essentials.shared.MessageFormatter;
import dk.cloudcreate.essentials.shared.concurrent.ThreadFactoryBuilder;
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostgresqlDurableQueueConsumer
implements DurableQueueConsumer {
    private static final Logger log = LoggerFactory.getLogger(PostgresqlDurableQueueConsumer.class);
    private final RedeliveryPolicy redeliveryPolicy;
    private final QueueName queueName;
    private final QueuedMessageHandler queuedMessageHandler;
    private final ScheduledExecutorService scheduler;
    private final PostgresqlDurableQueues postgresqlDurableQueues;
    private final HandleAwareUnitOfWorkFactory<? extends HandleAwareUnitOfWork> unitOfWorkFactory;
    private final int numberOfParallelMessageConsumers;
    private volatile boolean started;

    public PostgresqlDurableQueueConsumer(QueueName queueName, QueuedMessageHandler queuedMessageHandler, RedeliveryPolicy redeliveryPolicy, int numberOfParallelMessageConsumers, HandleAwareUnitOfWorkFactory<? extends HandleAwareUnitOfWork> unitOfWorkFactory, PostgresqlDurableQueues postgresqlDurableQueues) {
        this.queueName = (QueueName)FailFast.requireNonNull((Object)queueName, (String)"queueName is missing");
        this.queuedMessageHandler = (QueuedMessageHandler)FailFast.requireNonNull((Object)queuedMessageHandler, (String)"You must specify a queuedMessageHandler");
        this.redeliveryPolicy = (RedeliveryPolicy)FailFast.requireNonNull((Object)redeliveryPolicy, (String)"You must specify a redelivery policy");
        this.unitOfWorkFactory = (HandleAwareUnitOfWorkFactory)FailFast.requireNonNull(unitOfWorkFactory, (String)"You must specify a unitOfWorkFactory");
        this.postgresqlDurableQueues = (PostgresqlDurableQueues)FailFast.requireNonNull((Object)postgresqlDurableQueues, (String)"postgresqlDurableQueues is missing");
        FailFast.requireTrue((numberOfParallelMessageConsumers >= 1 ? 1 : 0) != 0, (String)"You must specify a number of parallelMessageConsumers >= 1");
        this.numberOfParallelMessageConsumers = numberOfParallelMessageConsumers;
        this.scheduler = Executors.newScheduledThreadPool(this.numberOfParallelMessageConsumers, new ThreadFactoryBuilder().nameFormat("Queue-" + queueName + "-Polling-%d").daemon(true).build());
    }

    public void start() {
        if (!this.started) {
            log.info("[{}] Starting {} DurableQueueConsumer threads with polling interval {} (based on initialRedeliveryDelay)", new Object[]{this.queueName, this.numberOfParallelMessageConsumers, this.redeliveryPolicy.initialRedeliveryDelay});
            for (int i = 0; i < this.numberOfParallelMessageConsumers; ++i) {
                if (i > 0) {
                    try {
                        Thread.sleep(10L);
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                }
                this.scheduler.scheduleAtFixedRate(this::pollQueue, this.redeliveryPolicy.initialRedeliveryDelay.toMillis(), this.redeliveryPolicy.initialRedeliveryDelay.toMillis(), TimeUnit.MILLISECONDS);
            }
            this.started = true;
        }
    }

    public void stop() {
        if (this.started) {
            log.info("[{}] Stopping DurableQueueConsumer", (Object)this.queueName);
            this.scheduler.shutdownNow();
            this.started = false;
            this.postgresqlDurableQueues.removeQueueConsumer(this);
            log.info("[{}] DurableQueueConsumer stopped", (Object)this.queueName);
        }
    }

    public boolean isStarted() {
        return this.started;
    }

    public QueueName queueName() {
        return this.queueName;
    }

    public void cancel() {
        this.stop();
    }

    private void pollQueue() {
        log.trace("[{}] Polling Queue for the next message ready for delivery", (Object)this.queueName);
        this.unitOfWorkFactory.usingUnitOfWork(handleAwareUnitOfWork -> {
            try {
                this.postgresqlDurableQueues.getNextMessageReadyForDelivery(this.queueName).map(queuedMessage -> {
                    log.debug("[{}:{}] Delivering message. Total attempts: {}, Redelivery Attempts: {}", new Object[]{this.queueName, queuedMessage.id, queuedMessage.totalDeliveryAttempts, queuedMessage.redeliveryAttempts});
                    try {
                        this.queuedMessageHandler.handle(queuedMessage);
                        log.debug("[{}:{}] Message handled successfully. Deleting the message in the Queue Store message. Total attempts: {}, Redelivery Attempts: {}", new Object[]{this.queueName, queuedMessage.id, queuedMessage.totalDeliveryAttempts, queuedMessage.redeliveryAttempts});
                        return this.postgresqlDurableQueues.deleteMessage(queuedMessage.id);
                    }
                    catch (Exception e) {
                        log.debug(MessageFormatter.msg((String)"[{}:{}] QueueMessageHandler for failed to handle: {}", (Object[])new Object[]{this.queueName, queuedMessage.id, queuedMessage}), (Throwable)e);
                        if (queuedMessage.totalDeliveryAttempts >= this.redeliveryPolicy.maximumNumberOfRedeliveries + 1) {
                            log.debug("[{}:{}] Marking Message as Dead Letter: {}", new Object[]{this.queueName, queuedMessage.id, queuedMessage});
                            return this.postgresqlDurableQueues.markAsDeadLetterMessage(queuedMessage.id, e);
                        }
                        Duration redeliveryDelay = this.redeliveryPolicy.calculateNextRedeliveryDelay(queuedMessage.redeliveryAttempts);
                        log.debug(MessageFormatter.msg((String)"[{}:{}] Using redeliveryDelay '{}' for QueueEntryId '{}' due to: {}", (Object[])new Object[]{this.queueName, queuedMessage.id, redeliveryDelay, queuedMessage.id, e.getMessage()}));
                        return this.postgresqlDurableQueues.retryMessage(queuedMessage.id, e, redeliveryDelay);
                    }
                });
            }
            catch (Exception e) {
                log.error(MessageFormatter.msg((String)"[{}] Error Polling Queue", (Object[])new Object[]{this.queueName}), (Throwable)e);
            }
        });
    }
}

