/*
 * Decompiled with CFR 0.152.
 */
package dk.trustworks.essentials.components.queue.postgresql;

import dk.trustworks.essentials.components.foundation.json.JSONSerializer;
import dk.trustworks.essentials.components.foundation.json.JacksonJSONSerializer;
import dk.trustworks.essentials.components.foundation.messaging.queue.DurableQueueException;
import dk.trustworks.essentials.components.foundation.messaging.queue.MessageMetaData;
import dk.trustworks.essentials.components.foundation.messaging.queue.QueueEntryId;
import dk.trustworks.essentials.components.foundation.messaging.queue.QueueName;
import dk.trustworks.essentials.components.foundation.messaging.queue.QueuedMessage;
import dk.trustworks.essentials.components.foundation.messaging.queue.stats.DefaultQueuedStatisticsMessage;
import dk.trustworks.essentials.components.foundation.messaging.queue.stats.DurableQueuesStatistics;
import dk.trustworks.essentials.components.foundation.messaging.queue.stats.QueueStatistics;
import dk.trustworks.essentials.components.foundation.messaging.queue.stats.QueuedStatisticsMessage;
import dk.trustworks.essentials.components.foundation.postgresql.PostgresqlUtil;
import dk.trustworks.essentials.components.foundation.transaction.jdbi.HandleAwareUnitOfWork;
import dk.trustworks.essentials.components.foundation.transaction.jdbi.HandleAwareUnitOfWorkFactory;
import dk.trustworks.essentials.components.foundation.ttl.TTLJob;
import dk.trustworks.essentials.components.queue.postgresql.DurableQueuesSerialization;
import dk.trustworks.essentials.components.queue.postgresql.jdbi.QueueEntryIdArgumentFactory;
import dk.trustworks.essentials.components.queue.postgresql.jdbi.QueueEntryIdColumnMapper;
import dk.trustworks.essentials.components.queue.postgresql.jdbi.QueueNameArgumentFactory;
import dk.trustworks.essentials.components.queue.postgresql.jdbi.QueueNameColumnMapper;
import dk.trustworks.essentials.shared.FailFast;
import dk.trustworks.essentials.shared.MessageFormatter;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.OffsetDateTime;
import java.util.Locale;
import java.util.Optional;
import org.jdbi.v3.core.argument.ArgumentFactory;
import org.jdbi.v3.core.mapper.ColumnMapper;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.Query;
import org.jdbi.v3.core.statement.StatementContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@TTLJob(name="durable_queues_statistics_ttl", enabledProperty="essentials.durable-queues.enable-queue-statistics-ttl", tableNameProperty="essentials.durable-queues.shared-queue-statistics-table-name", timestampColumn="deletion_ts", cronExpression="0 0 * * *", ttlDurationProperty="essentials.durable-queues.queue-statistics-ttl-duration")
public class PostgresqlDurableQueuesStatistics
implements DurableQueuesStatistics {
    private static final Logger log = LoggerFactory.getLogger(PostgresqlDurableQueuesStatistics.class);
    public static final String DEFAULT_DURABLE_QUEUES_TABLE_NAME = "durable_queues_statistics";
    private final HandleAwareUnitOfWorkFactory<? extends HandleAwareUnitOfWork> unitOfWorkFactory;
    private final String statsQueueTableName;
    private final String durableQueueTableName;
    private final JSONSerializer jsonSerializer;
    private final QueueStatisticsMessageRowMapper queueMessageMapper;
    private final QueueStatisticsRowMapper queueStatisticsRowMapperMapper;

    public PostgresqlDurableQueuesStatistics(HandleAwareUnitOfWorkFactory<? extends HandleAwareUnitOfWork> unitOfWorkFactory, String durableQueueTableName) {
        this(unitOfWorkFactory, (JSONSerializer)new JacksonJSONSerializer(DurableQueuesSerialization.createDefaultObjectMapper()), durableQueueTableName, DEFAULT_DURABLE_QUEUES_TABLE_NAME);
    }

    public PostgresqlDurableQueuesStatistics(HandleAwareUnitOfWorkFactory<? extends HandleAwareUnitOfWork> unitOfWorkFactory, JSONSerializer jsonSerializer, String durableQueueTableName, String statsQueueTableName) {
        this.unitOfWorkFactory = (HandleAwareUnitOfWorkFactory)FailFast.requireNonNull(unitOfWorkFactory, (String)"No unitOfWorkFactory instance provided");
        this.jsonSerializer = (JSONSerializer)FailFast.requireNonNull((Object)jsonSerializer, (String)"No jsonSerializer");
        this.durableQueueTableName = ((String)FailFast.requireNonNull((Object)durableQueueTableName, (String)"No durableQueueTableName provided")).toLowerCase(Locale.ROOT);
        PostgresqlUtil.checkIsValidTableOrColumnName((String)durableQueueTableName);
        this.statsQueueTableName = ((String)FailFast.requireNonNull((Object)statsQueueTableName, (String)"No statsQueueTableName provided")).toLowerCase(Locale.ROOT);
        PostgresqlUtil.checkIsValidTableOrColumnName((String)statsQueueTableName);
        this.queueMessageMapper = new QueueStatisticsMessageRowMapper();
        this.queueStatisticsRowMapperMapper = new QueueStatisticsRowMapper();
        this.initializeQueueTables();
    }

    private void initializeQueueTables() {
        PostgresqlUtil.checkIsValidTableOrColumnName((String)this.durableQueueTableName);
        PostgresqlUtil.checkIsValidTableOrColumnName((String)this.statsQueueTableName);
        this.unitOfWorkFactory.usingUnitOfWork(uow -> {
            uow.handle().getJdbi().registerArgument((ArgumentFactory)new QueueNameArgumentFactory());
            uow.handle().getJdbi().registerColumnMapper((ColumnMapper)new QueueNameColumnMapper());
            uow.handle().getJdbi().registerArgument((ArgumentFactory)new QueueEntryIdArgumentFactory());
            uow.handle().getJdbi().registerColumnMapper((ColumnMapper)new QueueEntryIdColumnMapper());
            uow.handle().execute(MessageFormatter.bind((String)"    CREATE TABLE IF NOT EXISTS {:tableName} (\n        id                     TEXT PRIMARY KEY,\n        queue_name             TEXT NOT NULL,\n        added_ts               TIMESTAMPTZ NOT NULL,\n        delivery_ts            TIMESTAMPTZ NOT NULL,\n        deletion_ts            TIMESTAMPTZ NOT NULL DEFAULT NOW(),\n        total_attempts         INTEGER NOT NULL,\n        redelivery_attempts    INTEGER NOT NULL,\n        delivery_mode          TEXT NOT NULL,\n        delivery_latency       INTERVAL NOT NULL,\n        delivery_error         BOOLEAN NOT NULL,\n        meta_data              JSONB DEFAULT NULL\n        );\n", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.statsQueueTableName)}), new Object[0]);
            log.info("Ensured Durable Queues table '{}' exists", (Object)this.statsQueueTableName);
            uow.handle().execute(MessageFormatter.bind((String)"CREATE INDEX IF NOT EXISTS idx_{:tableName}_queue_name ON {:tableName} (queue_name)", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.statsQueueTableName)}), new Object[0]);
            uow.handle().execute(MessageFormatter.bind((String)"CREATE INDEX IF NOT EXISTS idx_{:tableName}_stats ON {:tableName} (queue_name, added_ts)", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.statsQueueTableName)}), new Object[0]);
            uow.handle().execute(MessageFormatter.bind((String)"    CREATE OR REPLACE FUNCTION log_message_delivery_stats() RETURNS TRIGGER AS $$\n        BEGIN\n          BEGIN\n            INSERT INTO {:tableName} (\n                id,\n                queue_name,\n                added_ts,\n                delivery_ts,\n                deletion_ts,\n                total_attempts,\n                redelivery_attempts,\n                delivery_mode,\n                delivery_latency,\n                delivery_error,\n                meta_data\n            )\n            VALUES (\n                OLD.id,\n                OLD.queue_name,\n                OLD.added_ts,\n                OLD.delivery_ts,\n                NOW(),\n                OLD.total_attempts,\n                OLD.redelivery_attempts,\n                OLD.delivery_mode,\n                NOW() - OLD.added_ts,\n                OLD.last_delivery_error IS NOT NULL,\n                OLD.meta_data\n            );\n          EXCEPTION WHEN OTHERS THEN\n            RAISE NOTICE 'Trigger insert into queue message stats failed: %', SQLERRM;\n          END;\n          RETURN OLD;\n        END;\n        $$ LANGUAGE plpgsql;\n\n        DROP TRIGGER IF EXISTS trg_log_message_delivery_stats ON {:durableQueueTableName};\n\n        CREATE TRIGGER trg_log_message_delivery_stats\n        AFTER DELETE ON {:durableQueueTableName}\n        FOR EACH ROW\n        EXECUTE FUNCTION log_message_delivery_stats();\n", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.statsQueueTableName), MessageFormatter.NamedArgumentBinding.arg((String)"durableQueueTableName", (Object)this.durableQueueTableName)}), new Object[0]);
            log.info("Ensured Durable Queues Stats trigger exists");
        });
    }

    public Optional<QueueStatistics> getQueueStatistics(QueueName queueName) {
        FailFast.requireNonNull((Object)queueName, (String)"You must specify a QueueName");
        String sql = MessageFormatter.bind((String)"  WITH earliest AS (\n        SELECT MIN(added_ts) AS first_ts\n        FROM {:tableName}\n        WHERE queue_name = :queueName\n      )\n      SELECT\n        d.queue_name,\n        e.first_ts,\n        COUNT(*) AS total_messages_delivered,\n        AVG(\n          EXTRACT(EPOCH FROM (\n            deletion_ts -\n            CASE\n              WHEN redelivery_attempts > 0 OR (redelivery_attempts = 0 AND delivery_error = true) THEN delivery_ts\n              ELSE added_ts\n            END\n          )) * 1000\n        ) AS avg_delivery_latency_ms,\n        MIN(deletion_ts) AS first_delivery,\n        MAX(deletion_ts) AS last_delivery\n      FROM {:tableName} d\n      CROSS JOIN earliest e\n      WHERE d.queue_name = :queueName\n      GROUP BY d.queue_name, e.first_ts;\n", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.statsQueueTableName)});
        return (Optional)this.unitOfWorkFactory.withUnitOfWork(uow -> {
            Query query = (Query)uow.handle().createQuery(sql).bind("queueName", (Object)queueName);
            return query.map((RowMapper)this.queueStatisticsRowMapperMapper).findOne();
        });
    }

    public Optional<QueuedStatisticsMessage> getQueueStatisticsMessage(QueueEntryId id) {
        FailFast.requireNonNull((Object)id, (String)"You must specify a QueueEntryId");
        String sql = MessageFormatter.bind((String)"SELECT *\n FROM {:tableName}\n WHERE id = :id\n", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)this.statsQueueTableName)});
        return (Optional)this.unitOfWorkFactory.withUnitOfWork(uow -> {
            Query query = (Query)uow.handle().createQuery(sql).bind("id", (Object)id);
            return query.map((RowMapper)this.queueMessageMapper).findOne();
        });
    }

    private MessageMetaData deserializeMessageMetadata(QueueName queueName, String metaData) {
        FailFast.requireNonNull((Object)queueName, (String)"No queueName provided");
        FailFast.requireNonNull((Object)metaData, (String)"No messagePayload provided");
        try {
            return (MessageMetaData)this.jsonSerializer.deserialize(metaData, MessageMetaData.class);
        }
        catch (Throwable e) {
            throw new DurableQueueException(MessageFormatter.msg((String)"Failed to deserialize message meta-data", (Object[])new Object[0]), e, queueName);
        }
    }

    private class QueueStatisticsMessageRowMapper
    implements RowMapper<QueuedStatisticsMessage> {
        public QueuedStatisticsMessage map(ResultSet rs, StatementContext ctx) throws SQLException {
            QueueName queueName = QueueName.of((CharSequence)rs.getString("queue_name"));
            MessageMetaData messageMetaData = null;
            String metaDataColumnValue = rs.getString("meta_data");
            messageMetaData = metaDataColumnValue != null ? PostgresqlDurableQueuesStatistics.this.deserializeMessageMetadata(queueName, metaDataColumnValue) : new MessageMetaData();
            QueuedMessage.DeliveryMode deliveryMode = QueuedMessage.DeliveryMode.valueOf((String)rs.getString("delivery_mode"));
            return new DefaultQueuedStatisticsMessage(QueueEntryId.of((CharSequence)rs.getString("id")), queueName, rs.getObject("added_ts", OffsetDateTime.class), rs.getObject("delivery_ts", OffsetDateTime.class), rs.getObject("deletion_ts", OffsetDateTime.class), deliveryMode, rs.getInt("total_attempts"), rs.getInt("redelivery_attempts"), rs.getInt("delivery_latency"), messageMetaData);
        }
    }

    private static class QueueStatisticsRowMapper
    implements RowMapper<QueueStatistics> {
        public QueueStatistics map(ResultSet rs, StatementContext ctx) throws SQLException {
            QueueName queueName = QueueName.of((CharSequence)rs.getString("queue_name"));
            return new QueueStatistics(queueName, rs.getObject("first_ts", OffsetDateTime.class), rs.getLong("total_messages_delivered"), rs.getInt("avg_delivery_latency_ms"), rs.getObject("first_delivery", OffsetDateTime.class), rs.getObject("last_delivery", OffsetDateTime.class));
        }
    }
}

