/*
 * Decompiled with CFR 0.152.
 */
package dk.cloudcreate.essentials.components.queue.postgresql;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DefaultQueuedMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueConsumer;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueException;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueues;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueuesInterceptor;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.Message;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.MessageMetaData;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueueEntryId;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueueName;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuedMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.TransactionalMode;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.AcknowledgeMessageAsHandled;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.ConsumeFromQueue;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.DeleteMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.GetDeadLetterMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.GetDeadLetterMessages;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.GetNextMessageReadyForDelivery;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.GetQueuedMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.GetQueuedMessages;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.GetTotalMessagesQueuedFor;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.MarkAsDeadLetterMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.PurgeQueue;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.QueueMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.QueueMessageAsDeadLetterMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.QueueMessages;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.ResurrectDeadLetterMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.RetryMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.StopConsumingFromQueue;
import dk.cloudcreate.essentials.components.foundation.transaction.UnitOfWork;
import dk.cloudcreate.essentials.components.foundation.transaction.UnitOfWorkFactory;
import dk.cloudcreate.essentials.components.foundation.transaction.jdbi.HandleAwareUnitOfWork;
import dk.cloudcreate.essentials.components.foundation.transaction.jdbi.HandleAwareUnitOfWorkFactory;
import dk.cloudcreate.essentials.components.queue.postgresql.PostgresqlDurableQueueConsumer;
import dk.cloudcreate.essentials.components.queue.postgresql.jdbi.QueueEntryIdArgumentFactory;
import dk.cloudcreate.essentials.components.queue.postgresql.jdbi.QueueEntryIdColumnMapper;
import dk.cloudcreate.essentials.components.queue.postgresql.jdbi.QueueNameArgumentFactory;
import dk.cloudcreate.essentials.components.queue.postgresql.jdbi.QueueNameColumnMapper;
import dk.cloudcreate.essentials.jackson.immutable.EssentialsImmutableJacksonModule;
import dk.cloudcreate.essentials.jackson.types.EssentialTypesJacksonModule;
import dk.cloudcreate.essentials.shared.Exceptions;
import dk.cloudcreate.essentials.shared.FailFast;
import dk.cloudcreate.essentials.shared.MessageFormatter;
import dk.cloudcreate.essentials.shared.interceptor.InterceptorChain;
import dk.cloudcreate.essentials.shared.reflection.Classes;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.jdbi.v3.core.argument.ArgumentFactory;
import org.jdbi.v3.core.mapper.ColumnMapper;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.PreparedBatch;
import org.jdbi.v3.core.statement.Query;
import org.jdbi.v3.core.statement.StatementContext;
import org.jdbi.v3.core.statement.Update;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostgresqlDurableQueues
implements DurableQueues {
    private static final Logger log = LoggerFactory.getLogger(PostgresqlDurableQueues.class);
    public static final String DEFAULT_DURABLE_QUEUES_TABLE_NAME = "durable_queues";
    private final HandleAwareUnitOfWorkFactory<? extends HandleAwareUnitOfWork> unitOfWorkFactory;
    private final ObjectMapper messagePayloadObjectMapper;
    private final String sharedQueueTableName;
    private final ConcurrentMap<QueueName, PostgresqlDurableQueueConsumer> durableQueueConsumers = new ConcurrentHashMap<QueueName, PostgresqlDurableQueueConsumer>();
    private final QueuedMessageRowMapper queuedMessageMapper;
    private final List<DurableQueuesInterceptor> interceptors = new ArrayList<DurableQueuesInterceptor>();
    private volatile boolean started;

    public PostgresqlDurableQueues(HandleAwareUnitOfWorkFactory<? extends HandleAwareUnitOfWork> unitOfWorkFactory) {
        this(unitOfWorkFactory, PostgresqlDurableQueues.createObjectMapper(), DEFAULT_DURABLE_QUEUES_TABLE_NAME);
    }

    public PostgresqlDurableQueues(HandleAwareUnitOfWorkFactory<? extends HandleAwareUnitOfWork> unitOfWorkFactory, ObjectMapper messagePayloadObjectMapper) {
        this(unitOfWorkFactory, messagePayloadObjectMapper, DEFAULT_DURABLE_QUEUES_TABLE_NAME);
    }

    public PostgresqlDurableQueues(HandleAwareUnitOfWorkFactory<? extends HandleAwareUnitOfWork> unitOfWorkFactory, ObjectMapper messagePayloadObjectMapper, String sharedQueueTableName) {
        this.unitOfWorkFactory = (HandleAwareUnitOfWorkFactory)FailFast.requireNonNull(unitOfWorkFactory, (String)"No unitOfWorkFactory instance provided");
        this.messagePayloadObjectMapper = (ObjectMapper)FailFast.requireNonNull((Object)messagePayloadObjectMapper, (String)"No messagePayloadObjectMapper");
        this.sharedQueueTableName = ((String)FailFast.requireNonNull((Object)sharedQueueTableName, (String)"No sharedQueueTableName provided")).toLowerCase(Locale.ROOT);
        this.queuedMessageMapper = new QueuedMessageRowMapper();
        this.initializeQueueTables();
    }

    private void initializeQueueTables() {
        this.unitOfWorkFactory.usingUnitOfWork(handleAwareUnitOfWork -> {
            handleAwareUnitOfWork.handle().getJdbi().registerArgument((ArgumentFactory)new QueueNameArgumentFactory());
            handleAwareUnitOfWork.handle().getJdbi().registerColumnMapper((ColumnMapper)new QueueNameColumnMapper());
            handleAwareUnitOfWork.handle().getJdbi().registerArgument((ArgumentFactory)new QueueEntryIdArgumentFactory());
            handleAwareUnitOfWork.handle().getJdbi().registerColumnMapper((ColumnMapper)new QueueEntryIdColumnMapper());
            handleAwareUnitOfWork.handle().execute(MessageFormatter.bind((String)"CREATE TABLE IF NOT EXISTS {:tableName} (\n  id                     TEXT PRIMARY KEY,\n  queue_name             TEXT NOT NULL,\n  message_payload        JSONB NOT NULL,\n  message_payload_type   TEXT NOT NULL,\n  added_ts               TIMESTAMPTZ NOT NULL,\n  next_delivery_ts       TIMESTAMPTZ,\n  total_attempts         INTEGER DEFAULT 0,\n  redelivery_attempts    INTEGER DEFAULT 0,\n  last_delivery_error    TEXT DEFAULT NULL,\n  is_dead_letter_message BOOLEAN NOT NULL DEFAULT FALSE\n,  meta_data JSONB DEFAULT NULL\n)", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)}), new Object[0]);
            log.info("Ensured Durable Queues table '{}' exists", (Object)this.sharedQueueTableName);
            handleAwareUnitOfWork.handle().execute(MessageFormatter.bind((String)"ALTER TABLE {:tableName} ADD COLUMN IF NOT EXISTS meta_data JSONB DEFAULT NULL", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)}), new Object[0]);
            log.info("Ensured 'meta_data' column exists in Durable Queues table '{}'", (Object)this.sharedQueueTableName);
            String indexName = this.sharedQueueTableName + "queue_name__next_delivery__id__index";
            handleAwareUnitOfWork.handle().execute(MessageFormatter.bind((String)"CREATE INDEX IF NOT EXISTS {:indexName} ON {:tableName} (\n    queue_name, next_delivery_ts, id DESC\n)", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"indexName", (Object)indexName), MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)}), new Object[0]);
            log.info("Ensured Durable Queues index '{}' exists", (Object)indexName);
        });
    }

    public List<DurableQueuesInterceptor> getInterceptors() {
        return Collections.unmodifiableList(this.interceptors);
    }

    public void start() {
        if (!this.started) {
            this.started = true;
            log.info("Starting");
            this.durableQueueConsumers.values().forEach(DurableQueueConsumer.DefaultDurableQueueConsumer::start);
            log.info("Started");
        }
    }

    public void stop() {
        if (this.started) {
            log.info("Stopping");
            this.durableQueueConsumers.values().forEach(DurableQueueConsumer.DefaultDurableQueueConsumer::stop);
            this.started = false;
            log.info("Stopped");
        }
    }

    public boolean isStarted() {
        return this.started;
    }

    public TransactionalMode getTransactionalMode() {
        return TransactionalMode.FullyTransactional;
    }

    public Optional<UnitOfWorkFactory<? extends UnitOfWork>> getUnitOfWorkFactory() {
        return Optional.ofNullable(this.unitOfWorkFactory);
    }

    public DurableQueueConsumer consumeFromQueue(ConsumeFromQueue operation) {
        FailFast.requireNonNull((Object)operation, (String)"No operation provided");
        if (this.durableQueueConsumers.containsKey(operation.queueName)) {
            throw new DurableQueueException("There is already an DurableConsumer for this queue", operation.queueName);
        }
        operation.validate();
        return (DurableQueueConsumer)this.durableQueueConsumers.computeIfAbsent(operation.queueName, _queueName -> {
            PostgresqlDurableQueueConsumer consumer = (PostgresqlDurableQueueConsumer)((Object)((Object)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> new PostgresqlDurableQueueConsumer(operation, this.unitOfWorkFactory, this, this::removeQueueConsumer)).proceed()));
            if (this.started) {
                consumer.start();
            }
            return consumer;
        });
    }

    void removeQueueConsumer(DurableQueueConsumer durableQueueConsumer) {
        FailFast.requireNonNull((Object)durableQueueConsumer, (String)"You must provide a durableQueueConsumer");
        FailFast.requireFalse((boolean)durableQueueConsumer.isStarted(), (String)MessageFormatter.msg((String)"Cannot remove DurableQueueConsumer '{}' since it's started!", (Object[])new Object[]{durableQueueConsumer.queueName()}));
        StopConsumingFromQueue operation = new StopConsumingFromQueue(durableQueueConsumer);
        try {
            InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> (DurableQueueConsumer)this.durableQueueConsumers.remove(durableQueueConsumer.queueName())).proceed();
        }
        catch (Exception e) {
            log.error(MessageFormatter.msg((String)"Failed to perform {}", (Object[])new Object[]{operation}), (Throwable)e);
        }
    }

    public QueueEntryId queueMessage(QueueMessage operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must provide a QueueMessage instance");
        return (QueueEntryId)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> this.queueMessage(operation.queueName, operation.getMessage(), false, operation.getCauseOfEnqueuing(), operation.getDeliveryDelay())).proceed();
    }

    public QueueEntryId queueMessageAsDeadLetterMessage(QueueMessageAsDeadLetterMessage operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must provide a QueueMessageAsDeadLetterMessage instance");
        return (QueueEntryId)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> this.queueMessage(operation.queueName, operation.getMessage(), true, Optional.of(operation.getCauseOfError()), Optional.empty())).proceed();
    }

    protected QueueEntryId queueMessage(QueueName queueName, Message message, boolean isDeadLetterMessage, Optional<Exception> causeOfEnqueuing, Optional<Duration> deliveryDelay) {
        String jsonPayload;
        FailFast.requireNonNull((Object)queueName, (String)"You must provide a queueName");
        FailFast.requireNonNull((Object)message, (String)"You must provide a message");
        FailFast.requireNonNull(causeOfEnqueuing, (String)"You must provide a causeOfEnqueuing option");
        FailFast.requireNonNull(deliveryDelay, (String)"You must provide a deliveryDelay option");
        QueueEntryId queueEntryId = QueueEntryId.random();
        Instant addedTimestamp = Instant.now();
        Instant nextDeliveryTimestamp = isDeadLetterMessage ? null : addedTimestamp.plus(deliveryDelay.orElse(Duration.ZERO));
        log.debug("[{}] Queuing {}Message with entry-id {} and nextDeliveryTimestamp {}", new Object[]{queueName, isDeadLetterMessage ? "Dead Letter " : "", queueEntryId, nextDeliveryTimestamp});
        try {
            jsonPayload = this.messagePayloadObjectMapper.writeValueAsString(message.getPayload());
        }
        catch (JsonProcessingException e) {
            throw new DurableQueueException(MessageFormatter.msg((String)"Failed to serialize message payload of type", (Object[])new Object[]{message.getPayload().getClass().getName()}), (Throwable)e, queueName);
        }
        Update update = (Update)((Update)((Update)((Update)((Update)((Update)((Update)((HandleAwareUnitOfWork)this.unitOfWorkFactory.getRequiredUnitOfWork()).handle().createUpdate(MessageFormatter.bind((String)"INSERT INTO {:tableName} (\n       id,\n       queue_name,\n       message_payload,\n       message_payload_type,\n       added_ts,\n       next_delivery_ts,\n       last_delivery_error,\n       is_dead_letter_message,\n       meta_data\n   ) VALUES (\n       :id,\n       :queueName,\n       :message_payload::jsonb,\n       :message_payload_type,\n       :addedTimestamp,\n       :nextDeliveryTimestamp,\n       :lastDeliveryError,\n       :isDeadLetterMessage,\n       :metaData::jsonb\n   )", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)})).bind("id", (Object)queueEntryId)).bind("queueName", (Object)queueName)).bind("message_payload", jsonPayload)).bind("message_payload_type", message.getPayload().getClass().getName())).bind("addedTimestamp", (Object)addedTimestamp)).bind("nextDeliveryTimestamp", (Object)nextDeliveryTimestamp)).bind("isDeadLetterMessage", isDeadLetterMessage);
        try {
            String jsonMetaData = this.messagePayloadObjectMapper.writeValueAsString((Object)message.getMetaData());
            update.bind("metaData", jsonMetaData);
        }
        catch (JsonProcessingException e) {
            throw new DurableQueueException("Failed to serialize message meta-data", (Throwable)e, queueName);
        }
        if (causeOfEnqueuing.isPresent()) {
            update.bind("lastDeliveryError", causeOfEnqueuing.map(Exceptions::getStackTrace).get());
        } else {
            update.bindNull("lastDeliveryError", 12);
        }
        int numberOfRowsUpdated = update.execute();
        if (numberOfRowsUpdated == 0) {
            throw new DurableQueueException("Failed to insert message", queueName);
        }
        log.debug("[{}] Queued {}Message with entry-id {} and nextDeliveryTimestamp {}", new Object[]{queueName, isDeadLetterMessage ? "Dead Letter " : "", queueEntryId, nextDeliveryTimestamp});
        return queueEntryId;
    }

    public List<QueueEntryId> queueMessages(QueueMessages operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must provide a QueueMessages instance");
        operation.validate();
        return (List)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> {
            QueueName queueName = operation.getQueueName();
            Optional deliveryDelay = operation.getDeliveryDelay();
            List messages = operation.getMessages();
            OffsetDateTime addedTimestamp = OffsetDateTime.now(Clock.systemUTC());
            OffsetDateTime nextDeliveryTimestamp = addedTimestamp.plus(deliveryDelay.orElse(Duration.ZERO));
            PreparedBatch batch = ((HandleAwareUnitOfWork)this.unitOfWorkFactory.getRequiredUnitOfWork()).handle().prepareBatch(MessageFormatter.bind((String)"INSERT INTO {:tableName} (\n       id,\n       queue_name,\n       message_payload,\n       message_payload_type,\n       added_ts,\n       next_delivery_ts,\n       last_delivery_error,\n       is_dead_letter_message,\n       meta_data\n   ) VALUES (\n       :id,\n       :queueName,\n       :message_payload::jsonb,\n       :message_payload_type,\n       :addedTimestamp,\n       :nextDeliveryTimestamp,\n       :lastDeliveryError,\n       :isDeadLetterMessage,\n       :metaData::jsonb\n   )", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)}));
            List queueEntryIds = messages.stream().map(message -> {
                String jsonPayload;
                try {
                    jsonPayload = this.messagePayloadObjectMapper.writeValueAsString(message.getPayload());
                }
                catch (JsonProcessingException e) {
                    throw new DurableQueueException(MessageFormatter.msg((String)"Failed to serialize message payload of type", (Object[])new Object[]{message.getPayload().getClass().getName()}), (Throwable)e, queueName);
                }
                QueueEntryId queueEntryId = QueueEntryId.random();
                ((PreparedBatch)((PreparedBatch)((PreparedBatch)((PreparedBatch)((PreparedBatch)((PreparedBatch)((PreparedBatch)batch.bind("id", (Object)queueEntryId)).bind("queueName", (Object)queueName)).bind("message_payload", jsonPayload)).bind("message_payload_type", message.getPayload().getClass().getName())).bind("addedTimestamp", (Object)addedTimestamp)).bind("nextDeliveryTimestamp", (Object)nextDeliveryTimestamp)).bind("isDeadLetterMessage", false)).bindNull("lastDeliveryError", 12);
                try {
                    String jsonMetaData = this.messagePayloadObjectMapper.writeValueAsString((Object)message.getMetaData());
                    batch.bind("metaData", jsonMetaData);
                }
                catch (JsonProcessingException e) {
                    throw new DurableQueueException("Failed to serialize message meta-data", (Throwable)e, queueName);
                }
                batch.add();
                return queueEntryId;
            }).collect(Collectors.toList());
            int numberOfRowsUpdated = Arrays.stream(batch.execute()).reduce(Integer::sum).orElse(0);
            if (numberOfRowsUpdated != messages.size()) {
                throw new DurableQueueException(MessageFormatter.msg((String)"Attempted to queue {} messages but only inserted {} messages", (Object[])new Object[]{messages.size(), numberOfRowsUpdated}), queueName);
            }
            log.debug("[{}] Queued {} Messages with nextDeliveryTimestamp {} and entry-id's: {}", new Object[]{queueName, messages.size(), nextDeliveryTimestamp, queueEntryIds});
            return queueEntryIds;
        }).proceed();
    }

    public Optional<QueuedMessage> retryMessage(RetryMessage operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must provide a RetryMessage instance");
        operation.validate();
        return (Optional)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> {
            OffsetDateTime nextDeliveryTimestamp = OffsetDateTime.now(Clock.systemUTC()).plus(operation.getDeliveryDelay());
            Optional result = ((Query)((Query)((Query)((HandleAwareUnitOfWork)this.unitOfWorkFactory.getRequiredUnitOfWork()).handle().createQuery(MessageFormatter.bind((String)"UPDATE {:tableName} SET\n     next_delivery_ts = :nextDeliveryTimestamp,\n     last_delivery_error = :lastDeliveryError,\n     redelivery_attempts = redelivery_attempts + 1\n WHERE id = :id\n RETURNING *", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)})).bind("nextDeliveryTimestamp", (Object)nextDeliveryTimestamp)).bind("lastDeliveryError", Exceptions.getStackTrace((Throwable)operation.getCauseForRetry()))).bind("id", (Object)operation.queueEntryId)).map((RowMapper)this.queuedMessageMapper).findOne();
            if (result.isPresent()) {
                log.debug("Marked Message with id '{}' for Retry at {}. Message entry after update: {}", new Object[]{operation.queueEntryId, nextDeliveryTimestamp, result.get()});
                return result;
            }
            log.error("Failed to Mark Message with id '{}' for Retry", (Object)operation.queueEntryId);
            return Optional.empty();
        }).proceed();
    }

    public Optional<QueuedMessage> markAsDeadLetterMessage(MarkAsDeadLetterMessage operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must provide a MarkAsDeadLetterMessage instance");
        operation.validate();
        return (Optional)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> {
            Optional result = ((Query)((Query)((HandleAwareUnitOfWork)this.unitOfWorkFactory.getRequiredUnitOfWork()).handle().createQuery(MessageFormatter.bind((String)"UPDATE {:tableName} SET\n     next_delivery_ts = NULL,\n     last_delivery_error = :lastDeliveryError,\n     redelivery_attempts = redelivery_attempts + 1,\n     is_dead_letter_message = TRUE\n WHERE id = :id AND is_dead_letter_message = FALSE\n RETURNING *", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)})).bind("lastDeliveryError", Exceptions.getStackTrace((Throwable)operation.getCauseForBeingMarkedAsDeadLetter()))).bind("id", (Object)operation.queueEntryId)).map((RowMapper)this.queuedMessageMapper).findOne();
            if (result.isPresent()) {
                log.debug("Marked message with id '{}' as Dead Letter Message. Message entry after update: {}", (Object)operation.queueEntryId, result.get());
                return result;
            }
            log.error("Failed to Mark as Message message with id '{}' as Dead Letter Message", (Object)operation.queueEntryId);
            return Optional.empty();
        }).proceed();
    }

    public Optional<QueuedMessage> resurrectDeadLetterMessage(ResurrectDeadLetterMessage operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must provide a ResurrectDeadLetterMessage instance");
        operation.validate();
        return (Optional)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> {
            OffsetDateTime nextDeliveryTimestamp = OffsetDateTime.now(Clock.systemUTC()).plus(operation.getDeliveryDelay());
            Optional result = ((Query)((Query)((HandleAwareUnitOfWork)this.unitOfWorkFactory.getRequiredUnitOfWork()).handle().createQuery(MessageFormatter.bind((String)"UPDATE {:tableName} SET\n     next_delivery_ts = :nextDeliveryTimestamp,\n     is_dead_letter_message = FALSE\n WHERE id = :id AND is_dead_letter_message = TRUE\n RETURNING *", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)})).bind("nextDeliveryTimestamp", (Object)nextDeliveryTimestamp)).bind("id", (Object)operation.queueEntryId)).map((RowMapper)this.queuedMessageMapper).findOne();
            if (result.isPresent()) {
                log.debug("Resurrected Dead Letter Message with id '{}'. Message entry after update: {}", (Object)operation.queueEntryId, result.get());
                return result;
            }
            log.error("Failed to resurrect Dead Letter Message with id '{}'", (Object)operation.queueEntryId);
            return Optional.empty();
        }).proceed();
    }

    public boolean acknowledgeMessageAsHandled(AcknowledgeMessageAsHandled operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must provide a AcknowledgeMessageAsHandled instance");
        return (Boolean)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> {
            log.debug("Acknowledging-Message-As-Handled regarding Message with id '{}'", (Object)operation.queueEntryId);
            return this.deleteMessage(new DeleteMessage(operation.queueEntryId));
        }).proceed();
    }

    public boolean deleteMessage(DeleteMessage operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must provide a DeleteMessage instance");
        return (Boolean)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> {
            int rowsUpdated = ((Update)((HandleAwareUnitOfWork)this.unitOfWorkFactory.getRequiredUnitOfWork()).handle().createUpdate(MessageFormatter.bind((String)"DELETE FROM {:tableName} WHERE id = :id", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)})).bind("id", (Object)operation.queueEntryId)).execute();
            if (rowsUpdated == 1) {
                log.debug("Deleted Message with id '{}'", (Object)operation.queueEntryId);
                return true;
            }
            log.error("Failed to Delete Message with id '{}'", (Object)operation.queueEntryId);
            return false;
        }).proceed();
    }

    public Optional<QueuedMessage> getNextMessageReadyForDelivery(GetNextMessageReadyForDelivery operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must specify a GetNextMessageReadyForDelivery instance");
        return (Optional)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> {
            OffsetDateTime now = OffsetDateTime.now(Clock.systemUTC());
            return ((Query)((Query)((HandleAwareUnitOfWork)this.unitOfWorkFactory.getRequiredUnitOfWork()).handle().createQuery(MessageFormatter.bind((String)"WITH queued_message_ready_for_delivery AS (\n    SELECT id FROM {:tableName} \n    WHERE\n        queue_name = :queueName AND\n        next_delivery_ts <= :now AND\n        is_dead_letter_message = FALSE\n    ORDER BY next_delivery_ts ASC\n    LIMIT 1\n    FOR UPDATE SKIP LOCKED\n )\n UPDATE {:tableName} queued_message SET\n    total_attempts = total_attempts + 1,\n    next_delivery_ts = NULL\n FROM queued_message_ready_for_delivery\n WHERE queued_message.id = queued_message_ready_for_delivery.id\n RETURNING\n     queued_message.id,\n     queued_message.queue_name,\n     queued_message.message_payload,\n     queued_message.message_payload_type,\n     queued_message.added_ts,\n     queued_message.next_delivery_ts,\n     queued_message.last_delivery_error,\n     queued_message.total_attempts,\n     queued_message.redelivery_attempts,\n     queued_message.is_dead_letter_message,\n     queued_message.meta_data", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)})).bind("queueName", (Object)operation.queueName)).bind("now", (Object)now)).map((RowMapper)this.queuedMessageMapper).findOne();
        }).proceed();
    }

    public boolean hasMessagesQueuedFor(QueueName queueName) {
        return this.getTotalMessagesQueuedFor(queueName) > 0L;
    }

    public long getTotalMessagesQueuedFor(GetTotalMessagesQueuedFor operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must specify a GetTotalMessagesQueuedFor instance");
        return (Long)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> (Long)this.unitOfWorkFactory.withUnitOfWork(handleAwareUnitOfWork -> (Long)((Query)handleAwareUnitOfWork.handle().createQuery(MessageFormatter.bind((String)"SELECT count(*) FROM {:tableName} \n WHERE \n    queue_name = :queueName AND\n    is_dead_letter_message = FALSE", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)})).bind("queueName", (Object)operation.queueName)).mapTo(Long.class).one())).proceed();
    }

    public int purgeQueue(PurgeQueue operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must specify a PurgeQueue instance");
        return (Integer)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> (Integer)this.unitOfWorkFactory.withUnitOfWork(handleAwareUnitOfWork -> ((Update)handleAwareUnitOfWork.handle().createUpdate(MessageFormatter.bind((String)"DELETE FROM {:tableName} WHERE queue_name = :queueName", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)})).bind("queueName", (Object)operation.queueName)).execute())).proceed();
    }

    public List<QueuedMessage> getQueuedMessages(GetQueuedMessages operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must specify a GetQueuedMessages instance");
        return (List)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> this.queryQueuedMessages(operation.queueName, operation.getQueueingSortOrder(), IncludeMessages.QUEUED_MESSAGES, operation.getStartIndex(), operation.getPageSize())).proceed();
    }

    public List<QueuedMessage> getDeadLetterMessages(GetDeadLetterMessages operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must specify a GetDeadLetterMessages instance");
        return (List)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> this.queryQueuedMessages(operation.queueName, operation.getQueueingSortOrder(), IncludeMessages.DEAD_LETTER_MESSAGES, operation.getStartIndex(), operation.getPageSize())).proceed();
    }

    protected List<QueuedMessage> queryQueuedMessages(QueueName queueName, DurableQueues.QueueingSortOrder queueingSortOrder, IncludeMessages includeMessages, long startIndex, long pageSize) {
        FailFast.requireNonNull((Object)queueName, (String)"No queueName provided");
        FailFast.requireNonNull((Object)queueingSortOrder, (String)"No queueingOrder provided");
        FailFast.requireNonNull((Object)((Object)includeMessages), (String)"No includeMessages provided");
        Supplier<String> resolveIncludeMessagesSql = () -> {
            switch (includeMessages) {
                case ALL: {
                    return "";
                }
                case DEAD_LETTER_MESSAGES: {
                    return "AND is_dead_letter_message = TRUE\n";
                }
                case QUEUED_MESSAGES: {
                    return "AND is_dead_letter_message = FALSE\n";
                }
            }
            throw new IllegalArgumentException("Unsupported IncludeMessages value: " + includeMessages);
        };
        return (List)this.unitOfWorkFactory.withUnitOfWork(handleAwareUnitOfWork -> ((Query)((Query)((Query)handleAwareUnitOfWork.handle().createQuery(MessageFormatter.bind((String)"SELECT * FROM {:tableName} \n WHERE queue_name = :queueName\n{:includeMessages} LIMIT :pageSize \n OFFSET :offset", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName), MessageFormatter.NamedArgumentBinding.arg((String)"includeMessages", resolveIncludeMessagesSql.get())})).bind("queueName", FailFast.requireNonNull((Object)queueName, (String)"No QueueName provided"))).bind("offset", startIndex)).bind("pageSize", pageSize)).map((RowMapper)this.queuedMessageMapper).list());
    }

    public DurableQueues addInterceptor(DurableQueuesInterceptor interceptor) {
        FailFast.requireNonNull((Object)interceptor, (String)"No interceptor provided");
        log.debug("Adding interceptor: {}", (Object)interceptor);
        this.interceptors.add(interceptor);
        return this;
    }

    public DurableQueues removeInterceptor(DurableQueuesInterceptor interceptor) {
        FailFast.requireNonNull((Object)interceptor, (String)"No interceptor provided");
        log.debug("Removing interceptor: {}", (Object)interceptor);
        this.interceptors.remove(interceptor);
        return this;
    }

    public Optional<QueuedMessage> getDeadLetterMessage(GetDeadLetterMessage operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must specify a GetDeadLetterMessage instance");
        return (Optional)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> this.getQueuedMessage(operation.queueEntryId, true)).proceed();
    }

    public Optional<QueuedMessage> getQueuedMessage(GetQueuedMessage operation) {
        FailFast.requireNonNull((Object)operation, (String)"You must specify a GetQueuedMessage instance");
        return (Optional)InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, interceptorChain), () -> this.getQueuedMessage(operation.queueEntryId, false)).proceed();
    }

    private Optional<QueuedMessage> getQueuedMessage(QueueEntryId queueEntryId, boolean isDeadLetterMessage) {
        return (Optional)this.unitOfWorkFactory.withUnitOfWork(handleAwareUnitOfWork -> ((Query)((Query)handleAwareUnitOfWork.handle().createQuery(MessageFormatter.bind((String)"SELECT * FROM {:tableName} WHERE \n id = :id AND\n is_dead_letter_message = :isDeadLetterMessage", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)})).bind("id", FailFast.requireNonNull((Object)queueEntryId, (String)"No queueEntryId provided"))).bind("isDeadLetterMessage", isDeadLetterMessage)).map((RowMapper)this.queuedMessageMapper).findOne());
    }

    private Object deserializeMessagePayload(QueueName queueName, String messagePayload, String messagePayloadType) {
        FailFast.requireNonNull((Object)queueName, (String)"No queueName provided");
        FailFast.requireNonNull((Object)messagePayload, (String)"No messagePayload provided");
        FailFast.requireNonNull((Object)messagePayloadType, (String)"No messagePayloadType provided");
        try {
            return this.messagePayloadObjectMapper.readValue(messagePayload, Classes.forName((String)messagePayloadType));
        }
        catch (JsonProcessingException e) {
            throw new DurableQueueException(MessageFormatter.msg((String)"Failed to deserialize message payload of type {}", (Object[])new Object[]{messagePayloadType}), (Throwable)e, queueName);
        }
    }

    private MessageMetaData deserializeMessageMetadata(QueueName queueName, String metaData) {
        FailFast.requireNonNull((Object)queueName, (String)"No queueName provided");
        FailFast.requireNonNull((Object)metaData, (String)"No messagePayload provided");
        try {
            return (MessageMetaData)this.messagePayloadObjectMapper.readValue(metaData, MessageMetaData.class);
        }
        catch (JsonProcessingException e) {
            throw new DurableQueueException(MessageFormatter.msg((String)"Failed to deserialize message meta-data", (Object[])new Object[0]), (Throwable)e, queueName);
        }
    }

    private static ObjectMapper createObjectMapper() {
        JsonMapper objectMapper = (JsonMapper)((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)JsonMapper.builder().disable(new MapperFeature[]{MapperFeature.AUTO_DETECT_GETTERS})).disable(new MapperFeature[]{MapperFeature.AUTO_DETECT_IS_GETTERS})).disable(new MapperFeature[]{MapperFeature.AUTO_DETECT_SETTERS})).disable(new MapperFeature[]{MapperFeature.DEFAULT_VIEW_INCLUSION})).disable(new SerializationFeature[]{SerializationFeature.WRITE_DATES_AS_TIMESTAMPS})).disable(new DeserializationFeature[]{DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES})).disable(new SerializationFeature[]{SerializationFeature.FAIL_ON_EMPTY_BEANS})).enable(new MapperFeature[]{MapperFeature.AUTO_DETECT_CREATORS})).enable(new MapperFeature[]{MapperFeature.AUTO_DETECT_FIELDS})).enable(new MapperFeature[]{MapperFeature.PROPAGATE_TRANSIENT_MARKER})).addModule((Module)new Jdk8Module())).addModule((Module)new JavaTimeModule())).addModule((Module)new EssentialTypesJacksonModule())).addModule((Module)new EssentialsImmutableJacksonModule())).build();
        objectMapper.setVisibility(objectMapper.getSerializationConfig().getDefaultVisibilityChecker().withGetterVisibility(JsonAutoDetect.Visibility.NONE).withSetterVisibility(JsonAutoDetect.Visibility.NONE).withFieldVisibility(JsonAutoDetect.Visibility.ANY).withCreatorVisibility(JsonAutoDetect.Visibility.ANY));
        return objectMapper;
    }

    private class QueuedMessageRowMapper
    implements RowMapper<QueuedMessage> {
        public QueuedMessage map(ResultSet rs, StatementContext ctx) throws SQLException {
            QueueName queueName = QueueName.of((CharSequence)rs.getString("queue_name"));
            Object messagePayload = PostgresqlDurableQueues.this.deserializeMessagePayload(queueName, rs.getString("message_payload"), rs.getString("message_payload_type"));
            MessageMetaData messageMetaData = null;
            String metaDataColumnValue = rs.getString("meta_data");
            messageMetaData = metaDataColumnValue != null ? PostgresqlDurableQueues.this.deserializeMessageMetadata(queueName, metaDataColumnValue) : new MessageMetaData();
            return new DefaultQueuedMessage(QueueEntryId.of((CharSequence)rs.getString("id")), queueName, new Message(messagePayload, messageMetaData), rs.getObject("added_ts", OffsetDateTime.class), rs.getObject("next_delivery_ts", OffsetDateTime.class), rs.getString("last_delivery_error"), rs.getInt("total_attempts"), rs.getInt("redelivery_attempts"), rs.getBoolean("is_dead_letter_message"));
        }
    }

    protected static enum IncludeMessages {
        ALL,
        DEAD_LETTER_MESSAGES,
        QUEUED_MESSAGES;

    }
}

