/*
 * Decompiled with CFR 0.152.
 */
package com.github.ddth.queue.impl;

import com.github.ddth.commons.utils.IdGenerator;
import com.github.ddth.queue.IQueueMessage;
import com.github.ddth.queue.UniversalQueueMessage;
import com.github.ddth.queue.impl.JdbcQueue;
import com.github.ddth.queue.utils.QueueException;
import java.sql.Connection;
import java.sql.SQLException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.dao.PessimisticLockingFailureException;
import org.springframework.jdbc.core.JdbcTemplate;

public class LessLockingUniversalPgSQLQueue
extends JdbcQueue {
    private Logger LOGGER = LoggerFactory.getLogger(LessLockingUniversalPgSQLQueue.class);
    private IdGenerator IDGEN = IdGenerator.getInstance((long)IdGenerator.getMacAddr());
    public static final String COL_QUEUE_ID = "queue_id";
    public static final String COL_EPHEMERAL_ID = "ephemeral_id";
    public static final String COL_ORG_TIMESTAMP = "msg_org_timestamp";
    public static final String COL_TIMESTAMP = "msg_timestamp";
    public static final String COL_NUM_REQUEUES = "msg_num_requeues";
    public static final String COL_CONTENT = "msg_content";
    private boolean fifo = true;
    private String SQL_GET_ORPHAN_MSGS;
    private String SQL_PUT_NEW_TO_QUEUE;
    private String SQL_REPUT_TO_QUEUE;
    private String SQL_REMOVE_FROM_EPHEMERAL;
    private String SQL_REQUEUE;
    private String SQL_REQUEUE_SILENT;
    private String SQL_UPDATE_EPHEMERAL_ID_TAKE;
    private String SQL_CLEAR_EPHEMERAL_ID;
    private String SQL_READ_BY_EPHEMERAL_ID;

    public LessLockingUniversalPgSQLQueue setFifo(boolean fifo) {
        this.fifo = fifo;
        return this;
    }

    public LessLockingUniversalPgSQLQueue markFifo(boolean fifo) {
        this.fifo = fifo;
        return this;
    }

    public boolean isFifo() {
        return this.fifo;
    }

    public boolean getFifo() {
        return this.fifo;
    }

    @Override
    public String getTableNameEphemeral() {
        return this.getTableName();
    }

    @Override
    public LessLockingUniversalPgSQLQueue init() {
        super.init();
        this.SQL_REQUEUE = "UPDATE {0} SET {1}=0, {2}={2}+1, {3}=? WHERE {4}=?";
        this.SQL_REQUEUE = MessageFormat.format(this.SQL_REQUEUE, this.getTableName(), COL_EPHEMERAL_ID, COL_NUM_REQUEUES, COL_TIMESTAMP, COL_QUEUE_ID);
        this.SQL_REQUEUE_SILENT = "UPDATE {0} SET {1}=0 WHERE {2}=?";
        this.SQL_REQUEUE_SILENT = MessageFormat.format(this.SQL_REQUEUE_SILENT, this.getTableName(), COL_EPHEMERAL_ID, COL_QUEUE_ID);
        this.SQL_UPDATE_EPHEMERAL_ID_TAKE = "UPDATE {0} M SET {1}=? FROM (SELECT {2} FROM {0} WHERE {1}=0" + (this.fifo ? " ORDER BY {2} DESC" : "") + " LIMIT 1 FOR UPDATE) S WHERE M.{2}=S.{2}";
        this.SQL_UPDATE_EPHEMERAL_ID_TAKE = MessageFormat.format(this.SQL_UPDATE_EPHEMERAL_ID_TAKE, this.getTableName(), COL_EPHEMERAL_ID, COL_QUEUE_ID);
        this.SQL_CLEAR_EPHEMERAL_ID = "UPDATE {0} SET {1}=0 WHERE {2}=?";
        this.SQL_CLEAR_EPHEMERAL_ID = MessageFormat.format(this.SQL_CLEAR_EPHEMERAL_ID, this.getTableName(), COL_EPHEMERAL_ID, COL_QUEUE_ID);
        this.SQL_READ_BY_EPHEMERAL_ID = "SELECT {1}, {2}, {3}, {4}, {5} FROM {0} WHERE {6}=?";
        this.SQL_READ_BY_EPHEMERAL_ID = MessageFormat.format(this.SQL_READ_BY_EPHEMERAL_ID, this.getTableName(), "queue_id AS queue_id", "msg_org_timestamp AS org_timestamp", "msg_timestamp AS timestamp", "msg_num_requeues AS num_requeues", "msg_content AS content", COL_EPHEMERAL_ID);
        this.SQL_GET_ORPHAN_MSGS = "SELECT {1}, {2}, {3}, {4}, {5} FROM {0} WHERE ephemeral_id!=0 AND msg_timestamp<?";
        this.SQL_GET_ORPHAN_MSGS = MessageFormat.format(this.SQL_GET_ORPHAN_MSGS, this.getTableNameEphemeral(), "queue_id AS queue_id", "msg_org_timestamp AS org_timestamp", "msg_timestamp AS timestamp", "msg_num_requeues AS num_requeues", "msg_content AS content");
        this.SQL_PUT_NEW_TO_QUEUE = "INSERT INTO {0} ({1}, {2}, {3}, {4}) VALUES (?, ?, ?, ?)";
        this.SQL_PUT_NEW_TO_QUEUE = MessageFormat.format(this.SQL_PUT_NEW_TO_QUEUE, this.getTableName(), COL_ORG_TIMESTAMP, COL_TIMESTAMP, COL_NUM_REQUEUES, COL_CONTENT);
        this.SQL_REPUT_TO_QUEUE = "INSERT INTO {0} ({1}, {2}, {3}, {4}, {5}) VALUES (?, ?, ?, ?, ?)";
        this.SQL_REPUT_TO_QUEUE = MessageFormat.format(this.SQL_REPUT_TO_QUEUE, this.getTableName(), COL_QUEUE_ID, COL_ORG_TIMESTAMP, COL_TIMESTAMP, COL_NUM_REQUEUES, COL_CONTENT);
        this.SQL_REMOVE_FROM_EPHEMERAL = "DELETE FROM {0} WHERE queue_id=?";
        this.SQL_REMOVE_FROM_EPHEMERAL = MessageFormat.format(this.SQL_REMOVE_FROM_EPHEMERAL, this.getTableNameEphemeral());
        return this;
    }

    @Override
    protected UniversalQueueMessage readFromQueueStorage(JdbcTemplate jdbcTemplate) {
        return null;
    }

    @Override
    protected UniversalQueueMessage readFromEphemeralStorage(JdbcTemplate jdbcTemplate, IQueueMessage msg) {
        return null;
    }

    @Override
    protected Collection<IQueueMessage> getOrphanFromEphemeralStorage(JdbcTemplate jdbcTemplate, long thresholdTimestampMs) {
        Date threshold = new Date(thresholdTimestampMs);
        List dbRows = jdbcTemplate.queryForList(this.SQL_GET_ORPHAN_MSGS, new Object[]{threshold});
        if (dbRows != null && dbRows.size() > 0) {
            ArrayList<IQueueMessage> result = new ArrayList<IQueueMessage>();
            for (Map dbRow : dbRows) {
                UniversalQueueMessage msg = new UniversalQueueMessage();
                msg.fromMap(dbRow);
                result.add(msg);
            }
            return result;
        }
        return null;
    }

    @Override
    protected boolean putToQueueStorage(JdbcTemplate jdbcTemplate, IQueueMessage _msg) {
        if (!(_msg instanceof UniversalQueueMessage)) {
            throw new IllegalArgumentException("This method requires an argument of type [" + UniversalQueueMessage.class.getName() + "]!");
        }
        UniversalQueueMessage msg = (UniversalQueueMessage)_msg;
        Long qid = msg.qId();
        if (qid == null || qid == 0L) {
            int numRows = jdbcTemplate.update(this.SQL_PUT_NEW_TO_QUEUE, new Object[]{msg.qOriginalTimestamp(), msg.qTimestamp(), msg.qNumRequeues(), msg.content()});
            return numRows > 0;
        }
        int numRows = jdbcTemplate.update(this.SQL_REPUT_TO_QUEUE, new Object[]{qid, msg.qOriginalTimestamp(), msg.qTimestamp(), msg.qNumRequeues(), msg.content()});
        return numRows > 0;
    }

    @Override
    protected boolean putToEphemeralStorage(JdbcTemplate jdbcTemplate, IQueueMessage _msg) {
        return true;
    }

    @Override
    protected boolean removeFromQueueStorage(JdbcTemplate jdbcTemplate, IQueueMessage msg) {
        return true;
    }

    @Override
    protected boolean removeFromEphemeralStorage(JdbcTemplate jdbcTemplate, IQueueMessage _msg) {
        if (!(_msg instanceof UniversalQueueMessage)) {
            throw new IllegalArgumentException("This method requires an argument of type [" + UniversalQueueMessage.class.getName() + "]!");
        }
        UniversalQueueMessage msg = (UniversalQueueMessage)_msg;
        int numRows = jdbcTemplate.update(this.SQL_REMOVE_FROM_EPHEMERAL, new Object[]{msg.qId()});
        return numRows > 0;
    }

    @Override
    protected boolean _queueWithRetries(Connection conn, IQueueMessage _msg, int numRetries, int maxRetries) throws SQLException {
        if (!(_msg instanceof UniversalQueueMessage)) {
            throw new IllegalArgumentException("This method requires an argument of type [" + UniversalQueueMessage.class.getName() + "]!");
        }
        UniversalQueueMessage msg = (UniversalQueueMessage)_msg;
        try {
            JdbcTemplate jdbcTemplate = this.jdbcTemplate(conn);
            Date now = new Date();
            msg.qNumRequeues(0).qOriginalTimestamp(now).qTimestamp(now);
            boolean result = this.putToQueueStorage(jdbcTemplate, msg);
            return result;
        }
        catch (DuplicateKeyException dke) {
            this.LOGGER.warn(dke.getMessage(), (Throwable)dke);
            return true;
        }
        catch (PessimisticLockingFailureException ex) {
            if (numRetries > maxRetries) {
                throw new QueueException(ex);
            }
            return this._queueWithRetries(conn, msg, numRetries + 1, maxRetries);
        }
        catch (Exception e) {
            throw new QueueException(e);
        }
    }

    @Override
    protected boolean _requeueWithRetries(Connection conn, IQueueMessage _msg, int numRetries, int maxRetries) throws SQLException {
        if (!(_msg instanceof UniversalQueueMessage)) {
            throw new IllegalArgumentException("This method requires an argument of type [" + UniversalQueueMessage.class.getName() + "]!");
        }
        UniversalQueueMessage msg = (UniversalQueueMessage)_msg;
        try {
            JdbcTemplate jdbcTemplate = this.jdbcTemplate(conn);
            int numRows = jdbcTemplate.update(this.SQL_REQUEUE, new Object[]{new Date(), msg.qId()});
            return numRows > 0;
        }
        catch (DuplicateKeyException dke) {
            this.LOGGER.warn(dke.getMessage(), (Throwable)dke);
            return true;
        }
        catch (PessimisticLockingFailureException ex) {
            if (numRetries > maxRetries) {
                throw new QueueException(ex);
            }
            return this._requeueSilentWithRetries(conn, msg, numRetries + 1, maxRetries);
        }
        catch (Exception e) {
            throw new QueueException(e);
        }
    }

    @Override
    protected boolean _requeueSilentWithRetries(Connection conn, IQueueMessage _msg, int numRetries, int maxRetries) throws SQLException {
        if (!(_msg instanceof UniversalQueueMessage)) {
            throw new IllegalArgumentException("This method requires an argument of type [" + UniversalQueueMessage.class.getName() + "]!");
        }
        UniversalQueueMessage msg = (UniversalQueueMessage)_msg;
        try {
            JdbcTemplate jdbcTemplate = this.jdbcTemplate(conn);
            int numRows = jdbcTemplate.update(this.SQL_REQUEUE_SILENT, new Object[]{msg.qId()});
            return numRows > 0;
        }
        catch (DuplicateKeyException dke) {
            this.LOGGER.warn(dke.getMessage(), (Throwable)dke);
            return true;
        }
        catch (PessimisticLockingFailureException ex) {
            if (numRetries > maxRetries) {
                throw new QueueException(ex);
            }
            return this._requeueSilentWithRetries(conn, msg, numRetries + 1, maxRetries);
        }
        catch (Exception e) {
            throw new QueueException(e);
        }
    }

    @Override
    protected void _finishWithRetries(Connection conn, IQueueMessage _msg, int numRetries, int maxRetries) throws SQLException {
        if (!(_msg instanceof UniversalQueueMessage)) {
            throw new IllegalArgumentException("This method requires an argument of type [" + UniversalQueueMessage.class.getName() + "]!");
        }
        UniversalQueueMessage msg = (UniversalQueueMessage)_msg;
        try {
            JdbcTemplate jdbcTemplate = this.jdbcTemplate(conn);
            this.removeFromEphemeralStorage(jdbcTemplate, msg);
        }
        catch (PessimisticLockingFailureException ex) {
            if (numRetries > maxRetries) {
                throw new QueueException(ex);
            }
            this._finishWithRetries(conn, msg, numRetries + 1, maxRetries);
        }
        catch (Exception e) {
            throw new QueueException(e);
        }
    }

    @Override
    protected UniversalQueueMessage _takeWithRetries(Connection conn, int numRetries, int maxRetries) throws SQLException {
        try {
            List dbRows;
            JdbcTemplate jdbcTemplate = this.jdbcTemplate(conn);
            UniversalQueueMessage msg = null;
            long ephemeralId = this.IDGEN.generateId64();
            int numRows = jdbcTemplate.update(this.SQL_UPDATE_EPHEMERAL_ID_TAKE, new Object[]{ephemeralId});
            if (numRows > 0 && (dbRows = jdbcTemplate.queryForList(this.SQL_READ_BY_EPHEMERAL_ID, new Object[]{ephemeralId})) != null && dbRows.size() > 0) {
                Map dbRow = (Map)dbRows.get(0);
                msg = new UniversalQueueMessage();
                msg.fromMap(dbRow);
            }
            return msg;
        }
        catch (PessimisticLockingFailureException ex) {
            if (numRetries > maxRetries) {
                throw new QueueException(ex);
            }
            return this._takeWithRetries(conn, numRetries + 1, maxRetries);
        }
        catch (Exception e) {
            throw new QueueException(e);
        }
    }

    @Override
    protected boolean _moveFromEphemeralToQueueStorageWithRetries(IQueueMessage _msg, Connection conn, int numRetries, int maxRetries) throws SQLException {
        if (!(_msg instanceof UniversalQueueMessage)) {
            throw new IllegalArgumentException("This method requires an argument of type [" + UniversalQueueMessage.class.getName() + "]!");
        }
        UniversalQueueMessage msg = (UniversalQueueMessage)_msg;
        try {
            JdbcTemplate jdbcTemplate = this.jdbcTemplate(conn);
            int numRows = jdbcTemplate.update(this.SQL_CLEAR_EPHEMERAL_ID, new Object[]{msg.qId()});
            return numRows > 0;
        }
        catch (PessimisticLockingFailureException ex) {
            if (numRetries > maxRetries) {
                throw new QueueException(ex);
            }
            return this._moveFromEphemeralToQueueStorageWithRetries(msg, conn, numRetries + 1, maxRetries);
        }
        catch (Exception e) {
            throw new QueueException(e);
        }
    }

    @Override
    public UniversalQueueMessage take() {
        return (UniversalQueueMessage)super.take();
    }
}

