/*
 * Decompiled with CFR 0.152.
 */
package dk.cloudcreate.essentials.components.queue.postgresql;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import dk.cloudcreate.essentials.components.foundation.messaging.RedeliveryPolicy;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DefaultQueuedMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueConsumer;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueException;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueues;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueueEntryId;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueueName;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuedMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuedMessageHandler;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.TransactionalMode;
import dk.cloudcreate.essentials.components.foundation.transaction.jdbi.HandleAwareUnitOfWork;
import dk.cloudcreate.essentials.components.foundation.transaction.jdbi.HandleAwareUnitOfWorkFactory;
import dk.cloudcreate.essentials.components.queue.postgresql.PostgresqlDurableQueueConsumer;
import dk.cloudcreate.essentials.components.queue.postgresql.jdbi.QueueEntryIdArgumentFactory;
import dk.cloudcreate.essentials.components.queue.postgresql.jdbi.QueueEntryIdColumnMapper;
import dk.cloudcreate.essentials.components.queue.postgresql.jdbi.QueueNameArgumentFactory;
import dk.cloudcreate.essentials.components.queue.postgresql.jdbi.QueueNameColumnMapper;
import dk.cloudcreate.essentials.jackson.immutable.EssentialsImmutableJacksonModule;
import dk.cloudcreate.essentials.jackson.types.EssentialTypesJacksonModule;
import dk.cloudcreate.essentials.shared.Exceptions;
import dk.cloudcreate.essentials.shared.FailFast;
import dk.cloudcreate.essentials.shared.MessageFormatter;
import dk.cloudcreate.essentials.shared.reflection.Classes;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Clock;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.jdbi.v3.core.argument.ArgumentFactory;
import org.jdbi.v3.core.mapper.ColumnMapper;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.PreparedBatch;
import org.jdbi.v3.core.statement.Query;
import org.jdbi.v3.core.statement.StatementContext;
import org.jdbi.v3.core.statement.Update;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostgresqlDurableQueues
implements DurableQueues {
    private static final Logger log = LoggerFactory.getLogger(PostgresqlDurableQueues.class);
    public static final String DEFAULT_DURABLE_QUEUES_TABLE_NAME = "durable_queues";
    private final HandleAwareUnitOfWorkFactory<? extends HandleAwareUnitOfWork> unitOfWorkFactory;
    private final ObjectMapper messagePayloadObjectMapper;
    private final String sharedQueueTableName;
    private final ConcurrentMap<QueueName, PostgresqlDurableQueueConsumer> durableQueueConsumers = new ConcurrentHashMap<QueueName, PostgresqlDurableQueueConsumer>();
    private final QueuedMessageRowMapper queuedMessageMapper;
    private volatile boolean started;

    public PostgresqlDurableQueues(HandleAwareUnitOfWorkFactory<? extends HandleAwareUnitOfWork> unitOfWorkFactory) {
        this(unitOfWorkFactory, PostgresqlDurableQueues.createObjectMapper(), DEFAULT_DURABLE_QUEUES_TABLE_NAME);
    }

    public PostgresqlDurableQueues(HandleAwareUnitOfWorkFactory<? extends HandleAwareUnitOfWork> unitOfWorkFactory, ObjectMapper messagePayloadObjectMapper, String sharedQueueTableName) {
        this.unitOfWorkFactory = (HandleAwareUnitOfWorkFactory)FailFast.requireNonNull(unitOfWorkFactory, (String)"No unitOfWorkFactory instance provided");
        this.messagePayloadObjectMapper = (ObjectMapper)FailFast.requireNonNull((Object)messagePayloadObjectMapper, (String)"No messagePayloadObjectMapper");
        this.sharedQueueTableName = ((String)FailFast.requireNonNull((Object)sharedQueueTableName, (String)"No sharedQueueTableName provided")).toLowerCase(Locale.ROOT);
        this.queuedMessageMapper = new QueuedMessageRowMapper();
        this.initializeQueueTables();
    }

    private void initializeQueueTables() {
        this.unitOfWorkFactory.usingUnitOfWork(handleAwareUnitOfWork -> {
            handleAwareUnitOfWork.handle().getJdbi().registerArgument((ArgumentFactory)new QueueNameArgumentFactory());
            handleAwareUnitOfWork.handle().getJdbi().registerColumnMapper((ColumnMapper)new QueueNameColumnMapper());
            handleAwareUnitOfWork.handle().getJdbi().registerArgument((ArgumentFactory)new QueueEntryIdArgumentFactory());
            handleAwareUnitOfWork.handle().getJdbi().registerColumnMapper((ColumnMapper)new QueueEntryIdColumnMapper());
            int numberOfUpdates = handleAwareUnitOfWork.handle().execute(MessageFormatter.bind((String)"CREATE TABLE IF NOT EXISTS {:tableName} (\n  id                     TEXT PRIMARY KEY,\n  queue_name             TEXT NOT NULL,\n  message_payload        JSONB NOT NULL,\n  message_payload_type   TEXT NOT NULL,\n  added_ts               TIMESTAMPTZ NOT NULL,\n  next_delivery_ts       TIMESTAMPTZ,\n  total_attempts         INTEGER DEFAULT 0,\n  redelivery_attempts    INTEGER DEFAULT 0,\n  last_delivery_error    TEXT DEFAULT NULL,\n  is_dead_letter_message BOOLEAN NOT NULL DEFAULT FALSE\n)", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)}), new Object[0]);
            log.info("Durable Queues table '{}' {}", (Object)this.sharedQueueTableName, (Object)(numberOfUpdates == 1 ? "created" : "already existed"));
            String indexName = this.sharedQueueTableName + "queue_name__next_delivery__id__index";
            numberOfUpdates = handleAwareUnitOfWork.handle().execute(MessageFormatter.bind((String)"CREATE INDEX IF NOT EXISTS {:indexName} ON {:tableName} (\n    queue_name, next_delivery_ts, id DESC\n)", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"indexName", (Object)indexName), MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)}), new Object[0]);
            log.info("Durable Queues index '{}' {}", (Object)indexName, (Object)(numberOfUpdates == 1 ? "created" : "already existed"));
        });
    }

    public void start() {
        if (!this.started) {
            this.started = true;
            this.durableQueueConsumers.values().forEach(DurableQueueConsumer.DefaultDurableQueueConsumer::start);
        }
    }

    public void stop() {
        if (this.started) {
            this.durableQueueConsumers.values().forEach(DurableQueueConsumer.DefaultDurableQueueConsumer::stop);
            this.started = false;
        }
    }

    public boolean isStarted() {
        return this.started;
    }

    public DurableQueueConsumer consumeFromQueue(QueueName queueName, RedeliveryPolicy redeliveryPolicy, int parallelConsumers, QueuedMessageHandler queueMessageHandler) {
        FailFast.requireNonNull((Object)queueName, (String)"No queueName provided");
        if (this.durableQueueConsumers.containsKey(queueName)) {
            throw new DurableQueueException("There is already an DurableConsumer for this queue", queueName);
        }
        return (DurableQueueConsumer)this.durableQueueConsumers.computeIfAbsent(queueName, _queueName -> {
            PostgresqlDurableQueueConsumer consumer = new PostgresqlDurableQueueConsumer(queueName, queueMessageHandler, redeliveryPolicy, parallelConsumers, this.unitOfWorkFactory, this, this::removeQueueConsumer);
            if (this.started) {
                consumer.start();
            }
            return consumer;
        });
    }

    void removeQueueConsumer(DurableQueueConsumer durableQueueConsumer) {
        FailFast.requireFalse((boolean)durableQueueConsumer.isStarted(), (String)MessageFormatter.msg((String)"Cannot remove DurableQueueConsumer '{}' since it's started!", (Object[])new Object[]{durableQueueConsumer.queueName()}));
        this.durableQueueConsumers.remove(durableQueueConsumer.queueName());
    }

    public QueueEntryId queueMessage(QueueName queueName, Object payload, Optional<Exception> causeOfEnqueuing, Optional<Duration> deliveryDelay) {
        return this.queueMessage(queueName, payload, false, causeOfEnqueuing, deliveryDelay);
    }

    public QueueEntryId queueMessageAsDeadLetterMessage(QueueName queueName, Object payload, Exception causeOfError) {
        return this.queueMessage(queueName, payload, true, Optional.of(causeOfError), Optional.empty());
    }

    protected QueueEntryId queueMessage(QueueName queueName, Object payload, boolean isDeadLetterMessage, Optional<Exception> causeOfEnqueuing, Optional<Duration> deliveryDelay) {
        String jsonPayload;
        FailFast.requireNonNull((Object)queueName, (String)"You must provide a queueName");
        FailFast.requireNonNull((Object)payload, (String)"You must provide a payload");
        FailFast.requireNonNull(causeOfEnqueuing, (String)"You must provide a causeOfEnqueuing option");
        FailFast.requireNonNull(deliveryDelay, (String)"You must provide a deliveryDelay option");
        try {
            jsonPayload = this.messagePayloadObjectMapper.writeValueAsString(payload);
        }
        catch (JsonProcessingException e) {
            throw new DurableQueueException((Throwable)e, queueName);
        }
        OffsetDateTime addedTimestamp = OffsetDateTime.now(Clock.systemUTC());
        OffsetDateTime nextDeliveryTimestamp = isDeadLetterMessage ? null : addedTimestamp.plus(deliveryDelay.orElse(Duration.ZERO));
        QueueEntryId queueEntryId = QueueEntryId.random();
        Update update = (Update)((Update)((Update)((Update)((Update)((Update)((Update)((HandleAwareUnitOfWork)this.unitOfWorkFactory.getRequiredUnitOfWork()).handle().createUpdate(MessageFormatter.bind((String)"INSERT INTO {:tableName} (\n       id,\n       queue_name,\n       message_payload,\n       message_payload_type,\n       added_ts,\n       next_delivery_ts,\n       last_delivery_error,\n       is_dead_letter_message\n   ) VALUES (\n       :id,\n       :queueName,\n       :message_payload::jsonb,\n       :message_payload_type,\n       :addedTimestamp,\n       :nextDeliveryTimestamp,\n       :lastDeliveryError,\n       :isDeadLetterMessage\n   )", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)})).bind("id", (Object)queueEntryId)).bind("queueName", (Object)queueName)).bind("message_payload", jsonPayload)).bind("message_payload_type", payload.getClass().getName())).bind("addedTimestamp", (Object)addedTimestamp)).bind("nextDeliveryTimestamp", (Object)nextDeliveryTimestamp)).bind("isDeadLetterMessage", isDeadLetterMessage);
        if (causeOfEnqueuing.isPresent()) {
            update.bind("lastDeliveryError", causeOfEnqueuing.map(Exceptions::getStackTrace).get());
        } else {
            update.bindNull("lastDeliveryError", 12);
        }
        int numberOfRowsUpdated = update.execute();
        if (numberOfRowsUpdated == 0) {
            throw new DurableQueueException("Failed to insert message", queueName);
        }
        log.debug("[{}] Queued {}Message with entry-id {} and nextDeliveryTimestamp {}", new Object[]{queueName, isDeadLetterMessage ? "Dead Letter " : "", queueEntryId, nextDeliveryTimestamp});
        return queueEntryId;
    }

    public List<QueueEntryId> queueMessages(QueueName queueName, List<?> payloads, Optional<Duration> deliveryDelay) {
        FailFast.requireNonNull((Object)queueName, (String)"You must provide a queueName");
        FailFast.requireNonNull(payloads, (String)"You must provide a payloads list");
        FailFast.requireNonNull(deliveryDelay, (String)"You must provide a deliveryDelay option");
        OffsetDateTime addedTimestamp = OffsetDateTime.now(Clock.systemUTC());
        OffsetDateTime nextDeliveryTimestamp = addedTimestamp.plus(deliveryDelay.orElse(Duration.ZERO));
        PreparedBatch batch = ((HandleAwareUnitOfWork)this.unitOfWorkFactory.getRequiredUnitOfWork()).handle().prepareBatch(MessageFormatter.bind((String)"INSERT INTO {:tableName} (\n       id,\n       queue_name,\n       message_payload,\n       message_payload_type,\n       added_ts,\n       next_delivery_ts,\n       last_delivery_error,\n       is_dead_letter_message\n   ) VALUES (\n       :id,\n       :queueName,\n       :message_payload::jsonb,\n       :message_payload_type,\n       :addedTimestamp,\n       :nextDeliveryTimestamp,\n       :lastDeliveryError,\n       :isDeadLetterMessage\n   )", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)}));
        List<QueueEntryId> queueEntryIds = payloads.stream().map(payload -> {
            String jsonPayload;
            try {
                jsonPayload = this.messagePayloadObjectMapper.writeValueAsString(payload);
            }
            catch (JsonProcessingException e) {
                throw new DurableQueueException((Throwable)e, queueName);
            }
            QueueEntryId queueEntryId = QueueEntryId.random();
            ((PreparedBatch)((PreparedBatch)((PreparedBatch)((PreparedBatch)((PreparedBatch)((PreparedBatch)((PreparedBatch)batch.bind("id", (Object)queueEntryId)).bind("queueName", (Object)queueName)).bind("message_payload", jsonPayload)).bind("message_payload_type", payload.getClass().getName())).bind("addedTimestamp", (Object)addedTimestamp)).bind("nextDeliveryTimestamp", (Object)nextDeliveryTimestamp)).bind("isDeadLetterMessage", false)).bindNull("lastDeliveryError", 12);
            batch.add();
            return queueEntryId;
        }).collect(Collectors.toList());
        int numberOfRowsUpdated = Arrays.stream(batch.execute()).reduce(Integer::sum).orElse(0);
        if (numberOfRowsUpdated != payloads.size()) {
            throw new DurableQueueException(MessageFormatter.msg((String)"Attempted to queue {} messages but only inserted {} messages", (Object[])new Object[]{payloads.size(), numberOfRowsUpdated}), queueName);
        }
        log.debug("[{}] Queued {} Messages with nextDeliveryTimestamp {} and entry-id's: {}", new Object[]{queueName, payloads.size(), nextDeliveryTimestamp, queueEntryIds});
        return queueEntryIds;
    }

    public boolean retryMessage(QueueEntryId queueEntryId, Exception causeForRetry, Duration deliveryDelay) {
        FailFast.requireNonNull((Object)causeForRetry, (String)"You must provide a causeForRetry");
        FailFast.requireNonNull((Object)deliveryDelay, (String)"You must provide a deliveryDelay");
        OffsetDateTime nextDeliveryTimestamp = OffsetDateTime.now(Clock.systemUTC()).plus(deliveryDelay);
        int rowsUpdated = ((Update)((Update)((Update)((HandleAwareUnitOfWork)this.unitOfWorkFactory.getRequiredUnitOfWork()).handle().createUpdate(MessageFormatter.bind((String)"UPDATE {:tableName} SET\n     next_delivery_ts = :nextDeliveryTimestamp,\n     last_delivery_error = :lastDeliveryError,\n     redelivery_attempts = redelivery_attempts + 1\n WHERE id = :id", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)})).bind("nextDeliveryTimestamp", (Object)nextDeliveryTimestamp)).bind("lastDeliveryError", Exceptions.getStackTrace((Throwable)causeForRetry))).bind("id", FailFast.requireNonNull((Object)queueEntryId, (String)"You must provide a queueEntryId"))).execute();
        if (rowsUpdated == 1) {
            log.debug("Marked Message with id '{}' for Retry at {}", (Object)queueEntryId, (Object)nextDeliveryTimestamp);
            return true;
        }
        log.error("Failed to Mark Message with id '{}' for Retry", (Object)queueEntryId);
        return false;
    }

    public boolean markAsDeadLetterMessage(QueueEntryId queueEntryId, Exception causeForBeingMarkedAsDeadLetter) {
        FailFast.requireNonNull((Object)causeForBeingMarkedAsDeadLetter, (String)"You must provide a causeForBeingMarkedAsDeadLetter");
        int rowsUpdated = ((Update)((Update)((HandleAwareUnitOfWork)this.unitOfWorkFactory.getRequiredUnitOfWork()).handle().createUpdate(MessageFormatter.bind((String)"UPDATE {:tableName} SET\n     next_delivery_ts = NULL,\n     last_delivery_error = :lastDeliveryError,\n     redelivery_attempts = redelivery_attempts + 1,\n     is_dead_letter_message = TRUE\n WHERE id = :id AND is_dead_letter_message = FALSE", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)})).bind("lastDeliveryError", Exceptions.getStackTrace((Throwable)causeForBeingMarkedAsDeadLetter))).bind("id", FailFast.requireNonNull((Object)queueEntryId, (String)"You must provide a queueEntryId"))).execute();
        if (rowsUpdated == 1) {
            log.debug("Marked message with id '{}' as Dead Letter Message", (Object)queueEntryId);
            return true;
        }
        log.error("Failed to Mark as Message message with id '{}' as Dead Letter Message", (Object)queueEntryId);
        return false;
    }

    public boolean resurrectDeadLetterMessage(QueueEntryId queueEntryId, Duration deliveryDelay) {
        FailFast.requireNonNull((Object)deliveryDelay, (String)"You must provide a deliveryDelay");
        OffsetDateTime nextDeliveryTimestamp = OffsetDateTime.now(Clock.systemUTC()).plus(deliveryDelay);
        int rowsUpdated = ((Update)((Update)((HandleAwareUnitOfWork)this.unitOfWorkFactory.getRequiredUnitOfWork()).handle().createUpdate(MessageFormatter.bind((String)"UPDATE {:tableName} SET\n     next_delivery_ts = :nextDeliveryTimestamp,\n     is_dead_letter_message = FALSE\n WHERE id = :id AND is_dead_letter_message = TRUE", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)})).bind("nextDeliveryTimestamp", (Object)nextDeliveryTimestamp)).bind("id", FailFast.requireNonNull((Object)queueEntryId, (String)"You must provide a queueEntryId"))).execute();
        if (rowsUpdated == 1) {
            log.debug("Resurrected Dead Letter Message with id '{}'", (Object)queueEntryId);
            return true;
        }
        log.error("Failed to resurrect Dead Letter Message with id '{}'", (Object)queueEntryId);
        return false;
    }

    public boolean deleteMessage(QueueEntryId queueEntryId) {
        int rowsUpdated = ((Update)((HandleAwareUnitOfWork)this.unitOfWorkFactory.getRequiredUnitOfWork()).handle().createUpdate(MessageFormatter.bind((String)"DELETE FROM {:tableName} WHERE id = :id", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)})).bind("id", FailFast.requireNonNull((Object)queueEntryId, (String)"You must provide a queueEntryId"))).execute();
        if (rowsUpdated == 1) {
            log.debug("Deleted Message with id '{}'", (Object)queueEntryId);
            return true;
        }
        log.error("Failed to Delete Message with id '{}'", (Object)queueEntryId);
        return false;
    }

    public Optional<QueuedMessage> getNextMessageReadyForDelivery(QueueName queueName) {
        FailFast.requireNonNull((Object)queueName, (String)"You must specify a queueName");
        OffsetDateTime now = OffsetDateTime.now(Clock.systemUTC());
        return ((Query)((Query)((HandleAwareUnitOfWork)this.unitOfWorkFactory.getRequiredUnitOfWork()).handle().createQuery(MessageFormatter.bind((String)"WITH queued_message_ready_for_delivery AS (\n    SELECT id FROM {:tableName} \n    WHERE\n        queue_name = :queueName AND\n        next_delivery_ts <= :now AND\n        is_dead_letter_message = FALSE\n    ORDER BY next_delivery_ts ASC\n    LIMIT 1\n    FOR UPDATE SKIP LOCKED\n )\n UPDATE {:tableName} queued_message SET\n    total_attempts = total_attempts + 1,\n    next_delivery_ts = NULL\n FROM queued_message_ready_for_delivery\n WHERE queued_message.id = queued_message_ready_for_delivery.id\n RETURNING\n     queued_message.id,\n     queued_message.queue_name,\n     queued_message.message_payload,\n     queued_message.message_payload_type,\n     queued_message.added_ts,\n     queued_message.next_delivery_ts,\n     queued_message.last_delivery_error,\n     queued_message.total_attempts,\n     queued_message.redelivery_attempts,\n     queued_message.is_dead_letter_message", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)})).bind("queueName", (Object)queueName)).bind("now", (Object)now)).map((RowMapper)this.queuedMessageMapper).findOne();
    }

    public boolean hasMessagesQueuedFor(QueueName queueName) {
        return this.getTotalMessagesQueuedFor(queueName) > 0L;
    }

    public long getTotalMessagesQueuedFor(QueueName queueName) {
        return (Long)this.unitOfWorkFactory.withUnitOfWork(handleAwareUnitOfWork -> (Long)((Query)handleAwareUnitOfWork.handle().createQuery(MessageFormatter.bind((String)"SELECT count(*) FROM {:tableName} \n WHERE \n    queue_name = :queueName AND\n    is_dead_letter_message = FALSE", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)})).bind("queueName", FailFast.requireNonNull((Object)queueName, (String)"No QueueName provided"))).mapTo(Long.class).one());
    }

    public int purgeQueue(QueueName queueName) {
        return (Integer)this.unitOfWorkFactory.withUnitOfWork(handleAwareUnitOfWork -> ((Update)handleAwareUnitOfWork.handle().createUpdate(MessageFormatter.bind((String)"DELETE FROM {:tableName} WHERE queue_name = :queueName", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)})).bind("queueName", FailFast.requireNonNull((Object)queueName, (String)"No QueueName provided"))).execute());
    }

    public List<QueuedMessage> getQueuedMessages(QueueName queueName, DurableQueues.QueueingSortOrder queueingSortOrder, long startIndex, long pageSize) {
        return this.queryQueuedMessages(queueName, queueingSortOrder, IncludeMessages.QUEUED_MESSAGES, startIndex, pageSize);
    }

    public List<QueuedMessage> getDeadLetterMessages(QueueName queueName, DurableQueues.QueueingSortOrder queueingSortOrder, long startIndex, long pageSize) {
        return this.queryQueuedMessages(queueName, queueingSortOrder, IncludeMessages.DEAD_LETTER_MESSAGES, startIndex, pageSize);
    }

    protected List<QueuedMessage> queryQueuedMessages(QueueName queueName, DurableQueues.QueueingSortOrder queueingSortOrder, IncludeMessages includeMessages, long startIndex, long pageSize) {
        FailFast.requireNonNull((Object)queueName, (String)"No queueName provided");
        FailFast.requireNonNull((Object)queueingSortOrder, (String)"No queueingOrder provided");
        FailFast.requireNonNull((Object)((Object)includeMessages), (String)"No includeMessages provided");
        Supplier<String> resolveIncludeMessagesSql = () -> {
            switch (includeMessages) {
                case ALL: {
                    return "";
                }
                case DEAD_LETTER_MESSAGES: {
                    return "AND is_dead_letter_message = TRUE\n";
                }
                case QUEUED_MESSAGES: {
                    return "AND is_dead_letter_message = FALSE\n";
                }
            }
            throw new IllegalArgumentException("Unsupported IncludeMessages value: " + includeMessages);
        };
        return (List)this.unitOfWorkFactory.withUnitOfWork(handleAwareUnitOfWork -> ((Query)((Query)((Query)handleAwareUnitOfWork.handle().createQuery(MessageFormatter.bind((String)"SELECT * FROM {:tableName} \n WHERE queue_name = :queueName\n{:includeMessages} LIMIT :pageSize \n OFFSET :offset", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName), MessageFormatter.NamedArgumentBinding.arg((String)"includeMessages", resolveIncludeMessagesSql.get())})).bind("queueName", FailFast.requireNonNull((Object)queueName, (String)"No QueueName provided"))).bind("offset", startIndex)).bind("pageSize", pageSize)).map((RowMapper)this.queuedMessageMapper).list());
    }

    public Optional<QueuedMessage> getDeadLetterMessage(QueueEntryId queueEntryId) {
        return this.getQueuedMessage(queueEntryId, true);
    }

    public Optional<QueuedMessage> getQueuedMessage(QueueEntryId queueEntryId) {
        return this.getQueuedMessage(queueEntryId, false);
    }

    public TransactionalMode getTransactionalMode() {
        return TransactionalMode.FullyTransactional;
    }

    protected Optional<QueuedMessage> getQueuedMessage(QueueEntryId queueEntryId, boolean isDeadLetterMessage) {
        return (Optional)this.unitOfWorkFactory.withUnitOfWork(handleAwareUnitOfWork -> ((Query)((Query)handleAwareUnitOfWork.handle().createQuery(MessageFormatter.bind((String)"SELECT * FROM {:tableName} WHERE \n id = :id AND\n is_dead_letter_message = :isDeadLetterMessage", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.sharedQueueTableName)})).bind("id", FailFast.requireNonNull((Object)queueEntryId, (String)"No queueEntryId provided"))).bind("isDeadLetterMessage", isDeadLetterMessage)).map((RowMapper)this.queuedMessageMapper).findOne());
    }

    private Object deserializedMessagePayload(QueueName queueName, String messagePayload, String messagePayloadType) {
        FailFast.requireNonNull((Object)queueName, (String)"No queueName provided");
        FailFast.requireNonNull((Object)messagePayload, (String)"No messagePayload provided");
        FailFast.requireNonNull((Object)messagePayloadType, (String)"No messagePayloadType provided");
        try {
            return this.messagePayloadObjectMapper.readValue(messagePayload, Classes.forName((String)messagePayloadType));
        }
        catch (JsonProcessingException e) {
            throw new DurableQueueException(MessageFormatter.msg((String)"Failed to deserialize message payload of type {}", (Object[])new Object[]{messagePayloadType}), (Throwable)e, queueName);
        }
    }

    private static ObjectMapper createObjectMapper() {
        JsonMapper objectMapper = (JsonMapper)((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)((JsonMapper.Builder)JsonMapper.builder().disable(new MapperFeature[]{MapperFeature.AUTO_DETECT_GETTERS})).disable(new MapperFeature[]{MapperFeature.AUTO_DETECT_IS_GETTERS})).disable(new MapperFeature[]{MapperFeature.AUTO_DETECT_SETTERS})).disable(new MapperFeature[]{MapperFeature.DEFAULT_VIEW_INCLUSION})).disable(new SerializationFeature[]{SerializationFeature.WRITE_DATES_AS_TIMESTAMPS})).disable(new DeserializationFeature[]{DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES})).disable(new SerializationFeature[]{SerializationFeature.FAIL_ON_EMPTY_BEANS})).enable(new MapperFeature[]{MapperFeature.AUTO_DETECT_CREATORS})).enable(new MapperFeature[]{MapperFeature.AUTO_DETECT_FIELDS})).enable(new MapperFeature[]{MapperFeature.PROPAGATE_TRANSIENT_MARKER})).addModule((Module)new Jdk8Module())).addModule((Module)new JavaTimeModule())).addModule((Module)new EssentialTypesJacksonModule())).addModule((Module)new EssentialsImmutableJacksonModule())).build();
        objectMapper.setVisibility(objectMapper.getSerializationConfig().getDefaultVisibilityChecker().withGetterVisibility(JsonAutoDetect.Visibility.NONE).withSetterVisibility(JsonAutoDetect.Visibility.NONE).withFieldVisibility(JsonAutoDetect.Visibility.ANY).withCreatorVisibility(JsonAutoDetect.Visibility.ANY));
        return objectMapper;
    }

    private class QueuedMessageRowMapper
    implements RowMapper<QueuedMessage> {
        public QueuedMessage map(ResultSet rs, StatementContext ctx) throws SQLException {
            QueueName queueName = QueueName.of((CharSequence)rs.getString("queue_name"));
            return new DefaultQueuedMessage(QueueEntryId.of((CharSequence)rs.getString("id")), queueName, PostgresqlDurableQueues.this.deserializedMessagePayload(queueName, rs.getString("message_payload"), rs.getString("message_payload_type")), rs.getObject("added_ts", OffsetDateTime.class), rs.getObject("next_delivery_ts", OffsetDateTime.class), rs.getString("last_delivery_error"), rs.getInt("total_attempts"), rs.getInt("redelivery_attempts"), rs.getBoolean("is_dead_letter_message"));
        }
    }

    protected static enum IncludeMessages {
        ALL,
        DEAD_LETTER_MESSAGES,
        QUEUED_MESSAGES;

    }
}

