/*
 * Decompiled with CFR 0.152.
 */
package dk.trustworks.essentials.components.queue.postgresql;

import dk.trustworks.essentials.components.foundation.IOExceptionUtil;
import dk.trustworks.essentials.components.foundation.json.JSONSerializationException;
import dk.trustworks.essentials.components.foundation.json.JSONSerializer;
import dk.trustworks.essentials.components.foundation.json.JacksonJSONSerializer;
import dk.trustworks.essentials.components.foundation.messaging.queue.BatchMessageFetchingCapableDurableQueues;
import dk.trustworks.essentials.components.foundation.messaging.queue.CentralizedMessageFetcher;
import dk.trustworks.essentials.components.foundation.messaging.queue.CentralizedMessageFetcherDurableQueueConsumer;
import dk.trustworks.essentials.components.foundation.messaging.queue.CentralizedQueuePollingOptimizer;
import dk.trustworks.essentials.components.foundation.messaging.queue.DefaultDurableQueueConsumer;
import dk.trustworks.essentials.components.foundation.messaging.queue.DefaultQueuedMessage;
import dk.trustworks.essentials.components.foundation.messaging.queue.DurableQueueConsumer;
import dk.trustworks.essentials.components.foundation.messaging.queue.DurableQueueDeserializationException;
import dk.trustworks.essentials.components.foundation.messaging.queue.DurableQueueException;
import dk.trustworks.essentials.components.foundation.messaging.queue.DurableQueues;
import dk.trustworks.essentials.components.foundation.messaging.queue.DurableQueuesInterceptor;
import dk.trustworks.essentials.components.foundation.messaging.queue.Message;
import dk.trustworks.essentials.components.foundation.messaging.queue.MessageMetaData;
import dk.trustworks.essentials.components.foundation.messaging.queue.NextQueuedMessage;
import dk.trustworks.essentials.components.foundation.messaging.queue.OrderedMessage;
import dk.trustworks.essentials.components.foundation.messaging.queue.QueueEntryId;
import dk.trustworks.essentials.components.foundation.messaging.queue.QueueName;
import dk.trustworks.essentials.components.foundation.messaging.queue.QueuePollingOptimizer;
import dk.trustworks.essentials.components.foundation.messaging.queue.QueuedMessage;
import dk.trustworks.essentials.components.foundation.messaging.queue.QueuedMessageCounts;
import dk.trustworks.essentials.components.foundation.messaging.queue.SimpleQueuePollingOptimizer;
import dk.trustworks.essentials.components.foundation.messaging.queue.TransactionalMode;
import dk.trustworks.essentials.components.foundation.messaging.queue.operations.AcknowledgeMessageAsHandled;
import dk.trustworks.essentials.components.foundation.messaging.queue.operations.ConsumeFromQueue;
import dk.trustworks.essentials.components.foundation.messaging.queue.operations.DeleteMessage;
import dk.trustworks.essentials.components.foundation.messaging.queue.operations.GetDeadLetterMessage;
import dk.trustworks.essentials.components.foundation.messaging.queue.operations.GetDeadLetterMessages;
import dk.trustworks.essentials.components.foundation.messaging.queue.operations.GetNextMessageReadyForDelivery;
import dk.trustworks.essentials.components.foundation.messaging.queue.operations.GetQueuedMessage;
import dk.trustworks.essentials.components.foundation.messaging.queue.operations.GetQueuedMessageCountsFor;
import dk.trustworks.essentials.components.foundation.messaging.queue.operations.GetQueuedMessages;
import dk.trustworks.essentials.components.foundation.messaging.queue.operations.GetTotalDeadLetterMessagesQueuedFor;
import dk.trustworks.essentials.components.foundation.messaging.queue.operations.GetTotalMessagesQueuedFor;
import dk.trustworks.essentials.components.foundation.messaging.queue.operations.MarkAsDeadLetterMessage;
import dk.trustworks.essentials.components.foundation.messaging.queue.operations.PurgeQueue;
import dk.trustworks.essentials.components.foundation.messaging.queue.operations.QueueMessage;
import dk.trustworks.essentials.components.foundation.messaging.queue.operations.QueueMessageAsDeadLetterMessage;
import dk.trustworks.essentials.components.foundation.messaging.queue.operations.QueueMessages;
import dk.trustworks.essentials.components.foundation.messaging.queue.operations.ResurrectDeadLetterMessage;
import dk.trustworks.essentials.components.foundation.messaging.queue.operations.RetryMessage;
import dk.trustworks.essentials.components.foundation.messaging.queue.operations.StopConsumingFromQueue;
import dk.trustworks.essentials.components.foundation.postgresql.ListenNotify;
import dk.trustworks.essentials.components.foundation.postgresql.MultiTableChangeListener;
import dk.trustworks.essentials.components.foundation.postgresql.NotificationDuplicationFilter;
import dk.trustworks.essentials.components.foundation.postgresql.PostgresqlUtil;
import dk.trustworks.essentials.components.foundation.postgresql.TableChangeNotification;
import dk.trustworks.essentials.components.foundation.transaction.UnitOfWork;
import dk.trustworks.essentials.components.foundation.transaction.UnitOfWorkFactory;
import dk.trustworks.essentials.components.foundation.transaction.jdbi.HandleAwareUnitOfWork;
import dk.trustworks.essentials.components.foundation.transaction.jdbi.HandleAwareUnitOfWorkFactory;
import dk.trustworks.essentials.components.queue.postgresql.DurableQueuesSerialization;
import dk.trustworks.essentials.components.queue.postgresql.DurableQueuesSql;
import dk.trustworks.essentials.components.queue.postgresql.MessageMappingResult;
import dk.trustworks.essentials.components.queue.postgresql.PostgresqlDurableQueueConsumer;
import dk.trustworks.essentials.components.queue.postgresql.PostgresqlDurableQueuesBuilder;
import dk.trustworks.essentials.components.queue.postgresql.QueueNameDuplicationFilter;
import dk.trustworks.essentials.components.queue.postgresql.QueueTableNotification;
import dk.trustworks.essentials.components.queue.postgresql.QueuedMessageRowMapper;
import dk.trustworks.essentials.components.queue.postgresql.SingleOperationTransactionDurableQueuesInterceptor;
import dk.trustworks.essentials.components.queue.postgresql.jdbi.QueueEntryIdArgumentFactory;
import dk.trustworks.essentials.components.queue.postgresql.jdbi.QueueEntryIdColumnMapper;
import dk.trustworks.essentials.components.queue.postgresql.jdbi.QueueNameArgumentFactory;
import dk.trustworks.essentials.components.queue.postgresql.jdbi.QueueNameColumnMapper;
import dk.trustworks.essentials.reactive.AnnotatedEventHandler;
import dk.trustworks.essentials.reactive.EventHandler;
import dk.trustworks.essentials.reactive.Handler;
import dk.trustworks.essentials.shared.Exceptions;
import dk.trustworks.essentials.shared.FailFast;
import dk.trustworks.essentials.shared.MessageFormatter;
import dk.trustworks.essentials.shared.collections.Lists;
import dk.trustworks.essentials.shared.functional.QuadFunction;
import dk.trustworks.essentials.shared.functional.TripleFunction;
import dk.trustworks.essentials.shared.interceptor.DefaultInterceptorChain;
import dk.trustworks.essentials.shared.interceptor.InterceptorChain;
import dk.trustworks.essentials.types.CharSequenceType;
import java.io.Serializable;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.argument.ArgumentFactory;
import org.jdbi.v3.core.mapper.ColumnMapper;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.PreparedBatch;
import org.jdbi.v3.core.statement.Query;
import org.jdbi.v3.core.statement.StatementContext;
import org.jdbi.v3.core.statement.Update;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class PostgresqlDurableQueues
implements BatchMessageFetchingCapableDurableQueues {
    private static final Logger log = LoggerFactory.getLogger(PostgresqlDurableQueues.class);
    public static final String DEFAULT_DURABLE_QUEUES_TABLE_NAME = "durable_queues";
    private static final Object NO_PAYLOAD = new Object();
    private final HandleAwareUnitOfWorkFactory<? extends HandleAwareUnitOfWork> unitOfWorkFactory;
    private final JSONSerializer jsonSerializer;
    private final String sharedQueueTableName;
    private final ConcurrentMap<QueueName, CentralizedMessageFetcherDurableQueueConsumer> durableQueueConsumers = new ConcurrentHashMap<QueueName, CentralizedMessageFetcherDurableQueueConsumer>();
    private final ConcurrentMap<QueueName, PostgresqlDurableQueueConsumer> traditionalDurableQueueConsumers = new ConcurrentHashMap<QueueName, PostgresqlDurableQueueConsumer>();
    private final QueuedMessageRowMapper queuedMessageMapper;
    private final List<DurableQueuesInterceptor> interceptors = new CopyOnWriteArrayList<DurableQueuesInterceptor>();
    private final Optional<MultiTableChangeListener<TableChangeNotification>> multiTableChangeListener;
    private final Function<ConsumeFromQueue, QueuePollingOptimizer> queuePollingOptimizerFactory;
    private final TransactionalMode transactionalMode;
    private CentralizedMessageFetcher centralizedMessageFetcher;
    private final boolean useCentralizedMessageFetcher;
    private final boolean useOrderedUnorderedQuery;
    private final DurableQueuesSql durableQueuesSql;
    private final DurableQueuesSerialization durableQueuesSerialization;
    private Function<QueueName, QueuePollingOptimizer> centralizedQueuePollingOptimizerFactory;
    private int messageHandlingTimeoutMs;
    protected ConcurrentMap<QueueName, Instant> lastResetStuckMessagesCheckTimestamps = new ConcurrentHashMap<QueueName, Instant>();
    private volatile boolean started;

    public static PostgresqlDurableQueuesBuilder builder() {
        return new PostgresqlDurableQueuesBuilder();
    }

    public PostgresqlDurableQueues(HandleAwareUnitOfWorkFactory<? extends HandleAwareUnitOfWork> unitOfWorkFactory) {
        this(unitOfWorkFactory, (JSONSerializer)new JacksonJSONSerializer(DurableQueuesSerialization.createDefaultObjectMapper()), DEFAULT_DURABLE_QUEUES_TABLE_NAME, null, null);
    }

    public PostgresqlDurableQueues(HandleAwareUnitOfWorkFactory<? extends HandleAwareUnitOfWork> unitOfWorkFactory, Function<ConsumeFromQueue, QueuePollingOptimizer> queuePollingOptimizerFactory) {
        this(unitOfWorkFactory, (JSONSerializer)new JacksonJSONSerializer(DurableQueuesSerialization.createDefaultObjectMapper()), DEFAULT_DURABLE_QUEUES_TABLE_NAME, null, queuePollingOptimizerFactory);
    }

    public PostgresqlDurableQueues(HandleAwareUnitOfWorkFactory<? extends HandleAwareUnitOfWork> unitOfWorkFactory, JSONSerializer jsonSerializer) {
        this(unitOfWorkFactory, jsonSerializer, DEFAULT_DURABLE_QUEUES_TABLE_NAME, null, null);
    }

    public PostgresqlDurableQueues(HandleAwareUnitOfWorkFactory<? extends HandleAwareUnitOfWork> unitOfWorkFactory, JSONSerializer jsonSerializer, Function<ConsumeFromQueue, QueuePollingOptimizer> queuePollingOptimizerFactory) {
        this(unitOfWorkFactory, jsonSerializer, DEFAULT_DURABLE_QUEUES_TABLE_NAME, null, queuePollingOptimizerFactory);
    }

    public PostgresqlDurableQueues(HandleAwareUnitOfWorkFactory<? extends HandleAwareUnitOfWork> unitOfWorkFactory, JSONSerializer jsonSerializer, String sharedQueueTableName, MultiTableChangeListener<TableChangeNotification> multiTableChangeListener, Function<ConsumeFromQueue, QueuePollingOptimizer> queuePollingOptimizerFactory) {
        this(unitOfWorkFactory, jsonSerializer, sharedQueueTableName, multiTableChangeListener, queuePollingOptimizerFactory, TransactionalMode.FullyTransactional, null);
    }

    public PostgresqlDurableQueues(HandleAwareUnitOfWorkFactory<? extends HandleAwareUnitOfWork> unitOfWorkFactory, JSONSerializer jsonSerializer, String sharedQueueTableName, MultiTableChangeListener<TableChangeNotification> multiTableChangeListener, Function<ConsumeFromQueue, QueuePollingOptimizer> queuePollingOptimizerFactory, TransactionalMode transactionalMode, Duration messageHandlingTimeout) {
        this(unitOfWorkFactory, jsonSerializer, sharedQueueTableName, multiTableChangeListener, queuePollingOptimizerFactory, transactionalMode, messageHandlingTimeout, true, Duration.ofMillis(20L), null, true);
    }

    public PostgresqlDurableQueues(HandleAwareUnitOfWorkFactory<? extends HandleAwareUnitOfWork> unitOfWorkFactory, JSONSerializer jsonSerializer, String sharedQueueTableName, MultiTableChangeListener<TableChangeNotification> multiTableChangeListener, Function<ConsumeFromQueue, QueuePollingOptimizer> queuePollingOptimizerFactory, TransactionalMode transactionalMode, Duration messageHandlingTimeout, boolean useCentralizedMessageFetcher, Duration centralizedMessageFetcherPollingInterval, Function<QueueName, QueuePollingOptimizer> centralizedQueuePollingOptimizerFactory, boolean useOrderedUnorderedQuery) {
        this.unitOfWorkFactory = (HandleAwareUnitOfWorkFactory)FailFast.requireNonNull(unitOfWorkFactory, (String)"No unitOfWorkFactory instance provided");
        this.jsonSerializer = (JSONSerializer)FailFast.requireNonNull((Object)jsonSerializer, (String)"No jsonSerializer");
        this.sharedQueueTableName = ((String)FailFast.requireNonNull((Object)sharedQueueTableName, (String)"No sharedQueueTableName provided")).toLowerCase(Locale.ROOT);
        PostgresqlUtil.checkIsValidTableOrColumnName((String)sharedQueueTableName);
        this.useCentralizedMessageFetcher = useCentralizedMessageFetcher;
        this.useOrderedUnorderedQuery = useOrderedUnorderedQuery;
        this.durableQueuesSql = new DurableQueuesSql(sharedQueueTableName);
        this.durableQueuesSerialization = new DurableQueuesSerialization(jsonSerializer);
        this.queuedMessageMapper = new QueuedMessageRowMapper((QuadFunction<QueueName, QueueEntryId, String, String, Object>)((QuadFunction & Serializable)this.durableQueuesSerialization::deserializeMessagePayload), (TripleFunction<QueueName, QueueEntryId, String, MessageMetaData>)((TripleFunction & Serializable)this.durableQueuesSerialization::deserializeMessageMetadata));
        this.multiTableChangeListener = Optional.ofNullable(multiTableChangeListener);
        this.queuePollingOptimizerFactory = queuePollingOptimizerFactory != null ? queuePollingOptimizerFactory : this::createQueuePollingOptimizerFor;
        this.transactionalMode = (TransactionalMode)FailFast.requireNonNull((Object)transactionalMode, (String)"No transactionalMode instance provided");
        if (useCentralizedMessageFetcher) {
            this.centralizedMessageFetcher = new CentralizedMessageFetcher((DurableQueues)this, ((Duration)FailFast.requireNonNull((Object)centralizedMessageFetcherPollingInterval, (String)"No centralizedMessageFetcherPollingInterval provided")).toMillis(), this.interceptors);
            Function<QueueName, QueuePollingOptimizer> function = this.centralizedQueuePollingOptimizerFactory = centralizedQueuePollingOptimizerFactory != null ? centralizedQueuePollingOptimizerFactory : this::createCentralizedQueuePollingOptimizerFor;
        }
        if (transactionalMode == TransactionalMode.SingleOperationTransaction) {
            this.messageHandlingTimeoutMs = (int)((Duration)FailFast.requireNonNull((Object)messageHandlingTimeout, (String)"No messageHandlingTimeout provided")).toMillis();
            this.addInterceptor(new SingleOperationTransactionDurableQueuesInterceptor(unitOfWorkFactory));
        }
        this.multiTableChangeListener.ifPresent(listener -> listener.addDuplicationFilterAsFirst((NotificationDuplicationFilter)new QueueNameDuplicationFilter()));
        this.initializeQueueTables();
    }

    private void initializeQueueTables() {
        PostgresqlUtil.checkIsValidTableOrColumnName((String)this.sharedQueueTableName);
        this.unitOfWorkFactory.usingUnitOfWork(handleAwareUnitOfWork -> {
            handleAwareUnitOfWork.handle().getJdbi().registerArgument((ArgumentFactory)new QueueNameArgumentFactory());
            handleAwareUnitOfWork.handle().getJdbi().registerColumnMapper((ColumnMapper)new QueueNameColumnMapper());
            handleAwareUnitOfWork.handle().getJdbi().registerArgument((ArgumentFactory)new QueueEntryIdArgumentFactory());
            handleAwareUnitOfWork.handle().getJdbi().registerColumnMapper((ColumnMapper)new QueueEntryIdColumnMapper());
            handleAwareUnitOfWork.handle().execute(this.durableQueuesSql.getCreateQueueTableSql(), new Object[0]);
            log.info("Ensured Durable Queues table '{}' exists", (Object)this.sharedQueueTableName);
            this.dropIndex("DROP INDEX IF EXISTS idx_{:tableName}_queue_name", handleAwareUnitOfWork.handle());
            this.dropIndex("DROP INDEX IF EXISTS idx_{:tableName}_next_delivery_ts", handleAwareUnitOfWork.handle());
            this.dropIndex("DROP INDEX IF EXISTS idx_{:tableName}_is_dead_letter_message", handleAwareUnitOfWork.handle());
            this.dropIndex("DROP INDEX IF EXISTS idx_{:tableName}_is_being_delivered", handleAwareUnitOfWork.handle());
            this.createIndex(this.durableQueuesSql.getCreateOrderedMessageIndexSql(), handleAwareUnitOfWork.handle());
            this.createIndex(this.durableQueuesSql.getCreateNextMessageIndexSql(), handleAwareUnitOfWork.handle());
            this.createIndex(this.durableQueuesSql.getCreateNextReadyMessageIndexSql(), handleAwareUnitOfWork.handle());
            this.createIndex(this.durableQueuesSql.getCreateOrderedMessageReadyIndexSql(), handleAwareUnitOfWork.handle());
            this.createIndex(this.durableQueuesSql.getCreateUnorderedMessageReadyIndexSql(), handleAwareUnitOfWork.handle());
            this.createIndex(this.durableQueuesSql.getCreateOrderedMessageHeadIndexSql(), handleAwareUnitOfWork.handle());
            this.multiTableChangeListener.ifPresent(listener -> ListenNotify.addChangeNotificationTriggerToTable((Handle)handleAwareUnitOfWork.handle(), (String)this.sharedQueueTableName, List.of(ListenNotify.SqlOperation.INSERT, ListenNotify.SqlOperation.UPDATE), (String[])new String[]{"id", "queue_name", "added_ts", "next_delivery_ts", "delivery_ts", "is_dead_letter_message", "is_being_delivered"}));
        });
    }

    private void createIndex(String indexStatement, Handle handle) {
        PostgresqlUtil.checkIsValidTableOrColumnName((String)this.sharedQueueTableName);
        handle.execute(MessageFormatter.bind((String)indexStatement, (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)}), new Object[0]);
    }

    private void dropIndex(String indexStatement, Handle handle) {
        PostgresqlUtil.checkIsValidTableOrColumnName((String)this.sharedQueueTableName);
        handle.execute(MessageFormatter.bind((String)indexStatement, (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)}), new Object[0]);
    }

    public String getSharedQueueTableName() {
        return this.sharedQueueTableName;
    }

    DurableQueuesSql getDurableQueuesSql() {
        return this.durableQueuesSql;
    }

    public List<DurableQueuesInterceptor> getInterceptors() {
        return Collections.unmodifiableList(this.interceptors);
    }

    public void start() {
        if (!this.started) {
            this.started = true;
            log.info("Starting PostgresqlDurableQueues");
            PostgresqlUtil.checkIsValidTableOrColumnName((String)this.sharedQueueTableName);
            this.interceptors.forEach(durableQueuesInterceptor -> durableQueuesInterceptor.setDurableQueues((DurableQueues)this));
            DefaultInterceptorChain.sortInterceptorsByOrder(this.interceptors);
            if (this.useCentralizedMessageFetcher) {
                this.centralizedMessageFetcher.start();
                this.durableQueueConsumers.values().forEach(CentralizedMessageFetcherDurableQueueConsumer::start);
            } else {
                this.traditionalDurableQueueConsumers.values().forEach(DefaultDurableQueueConsumer::start);
            }
            this.multiTableChangeListener.ifPresent(listener -> {
                listener.listenToNotificationsFor(this.sharedQueueTableName, QueueTableNotification.class);
                listener.getEventBus().addAsyncSubscriber((EventHandler)new AnnotatedEventHandler(){

                    @Handler
                    void handle(QueueTableNotification e) {
                        try {
                            log.trace("[{}] Received Message-Added {} with id '{}'", new Object[]{e.queueName, ((Object)((Object)e)).getClass().getSimpleName(), e.id});
                            QueueName queueName = QueueName.of((CharSequence)e.queueName);
                            if (PostgresqlDurableQueues.this.useCentralizedMessageFetcher) {
                                PostgresqlDurableQueues.this.durableQueueConsumers.values().stream().filter(durableQueueConsumer -> durableQueueConsumer.queueName().equals((Object)queueName)).forEach(durableQueueConsumer -> {
                                    DefaultQueuedMessage queuedMessage = PostgresqlDurableQueues.createDefaultQueuedMessage(e, queueName);
                                    durableQueueConsumer.messageAdded((QueuedMessage)queuedMessage);
                                });
                            } else {
                                PostgresqlDurableQueues.this.traditionalDurableQueueConsumers.values().stream().filter(durableQueueConsumer -> durableQueueConsumer.queueName().equals((Object)queueName)).forEach(durableQueueConsumer -> {
                                    DefaultQueuedMessage queuedMessage = PostgresqlDurableQueues.createDefaultQueuedMessage(e, queueName);
                                    durableQueueConsumer.messageAdded((QueuedMessage)queuedMessage);
                                });
                            }
                        }
                        catch (Exception ex) {
                            log.error("Error occurred while handling notification", (Throwable)ex);
                        }
                    }
                });
            });
            log.info("Started PostgresqlDurableQueues");
        }
    }

    private static DefaultQueuedMessage createDefaultQueuedMessage(QueueTableNotification e, QueueName queueName) {
        return new DefaultQueuedMessage(QueueEntryId.of((CharSequence)String.valueOf(e.id)), queueName, Message.of((Object)NO_PAYLOAD), e.addedTimestamp, e.nextDeliveryTimestamp, e.deliveryTimestamp, null, -1, -1, e.isDeadLetterMessage, e.isBeingDelivered);
    }

    public void stop() {
        if (this.started) {
            block6: {
                log.info("Stopping");
                PostgresqlUtil.checkIsValidTableOrColumnName((String)this.sharedQueueTableName);
                if (this.useCentralizedMessageFetcher) {
                    this.durableQueueConsumers.values().forEach(consumer -> {
                        try {
                            consumer.stop();
                        }
                        catch (Exception ex) {
                            if (IOExceptionUtil.isIOException((Throwable)ex)) {
                                log.debug("Error occurred while stopping CentralizedMessageFetcherDurableQueueConsumer", (Throwable)ex);
                            }
                            log.error("Error occurred while stopping CentralizedMessageFetcherDurableQueueConsumer", (Throwable)ex);
                        }
                    });
                    try {
                        this.centralizedMessageFetcher.stop();
                    }
                    catch (Exception ex) {
                        if (IOExceptionUtil.isIOException((Throwable)ex)) {
                            log.debug("Error occurred while stopping CentralizedMessageFetcher", (Throwable)ex);
                            break block6;
                        }
                        log.error("Error occurred while stopping CentralizedMessageFetcher", (Throwable)ex);
                    }
                } else {
                    this.traditionalDurableQueueConsumers.values().forEach(consumer -> {
                        try {
                            consumer.stop();
                        }
                        catch (Exception ex) {
                            if (IOExceptionUtil.isIOException((Throwable)ex)) {
                                log.debug("Error occurred while stopping PostgresqlDurableQueueConsumer", (Throwable)ex);
                            }
                            log.error("Error occurred while stopping PostgresqlDurableQueueConsumer", (Throwable)ex);
                        }
                    });
                }
            }
            this.multiTableChangeListener.ifPresent(listener -> {
                try {
                    listener.unlistenToNotificationsFor(this.sharedQueueTableName);
                }
                catch (Exception ex) {
                    if (IOExceptionUtil.isIOException((Throwable)ex)) {
                        log.debug("Error occurred while performing unlistenToNotificationsFor '{}'", (Object)this.sharedQueueTableName, (Object)ex);
                    }
                    log.error("Error occurred while performing unlistenToNotificationsFor '{}'", (Object)this.sharedQueueTableName, (Object)ex);
                }
            });
            this.started = false;
            log.info("Stopped");
        }
    }

    public boolean isStarted() {
        return this.started;
    }

    public TransactionalMode getTransactionalMode() {
        return this.transactionalMode;
    }

    public Optional<UnitOfWorkFactory<? extends UnitOfWork>> getUnitOfWorkFactory() {
        return Optional.ofNullable(this.unitOfWorkFactory);
    }

    QueuedMessageRowMapper getQueuedMessageMapper() {
        return this.queuedMessageMapper;
    }

    public Set<QueueName> getQueueNames() {
        Set<QueueName> consumerQueueNames = this.getActiveQueueNames();
        Set dbQueueNames = (Set)this.unitOfWorkFactory.withUnitOfWork(handleAwareUnitOfWork -> handleAwareUnitOfWork.handle().createQuery(this.durableQueuesSql.getQueueNamesSql()).mapTo(QueueName.class).set());
        dbQueueNames.addAll(consumerQueueNames);
        return dbQueueNames;
    }

    public Set<QueueName> getActiveQueueNames() {
        HashSet<QueueName> allActiveQueueNames = new HashSet<QueueName>();
        allActiveQueueNames.addAll(this.durableQueueConsumers.keySet());
        allActiveQueueNames.addAll(this.traditionalDurableQueueConsumers.keySet());
        return allActiveQueueNames;
    }

    public DurableQueueConsumer consumeFromQueue(ConsumeFromQueue operation) {
        FailFast.requireNonNull((Object)operation, (String)"No operation provided");
        if (this.durableQueueConsumers.containsKey(operation.queueName) || this.traditionalDurableQueueConsumers.containsKey(operation.queueName)) {
            throw new DurableQueueException("There is already a DurableConsumer for this queue", operation.queueName);
        }
        operation.validate();
        if (this.useCentralizedMessageFetcher) {
            QueuePollingOptimizer queuePollingOptimizer = this.multiTableChangeListener.map(_ignore -> this.centralizedQueuePollingOptimizerFactory.apply(operation.getQueueName())).orElseGet(QueuePollingOptimizer::None);
            return (DurableQueueConsumer)this.durableQueueConsumers.computeIfAbsent(operation.queueName, _queueName -> {
                CentralizedMessageFetcherDurableQueueConsumer consumer = (CentralizedMessageFetcherDurableQueueConsumer)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> new CentralizedMessageFetcherDurableQueueConsumer(operation, (BatchMessageFetchingCapableDurableQueues)this, this::removeQueueConsumer, this.centralizedMessageFetcher, queuePollingOptimizer)).proceed();
                if (this.started) {
                    consumer.start();
                }
                log.info("[{}] {} - {} {}", new Object[]{operation.queueName, operation.consumerName, this.started ? "Started" : "Created", consumer.getClass().getSimpleName()});
                return consumer;
            });
        }
        return (DurableQueueConsumer)this.traditionalDurableQueueConsumers.computeIfAbsent(operation.queueName, _queueName -> {
            long pollingIntervalMs = operation.getPollingInterval().toMillis();
            QueuePollingOptimizer queuePollingOptimizer = this.multiTableChangeListener.map(_ignore -> this.queuePollingOptimizerFactory.apply(operation)).orElseGet(QueuePollingOptimizer::None);
            PostgresqlDurableQueueConsumer consumer = (PostgresqlDurableQueueConsumer)((Object)((Object)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> new PostgresqlDurableQueueConsumer(operation, this.unitOfWorkFactory, this, this::removeQueueConsumer, pollingIntervalMs, queuePollingOptimizer, this.interceptors)).proceed()));
            if (this.started) {
                consumer.start();
            }
            log.info("[{}] {} - {} {}", new Object[]{operation.queueName, operation.consumerName, this.started ? "Started" : "Created", ((Object)((Object)consumer)).getClass().getSimpleName()});
            return consumer;
        });
    }

    private QueuePollingOptimizer createQueuePollingOptimizerFor(ConsumeFromQueue operation) {
        long pollingIntervalMs = operation.getPollingInterval().toMillis();
        return new SimpleQueuePollingOptimizer(operation, (long)((double)pollingIntervalMs * 0.5), pollingIntervalMs * 20L);
    }

    private QueuePollingOptimizer createCentralizedQueuePollingOptimizerFor(QueueName queueName) {
        if (!this.useCentralizedMessageFetcher) {
            throw new IllegalStateException("The CentralizedMessageFetcher is not enabled, so a custom QueuePollingOptimizer must be provided");
        }
        long pollingIntervalMs = this.centralizedMessageFetcher.getPollingIntervalMs();
        return new CentralizedQueuePollingOptimizer(queueName, (long)((double)pollingIntervalMs * 0.5), pollingIntervalMs * 20L, 1.5, 0.1);
    }

    protected void removeQueueConsumer(DurableQueueConsumer durableQueueConsumer) {
        FailFast.requireNonNull((Object)durableQueueConsumer, (String)"You must provide a durableQueueConsumer");
        FailFast.requireFalse((boolean)durableQueueConsumer.isStarted(), (String)MessageFormatter.msg((String)"Cannot remove DurableQueueConsumer '{}' since it's started!", (Object[])new Object[]{durableQueueConsumer.queueName()}));
        StopConsumingFromQueue operation = new StopConsumingFromQueue(durableQueueConsumer);
        try {
            InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> {
                QueueName queueName = durableQueueConsumer.queueName();
                if (durableQueueConsumer instanceof CentralizedMessageFetcherDurableQueueConsumer) {
                    this.centralizedMessageFetcher.unregisterConsumer(queueName);
                    return (DurableQueueConsumer)this.durableQueueConsumers.remove(queueName);
                }
                return (DurableQueueConsumer)this.traditionalDurableQueueConsumers.remove(queueName);
            }).proceed();
        }
        catch (Exception e) {
            log.error(MessageFormatter.msg((String)"Failed to perform {}", (Object[])new Object[]{operation}), (Throwable)e);
        }
    }

    public QueueEntryId queueMessage(QueueMessage operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must provide a QueueMessage instance");
        return (QueueEntryId)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> this.queueMessage(operation.queueName, operation.getMessage(), false, operation.getCauseOfEnqueuing(), operation.getDeliveryDelay())).proceed();
    }

    public QueueEntryId queueMessageAsDeadLetterMessage(QueueMessageAsDeadLetterMessage operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must provide a QueueMessageAsDeadLetterMessage instance");
        return (QueueEntryId)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> this.queueMessage(operation.queueName, operation.getMessage(), true, Optional.ofNullable(operation.getCauseOfError()), Optional.empty())).proceed();
    }

    protected QueueEntryId queueMessage(QueueName queueName, Message message, boolean isDeadLetterMessage, Optional<Exception> causeOfEnqueuing, Optional<Duration> deliveryDelay) {
        String jsonPayload;
        FailFast.requireNonNull((Object)queueName, (String)"You must provide a queueName");
        FailFast.requireNonNull((Object)message, (String)"You must provide a message");
        FailFast.requireNonNull(causeOfEnqueuing, (String)"You must provide a causeOfEnqueuing option");
        FailFast.requireNonNull(deliveryDelay, (String)"You must provide a deliveryDelay option");
        QueueEntryId queueEntryId = QueueEntryId.random();
        Instant addedTimestamp = Instant.now();
        Instant nextDeliveryTimestamp = isDeadLetterMessage ? null : addedTimestamp.plus(deliveryDelay.orElse(Duration.ZERO));
        boolean isOrderedMessage = message instanceof OrderedMessage;
        log.trace("[{}:{}] Queuing {}{}message{} with nextDeliveryTimestamp {}", new Object[]{queueName, queueEntryId, isDeadLetterMessage ? "Dead Letter " : "", isOrderedMessage ? "Ordered " : "", isOrderedMessage ? MessageFormatter.msg((String)" {}:{}", (Object[])new Object[]{((OrderedMessage)message).getKey(), ((OrderedMessage)message).getOrder()}) : "", nextDeliveryTimestamp});
        try {
            jsonPayload = this.jsonSerializer.serialize(message.getPayload());
        }
        catch (JSONSerializationException e) {
            throw new DurableQueueException(MessageFormatter.msg((String)"Failed to serialize message payload of type", (Object[])new Object[]{message.getPayload().getClass().getName()}), (Throwable)e, queueName);
        }
        if (this.transactionalMode == TransactionalMode.FullyTransactional) {
            this.unitOfWorkFactory.getRequiredUnitOfWork();
        }
        this.unitOfWorkFactory.usingUnitOfWork(unitOfWork -> {
            Update update = (Update)((Update)((Update)((Update)((Update)((Update)((Update)unitOfWork.handle().createUpdate(this.durableQueuesSql.getQueueMessageSql()).bind("id", (Object)queueEntryId)).bind("queueName", (Object)queueName)).bind("message_payload", jsonPayload)).bind("message_payload_type", message.getPayload().getClass().getName())).bind("addedTimestamp", (Object)addedTimestamp)).bind("nextDeliveryTimestamp", (Object)nextDeliveryTimestamp)).bind("isDeadLetterMessage", isDeadLetterMessage);
            if (message instanceof OrderedMessage) {
                OrderedMessage orderedMessage = (OrderedMessage)message;
                FailFast.requireNonNull((Object)orderedMessage.getKey(), (String)"An OrderedMessage requires a non null key");
                FailFast.requireTrue((orderedMessage.getOrder() >= 0L ? 1 : 0) != 0, (String)"An OrderedMessage requires an order >= 0");
                ((Update)((Update)update.bind("deliveryMode", (Object)QueuedMessage.DeliveryMode.IN_ORDER)).bind("key", orderedMessage.getKey())).bind("order", orderedMessage.getOrder());
            } else {
                ((Update)((Update)update.bind("deliveryMode", (Object)QueuedMessage.DeliveryMode.NORMAL)).bindNull("key", 12)).bind("order", -1L);
            }
            try {
                String jsonMetaData = this.jsonSerializer.serialize((Object)message.getMetaData());
                update.bind("metaData", jsonMetaData);
            }
            catch (JSONSerializationException e) {
                throw new DurableQueueException("Failed to serialize message meta-data", (Throwable)e, queueName);
            }
            if (causeOfEnqueuing.isPresent()) {
                update.bind("lastDeliveryError", causeOfEnqueuing.map(Exceptions::getStackTrace).get());
            } else {
                update.bindNull("lastDeliveryError", 12);
            }
            int numberOfRowsUpdated = update.execute();
            if (numberOfRowsUpdated == 0) {
                throw new DurableQueueException("Failed to insert message", queueName);
            }
        });
        log.debug("[{}:{}] Queued {}{}message{} with nextDeliveryTimestamp {}", new Object[]{queueName, queueEntryId, isDeadLetterMessage ? "Dead Letter " : "", isOrderedMessage ? "Ordered " : "", isOrderedMessage ? MessageFormatter.msg((String)" {}:{}", (Object[])new Object[]{((OrderedMessage)message).getKey(), ((OrderedMessage)message).getOrder()}) : "", nextDeliveryTimestamp});
        return queueEntryId;
    }

    public List<QueueEntryId> queueMessages(QueueMessages operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must provide a QueueMessages instance");
        operation.validate();
        return (List)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> {
            QueueName queueName = operation.getQueueName();
            Optional deliveryDelay = operation.getDeliveryDelay();
            List messages = operation.getMessages();
            Instant addedTimestamp = Instant.now();
            Instant nextDeliveryTimestamp = addedTimestamp.plus(deliveryDelay.orElse(Duration.ZERO));
            PreparedBatch batch = ((HandleAwareUnitOfWork)this.unitOfWorkFactory.getRequiredUnitOfWork()).handle().prepareBatch(this.durableQueuesSql.getQueueMessageSql());
            List queueEntryIds = Lists.toIndexedStream((List)messages).map(indexedMessage -> {
                String jsonPayload;
                Message message = (Message)indexedMessage._2;
                try {
                    jsonPayload = this.jsonSerializer.serialize(message.getPayload());
                }
                catch (JSONSerializationException e) {
                    throw new DurableQueueException(MessageFormatter.msg((String)"Failed to serialize message payload of type", (Object[])new Object[]{message.getPayload().getClass().getName()}), (Throwable)e, queueName);
                }
                QueueEntryId queueEntryId = QueueEntryId.random();
                ((PreparedBatch)((PreparedBatch)((PreparedBatch)((PreparedBatch)((PreparedBatch)((PreparedBatch)((PreparedBatch)((PreparedBatch)batch.bind("id", (Object)queueEntryId)).bind("queueName", (Object)queueName)).bind("message_payload", jsonPayload)).bind("message_payload_type", message.getPayload().getClass().getName())).bind("addedTimestamp", (Object)addedTimestamp)).bind("nextDeliveryTimestamp", (Object)nextDeliveryTimestamp)).bind("isDeadLetterMessage", false)).bind("isBeingDelivered", false)).bindNull("lastDeliveryError", 12);
                if (message instanceof OrderedMessage) {
                    OrderedMessage orderedMessage = (OrderedMessage)message;
                    FailFast.requireNonNull((Object)orderedMessage.getKey(), (String)MessageFormatter.msg((String)"[Index: {}] - OrderedMessage requires a non null key", (Object[])new Object[]{indexedMessage._1}));
                    FailFast.requireTrue((orderedMessage.getOrder() >= 0L ? 1 : 0) != 0, (String)MessageFormatter.msg((String)"[Index: {}] - OrderedMessage requires an order >= 0", (Object[])new Object[]{indexedMessage._1}));
                    ((PreparedBatch)((PreparedBatch)batch.bind("deliveryMode", (Object)QueuedMessage.DeliveryMode.IN_ORDER)).bind("key", orderedMessage.getKey())).bind("order", orderedMessage.getOrder());
                } else {
                    ((PreparedBatch)((PreparedBatch)batch.bind("deliveryMode", (Object)QueuedMessage.DeliveryMode.NORMAL)).bindNull("key", 12)).bind("order", -1L);
                }
                try {
                    String jsonMetaData = this.jsonSerializer.serialize((Object)message.getMetaData());
                    batch.bind("metaData", jsonMetaData);
                }
                catch (JSONSerializationException e) {
                    throw new DurableQueueException("Failed to serialize message meta-data", (Throwable)e, queueName);
                }
                batch.add();
                return queueEntryId;
            }).collect(Collectors.toList());
            int numberOfRowsUpdated = Arrays.stream(batch.execute()).reduce(Integer::sum).orElse(0);
            if (numberOfRowsUpdated != messages.size()) {
                throw new DurableQueueException(MessageFormatter.msg((String)"Attempted to queue {} messages but only inserted {} messages", (Object[])new Object[]{messages.size(), numberOfRowsUpdated}), queueName);
            }
            log.debug("[{}] Queued {} Messages with nextDeliveryTimestamp {} and entry-id's: {}", new Object[]{queueName, messages.size(), nextDeliveryTimestamp, queueEntryIds});
            return queueEntryIds;
        }).proceed();
    }

    public Optional<QueuedMessage> retryMessage(RetryMessage operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must provide a RetryMessage instance");
        operation.validate();
        return (Optional)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> {
            Instant nextDeliveryTimestamp = Instant.now().plus(operation.getDeliveryDelay());
            Optional result = ((Query)((Query)((Query)((HandleAwareUnitOfWork)this.unitOfWorkFactory.getRequiredUnitOfWork()).handle().createQuery(this.durableQueuesSql.getRetryMessageSql()).bind("nextDeliveryTimestamp", (Object)nextDeliveryTimestamp)).bind("lastDeliveryError", operation.getCauseForRetry() != null ? Exceptions.getStackTrace((Throwable)operation.getCauseForRetry()) : "Manually requested redelivery")).bind("id", (Object)operation.queueEntryId)).map((RowMapper)this.queuedMessageMapper).findOne();
            if (result.isPresent()) {
                log.debug("Marked Message with id '{}' for Retry at {}. Message entry after update: {}", new Object[]{operation.queueEntryId, nextDeliveryTimestamp, result.get()});
                return result;
            }
            log.error("Failed to Mark Message with id '{}' for Retry", (Object)operation.queueEntryId);
            return Optional.empty();
        }).proceed();
    }

    public Optional<QueuedMessage> markAsDeadLetterMessage(MarkAsDeadLetterMessage operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must provide a MarkAsDeadLetterMessage instance");
        operation.validate();
        return (Optional)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> {
            Optional result = ((Query)((Query)((HandleAwareUnitOfWork)this.unitOfWorkFactory.getRequiredUnitOfWork()).handle().createQuery(this.durableQueuesSql.getMarkAsDeadLetterMessageSql()).bind("lastDeliveryError", operation.getCauseForBeingMarkedAsDeadLetter())).bind("id", (Object)operation.queueEntryId)).map((RowMapper)this.queuedMessageMapper).findOne();
            if (result.isPresent()) {
                log.debug("Marked message with id '{}' as Dead Letter Message. Message entry after update: {}", (Object)operation.queueEntryId, result.get());
                return result;
            }
            log.error("Failed to Mark as Message message with id '{}' as Dead Letter Message", (Object)operation.queueEntryId);
            return Optional.empty();
        }).proceed();
    }

    public Optional<QueuedMessage> resurrectDeadLetterMessage(ResurrectDeadLetterMessage operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must provide a ResurrectDeadLetterMessage instance");
        operation.validate();
        return (Optional)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> {
            Instant nextDeliveryTimestamp = Instant.now().plus(operation.getDeliveryDelay());
            Optional result = ((Query)((Query)((HandleAwareUnitOfWork)this.unitOfWorkFactory.getRequiredUnitOfWork()).handle().createQuery(this.durableQueuesSql.getResurrectDeadLetterMessageSql()).bind("nextDeliveryTimestamp", (Object)nextDeliveryTimestamp)).bind("id", (Object)operation.queueEntryId)).map((RowMapper)this.queuedMessageMapper).findOne();
            if (result.isPresent()) {
                QueuedMessage updateResult = (QueuedMessage)result.get();
                boolean isOrderedMessage = updateResult.getDeliveryMode() == QueuedMessage.DeliveryMode.IN_ORDER;
                log.debug("[{}] Resurrected Dead Letter {}Message with id '{}' {} and nextDeliveryTimestamp: {}. Message entry after update: {}", new Object[]{updateResult.getQueueName(), isOrderedMessage ? "Ordered " : "", operation.getQueueEntryId(), isOrderedMessage ? "(key: " + ((OrderedMessage)updateResult).getKey() + ", order: " + ((OrderedMessage)updateResult).getOrder() + ")" : "", nextDeliveryTimestamp, updateResult});
                return result;
            }
            log.error("Failed to resurrect Dead Letter Message with id '{}'", (Object)operation.queueEntryId);
            return Optional.empty();
        }).proceed();
    }

    public boolean acknowledgeMessageAsHandled(AcknowledgeMessageAsHandled operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must provide a AcknowledgeMessageAsHandled instance");
        return (Boolean)this.unitOfWorkFactory.withUnitOfWork(() -> (Boolean)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> {
            log.debug("Acknowledging-Message-As-Handled regarding Message with id '{}'", (Object)operation.queueEntryId);
            QueueEntryId queueEntryId = operation.queueEntryId;
            int rowsUpdated = ((Update)((HandleAwareUnitOfWork)this.unitOfWorkFactory.getRequiredUnitOfWork()).handle().createUpdate(this.durableQueuesSql.getAcknowledgeMessageAsHandledSql()).bind("id", (Object)operation.queueEntryId)).execute();
            if (rowsUpdated == 1) {
                log.debug("Acknowledged message as handled and deleted it. Id: '{}'", (Object)queueEntryId);
                return true;
            }
            if (this.getDeadLetterMessage(new GetDeadLetterMessage(operation.queueEntryId)).isPresent()) {
                log.debug("Couldn't acknowledge message as it was marked as a Dead-Letter-Message during the message handling. Id: '{}'", (Object)queueEntryId);
                return true;
            }
            log.error("Couldn't Acknowledge with id '{}' - it may already have been deleted", (Object)queueEntryId);
            return false;
        }).proceed());
    }

    public boolean deleteMessage(DeleteMessage operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must provide a DeleteMessage instance");
        return (Boolean)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> {
            int rowsUpdated = ((Update)((HandleAwareUnitOfWork)this.unitOfWorkFactory.getRequiredUnitOfWork()).handle().createUpdate(this.durableQueuesSql.getDeleteMessageSql()).bind("id", (Object)operation.queueEntryId)).execute();
            if (rowsUpdated == 1) {
                log.debug("Deleted Message with id '{}'", (Object)operation.queueEntryId);
                return true;
            }
            log.error("Couldn't Delete Message with id '{}' - it may already have been deleted", (Object)operation.queueEntryId);
            return false;
        }).proceed();
    }

    public Optional<QueuedMessage> getNextMessageReadyForDelivery(GetNextMessageReadyForDelivery operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must provide a GetNextMessageReadyForDelivery instance");
        return this.getNextMessageReadyForDelivery(operation, this.useOrderedUnorderedQuery);
    }

    public Optional<QueuedMessage> getNextMessageReadyForDelivery(GetNextMessageReadyForDelivery operation, boolean useOrderedUnorderedQuery) {
        FailFast.requireNonNull((Object)operation, (String)"You must specify a GetNextMessageReadyForDelivery instance");
        log.trace("[{}] Entered GetNextMessageReadyForDelivery", (Object)operation.queueName);
        return (Optional)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptors, interceptorChain) -> interceptors.intercept(operation, interceptorChain), () -> {
            this.resetMessagesStuckBeingDelivered(operation.queueName);
            Instant now = Instant.now();
            List<String> excludes = operation.getExcludeOrderedMessagesWithKey() != null ? operation.getExcludeOrderedMessagesWithKey() : List.of();
            try {
                if (useOrderedUnorderedQuery) {
                    return this.fetchNextMessageReadyForDeliveryOrderedUnordered(operation.queueName, excludes, now);
                }
                return this.fetchNextMessageReadyForDelivery(operation.queueName, excludes, now);
            }
            catch (DurableQueueDeserializationException e) {
                log.error("[{}] Marking Message as DeadLetterMessage due to DurableQueueDeserializationException while deserializing message with id '{}'", new Object[]{operation.queueName, e.queueEntryId.get(), e});
                this.markAsDeadLetterMessage((QueueEntryId)e.queueEntryId.get(), e);
                return Optional.empty();
            }
        }).proceed();
    }

    private Optional<QueuedMessage> fetchNextMessageReadyForDeliveryOrderedUnordered(QueueName queueName, Collection<String> excludes, Instant now) {
        Optional queuedMessage;
        boolean hasExcludes = !excludes.isEmpty();
        String orderedSql = this.durableQueuesSql.buildOrderedSqlStatement(hasExcludes);
        log.trace("[{}] Executing fetchNextMessageReadyForDeliveryOrderedUnordered sql", (Object)queueName);
        Handle handle = ((HandleAwareUnitOfWork)this.unitOfWorkFactory.getRequiredUnitOfWork()).handle();
        Query orderedQuery = (Query)((Query)((Query)handle.createQuery(orderedSql).bind("queueName", (Object)queueName)).bind("now", (Object)now)).bind("limit", 1);
        if (hasExcludes) {
            orderedQuery.bindList("excludeKeys", excludes);
        }
        if ((queuedMessage = orderedQuery.map((RowMapper)this.queuedMessageMapper).findOne()).isPresent()) {
            return queuedMessage;
        }
        String unorderedSql = this.durableQueuesSql.buildUnorderedSqlStatement();
        return ((Query)((Query)((Query)handle.createQuery(unorderedSql).bind("queueName", (Object)queueName)).bind("now", (Object)now)).bind("limit", 1)).map((RowMapper)this.queuedMessageMapper).findOne();
    }

    private Optional<QueuedMessage> fetchNextMessageReadyForDelivery(QueueName queueName, Collection<String> excludes, Instant now) {
        String sql = this.durableQueuesSql.buildGetNextMessageReadyForDeliverySqlStatement(excludes);
        Query query = (Query)((Query)((Query)((HandleAwareUnitOfWork)this.unitOfWorkFactory.getRequiredUnitOfWork()).handle().createQuery(sql).bind("queueName", (Object)queueName)).bind("now", (Object)now)).bind("limit", 1);
        if (!excludes.isEmpty()) {
            query.bindList("excludedKeys", excludes);
        }
        log.trace("[{}] Executing fetchNextMessageReadyForDelivery sql", (Object)queueName);
        return query.map((RowMapper)this.queuedMessageMapper).findOne();
    }

    void resetMessagesStuckBeingDelivered(QueueName queueName) {
        if (this.transactionalMode == TransactionalMode.SingleOperationTransaction) {
            Instant now = Instant.now();
            Instant lastStuckMessageResetTimestamp = (Instant)this.lastResetStuckMessagesCheckTimestamps.get(queueName);
            if (lastStuckMessageResetTimestamp == null || Duration.between(now, lastStuckMessageResetTimestamp).abs().toMillis() > (long)this.messageHandlingTimeoutMs) {
                int numberOfChanges;
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Looking for messages stuck marked as isBeingDelivered. Last check was performed: {}", (Object)queueName, (Object)lastStuckMessageResetTimestamp);
                }
                if ((numberOfChanges = ((Update)((Update)((Update)((HandleAwareUnitOfWork)this.unitOfWorkFactory.getRequiredUnitOfWork()).handle().createUpdate(this.durableQueuesSql.getResetMessagesStuckBeingDeliveredSql()).bind("threshold", (Object)now.minusMillis(this.messageHandlingTimeoutMs))).bind("error", "Handler Processing of the Message was determined to have Timed Out")).bind("now", (Object)now)).execute()) > 0) {
                    log.debug("[{}] Reset {} messages stuck marked as isBeingDelivered", (Object)queueName, (Object)numberOfChanges);
                } else {
                    log.debug("[{}] Didn't find any messages being stuck marked as isBeingDelivered", (Object)queueName);
                }
                this.lastResetStuckMessagesCheckTimestamps.put(queueName, now);
            }
        }
    }

    public boolean hasMessagesQueuedFor(QueueName queueName) {
        return this.getTotalMessagesQueuedFor(queueName) > 0L;
    }

    public final boolean hasOrderedMessageQueuedForKey(QueueName queueName, String key) {
        FailFast.requireNonNull((Object)queueName, (String)"No queueName provided");
        FailFast.requireNonNull((Object)key, (String)"No key provided");
        return (Long)this.unitOfWorkFactory.withUnitOfWork(handleAwareUnitOfWork -> (Long)((Query)((Query)handleAwareUnitOfWork.handle().createQuery(this.durableQueuesSql.getHasOrderedMessageQueuedForKeySql()).bind("queueName", (Object)queueName)).bind("key", key)).mapTo(Long.class).one()) > 0L;
    }

    public final long getTotalMessagesQueuedFor(GetTotalMessagesQueuedFor operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must specify a GetTotalMessagesQueuedFor instance");
        return (Long)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> (Long)this.unitOfWorkFactory.withUnitOfWork(handleAwareUnitOfWork -> (Long)((Query)handleAwareUnitOfWork.handle().createQuery(this.durableQueuesSql.getGetTotalMessagesQueuedForSql()).bind("queueName", (Object)operation.queueName)).mapTo(Long.class).one())).proceed();
    }

    public QueuedMessageCounts getQueuedMessageCountsFor(GetQueuedMessageCountsFor operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must specify a GetTotalMessagesQueuedFor instance");
        return (QueuedMessageCounts)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> (QueuedMessageCounts)this.unitOfWorkFactory.withUnitOfWork(handleAwareUnitOfWork -> (QueuedMessageCounts)((Query)handleAwareUnitOfWork.handle().createQuery(this.durableQueuesSql.getQueuedMessageCountsForSql()).bind("queueName", (Object)operation.queueName)).map((rs, ctx) -> new QueuedMessageCounts(operation.queueName, rs.getLong("regular_count"), rs.getLong("dead_letter_count"))).one())).proceed();
    }

    public final long getTotalDeadLetterMessagesQueuedFor(GetTotalDeadLetterMessagesQueuedFor operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must specify a GetTotalDeadLetterMessagesQueuedFor instance");
        return (Long)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> (Long)this.unitOfWorkFactory.withUnitOfWork(handleAwareUnitOfWork -> (Long)((Query)handleAwareUnitOfWork.handle().createQuery(this.durableQueuesSql.getGetTotalDeadLetterMessagesQueuedForSql()).bind("queueName", (Object)operation.queueName)).mapTo(Long.class).one())).proceed();
    }

    public final int purgeQueue(PurgeQueue operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must specify a PurgeQueue instance");
        return (Integer)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> (Integer)this.unitOfWorkFactory.withUnitOfWork(handleAwareUnitOfWork -> ((Update)handleAwareUnitOfWork.handle().createUpdate(this.durableQueuesSql.getPurgeQueueSql()).bind("queueName", (Object)operation.queueName)).execute())).proceed();
    }

    public final List<NextQueuedMessage> queryForMessagesSoonReadyForDelivery(QueueName queueName, Instant withNextDeliveryTimestampAfter, int maxNumberOfMessagesToReturn) {
        FailFast.requireNonNull((Object)queueName, (String)"No queueName provided");
        FailFast.requireNonNull((Object)withNextDeliveryTimestampAfter, (String)"No withNextDeliveryTimestampAfter provided");
        return (List)this.unitOfWorkFactory.withUnitOfWork(handleAwareUnitOfWork -> ((Query)((Query)((Query)handleAwareUnitOfWork.handle().createQuery(this.durableQueuesSql.getQueryForMessagesSoonReadyForDeliverySql()).bind("queueName", FailFast.requireNonNull((Object)queueName, (String)"No QueueName provided"))).bind("now", (Object)withNextDeliveryTimestampAfter)).bind("pageSize", maxNumberOfMessagesToReturn)).map((rs, ctx) -> new NextQueuedMessage(QueueEntryId.of((CharSequence)rs.getString("id")), queueName, rs.getObject("added_ts", OffsetDateTime.class).toInstant(), rs.getObject("next_delivery_ts", OffsetDateTime.class).toInstant())).list());
    }

    public final List<QueuedMessage> getQueuedMessages(GetQueuedMessages operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must specify a GetQueuedMessages instance");
        return (List)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> this.queryQueuedMessages(operation.queueName, operation.getQueueingSortOrder(), IncludeMessages.QUEUED_MESSAGES, operation.getStartIndex(), operation.getPageSize())).proceed();
    }

    public final List<QueuedMessage> getDeadLetterMessages(GetDeadLetterMessages operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must specify a GetDeadLetterMessages instance");
        return (List)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> this.queryQueuedMessages(operation.queueName, operation.getQueueingSortOrder(), IncludeMessages.DEAD_LETTER_MESSAGES, operation.getStartIndex(), operation.getPageSize())).proceed();
    }

    List<QueuedMessage> queryQueuedMessages(QueueName queueName, DurableQueues.QueueingSortOrder queueingSortOrder, IncludeMessages includeMessages, long startIndex, long pageSize) {
        FailFast.requireNonNull((Object)queueName, (String)"No queueName provided");
        FailFast.requireNonNull((Object)queueingSortOrder, (String)"No queueingOrder provided");
        FailFast.requireNonNull((Object)((Object)includeMessages), (String)"No includeMessages provided");
        Supplier<String> resolveIncludeMessagesSql = () -> {
            switch (includeMessages) {
                case ALL: {
                    return "";
                }
                case DEAD_LETTER_MESSAGES: {
                    return "AND is_dead_letter_message = TRUE\n";
                }
                case QUEUED_MESSAGES: {
                    return "AND is_dead_letter_message = FALSE\n";
                }
            }
            throw new IllegalArgumentException("Unsupported IncludeMessages value: " + String.valueOf((Object)includeMessages));
        };
        return (List)this.unitOfWorkFactory.withUnitOfWork(handleAwareUnitOfWork -> ((Query)((Query)((Query)handleAwareUnitOfWork.handle().createQuery(MessageFormatter.bind((String)"SELECT * FROM {:tableName} \n WHERE queue_name = :queueName\n{:includeMessages} LIMIT :pageSize \n OFFSET :offset", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName), MessageFormatter.NamedArgumentBinding.arg((String)"includeMessages", resolveIncludeMessagesSql.get())})).bind("queueName", FailFast.requireNonNull((Object)queueName, (String)"No QueueName provided"))).bind("offset", startIndex)).bind("pageSize", pageSize)).map((RowMapper)this.queuedMessageMapper).list());
    }

    public DurableQueues addInterceptor(DurableQueuesInterceptor interceptor) {
        FailFast.requireNonNull((Object)interceptor, (String)"No interceptor provided");
        log.info("Adding interceptor: {}", (Object)interceptor);
        interceptor.setDurableQueues((DurableQueues)this);
        this.interceptors.add(interceptor);
        DefaultInterceptorChain.sortInterceptorsByOrder(this.interceptors);
        return this;
    }

    public DurableQueues removeInterceptor(DurableQueuesInterceptor interceptor) {
        FailFast.requireNonNull((Object)interceptor, (String)"No interceptor provided");
        log.info("Removing interceptor: {}", (Object)interceptor);
        this.interceptors.remove(interceptor);
        DefaultInterceptorChain.sortInterceptorsByOrder(this.interceptors);
        return this;
    }

    public Optional<QueueName> getQueueNameFor(QueueEntryId queueEntryId) {
        return (Optional)this.unitOfWorkFactory.withUnitOfWork(handleAwareUnitOfWork -> ((Query)handleAwareUnitOfWork.handle().createQuery(MessageFormatter.bind((String)"SELECT queue_name FROM {:tableName} WHERE \n id = :id", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)})).bind("id", FailFast.requireNonNull((Object)queueEntryId, (String)"No queueEntryId provided"))).mapTo(QueueName.class).findOne());
    }

    public Optional<QueuedMessage> getDeadLetterMessage(GetDeadLetterMessage operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must specify a GetDeadLetterMessage instance");
        return (Optional)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> this.getQueuedMessage(operation.queueEntryId, true)).proceed();
    }

    public List<QueuedMessage> fetchNextBatchOfMessages(Collection<QueueName> queueNames, Map<QueueName, Set<String>> excludeKeysPerQueue, Map<QueueName, Integer> availableWorkerSlotsPerQueue) {
        FailFast.requireNonNull(queueNames, (String)"No queueNames provided");
        FailFast.requireNonNull(excludeKeysPerQueue, (String)"No excludeKeysPerQueue provided");
        FailFast.requireNonNull(availableWorkerSlotsPerQueue, (String)"No availableWorkerSlotsPerQueue provided");
        return this.fetchNextBatchOfMessages(queueNames, excludeKeysPerQueue, availableWorkerSlotsPerQueue, this.useOrderedUnorderedQuery);
    }

    public List<QueuedMessage> fetchNextBatchOfMessages(Collection<QueueName> queueNames, Map<QueueName, Set<String>> excludeKeysPerQueue, Map<QueueName, Integer> availableWorkerSlotsPerQueue, boolean useOrderedUnorderedQuery) {
        log.trace("Fetching batch of messages for queues: {}", queueNames);
        if (queueNames.isEmpty()) {
            return Collections.emptyList();
        }
        try {
            return (List)this.unitOfWorkFactory.withUnitOfWork(uow -> {
                this.resetMessagesStuckBeingDeliveredAcrossMultipleQueues(queueNames);
                Instant now = Instant.now();
                ArrayList allMessages = new ArrayList();
                for (QueueName queueName : queueNames) {
                    Integer availableWorkerSlotsForThisQueue = (Integer)availableWorkerSlotsPerQueue.get(queueName);
                    if (availableWorkerSlotsForThisQueue == null || availableWorkerSlotsForThisQueue <= 0) {
                        log.trace("[{}] Skipping queue as it has no available worker slots", (Object)queueName);
                        continue;
                    }
                    CentralizedMessageFetcherDurableQueueConsumer consumer = (CentralizedMessageFetcherDurableQueueConsumer)this.durableQueueConsumers.get(queueName);
                    if (consumer == null) {
                        log.trace("[{}] Skipping queue as it has no consumer", (Object)queueName);
                        continue;
                    }
                    QueuePollingOptimizer optimizer = consumer.getQueuePollingOptimizer();
                    if (optimizer.shouldSkipPolling()) {
                        log.trace("[{}] skipping centralized polling", (Object)queueName);
                        continue;
                    }
                    Set<String> excluded = excludeKeysPerQueue.getOrDefault(queueName, Collections.emptySet());
                    try {
                        List messagesForQueue;
                        if (useOrderedUnorderedQuery) {
                            String orderedSql = this.durableQueuesSql.buildOrderedSqlStatement(!excluded.isEmpty());
                            Query orderedQ = (Query)((Query)((Query)uow.handle().createQuery(orderedSql).bind("queueName", (Object)queueName)).bind("now", (Object)now)).bind("limit", availableWorkerSlotsForThisQueue);
                            if (!excluded.isEmpty()) {
                                orderedQ.bindList("excludeKeys", excluded);
                            }
                            if ((messagesForQueue = orderedQ.map((RowMapper)this.queuedMessageMapper).list()).isEmpty()) {
                                String unorderedSql = this.durableQueuesSql.buildUnorderedSqlStatement();
                                Query unorderedQ = (Query)((Query)((Query)uow.handle().createQuery(unorderedSql).bind("queueName", (Object)queueName)).bind("now", (Object)now)).bind("limit", availableWorkerSlotsForThisQueue);
                                messagesForQueue = unorderedQ.map((RowMapper)this.queuedMessageMapper).list();
                            }
                        } else {
                            String sql = this.durableQueuesSql.buildGetNextMessageReadyForDeliverySqlStatement(excluded);
                            Query query = (Query)((Query)((Query)uow.handle().createQuery(sql).bind("queueName", (Object)queueName)).bind("now", (Object)now)).bind("limit", availableWorkerSlotsForThisQueue);
                            if (!excluded.isEmpty()) {
                                query.bindList("excludedKeys", new ArrayList<String>(excluded));
                            }
                            messagesForQueue = query.map((RowMapper)this.queuedMessageMapper).list();
                        }
                        log.debug("[{}] Batch fetched {} messages with {} slots available", new Object[]{queueName, messagesForQueue.size(), availableWorkerSlotsForThisQueue});
                        if (messagesForQueue.isEmpty()) {
                            optimizer.queuePollingReturnedNoMessages();
                            log.trace("[{}] No messages fetched for this queue", (Object)queueName);
                        } else {
                            optimizer.queuePollingReturnedMessages(messagesForQueue);
                            log.trace("[{}] Fetched {} messages for this queue", (Object)queueName, (Object)messagesForQueue.size());
                        }
                        allMessages.addAll(messagesForQueue);
                    }
                    catch (DurableQueueDeserializationException e) {
                        log.error("[{}] Marking Message as DeadLetterMessage due to DurableQueueDeserializationException while deserializing message with id '{}'", new Object[]{queueName, e.queueEntryId.get(), e});
                        this.markAsDeadLetterMessage((QueueEntryId)e.queueEntryId.get(), e);
                    }
                }
                log.debug("Batch fetched {} messages for {} queues: {}", new Object[]{allMessages.size(), queueNames.size(), queueNames});
                return allMessages;
            });
        }
        catch (Exception e) {
            if (IOExceptionUtil.isIOException((Throwable)e)) {
                log.debug("Error in fetchNextBatchOfMessages: {}", (Object)e.getMessage(), (Object)e);
            } else {
                log.error("Error in fetchNextBatchOfMessages: {}", (Object)e.getMessage(), (Object)e);
            }
            return Collections.emptyList();
        }
    }

    public List<QueuedMessage> fetchNextBatchOfMessagesBatched(Collection<QueueName> queueNames, Map<QueueName, Set<String>> excludeKeysPerQueue, Map<QueueName, Integer> availableWorkerSlotsPerQueue) {
        log.trace("Fetching batch of messages for queues: {}", queueNames);
        List<QueueName> activeQueues = queueNames.stream().filter(queueName -> availableWorkerSlotsPerQueue.getOrDefault(queueName, 0) > 0).filter(queueName -> {
            CentralizedMessageFetcherDurableQueueConsumer durableQueueConsumer = (CentralizedMessageFetcherDurableQueueConsumer)this.durableQueueConsumers.get(queueName);
            if (durableQueueConsumer == null) {
                log.trace("[{}] Skipping queue as it has no consumer", queueName);
                return false;
            }
            QueuePollingOptimizer queuePollingOptimizer = durableQueueConsumer.getQueuePollingOptimizer();
            boolean skip = queuePollingOptimizer.shouldSkipPolling();
            if (skip) {
                log.trace("[{}] skipping due to backoff", queueName);
            }
            return !skip;
        }).toList();
        if (activeQueues.isEmpty()) {
            return Collections.emptyList();
        }
        try {
            return (List)this.unitOfWorkFactory.withUnitOfWork(uow -> {
                this.resetMessagesStuckBeingDeliveredAcrossMultipleQueues(activeQueues);
                Instant now = Instant.now();
                DurableQueuesSql.BatchedSqlResult batchedSqlResult = this.durableQueuesSql.buildBatchedSqlStatement(excludeKeysPerQueue, availableWorkerSlotsPerQueue, activeQueues);
                Query query = (Query)uow.handle().createQuery(batchedSqlResult.getSql()).bind("now", (Object)now);
                for (Map.Entry<String, String> entry : batchedSqlResult.getSingleValueBindings().entrySet()) {
                    query.bind(entry.getKey(), entry.getValue());
                }
                for (Map.Entry<String, Object> entry : batchedSqlResult.getListBindings().entrySet()) {
                    query.bindList(entry.getKey(), (Iterable)entry.getValue());
                }
                MessageMappingResult mappingResult = this.mapQueryResultsWithExceptionHandling(query);
                List<QueuedMessage> list = mappingResult.successfulMessages();
                if (!mappingResult.failedMappings().isEmpty()) {
                    log.warn("Failed to deserialize {} messages during batch fetch. Failed QueueEntryIds: {}", (Object)mappingResult.failedMappings().size(), (Object)mappingResult.failedMappings().stream().map(failed -> failed.queueEntryId().toString()).collect(Collectors.joining(", ")));
                    for (MessageMappingResult.FailedMessageMapping failedMapping : mappingResult.failedMappings()) {
                        log.error("[{}] Marking Message as DeadLetterMessage due to issues while deserializing message with id '{}'", new Object[]{failedMapping.queueName(), failedMapping.queueEntryId(), failedMapping.mappingException()});
                        this.markAsDeadLetterMessage(failedMapping.queueEntryId(), failedMapping.mappingException());
                    }
                }
                Map<QueueName, List<QueuedMessage>> byQueue = list.stream().collect(Collectors.groupingBy(QueuedMessage::getQueueName));
                for (QueueName queueName : activeQueues) {
                    CentralizedMessageFetcherDurableQueueConsumer durableQueueConsumer = (CentralizedMessageFetcherDurableQueueConsumer)this.durableQueueConsumers.get(queueName);
                    if (durableQueueConsumer == null) {
                        log.trace("[{}] Skipping queue as it has no consumer", (Object)queueName);
                        continue;
                    }
                    QueuePollingOptimizer queuePollingOptimizer = durableQueueConsumer.getQueuePollingOptimizer();
                    List messagesForQueue = byQueue.getOrDefault(queueName, Collections.emptyList());
                    if (messagesForQueue.isEmpty()) {
                        queuePollingOptimizer.queuePollingReturnedNoMessages();
                        continue;
                    }
                    queuePollingOptimizer.queuePollingReturnedMessages(messagesForQueue);
                }
                log.debug("Batch fetched {} messages across {} queues: {}", new Object[]{list.size(), activeQueues.size(), byQueue.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> ((List)e.getValue()).size()))});
                return list;
            });
        }
        catch (Exception e) {
            if (IOExceptionUtil.isIOException((Throwable)e)) {
                log.debug("Error in fetchNextBatchOfMessages: {}", (Object)e.getMessage(), (Object)e);
            } else {
                log.error("Error in fetchNextBatchOfMessages: {}", (Object)e.getMessage(), (Object)e);
            }
            return Collections.emptyList();
        }
    }

    private MessageMappingResult mapQueryResultsWithExceptionHandling(Query query) {
        ArrayList<QueuedMessage> successfulMessages = new ArrayList<QueuedMessage>();
        final ArrayList<MessageMappingResult.FailedMessageMapping> failedMappings = new ArrayList<MessageMappingResult.FailedMessageMapping>();
        RowMapper<QueuedMessage> customMapper = new RowMapper<QueuedMessage>(){

            public QueuedMessage map(ResultSet rs, StatementContext ctx) throws SQLException {
                try {
                    return PostgresqlDurableQueues.this.queuedMessageMapper.map(rs, ctx);
                }
                catch (Exception e) {
                    QueueName queueName = QueueName.of((CharSequence)rs.getString("queue_name"));
                    QueueEntryId queueEntryId = QueueEntryId.of((CharSequence)rs.getString("id"));
                    failedMappings.add(new MessageMappingResult.FailedMessageMapping(queueName, queueEntryId, e));
                    return null;
                }
            }
        };
        List allResults = query.map((RowMapper)customMapper).list();
        successfulMessages.addAll(allResults.stream().filter(Objects::nonNull).toList());
        return new MessageMappingResult(successfulMessages, failedMappings);
    }

    private void resetMessagesStuckBeingDeliveredAcrossMultipleQueues(Collection<QueueName> queueNames) {
        FailFast.requireNonNull(queueNames, (String)"No queueNames provided");
        if (this.transactionalMode != TransactionalMode.SingleOperationTransaction || queueNames.isEmpty()) {
            return;
        }
        log.trace("resetMultipleQueuesStuckBeingDelivered called for queues: {}", queueNames);
        Instant now = Instant.now();
        List<QueueName> queuesToReset = queueNames.stream().filter(queueName -> {
            Instant lastReset = (Instant)this.lastResetStuckMessagesCheckTimestamps.get(queueName);
            return lastReset == null || Duration.between(now, lastReset).abs().toMillis() > (long)this.messageHandlingTimeoutMs;
        }).collect(Collectors.toList());
        if (queuesToReset.isEmpty()) {
            log.trace("No stuck messages to reset across multiple queues: {}", queueNames);
            return;
        }
        log.debug("Looking for messages stuck marked as isBeingDelivered across queues: {}", queuesToReset);
        List<String> queueNamesForQuery = queuesToReset.stream().map(CharSequenceType::toString).toList();
        int numberOfChanges = ((Update)((Update)((Update)((Update)((HandleAwareUnitOfWork)this.unitOfWorkFactory.getRequiredUnitOfWork()).handle().createUpdate(this.durableQueuesSql.getResetMessagesStuckBeingDeliveredAcrossMultipleQueuesSql()).bind("threshold", (Object)now.minusMillis(this.messageHandlingTimeoutMs))).bind("error", "Handler Processing of the Message was determined to have Timed Out")).bind("now", (Object)now)).bindList("queueNames", queueNamesForQuery)).execute();
        if (numberOfChanges > 0) {
            log.debug("Reset {} messages stuck marked as isBeingDelivered across queues: {}", (Object)numberOfChanges, queuesToReset);
        } else {
            log.debug("No stuck messages found across queues: {}", queuesToReset);
        }
        queuesToReset.forEach(queueName -> this.lastResetStuckMessagesCheckTimestamps.put((QueueName)queueName, now));
    }

    public final Optional<QueuedMessage> getQueuedMessage(GetQueuedMessage operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must specify a GetQueuedMessage instance");
        return (Optional)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> this.getQueuedMessage(operation.queueEntryId, false)).proceed();
    }

    private Optional<QueuedMessage> getQueuedMessage(QueueEntryId queueEntryId, boolean isDeadLetterMessage) {
        return (Optional)this.unitOfWorkFactory.withUnitOfWork(handleAwareUnitOfWork -> ((Query)((Query)handleAwareUnitOfWork.handle().createQuery(MessageFormatter.bind((String)"SELECT * FROM {:tableName} WHERE \n id = :id AND\n is_dead_letter_message = :isDeadLetterMessage", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)})).bind("id", FailFast.requireNonNull((Object)queueEntryId, (String)"No queueEntryId provided"))).bind("isDeadLetterMessage", isDeadLetterMessage)).map((RowMapper)this.queuedMessageMapper).findOne());
    }

    protected static enum IncludeMessages {
        ALL,
        DEAD_LETTER_MESSAGES,
        QUEUED_MESSAGES;

    }
}

