/*
 * Decompiled with CFR 0.152.
 */
package dk.cloudcreate.essentials.components.queue.postgresql;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import dk.cloudcreate.essentials.components.foundation.IOExceptionUtil;
import dk.cloudcreate.essentials.components.foundation.json.JSONSerializationException;
import dk.cloudcreate.essentials.components.foundation.json.JSONSerializer;
import dk.cloudcreate.essentials.components.foundation.json.JacksonJSONSerializer;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DefaultDurableQueueConsumer;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DefaultQueuedMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueConsumer;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueDeserializationException;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueException;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueues;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueuesInterceptor;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.Message;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.MessageMetaData;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.NextQueuedMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.OrderedMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueueEntryId;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueueName;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuePollingOptimizer;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuedMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuedMessageCounts;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.TransactionalMode;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.AcknowledgeMessageAsHandled;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.ConsumeFromQueue;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.DeleteMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.GetDeadLetterMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.GetDeadLetterMessages;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.GetNextMessageReadyForDelivery;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.GetQueuedMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.GetQueuedMessageCountsFor;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.GetQueuedMessages;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.GetTotalDeadLetterMessagesQueuedFor;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.GetTotalMessagesQueuedFor;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.MarkAsDeadLetterMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.PurgeQueue;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.QueueMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.QueueMessageAsDeadLetterMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.QueueMessages;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.ResurrectDeadLetterMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.RetryMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.StopConsumingFromQueue;
import dk.cloudcreate.essentials.components.foundation.postgresql.ListenNotify;
import dk.cloudcreate.essentials.components.foundation.postgresql.MultiTableChangeListener;
import dk.cloudcreate.essentials.components.foundation.postgresql.NotificationDuplicationFilter;
import dk.cloudcreate.essentials.components.foundation.postgresql.PostgresqlUtil;
import dk.cloudcreate.essentials.components.foundation.postgresql.TableChangeNotification;
import dk.cloudcreate.essentials.components.foundation.transaction.UnitOfWork;
import dk.cloudcreate.essentials.components.foundation.transaction.UnitOfWorkFactory;
import dk.cloudcreate.essentials.components.foundation.transaction.jdbi.HandleAwareUnitOfWork;
import dk.cloudcreate.essentials.components.foundation.transaction.jdbi.HandleAwareUnitOfWorkFactory;
import dk.cloudcreate.essentials.components.queue.postgresql.PostgresqlDurableQueueConsumer;
import dk.cloudcreate.essentials.components.queue.postgresql.PostgresqlDurableQueuesBuilder;
import dk.cloudcreate.essentials.components.queue.postgresql.QueueNameDuplicationFilter;
import dk.cloudcreate.essentials.components.queue.postgresql.jdbi.QueueEntryIdArgumentFactory;
import dk.cloudcreate.essentials.components.queue.postgresql.jdbi.QueueEntryIdColumnMapper;
import dk.cloudcreate.essentials.components.queue.postgresql.jdbi.QueueNameArgumentFactory;
import dk.cloudcreate.essentials.components.queue.postgresql.jdbi.QueueNameColumnMapper;
import dk.cloudcreate.essentials.jackson.immutable.EssentialsImmutableJacksonModule;
import dk.cloudcreate.essentials.jackson.types.EssentialTypesJacksonModule;
import dk.cloudcreate.essentials.reactive.AnnotatedEventHandler;
import dk.cloudcreate.essentials.reactive.EventHandler;
import dk.cloudcreate.essentials.reactive.Handler;
import dk.cloudcreate.essentials.shared.Exceptions;
import dk.cloudcreate.essentials.shared.FailFast;
import dk.cloudcreate.essentials.shared.MessageFormatter;
import dk.cloudcreate.essentials.shared.collections.Lists;
import dk.cloudcreate.essentials.shared.interceptor.DefaultInterceptorChain;
import dk.cloudcreate.essentials.shared.interceptor.InterceptorChain;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.argument.ArgumentFactory;
import org.jdbi.v3.core.mapper.ColumnMapper;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.PreparedBatch;
import org.jdbi.v3.core.statement.Query;
import org.jdbi.v3.core.statement.StatementContext;
import org.jdbi.v3.core.statement.Update;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class PostgresqlDurableQueues
implements DurableQueues {
    private static final Logger log = LoggerFactory.getLogger(PostgresqlDurableQueues.class);
    public static final String DEFAULT_DURABLE_QUEUES_TABLE_NAME = "durable_queues";
    private static final Object NO_PAYLOAD = new Object();
    private final HandleAwareUnitOfWorkFactory<? extends HandleAwareUnitOfWork> unitOfWorkFactory;
    private final JSONSerializer jsonSerializer;
    private final String sharedQueueTableName;
    private final ConcurrentMap<QueueName, PostgresqlDurableQueueConsumer> durableQueueConsumers = new ConcurrentHashMap<QueueName, PostgresqlDurableQueueConsumer>();
    private final QueuedMessageRowMapper queuedMessageMapper;
    private final List<DurableQueuesInterceptor> interceptors = new CopyOnWriteArrayList<DurableQueuesInterceptor>();
    private final Optional<MultiTableChangeListener<TableChangeNotification>> multiTableChangeListener;
    private final Function<ConsumeFromQueue, QueuePollingOptimizer> queuePollingOptimizerFactory;
    private final TransactionalMode transactionalMode;
    private int messageHandlingTimeoutMs;
    protected ConcurrentMap<QueueName, Instant> lastResetStuckMessagesCheckTimestamps = new ConcurrentHashMap<QueueName, Instant>();
    private volatile boolean started;

    public static PostgresqlDurableQueuesBuilder builder() {
        return new PostgresqlDurableQueuesBuilder();
    }

    public PostgresqlDurableQueues(HandleAwareUnitOfWorkFactory<? extends HandleAwareUnitOfWork> unitOfWorkFactory) {
        this(unitOfWorkFactory, (JSONSerializer)new JacksonJSONSerializer(PostgresqlDurableQueues.createDefaultObjectMapper()), DEFAULT_DURABLE_QUEUES_TABLE_NAME, null, null);
    }

    public PostgresqlDurableQueues(HandleAwareUnitOfWorkFactory<? extends HandleAwareUnitOfWork> unitOfWorkFactory, Function<ConsumeFromQueue, QueuePollingOptimizer> queuePollingOptimizerFactory) {
        this(unitOfWorkFactory, (JSONSerializer)new JacksonJSONSerializer(PostgresqlDurableQueues.createDefaultObjectMapper()), DEFAULT_DURABLE_QUEUES_TABLE_NAME, null, queuePollingOptimizerFactory);
    }

    public PostgresqlDurableQueues(HandleAwareUnitOfWorkFactory<? extends HandleAwareUnitOfWork> unitOfWorkFactory, JSONSerializer jsonSerializer) {
        this(unitOfWorkFactory, jsonSerializer, DEFAULT_DURABLE_QUEUES_TABLE_NAME, null, null);
    }

    public PostgresqlDurableQueues(HandleAwareUnitOfWorkFactory<? extends HandleAwareUnitOfWork> unitOfWorkFactory, JSONSerializer jsonSerializer, Function<ConsumeFromQueue, QueuePollingOptimizer> queuePollingOptimizerFactory) {
        this(unitOfWorkFactory, jsonSerializer, DEFAULT_DURABLE_QUEUES_TABLE_NAME, null, queuePollingOptimizerFactory);
    }

    public PostgresqlDurableQueues(HandleAwareUnitOfWorkFactory<? extends HandleAwareUnitOfWork> unitOfWorkFactory, JSONSerializer jsonSerializer, String sharedQueueTableName, MultiTableChangeListener<TableChangeNotification> multiTableChangeListener, Function<ConsumeFromQueue, QueuePollingOptimizer> queuePollingOptimizerFactory) {
        this(unitOfWorkFactory, jsonSerializer, sharedQueueTableName, multiTableChangeListener, queuePollingOptimizerFactory, TransactionalMode.FullyTransactional, null);
    }

    public PostgresqlDurableQueues(HandleAwareUnitOfWorkFactory<? extends HandleAwareUnitOfWork> unitOfWorkFactory, JSONSerializer jsonSerializer, String sharedQueueTableName, MultiTableChangeListener<TableChangeNotification> multiTableChangeListener, Function<ConsumeFromQueue, QueuePollingOptimizer> queuePollingOptimizerFactory, TransactionalMode transactionalMode, Duration messageHandlingTimeout) {
        this.unitOfWorkFactory = (HandleAwareUnitOfWorkFactory)FailFast.requireNonNull(unitOfWorkFactory, (String)"No unitOfWorkFactory instance provided");
        this.jsonSerializer = (JSONSerializer)FailFast.requireNonNull((Object)jsonSerializer, (String)"No jsonSerializer");
        this.sharedQueueTableName = ((String)FailFast.requireNonNull((Object)sharedQueueTableName, (String)"No sharedQueueTableName provided")).toLowerCase(Locale.ROOT);
        PostgresqlUtil.checkIsValidTableOrColumnName((String)sharedQueueTableName);
        this.queuedMessageMapper = new QueuedMessageRowMapper();
        this.multiTableChangeListener = Optional.ofNullable(multiTableChangeListener);
        this.queuePollingOptimizerFactory = queuePollingOptimizerFactory != null ? queuePollingOptimizerFactory : this::createQueuePollingOptimizerFor;
        this.transactionalMode = (TransactionalMode)FailFast.requireNonNull((Object)transactionalMode, (String)"No transactionalMode instance provided");
        if (transactionalMode == TransactionalMode.SingleOperationTransaction) {
            this.messageHandlingTimeoutMs = (int)((Duration)FailFast.requireNonNull((Object)messageHandlingTimeout, (String)"No messageHandlingTimeout provided")).toMillis();
            this.addInterceptor(new SingleOperationTransactionDurableQueuesInterceptor(unitOfWorkFactory));
        }
        this.multiTableChangeListener.ifPresent(listener -> listener.addDuplicationFilterAsFirst((NotificationDuplicationFilter)new QueueNameDuplicationFilter()));
        this.initializeQueueTables();
    }

    private void initializeQueueTables() {
        PostgresqlUtil.checkIsValidTableOrColumnName((String)this.sharedQueueTableName);
        this.unitOfWorkFactory.usingUnitOfWork(handleAwareUnitOfWork -> {
            handleAwareUnitOfWork.handle().getJdbi().registerArgument((ArgumentFactory)new QueueNameArgumentFactory());
            handleAwareUnitOfWork.handle().getJdbi().registerColumnMapper((ColumnMapper)new QueueNameColumnMapper());
            handleAwareUnitOfWork.handle().getJdbi().registerArgument((ArgumentFactory)new QueueEntryIdArgumentFactory());
            handleAwareUnitOfWork.handle().getJdbi().registerColumnMapper((ColumnMapper)new QueueEntryIdColumnMapper());
            handleAwareUnitOfWork.handle().execute(MessageFormatter.bind((String)"CREATE TABLE IF NOT EXISTS {:tableName} (\n  id                     TEXT PRIMARY KEY,\n  queue_name             TEXT NOT NULL,\n  message_payload        JSONB NOT NULL,\n  message_payload_type   TEXT NOT NULL,\n  added_ts               TIMESTAMPTZ NOT NULL,\n  next_delivery_ts       TIMESTAMPTZ,\n  delivery_ts            TIMESTAMPTZ DEFAULT NULL,\n  total_attempts         INTEGER DEFAULT 0,\n  redelivery_attempts    INTEGER DEFAULT 0,\n  last_delivery_error    TEXT DEFAULT NULL,\n  is_being_delivered     BOOLEAN DEFAULT FALSE,\n  is_dead_letter_message BOOLEAN NOT NULL DEFAULT FALSE,\n  meta_data              JSONB DEFAULT NULL,\n  delivery_mode          TEXT NOT NULL,\n  key                    TEXT DEFAULT NULL,\n  key_order              BIGINT DEFAULT -1\n)", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)}), new Object[0]);
            log.info("Ensured Durable Queues table '{}' exists", (Object)this.sharedQueueTableName);
            this.dropIndex("DROP INDEX IF EXISTS idx_{:tableName}_queue_name", handleAwareUnitOfWork.handle());
            this.dropIndex("DROP INDEX IF EXISTS idx_{:tableName}_next_delivery_ts", handleAwareUnitOfWork.handle());
            this.dropIndex("DROP INDEX IF EXISTS idx_{:tableName}_is_dead_letter_message", handleAwareUnitOfWork.handle());
            this.dropIndex("DROP INDEX IF EXISTS idx_{:tableName}_is_being_delivered", handleAwareUnitOfWork.handle());
            this.createIndex("CREATE INDEX IF NOT EXISTS idx_{:tableName}_ordered_msg ON {:tableName} (queue_name, key, key_order)", handleAwareUnitOfWork.handle());
            this.createIndex("CREATE INDEX IF NOT EXISTS idx_{:tableName}_next_msg ON {:tableName} (queue_name, is_dead_letter_message, is_being_delivered, next_delivery_ts)", handleAwareUnitOfWork.handle());
            this.createIndex("CREATE INDEX IF NOT EXISTS idx_{:tableName}_ready ON {:tableName} (\n    queue_name,\n    next_delivery_ts,\n    key,\n    key_order\n)\nWHERE\n    is_dead_letter_message = FALSE\n    AND is_being_delivered = FALSE\n", handleAwareUnitOfWork.handle());
            this.multiTableChangeListener.ifPresent(listener -> ListenNotify.addChangeNotificationTriggerToTable((Handle)handleAwareUnitOfWork.handle(), (String)this.sharedQueueTableName, List.of(ListenNotify.SqlOperation.INSERT, ListenNotify.SqlOperation.UPDATE), (String[])new String[]{"id", "queue_name", "added_ts", "next_delivery_ts", "delivery_ts", "is_dead_letter_message", "is_being_delivered"}));
        });
    }

    private void createIndex(String indexStatement, Handle handle) {
        PostgresqlUtil.checkIsValidTableOrColumnName((String)this.sharedQueueTableName);
        handle.execute(MessageFormatter.bind((String)indexStatement, (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)}), new Object[0]);
    }

    private void dropIndex(String indexStatement, Handle handle) {
        PostgresqlUtil.checkIsValidTableOrColumnName((String)this.sharedQueueTableName);
        handle.execute(MessageFormatter.bind((String)indexStatement, (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)}), new Object[0]);
    }

    public final String getSharedQueueTableName() {
        return this.sharedQueueTableName;
    }

    public final List<DurableQueuesInterceptor> getInterceptors() {
        return Collections.unmodifiableList(this.interceptors);
    }

    public final void start() {
        if (!this.started) {
            this.started = true;
            log.info("Starting");
            PostgresqlUtil.checkIsValidTableOrColumnName((String)this.sharedQueueTableName);
            this.interceptors.forEach(durableQueuesInterceptor -> durableQueuesInterceptor.setDurableQueues((DurableQueues)this));
            DefaultInterceptorChain.sortInterceptorsByOrder(this.interceptors);
            this.durableQueueConsumers.values().forEach(DefaultDurableQueueConsumer::start);
            this.multiTableChangeListener.ifPresent(listener -> {
                listener.listenToNotificationsFor(this.sharedQueueTableName, QueueTableNotification.class);
                listener.getEventBus().addAsyncSubscriber((EventHandler)new AnnotatedEventHandler(){

                    @Handler
                    void handle(QueueTableNotification e) {
                        try {
                            log.trace("[{}] Received Message-Added {} with id '{}'", new Object[]{e.queueName, ((Object)((Object)e)).getClass().getSimpleName(), e.id});
                            QueueName queueName = QueueName.of((CharSequence)e.queueName);
                            PostgresqlDurableQueues.this.durableQueueConsumers.values().stream().filter(durableQueueConsumer -> durableQueueConsumer.queueName.equals((Object)queueName)).forEach(durableQueueConsumer -> durableQueueConsumer.messageAdded((QueuedMessage)new DefaultQueuedMessage(QueueEntryId.of((CharSequence)String.valueOf(e.id)), queueName, Message.of((Object)NO_PAYLOAD), e.addedTimestamp, e.nextDeliveryTimestamp, e.deliveryTimestamp, null, -1, -1, e.isDeadLetterMessage, e.isBeingDelivered)));
                        }
                        catch (Exception ex) {
                            log.error("Error occurred while handling notification", (Throwable)ex);
                        }
                    }
                });
            });
            log.info("Started");
        }
    }

    public final void stop() {
        if (this.started) {
            log.info("Stopping");
            PostgresqlUtil.checkIsValidTableOrColumnName((String)this.sharedQueueTableName);
            this.durableQueueConsumers.values().forEach(postgresqlDurableQueueConsumer -> {
                try {
                    postgresqlDurableQueueConsumer.stop();
                }
                catch (Exception ex) {
                    if (IOExceptionUtil.isIOException((Throwable)ex)) {
                        log.debug("Error occurred while stopping DurableQueueConsumer", (Throwable)ex);
                    }
                    log.error("Error occurred while stopping DurableQueueConsumer", (Throwable)ex);
                }
            });
            this.multiTableChangeListener.ifPresent(listener -> {
                try {
                    listener.unlistenToNotificationsFor(this.sharedQueueTableName);
                }
                catch (Exception ex) {
                    if (IOExceptionUtil.isIOException((Throwable)ex)) {
                        log.debug("Error occurred while performing unlistenToNotificationsFor '{}'", (Object)this.sharedQueueTableName, (Object)ex);
                    }
                    log.error("Error occurred while performing unlistenToNotificationsFor '{}'", (Object)this.sharedQueueTableName, (Object)ex);
                }
            });
            this.started = false;
            log.info("Stopped");
        }
    }

    public final boolean isStarted() {
        return this.started;
    }

    public final TransactionalMode getTransactionalMode() {
        return this.transactionalMode;
    }

    public final Optional<UnitOfWorkFactory<? extends UnitOfWork>> getUnitOfWorkFactory() {
        return Optional.ofNullable(this.unitOfWorkFactory);
    }

    public final Set<QueueName> getQueueNames() {
        Set consumerQueueNames = this.durableQueueConsumers.keySet();
        Set dbQueueNames = (Set)this.unitOfWorkFactory.withUnitOfWork(handleAwareUnitOfWork -> handleAwareUnitOfWork.handle().createQuery(MessageFormatter.bind((String)"SELECT distinct queue_name FROM {:tableName}", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)})).mapTo(QueueName.class).set());
        dbQueueNames.addAll(consumerQueueNames);
        return dbQueueNames;
    }

    public Set<QueueName> getActiveQueueNames() {
        return this.durableQueueConsumers.keySet();
    }

    public final DurableQueueConsumer consumeFromQueue(ConsumeFromQueue operation) {
        FailFast.requireNonNull((Object)operation, (String)"No operation provided");
        if (this.durableQueueConsumers.containsKey(operation.queueName)) {
            throw new DurableQueueException("There is already an DurableConsumer for this queue", operation.queueName);
        }
        operation.validate();
        return (DurableQueueConsumer)this.durableQueueConsumers.computeIfAbsent(operation.queueName, _queueName -> {
            PostgresqlDurableQueueConsumer consumer = (PostgresqlDurableQueueConsumer)((Object)((Object)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> {
                long pollingIntervalMs = operation.getPollingInterval().toMillis();
                QueuePollingOptimizer queuePollingOptimizer = this.multiTableChangeListener.map(_ignore -> this.queuePollingOptimizerFactory.apply(operation)).orElseGet(QueuePollingOptimizer::None);
                return new PostgresqlDurableQueueConsumer(operation, this.unitOfWorkFactory, this, this::removeQueueConsumer, pollingIntervalMs, queuePollingOptimizer, this.interceptors);
            }).proceed()));
            if (this.started) {
                consumer.start();
            }
            log.info("[{}] {} - {} {}", new Object[]{operation.queueName, operation.consumerName, this.started ? "Started" : "Created", ((Object)((Object)consumer)).getClass().getSimpleName()});
            return consumer;
        });
    }

    protected QueuePollingOptimizer createQueuePollingOptimizerFor(ConsumeFromQueue operation) {
        long pollingIntervalMs = operation.getPollingInterval().toMillis();
        return new QueuePollingOptimizer.SimpleQueuePollingOptimizer(operation, (long)((double)pollingIntervalMs * 0.5), pollingIntervalMs * 20L);
    }

    final void removeQueueConsumer(DurableQueueConsumer durableQueueConsumer) {
        FailFast.requireNonNull((Object)durableQueueConsumer, (String)"You must provide a durableQueueConsumer");
        FailFast.requireFalse((boolean)durableQueueConsumer.isStarted(), (String)MessageFormatter.msg((String)"Cannot remove DurableQueueConsumer '{}' since it's started!", (Object[])new Object[]{durableQueueConsumer.queueName()}));
        StopConsumingFromQueue operation = new StopConsumingFromQueue(durableQueueConsumer);
        try {
            InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> (DurableQueueConsumer)this.durableQueueConsumers.remove(durableQueueConsumer.queueName())).proceed();
        }
        catch (Exception e) {
            log.error(MessageFormatter.msg((String)"Failed to perform {}", (Object[])new Object[]{operation}), (Throwable)e);
        }
    }

    public final QueueEntryId queueMessage(QueueMessage operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must provide a QueueMessage instance");
        return (QueueEntryId)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> this.queueMessage(operation.queueName, operation.getMessage(), false, operation.getCauseOfEnqueuing(), operation.getDeliveryDelay())).proceed();
    }

    public final QueueEntryId queueMessageAsDeadLetterMessage(QueueMessageAsDeadLetterMessage operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must provide a QueueMessageAsDeadLetterMessage instance");
        return (QueueEntryId)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> this.queueMessage(operation.queueName, operation.getMessage(), true, Optional.ofNullable(operation.getCauseOfError()), Optional.empty())).proceed();
    }

    protected final QueueEntryId queueMessage(QueueName queueName, Message message, boolean isDeadLetterMessage, Optional<Exception> causeOfEnqueuing, Optional<Duration> deliveryDelay) {
        String jsonPayload;
        FailFast.requireNonNull((Object)queueName, (String)"You must provide a queueName");
        FailFast.requireNonNull((Object)message, (String)"You must provide a message");
        FailFast.requireNonNull(causeOfEnqueuing, (String)"You must provide a causeOfEnqueuing option");
        FailFast.requireNonNull(deliveryDelay, (String)"You must provide a deliveryDelay option");
        QueueEntryId queueEntryId = QueueEntryId.random();
        Instant addedTimestamp = Instant.now();
        Instant nextDeliveryTimestamp = isDeadLetterMessage ? null : addedTimestamp.plus(deliveryDelay.orElse(Duration.ZERO));
        boolean isOrderedMessage = message instanceof OrderedMessage;
        log.trace("[{}:{}] Queuing {}{}message{} with nextDeliveryTimestamp {}", new Object[]{queueName, queueEntryId, isDeadLetterMessage ? "Dead Letter " : "", isOrderedMessage ? "Ordered " : "", isOrderedMessage ? MessageFormatter.msg((String)" {}:{}", (Object[])new Object[]{((OrderedMessage)message).getKey(), ((OrderedMessage)message).getOrder()}) : "", nextDeliveryTimestamp});
        try {
            jsonPayload = this.jsonSerializer.serialize(message.getPayload());
        }
        catch (JSONSerializationException e) {
            throw new DurableQueueException(MessageFormatter.msg((String)"Failed to serialize message payload of type", (Object[])new Object[]{message.getPayload().getClass().getName()}), (Throwable)e, queueName);
        }
        if (this.transactionalMode == TransactionalMode.FullyTransactional) {
            this.unitOfWorkFactory.getRequiredUnitOfWork();
        }
        this.unitOfWorkFactory.usingUnitOfWork(unitOfWork -> {
            Update update = (Update)((Update)((Update)((Update)((Update)((Update)((Update)unitOfWork.handle().createUpdate(MessageFormatter.bind((String)"INSERT INTO {:tableName} (\n       id,\n       queue_name,\n       message_payload,\n       message_payload_type,\n       added_ts,\n       next_delivery_ts,\n       last_delivery_error,\n       is_dead_letter_message,\n       meta_data,\n       delivery_mode,\n       key,\n       key_order\n   ) VALUES (\n       :id,\n       :queueName,\n       :message_payload::jsonb,\n       :message_payload_type,\n       :addedTimestamp,\n       :nextDeliveryTimestamp,\n       :lastDeliveryError,\n       :isDeadLetterMessage,\n       :metaData::jsonb,\n       :deliveryMode,\n       :key,\n       :order\n   )", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)})).bind("id", (Object)queueEntryId)).bind("queueName", (Object)queueName)).bind("message_payload", jsonPayload)).bind("message_payload_type", message.getPayload().getClass().getName())).bind("addedTimestamp", (Object)addedTimestamp)).bind("nextDeliveryTimestamp", (Object)nextDeliveryTimestamp)).bind("isDeadLetterMessage", isDeadLetterMessage);
            if (message instanceof OrderedMessage) {
                OrderedMessage orderedMessage = (OrderedMessage)message;
                FailFast.requireNonNull((Object)orderedMessage.getKey(), (String)"An OrderedMessage requires a non null key");
                FailFast.requireTrue((orderedMessage.getOrder() >= 0L ? 1 : 0) != 0, (String)"An OrderedMessage requires an order >= 0");
                ((Update)((Update)update.bind("deliveryMode", (Object)QueuedMessage.DeliveryMode.IN_ORDER)).bind("key", orderedMessage.getKey())).bind("order", orderedMessage.getOrder());
            } else {
                ((Update)((Update)update.bind("deliveryMode", (Object)QueuedMessage.DeliveryMode.NORMAL)).bindNull("key", 12)).bind("order", -1L);
            }
            try {
                String jsonMetaData = this.jsonSerializer.serialize((Object)message.getMetaData());
                update.bind("metaData", jsonMetaData);
            }
            catch (JSONSerializationException e) {
                throw new DurableQueueException("Failed to serialize message meta-data", (Throwable)e, queueName);
            }
            if (causeOfEnqueuing.isPresent()) {
                update.bind("lastDeliveryError", causeOfEnqueuing.map(Exceptions::getStackTrace).get());
            } else {
                update.bindNull("lastDeliveryError", 12);
            }
            int numberOfRowsUpdated = update.execute();
            if (numberOfRowsUpdated == 0) {
                throw new DurableQueueException("Failed to insert message", queueName);
            }
        });
        log.debug("[{}:{}] Queued {}{}message{} with nextDeliveryTimestamp {}", new Object[]{queueName, queueEntryId, isDeadLetterMessage ? "Dead Letter " : "", isOrderedMessage ? "Ordered " : "", isOrderedMessage ? MessageFormatter.msg((String)" {}:{}", (Object[])new Object[]{((OrderedMessage)message).getKey(), ((OrderedMessage)message).getOrder()}) : "", nextDeliveryTimestamp});
        return queueEntryId;
    }

    public final List<QueueEntryId> queueMessages(QueueMessages operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must provide a QueueMessages instance");
        operation.validate();
        return (List)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> {
            QueueName queueName = operation.getQueueName();
            Optional deliveryDelay = operation.getDeliveryDelay();
            List messages = operation.getMessages();
            Instant addedTimestamp = Instant.now();
            Instant nextDeliveryTimestamp = addedTimestamp.plus(deliveryDelay.orElse(Duration.ZERO));
            PreparedBatch batch = ((HandleAwareUnitOfWork)this.unitOfWorkFactory.getRequiredUnitOfWork()).handle().prepareBatch(MessageFormatter.bind((String)"INSERT INTO {:tableName} (\n       id,\n       queue_name,\n       message_payload,\n       message_payload_type,\n       added_ts,\n       next_delivery_ts,\n       last_delivery_error,\n       is_dead_letter_message,\n       is_being_delivered,\n       meta_data,\n       delivery_mode,\n       key,\n       key_order\n   ) VALUES (\n       :id,\n       :queueName,\n       :message_payload::jsonb,\n       :message_payload_type,\n       :addedTimestamp,\n       :nextDeliveryTimestamp,\n       :lastDeliveryError,\n       :isDeadLetterMessage,\n       :isBeingDelivered,\n       :metaData::jsonb,\n       :deliveryMode,\n       :key,\n       :order\n   )", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)}));
            List queueEntryIds = Lists.toIndexedStream((List)messages).map(indexedMessage -> {
                String jsonPayload;
                Message message = (Message)indexedMessage._2;
                try {
                    jsonPayload = this.jsonSerializer.serialize(message.getPayload());
                }
                catch (JSONSerializationException e) {
                    throw new DurableQueueException(MessageFormatter.msg((String)"Failed to serialize message payload of type", (Object[])new Object[]{message.getPayload().getClass().getName()}), (Throwable)e, queueName);
                }
                QueueEntryId queueEntryId = QueueEntryId.random();
                ((PreparedBatch)((PreparedBatch)((PreparedBatch)((PreparedBatch)((PreparedBatch)((PreparedBatch)((PreparedBatch)((PreparedBatch)batch.bind("id", (Object)queueEntryId)).bind("queueName", (Object)queueName)).bind("message_payload", jsonPayload)).bind("message_payload_type", message.getPayload().getClass().getName())).bind("addedTimestamp", (Object)addedTimestamp)).bind("nextDeliveryTimestamp", (Object)nextDeliveryTimestamp)).bind("isDeadLetterMessage", false)).bind("isBeingDelivered", false)).bindNull("lastDeliveryError", 12);
                if (message instanceof OrderedMessage) {
                    OrderedMessage orderedMessage = (OrderedMessage)message;
                    FailFast.requireNonNull((Object)orderedMessage.getKey(), (String)MessageFormatter.msg((String)"[Index: {}] - OrderedMessage requires a non null key", (Object[])new Object[]{indexedMessage._1}));
                    FailFast.requireTrue((orderedMessage.getOrder() >= 0L ? 1 : 0) != 0, (String)MessageFormatter.msg((String)"[Index: {}] - OrderedMessage requires an order >= 0", (Object[])new Object[]{indexedMessage._1}));
                    ((PreparedBatch)((PreparedBatch)batch.bind("deliveryMode", (Object)QueuedMessage.DeliveryMode.IN_ORDER)).bind("key", orderedMessage.getKey())).bind("order", orderedMessage.getOrder());
                } else {
                    ((PreparedBatch)((PreparedBatch)batch.bind("deliveryMode", (Object)QueuedMessage.DeliveryMode.NORMAL)).bindNull("key", 12)).bind("order", -1L);
                }
                try {
                    String jsonMetaData = this.jsonSerializer.serialize((Object)message.getMetaData());
                    batch.bind("metaData", jsonMetaData);
                }
                catch (JSONSerializationException e) {
                    throw new DurableQueueException("Failed to serialize message meta-data", (Throwable)e, queueName);
                }
                batch.add();
                return queueEntryId;
            }).collect(Collectors.toList());
            int numberOfRowsUpdated = Arrays.stream(batch.execute()).reduce(Integer::sum).orElse(0);
            if (numberOfRowsUpdated != messages.size()) {
                throw new DurableQueueException(MessageFormatter.msg((String)"Attempted to queue {} messages but only inserted {} messages", (Object[])new Object[]{messages.size(), numberOfRowsUpdated}), queueName);
            }
            log.debug("[{}] Queued {} Messages with nextDeliveryTimestamp {} and entry-id's: {}", new Object[]{queueName, messages.size(), nextDeliveryTimestamp, queueEntryIds});
            return queueEntryIds;
        }).proceed();
    }

    public final Optional<QueuedMessage> retryMessage(RetryMessage operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must provide a RetryMessage instance");
        operation.validate();
        return (Optional)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> {
            Instant nextDeliveryTimestamp = Instant.now().plus(operation.getDeliveryDelay());
            Optional result = ((Query)((Query)((Query)((HandleAwareUnitOfWork)this.unitOfWorkFactory.getRequiredUnitOfWork()).handle().createQuery(MessageFormatter.bind((String)"UPDATE {:tableName} SET\n     next_delivery_ts = :nextDeliveryTimestamp,\n     last_delivery_error = :lastDeliveryError,\n     redelivery_attempts = redelivery_attempts + 1,\n     is_being_delivered = FALSE,\n     delivery_ts = NULL\n WHERE id = :id\n RETURNING *", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)})).bind("nextDeliveryTimestamp", (Object)nextDeliveryTimestamp)).bind("lastDeliveryError", operation.getCauseForRetry() != null ? Exceptions.getStackTrace((Throwable)operation.getCauseForRetry()) : "Manually requested redelivery")).bind("id", (Object)operation.queueEntryId)).map((RowMapper)this.queuedMessageMapper).findOne();
            if (result.isPresent()) {
                log.debug("Marked Message with id '{}' for Retry at {}. Message entry after update: {}", new Object[]{operation.queueEntryId, nextDeliveryTimestamp, result.get()});
                return result;
            }
            log.error("Failed to Mark Message with id '{}' for Retry", (Object)operation.queueEntryId);
            return Optional.empty();
        }).proceed();
    }

    public final Optional<QueuedMessage> markAsDeadLetterMessage(MarkAsDeadLetterMessage operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must provide a MarkAsDeadLetterMessage instance");
        operation.validate();
        return (Optional)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> {
            Optional result = ((Query)((Query)((HandleAwareUnitOfWork)this.unitOfWorkFactory.getRequiredUnitOfWork()).handle().createQuery(MessageFormatter.bind((String)"UPDATE {:tableName} SET\n     next_delivery_ts = NULL,\n     last_delivery_error = :lastDeliveryError,\n     is_dead_letter_message = TRUE,\n     is_being_delivered = FALSE,\n     delivery_ts = NULL\n WHERE id = :id AND is_dead_letter_message = FALSE\n RETURNING *", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)})).bind("lastDeliveryError", operation.getCauseForBeingMarkedAsDeadLetter())).bind("id", (Object)operation.queueEntryId)).map((RowMapper)this.queuedMessageMapper).findOne();
            if (result.isPresent()) {
                log.debug("Marked message with id '{}' as Dead Letter Message. Message entry after update: {}", (Object)operation.queueEntryId, result.get());
                return result;
            }
            log.error("Failed to Mark as Message message with id '{}' as Dead Letter Message", (Object)operation.queueEntryId);
            return Optional.empty();
        }).proceed();
    }

    public final Optional<QueuedMessage> resurrectDeadLetterMessage(ResurrectDeadLetterMessage operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must provide a ResurrectDeadLetterMessage instance");
        operation.validate();
        return (Optional)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> {
            Instant nextDeliveryTimestamp = Instant.now().plus(operation.getDeliveryDelay());
            Optional result = ((Query)((Query)((HandleAwareUnitOfWork)this.unitOfWorkFactory.getRequiredUnitOfWork()).handle().createQuery(MessageFormatter.bind((String)"UPDATE {:tableName} SET\n     next_delivery_ts = :nextDeliveryTimestamp,\n     is_dead_letter_message = FALSE\n WHERE id = :id AND is_dead_letter_message = TRUE\n RETURNING *", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)})).bind("nextDeliveryTimestamp", (Object)nextDeliveryTimestamp)).bind("id", (Object)operation.queueEntryId)).map((RowMapper)this.queuedMessageMapper).findOne();
            if (result.isPresent()) {
                QueuedMessage updateResult = (QueuedMessage)result.get();
                boolean isOrderedMessage = updateResult.getDeliveryMode() == QueuedMessage.DeliveryMode.IN_ORDER;
                log.debug("[{}] Resurrected Dead Letter {}Message with id '{}' {} and nextDeliveryTimestamp: {}. Message entry after update: {}", new Object[]{updateResult.getQueueName(), isOrderedMessage ? "Ordered " : "", operation.getQueueEntryId(), isOrderedMessage ? "(key: " + ((OrderedMessage)updateResult).getKey() + ", order: " + ((OrderedMessage)updateResult).getOrder() + ")" : "", nextDeliveryTimestamp, updateResult});
                return result;
            }
            log.error("Failed to resurrect Dead Letter Message with id '{}'", (Object)operation.queueEntryId);
            return Optional.empty();
        }).proceed();
    }

    public final boolean acknowledgeMessageAsHandled(AcknowledgeMessageAsHandled operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must provide a AcknowledgeMessageAsHandled instance");
        return (Boolean)this.unitOfWorkFactory.withUnitOfWork(() -> (Boolean)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> {
            log.debug("Acknowledging-Message-As-Handled regarding Message with id '{}'", (Object)operation.queueEntryId);
            QueueEntryId queueEntryId = operation.queueEntryId;
            int rowsUpdated = ((Update)((HandleAwareUnitOfWork)this.unitOfWorkFactory.getRequiredUnitOfWork()).handle().createUpdate(MessageFormatter.bind((String)"DELETE FROM {:tableName} WHERE id = :id AND is_dead_letter_message = FALSE", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)})).bind("id", (Object)operation.queueEntryId)).execute();
            if (rowsUpdated == 1) {
                log.debug("Acknowledged message as handled and deleted it. Id: '{}'", (Object)queueEntryId);
                return true;
            }
            if (this.getDeadLetterMessage(new GetDeadLetterMessage(operation.queueEntryId)).isPresent()) {
                log.debug("Couldn't acknowledge message as it was marked as a Dead-Letter-Message during the message handling. Id: '{}'", (Object)queueEntryId);
                return true;
            }
            log.error("Couldn't Acknowledge with id '{}' - it may already have been deleted", (Object)queueEntryId);
            return false;
        }).proceed());
    }

    public final boolean deleteMessage(DeleteMessage operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must provide a DeleteMessage instance");
        return (Boolean)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> {
            int rowsUpdated = ((Update)((HandleAwareUnitOfWork)this.unitOfWorkFactory.getRequiredUnitOfWork()).handle().createUpdate(MessageFormatter.bind((String)"DELETE FROM {:tableName} WHERE id = :id", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)})).bind("id", (Object)operation.queueEntryId)).execute();
            if (rowsUpdated == 1) {
                log.debug("Deleted Message with id '{}'", (Object)operation.queueEntryId);
                return true;
            }
            log.error("Couldn't Delete Message with id '{}' - it may already have been deleted", (Object)operation.queueEntryId);
            return false;
        }).proceed();
    }

    public final Optional<QueuedMessage> getNextMessageReadyForDelivery(GetNextMessageReadyForDelivery operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must specify a GetNextMessageReadyForDelivery instance");
        log.trace("[{}] Entered GetNextMessageReadyForDelivery", (Object)operation.queueName);
        return (Optional)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> {
            Collection excludedKeys;
            log.trace("[{}] Handling GetNextMessageReadyForDelivery: {}", (Object)operation.queueName, (Object)operation);
            this.resetMessagesStuckBeingDelivered(operation.queueName);
            Instant now = Instant.now();
            String excludeKeysLimitSql = "";
            Collection collection = excludedKeys = operation.getExcludeOrderedMessagesWithKey() != null ? operation.getExcludeOrderedMessagesWithKey() : List.of();
            if (!excludedKeys.isEmpty()) {
                excludeKeysLimitSql = "        AND key NOT IN (<excludedKeys>)\n";
            }
            String sql = MessageFormatter.bind((String)"WITH queued_message_ready_for_delivery AS (\n    SELECT id FROM {:tableName} q1\n    WHERE\n        queue_name = :queueName AND\n        is_dead_letter_message = FALSE AND\n        is_being_delivered = FALSE AND\n        next_delivery_ts <= :now AND\n        NOT EXISTS (SELECT 1 FROM {:tableName} q2 WHERE q2.key = q1.key AND q2.queue_name = q1.queue_name AND q2.key_order < q1.key_order)\n            {:excludeKeys}\n            ORDER BY key_order ASC, next_delivery_ts ASC\n            LIMIT 1\n            FOR UPDATE SKIP LOCKED\n        )\n        UPDATE {:tableName} queued_message SET\n            total_attempts = total_attempts + 1,\n            next_delivery_ts = NULL,\n            is_being_delivered = TRUE,\n            delivery_ts = :now\n        FROM queued_message_ready_for_delivery\n        WHERE queued_message.id = queued_message_ready_for_delivery.id\n        RETURNING\n            queued_message.id,\n            queued_message.queue_name,\n            queued_message.message_payload,\n            queued_message.message_payload_type,\n            queued_message.added_ts,\n            queued_message.next_delivery_ts,\n            queued_message.delivery_ts,\n            queued_message.last_delivery_error,\n            queued_message.total_attempts,\n            queued_message.redelivery_attempts,\n            queued_message.is_dead_letter_message,\n            queued_message.is_being_delivered,\n            queued_message.meta_data,\n            queued_message.delivery_mode,\n            queued_message.key,\n            queued_message.key_order\n", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName), MessageFormatter.NamedArgumentBinding.arg((String)"excludeKeys", (Object)excludeKeysLimitSql)});
            log.trace("[{}] Querying Postgresql: {}", (Object)operation.queueName, (Object)operation);
            Query query = (Query)((Query)((HandleAwareUnitOfWork)this.unitOfWorkFactory.getRequiredUnitOfWork()).handle().createQuery(sql).bind("queueName", (Object)operation.queueName)).bind("now", (Object)now);
            if (!excludedKeys.isEmpty()) {
                query.bindList("excludedKeys", excludedKeys);
            }
            Optional result = null;
            try {
                result = query.map((RowMapper)this.queuedMessageMapper).findOne();
            }
            catch (DurableQueueDeserializationException e) {
                log.error("[{}] Marking Message as DeadLetterMessage due to DurableQueueDeserializationException while deserializing message with id '{}'", new Object[]{operation.queueName, e.queueEntryId.get(), e});
                this.markAsDeadLetterMessage((QueueEntryId)e.queueEntryId.get(), e);
                return Optional.empty();
            }
            log.trace("[{}] Completed GetNextMessageReadyForDelivery: {}", (Object)operation.queueName, (Object)operation);
            return result;
        }).proceed();
    }

    protected final void resetMessagesStuckBeingDelivered(QueueName queueName) {
        if (this.transactionalMode == TransactionalMode.SingleOperationTransaction) {
            Instant now = Instant.now();
            Instant lastStuckMessageResetTimestamp = (Instant)this.lastResetStuckMessagesCheckTimestamps.get(queueName);
            if (lastStuckMessageResetTimestamp == null || Duration.between(now, lastStuckMessageResetTimestamp).abs().toMillis() > (long)this.messageHandlingTimeoutMs) {
                int numberOfChanges;
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Looking for messages stuck marked as isBeingDelivered. Last check was performed: {}", (Object)queueName, (Object)lastStuckMessageResetTimestamp);
                }
                if ((numberOfChanges = ((Update)((Update)((Update)((HandleAwareUnitOfWork)this.unitOfWorkFactory.getRequiredUnitOfWork()).handle().createUpdate(MessageFormatter.bind((String)"UPDATE {:tableName} SET\n     is_being_delivered = FALSE,\n     delivery_ts = NULL,\n     redelivery_attempts = redelivery_attempts + 1,\n     next_delivery_ts = :now,\n     last_delivery_error = :error\n WHERE is_being_delivered = TRUE\n AND delivery_ts <= :threshold\n", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)})).bind("threshold", (Object)now.minusMillis(this.messageHandlingTimeoutMs))).bind("error", "Handler Processing of the Message was determined to have Timed Out")).bind("now", (Object)now)).execute()) > 0) {
                    log.debug("[{}] Reset {} messages stuck marked as isBeingDelivered", (Object)queueName, (Object)numberOfChanges);
                } else {
                    log.debug("[{}] Didn't find any messages being stuck marked as isBeingDelivered", (Object)queueName);
                }
                this.lastResetStuckMessagesCheckTimestamps.put(queueName, now);
            }
        }
    }

    public final boolean hasMessagesQueuedFor(QueueName queueName) {
        return this.getTotalMessagesQueuedFor(queueName) > 0L;
    }

    public final long getTotalMessagesQueuedFor(GetTotalMessagesQueuedFor operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must specify a GetTotalMessagesQueuedFor instance");
        return (Long)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> (Long)this.unitOfWorkFactory.withUnitOfWork(handleAwareUnitOfWork -> (Long)((Query)handleAwareUnitOfWork.handle().createQuery(MessageFormatter.bind((String)"SELECT count(*) FROM {:tableName} \n WHERE \n    queue_name = :queueName AND\n    is_dead_letter_message = FALSE", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)})).bind("queueName", (Object)operation.queueName)).mapTo(Long.class).one())).proceed();
    }

    public QueuedMessageCounts getQueuedMessageCountsFor(GetQueuedMessageCountsFor operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must specify a GetTotalMessagesQueuedFor instance");
        return (QueuedMessageCounts)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> (QueuedMessageCounts)this.unitOfWorkFactory.withUnitOfWork(handleAwareUnitOfWork -> (QueuedMessageCounts)((Query)handleAwareUnitOfWork.handle().createQuery(MessageFormatter.bind((String)"SELECT \n    COUNT(*) FILTER (WHERE is_dead_letter_message = FALSE) AS regular_count,\n    COUNT(*) FILTER (WHERE is_dead_letter_message = TRUE) AS dead_letter_count\nFROM {:tableName} \n WHERE \n    queue_name = :queueName", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)})).bind("queueName", (Object)operation.queueName)).map((rs, ctx) -> new QueuedMessageCounts(operation.queueName, rs.getLong("regular_count"), rs.getLong("dead_letter_count"))).one())).proceed();
    }

    public final long getTotalDeadLetterMessagesQueuedFor(GetTotalDeadLetterMessagesQueuedFor operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must specify a GetTotalDeadLetterMessagesQueuedFor instance");
        return (Long)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> (Long)this.unitOfWorkFactory.withUnitOfWork(handleAwareUnitOfWork -> (Long)((Query)handleAwareUnitOfWork.handle().createQuery(MessageFormatter.bind((String)"SELECT count(*) FROM {:tableName} \n WHERE \n    queue_name = :queueName AND\n    is_dead_letter_message = TRUE", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)})).bind("queueName", (Object)operation.queueName)).mapTo(Long.class).one())).proceed();
    }

    public final int purgeQueue(PurgeQueue operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must specify a PurgeQueue instance");
        return (Integer)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> (Integer)this.unitOfWorkFactory.withUnitOfWork(handleAwareUnitOfWork -> ((Update)handleAwareUnitOfWork.handle().createUpdate(MessageFormatter.bind((String)"DELETE FROM {:tableName} WHERE queue_name = :queueName", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)})).bind("queueName", (Object)operation.queueName)).execute())).proceed();
    }

    public final List<NextQueuedMessage> queryForMessagesSoonReadyForDelivery(QueueName queueName, Instant withNextDeliveryTimestampAfter, int maxNumberOfMessagesToReturn) {
        FailFast.requireNonNull((Object)queueName, (String)"No queueName provided");
        FailFast.requireNonNull((Object)withNextDeliveryTimestampAfter, (String)"No withNextDeliveryTimestampAfter provided");
        return (List)this.unitOfWorkFactory.withUnitOfWork(handleAwareUnitOfWork -> ((Query)((Query)((Query)handleAwareUnitOfWork.handle().createQuery(MessageFormatter.bind((String)"SELECT id, added_ts, next_delivery_ts FROM {:tableName} \n WHERE queue_name = :queueName\n AND is_dead_letter_message = FALSE\n AND is_being_delivered = FALSE\n AND next_delivery_ts > :now\n ORDER BY next_delivery_ts ASC\n LIMIT :pageSize", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)})).bind("queueName", FailFast.requireNonNull((Object)queueName, (String)"No QueueName provided"))).bind("now", (Object)withNextDeliveryTimestampAfter)).bind("pageSize", maxNumberOfMessagesToReturn)).map((rs, ctx) -> new NextQueuedMessage(QueueEntryId.of((CharSequence)rs.getString("id")), queueName, rs.getObject("added_ts", OffsetDateTime.class).toInstant(), rs.getObject("next_delivery_ts", OffsetDateTime.class).toInstant())).list());
    }

    public final List<QueuedMessage> getQueuedMessages(GetQueuedMessages operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must specify a GetQueuedMessages instance");
        return (List)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> this.queryQueuedMessages(operation.queueName, operation.getQueueingSortOrder(), IncludeMessages.QUEUED_MESSAGES, operation.getStartIndex(), operation.getPageSize())).proceed();
    }

    public final List<QueuedMessage> getDeadLetterMessages(GetDeadLetterMessages operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must specify a GetDeadLetterMessages instance");
        return (List)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> this.queryQueuedMessages(operation.queueName, operation.getQueueingSortOrder(), IncludeMessages.DEAD_LETTER_MESSAGES, operation.getStartIndex(), operation.getPageSize())).proceed();
    }

    protected final List<QueuedMessage> queryQueuedMessages(QueueName queueName, DurableQueues.QueueingSortOrder queueingSortOrder, IncludeMessages includeMessages, long startIndex, long pageSize) {
        FailFast.requireNonNull((Object)queueName, (String)"No queueName provided");
        FailFast.requireNonNull((Object)queueingSortOrder, (String)"No queueingOrder provided");
        FailFast.requireNonNull((Object)((Object)includeMessages), (String)"No includeMessages provided");
        Supplier<String> resolveIncludeMessagesSql = () -> {
            switch (includeMessages) {
                case ALL: {
                    return "";
                }
                case DEAD_LETTER_MESSAGES: {
                    return "AND is_dead_letter_message = TRUE\n";
                }
                case QUEUED_MESSAGES: {
                    return "AND is_dead_letter_message = FALSE\n";
                }
            }
            throw new IllegalArgumentException("Unsupported IncludeMessages value: " + String.valueOf((Object)includeMessages));
        };
        return (List)this.unitOfWorkFactory.withUnitOfWork(handleAwareUnitOfWork -> ((Query)((Query)((Query)handleAwareUnitOfWork.handle().createQuery(MessageFormatter.bind((String)"SELECT * FROM {:tableName} \n WHERE queue_name = :queueName\n{:includeMessages} LIMIT :pageSize \n OFFSET :offset", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName), MessageFormatter.NamedArgumentBinding.arg((String)"includeMessages", resolveIncludeMessagesSql.get())})).bind("queueName", FailFast.requireNonNull((Object)queueName, (String)"No QueueName provided"))).bind("offset", startIndex)).bind("pageSize", pageSize)).map((RowMapper)this.queuedMessageMapper).list());
    }

    public final DurableQueues addInterceptor(DurableQueuesInterceptor interceptor) {
        FailFast.requireNonNull((Object)interceptor, (String)"No interceptor provided");
        log.info("Adding interceptor: {}", (Object)interceptor);
        interceptor.setDurableQueues((DurableQueues)this);
        this.interceptors.add(interceptor);
        DefaultInterceptorChain.sortInterceptorsByOrder(this.interceptors);
        return this;
    }

    public final DurableQueues removeInterceptor(DurableQueuesInterceptor interceptor) {
        FailFast.requireNonNull((Object)interceptor, (String)"No interceptor provided");
        log.info("Removing interceptor: {}", (Object)interceptor);
        this.interceptors.remove(interceptor);
        DefaultInterceptorChain.sortInterceptorsByOrder(this.interceptors);
        return this;
    }

    public final Optional<QueueName> getQueueNameFor(QueueEntryId queueEntryId) {
        return (Optional)this.unitOfWorkFactory.withUnitOfWork(handleAwareUnitOfWork -> ((Query)handleAwareUnitOfWork.handle().createQuery(MessageFormatter.bind((String)"SELECT queue_name FROM {:tableName} WHERE \n id = :id", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)})).bind("id", FailFast.requireNonNull((Object)queueEntryId, (String)"No queueEntryId provided"))).mapTo(QueueName.class).findOne());
    }

    public final Optional<QueuedMessage> getDeadLetterMessage(GetDeadLetterMessage operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must specify a GetDeadLetterMessage instance");
        return (Optional)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> this.getQueuedMessage(operation.queueEntryId, true)).proceed();
    }

    public final Optional<QueuedMessage> getQueuedMessage(GetQueuedMessage operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must specify a GetQueuedMessage instance");
        return (Optional)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> this.getQueuedMessage(operation.queueEntryId, false)).proceed();
    }

    private Optional<QueuedMessage> getQueuedMessage(QueueEntryId queueEntryId, boolean isDeadLetterMessage) {
        return (Optional)this.unitOfWorkFactory.withUnitOfWork(handleAwareUnitOfWork -> ((Query)((Query)handleAwareUnitOfWork.handle().createQuery(MessageFormatter.bind((String)"SELECT * FROM {:tableName} WHERE \n id = :id AND\n is_dead_letter_message = :isDeadLetterMessage", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)})).bind("id", FailFast.requireNonNull((Object)queueEntryId, (String)"No queueEntryId provided"))).bind("isDeadLetterMessage", isDeadLetterMessage)).map((RowMapper)this.queuedMessageMapper).findOne());
    }

    private Object deserializeMessagePayload(QueueName queueName, QueueEntryId queueEntryId, String messagePayload, String messagePayloadType) {
        FailFast.requireNonNull((Object)queueName, (String)"No queueName provided");
        FailFast.requireNonNull((Object)queueEntryId, (String)"No queueEntryId provided");
        FailFast.requireNonNull((Object)messagePayload, (String)"No messagePayload provided");
        FailFast.requireNonNull((Object)messagePayloadType, (String)"No messagePayloadType provided");
        try {
            return this.jsonSerializer.deserialize(messagePayload, messagePayloadType);
        }
        catch (Throwable e) {
            Exceptions.rethrowIfCriticalError((Throwable)e);
            throw new DurableQueueDeserializationException(MessageFormatter.msg((String)"Failed to deserialize message payload of type {}", (Object[])new Object[]{messagePayloadType}), e, queueName, queueEntryId);
        }
    }

    private MessageMetaData deserializeMessageMetadata(QueueName queueName, QueueEntryId queueEntryId, String metaData) {
        FailFast.requireNonNull((Object)queueName, (String)"No queueName provided");
        FailFast.requireNonNull((Object)queueEntryId, (String)"No queueEntryId provided");
        FailFast.requireNonNull((Object)metaData, (String)"No messagePayload provided");
        try {
            return (MessageMetaData)this.jsonSerializer.deserialize(metaData, MessageMetaData.class);
        }
        catch (Throwable e) {
            Exceptions.rethrowIfCriticalError((Throwable)e);
            throw new DurableQueueDeserializationException(MessageFormatter.msg((String)"Failed to deserialize message meta-data", (Object[])new Object[0]), e, queueName, queueEntryId);
        }
    }

    public static ObjectMapper createDefaultObjectMapper() {
        JsonMapper objectMapper = (JsonMapper)((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)JsonMapper.builder().disable(new MapperFeature[]{MapperFeature.AUTO_DETECT_GETTERS})).disable(new MapperFeature[]{MapperFeature.AUTO_DETECT_IS_GETTERS})).disable(new MapperFeature[]{MapperFeature.AUTO_DETECT_SETTERS})).disable(new MapperFeature[]{MapperFeature.DEFAULT_VIEW_INCLUSION})).disable(new SerializationFeature[]{SerializationFeature.WRITE_DATES_AS_TIMESTAMPS})).disable(new DeserializationFeature[]{DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES})).disable(new SerializationFeature[]{SerializationFeature.FAIL_ON_EMPTY_BEANS})).enable(new MapperFeature[]{MapperFeature.AUTO_DETECT_CREATORS})).enable(new MapperFeature[]{MapperFeature.AUTO_DETECT_FIELDS})).enable(new MapperFeature[]{MapperFeature.PROPAGATE_TRANSIENT_MARKER})).addModule((Module)new Jdk8Module())).addModule((Module)new JavaTimeModule())).addModule((Module)new EssentialTypesJacksonModule())).addModule((Module)new EssentialsImmutableJacksonModule())).build();
        objectMapper.setVisibility(objectMapper.getSerializationConfig().getDefaultVisibilityChecker().withGetterVisibility(JsonAutoDetect.Visibility.NONE).withSetterVisibility(JsonAutoDetect.Visibility.NONE).withFieldVisibility(JsonAutoDetect.Visibility.ANY).withCreatorVisibility(JsonAutoDetect.Visibility.ANY));
        return objectMapper;
    }

    private class QueuedMessageRowMapper
    implements RowMapper<QueuedMessage> {
        public QueuedMessage map(ResultSet rs, StatementContext ctx) throws SQLException {
            QueueName queueName = QueueName.of((CharSequence)rs.getString("queue_name"));
            QueueEntryId queueEntryId = QueueEntryId.of((CharSequence)rs.getString("id"));
            Object messagePayload = PostgresqlDurableQueues.this.deserializeMessagePayload(queueName, queueEntryId, rs.getString("message_payload"), rs.getString("message_payload_type"));
            MessageMetaData messageMetaData = null;
            String metaDataColumnValue = rs.getString("meta_data");
            messageMetaData = metaDataColumnValue != null ? PostgresqlDurableQueues.this.deserializeMessageMetadata(queueName, queueEntryId, metaDataColumnValue) : new MessageMetaData();
            QueuedMessage.DeliveryMode deliveryMode = QueuedMessage.DeliveryMode.valueOf((String)rs.getString("delivery_mode"));
            Message message = null;
            switch (deliveryMode) {
                case NORMAL: {
                    message = new Message(messagePayload, messageMetaData);
                    break;
                }
                case IN_ORDER: {
                    message = new OrderedMessage(messagePayload, rs.getString("key"), rs.getLong("key_order"), messageMetaData);
                    break;
                }
                default: {
                    throw new IllegalStateException(MessageFormatter.msg((String)"Unsupported deliveryMode '{}'", (Object[])new Object[]{deliveryMode}));
                }
            }
            return new DefaultQueuedMessage(QueueEntryId.of((CharSequence)rs.getString("id")), queueName, message, rs.getObject("added_ts", OffsetDateTime.class), rs.getObject("next_delivery_ts", OffsetDateTime.class), rs.getObject("delivery_ts", OffsetDateTime.class), rs.getString("last_delivery_error"), rs.getInt("total_attempts"), rs.getInt("redelivery_attempts"), rs.getBoolean("is_dead_letter_message"), rs.getBoolean("is_being_delivered"));
        }
    }

    private static class SingleOperationTransactionDurableQueuesInterceptor
    implements DurableQueuesInterceptor {
        private final HandleAwareUnitOfWorkFactory<? extends HandleAwareUnitOfWork> unitOfWorkFactory;
        private DurableQueues durableQueues;

        public SingleOperationTransactionDurableQueuesInterceptor(HandleAwareUnitOfWorkFactory<? extends HandleAwareUnitOfWork> unitOfWorkFactory) {
            this.unitOfWorkFactory = unitOfWorkFactory;
        }

        public void setDurableQueues(DurableQueues durableQueues) {
            this.durableQueues = (DurableQueues)FailFast.requireNonNull((Object)durableQueues, (String)"No durableQueues instance provided");
        }

        public Optional<QueuedMessage> intercept(GetDeadLetterMessage operation, InterceptorChain<GetDeadLetterMessage, Optional<QueuedMessage>, DurableQueuesInterceptor> interceptorChain) {
            return (Optional)this.unitOfWorkFactory.withUnitOfWork(() -> interceptorChain.proceed());
        }

        public Optional<QueuedMessage> intercept(GetQueuedMessage operation, InterceptorChain<GetQueuedMessage, Optional<QueuedMessage>, DurableQueuesInterceptor> interceptorChain) {
            return (Optional)this.unitOfWorkFactory.withUnitOfWork(() -> interceptorChain.proceed());
        }

        public QueueEntryId intercept(QueueMessage operation, InterceptorChain<QueueMessage, QueueEntryId, DurableQueuesInterceptor> interceptorChain) {
            return (QueueEntryId)this.unitOfWorkFactory.withUnitOfWork(() -> interceptorChain.proceed());
        }

        public QueueEntryId intercept(QueueMessageAsDeadLetterMessage operation, InterceptorChain<QueueMessageAsDeadLetterMessage, QueueEntryId, DurableQueuesInterceptor> interceptorChain) {
            return (QueueEntryId)this.unitOfWorkFactory.withUnitOfWork(() -> interceptorChain.proceed());
        }

        public List<QueueEntryId> intercept(QueueMessages operation, InterceptorChain<QueueMessages, List<QueueEntryId>, DurableQueuesInterceptor> interceptorChain) {
            return (List)this.unitOfWorkFactory.withUnitOfWork(() -> interceptorChain.proceed());
        }

        public Optional<QueuedMessage> intercept(RetryMessage operation, InterceptorChain<RetryMessage, Optional<QueuedMessage>, DurableQueuesInterceptor> interceptorChain) {
            return (Optional)this.unitOfWorkFactory.withUnitOfWork(() -> interceptorChain.proceed());
        }

        public Optional<QueuedMessage> intercept(MarkAsDeadLetterMessage operation, InterceptorChain<MarkAsDeadLetterMessage, Optional<QueuedMessage>, DurableQueuesInterceptor> interceptorChain) {
            return (Optional)this.unitOfWorkFactory.withUnitOfWork(() -> interceptorChain.proceed());
        }

        public Optional<QueuedMessage> intercept(ResurrectDeadLetterMessage operation, InterceptorChain<ResurrectDeadLetterMessage, Optional<QueuedMessage>, DurableQueuesInterceptor> interceptorChain) {
            return (Optional)this.unitOfWorkFactory.withUnitOfWork(() -> interceptorChain.proceed());
        }

        public boolean intercept(AcknowledgeMessageAsHandled operation, InterceptorChain<AcknowledgeMessageAsHandled, Boolean, DurableQueuesInterceptor> interceptorChain) {
            return (Boolean)this.unitOfWorkFactory.withUnitOfWork(() -> interceptorChain.proceed());
        }

        public boolean intercept(DeleteMessage operation, InterceptorChain<DeleteMessage, Boolean, DurableQueuesInterceptor> interceptorChain) {
            return (Boolean)this.unitOfWorkFactory.withUnitOfWork(() -> interceptorChain.proceed());
        }

        public Optional<QueuedMessage> intercept(GetNextMessageReadyForDelivery operation, InterceptorChain<GetNextMessageReadyForDelivery, Optional<QueuedMessage>, DurableQueuesInterceptor> interceptorChain) {
            return (Optional)this.unitOfWorkFactory.withUnitOfWork(() -> interceptorChain.proceed());
        }

        public long intercept(GetTotalMessagesQueuedFor operation, InterceptorChain<GetTotalMessagesQueuedFor, Long, DurableQueuesInterceptor> interceptorChain) {
            return (Long)this.unitOfWorkFactory.withUnitOfWork(() -> interceptorChain.proceed());
        }

        public List<QueuedMessage> intercept(GetQueuedMessages operation, InterceptorChain<GetQueuedMessages, List<QueuedMessage>, DurableQueuesInterceptor> interceptorChain) {
            return (List)this.unitOfWorkFactory.withUnitOfWork(() -> interceptorChain.proceed());
        }

        public List<QueuedMessage> intercept(GetDeadLetterMessages operation, InterceptorChain<GetDeadLetterMessages, List<QueuedMessage>, DurableQueuesInterceptor> interceptorChain) {
            return (List)this.unitOfWorkFactory.withUnitOfWork(() -> interceptorChain.proceed());
        }

        public int intercept(PurgeQueue operation, InterceptorChain<PurgeQueue, Integer, DurableQueuesInterceptor> interceptorChain) {
            return (Integer)this.unitOfWorkFactory.withUnitOfWork(() -> interceptorChain.proceed());
        }
    }

    protected static enum IncludeMessages {
        ALL,
        DEAD_LETTER_MESSAGES,
        QUEUED_MESSAGES;

    }

    public static class QueueTableNotification
    extends TableChangeNotification {
        @JsonProperty(value="id")
        private String id;
        @JsonProperty(value="queue_name")
        private String queueName;
        @JsonProperty(value="added_ts")
        private OffsetDateTime addedTimestamp;
        @JsonProperty(value="next_delivery_ts")
        private OffsetDateTime nextDeliveryTimestamp;
        @JsonProperty(value="delivery_ts")
        private OffsetDateTime deliveryTimestamp;
        @JsonProperty(value="is_dead_letter_message")
        private boolean isDeadLetterMessage;
        @JsonProperty(value="is_being_delivered")
        private boolean isBeingDelivered;
    }
}

