/*
 * Decompiled with CFR 0.152.
 */
package dk.cloudcreate.essentials.components.queue.postgresql;

import dk.cloudcreate.essentials.components.common.transaction.HandleAwareUnitOfWork;
import dk.cloudcreate.essentials.components.common.transaction.HandleAwareUnitOfWorkFactory;
import dk.cloudcreate.essentials.components.queue.DurableQueueConsumer;
import dk.cloudcreate.essentials.components.queue.QueueName;
import dk.cloudcreate.essentials.components.queue.QueueRedeliveryPolicy;
import dk.cloudcreate.essentials.components.queue.QueuedMessage;
import dk.cloudcreate.essentials.components.queue.QueuedMessageHandler;
import dk.cloudcreate.essentials.components.queue.postgresql.PostgresqlDurableQueues;
import dk.cloudcreate.essentials.shared.FailFast;
import dk.cloudcreate.essentials.shared.MessageFormatter;
import dk.cloudcreate.essentials.shared.concurrent.ThreadFactoryBuilder;
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostgresqlDurableQueueConsumer
implements DurableQueueConsumer {
    private static final Logger log = LoggerFactory.getLogger(PostgresqlDurableQueueConsumer.class);
    private final QueueRedeliveryPolicy redeliveryPolicy;
    private final QueueName queueName;
    private final QueuedMessageHandler queuedMessageHandler;
    private final ScheduledExecutorService scheduler;
    private final PostgresqlDurableQueues postgresqlDurableQueues;
    private final HandleAwareUnitOfWorkFactory<? extends HandleAwareUnitOfWork> unitOfWorkFactory;
    private volatile boolean started;

    public PostgresqlDurableQueueConsumer(QueueName queueName, QueuedMessageHandler queuedMessageHandler, QueueRedeliveryPolicy redeliveryPolicy, int parallelMessageConsumers, HandleAwareUnitOfWorkFactory<? extends HandleAwareUnitOfWork> unitOfWorkFactory, PostgresqlDurableQueues postgresqlDurableQueues) {
        this.queueName = (QueueName)((Object)FailFast.requireNonNull((Object)((Object)queueName), (String)"queueName is missing"));
        this.queuedMessageHandler = (QueuedMessageHandler)FailFast.requireNonNull((Object)queuedMessageHandler, (String)"You must specify a queuedMessageHandler");
        this.redeliveryPolicy = (QueueRedeliveryPolicy)FailFast.requireNonNull((Object)redeliveryPolicy, (String)"You must specify a redelivery policy");
        this.unitOfWorkFactory = (HandleAwareUnitOfWorkFactory)FailFast.requireNonNull(unitOfWorkFactory, (String)"You must specify a unitOfWorkFactory");
        this.postgresqlDurableQueues = (PostgresqlDurableQueues)FailFast.requireNonNull((Object)postgresqlDurableQueues, (String)"postgresqlDurableQueues is missing");
        FailFast.requireTrue((parallelMessageConsumers >= 1 ? 1 : 0) != 0, (String)"You must specify a number of parallelMessageConsumers >= 1");
        this.scheduler = Executors.newScheduledThreadPool(parallelMessageConsumers, new ThreadFactoryBuilder().nameFormat("Queue-" + queueName + "-Polling-%d").daemon(true).build());
    }

    public void start() {
        if (!this.started) {
            log.info("[{}] Starting DurableQueueConsumer with polling interval {} (based on initialRedeliveryDelay)", (Object)this.queueName, (Object)this.redeliveryPolicy.initialRedeliveryDelay);
            this.scheduler.scheduleAtFixedRate(this::pollQueue, this.redeliveryPolicy.initialRedeliveryDelay.toMillis(), this.redeliveryPolicy.initialRedeliveryDelay.toMillis(), TimeUnit.MILLISECONDS);
            this.started = true;
        }
    }

    public void stop() {
        if (this.started) {
            log.info("[{}] Stopping DurableQueueConsumer", (Object)this.queueName);
            this.scheduler.shutdownNow();
            this.started = false;
            this.postgresqlDurableQueues.removeQueueConsumer(this);
            log.info("[{}] DurableQueueConsumer stopped", (Object)this.queueName);
        }
    }

    public boolean isStarted() {
        return this.started;
    }

    @Override
    public QueueName queueName() {
        return this.queueName;
    }

    @Override
    public void cancel() {
        this.stop();
    }

    private void pollQueue() {
        log.trace("[{}] Polling Queue for the next message ready for delivery", (Object)this.queueName);
        this.unitOfWorkFactory.usingUnitOfWork(handleAwareUnitOfWork -> {
            try {
                this.postgresqlDurableQueues.getNextMessageReadyForDelivery(this.queueName).map(queuedMessage -> {
                    log.debug("[{}:{}] Delivering message. Total attempts: {}, Redelivery Attempts: {}", new Object[]{this.queueName, queuedMessage.id, queuedMessage.totalDeliveryAttempts, queuedMessage.redeliveryAttempts});
                    try {
                        this.queuedMessageHandler.handle((QueuedMessage)queuedMessage);
                        log.debug("[{}:{}] Message handled successfully. Deleting the message in the Queue Store message. Total attempts: {}, Redelivery Attempts: {}", new Object[]{this.queueName, queuedMessage.id, queuedMessage.totalDeliveryAttempts, queuedMessage.redeliveryAttempts});
                        return this.postgresqlDurableQueues.deleteMessage(queuedMessage.id);
                    }
                    catch (Exception e) {
                        log.debug(MessageFormatter.msg((String)"[{}:{}] QueueMessageHandler for failed to handle: {}", (Object[])new Object[]{this.queueName, queuedMessage.id, queuedMessage}), (Throwable)e);
                        if (queuedMessage.totalDeliveryAttempts >= this.redeliveryPolicy.maximumNumberOfRedeliveries + 1) {
                            log.debug("[{}:{}] Marking Message as Dead Letter: {}", new Object[]{this.queueName, queuedMessage.id, queuedMessage});
                            return this.postgresqlDurableQueues.markAsDeadLetterMessage(queuedMessage.id, e);
                        }
                        Duration redeliveryDelay = this.redeliveryPolicy.calculateNextRedeliveryDelay(queuedMessage.redeliveryAttempts);
                        log.debug(MessageFormatter.msg((String)"[{}:{}] Using redeliveryDelay '{}' for QueueEntryId '{}' due to: {}", (Object[])new Object[]{this.queueName, queuedMessage.id, redeliveryDelay, queuedMessage.id, e.getMessage()}));
                        return this.postgresqlDurableQueues.retryMessage(queuedMessage.id, e, redeliveryDelay);
                    }
                });
            }
            catch (Exception e) {
                log.error(MessageFormatter.msg((String)"[{}] Error Polling Queue", (Object[])new Object[]{this.queueName}), (Throwable)e);
            }
        });
    }
}

