/*
 * Decompiled with CFR 0.152.
 */
package com.github.sonus21.rqueue.listener;

import com.github.sonus21.rqueue.config.RqueueConfig;
import com.github.sonus21.rqueue.config.RqueueWebConfig;
import com.github.sonus21.rqueue.core.RqueueMessage;
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
import com.github.sonus21.rqueue.exception.UnknownSwitchCase;
import com.github.sonus21.rqueue.listener.MessageProcessorHandler;
import com.github.sonus21.rqueue.listener.QueueDetail;
import com.github.sonus21.rqueue.models.db.MessageMetadata;
import com.github.sonus21.rqueue.models.db.MessageStatus;
import com.github.sonus21.rqueue.models.db.QueueConfig;
import com.github.sonus21.rqueue.models.enums.ExecutionStatus;
import com.github.sonus21.rqueue.models.event.RqueueExecutionEvent;
import com.github.sonus21.rqueue.utils.BaseLogger;
import com.github.sonus21.rqueue.utils.RedisUtils;
import com.github.sonus21.rqueue.utils.backoff.TaskExecutionBackOff;
import com.github.sonus21.rqueue.web.dao.RqueueSystemConfigDao;
import com.github.sonus21.rqueue.web.service.RqueueMessageMetadataService;
import java.time.Duration;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;

class PostProcessingHandler
extends BaseLogger {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(PostProcessingHandler.class);
    private final ApplicationEventPublisher applicationEventPublisher;
    private final RqueueWebConfig rqueueWebConfig;
    private final RqueueMessageMetadataService rqueueMessageMetadataService;
    private final RqueueMessageTemplate rqueueMessageTemplate;
    private final TaskExecutionBackOff taskExecutionBackoff;
    private final MessageProcessorHandler messageProcessorHandler;
    private final RqueueSystemConfigDao rqueueSystemConfigDao;
    private final RqueueConfig rqueueConfig;

    PostProcessingHandler(RqueueConfig rqueueConfig, RqueueWebConfig rqueueWebConfig, ApplicationEventPublisher applicationEventPublisher, RqueueMessageMetadataService rqueueMessageMetadataService, RqueueMessageTemplate rqueueMessageTemplate, TaskExecutionBackOff taskExecutionBackoff, MessageProcessorHandler messageProcessorHandler, RqueueSystemConfigDao rqueueSystemConfigDao) {
        super(log, null);
        this.applicationEventPublisher = applicationEventPublisher;
        this.rqueueWebConfig = rqueueWebConfig;
        this.rqueueMessageMetadataService = rqueueMessageMetadataService;
        this.rqueueMessageTemplate = rqueueMessageTemplate;
        this.taskExecutionBackoff = taskExecutionBackoff;
        this.messageProcessorHandler = messageProcessorHandler;
        this.rqueueSystemConfigDao = rqueueSystemConfigDao;
        this.rqueueConfig = rqueueConfig;
    }

    void handle(QueueDetail queueDetail, RqueueMessage rqueueMessage, Object userMessage, MessageMetadata messageMetadata, ExecutionStatus status, int failureCount, long jobExecutionStartTime) {
        try {
            switch (status) {
                case QUEUE_INACTIVE: {
                    return;
                }
                case DELETED: {
                    this.handleManualDeletion(queueDetail, rqueueMessage, userMessage, messageMetadata, failureCount, jobExecutionStartTime);
                    break;
                }
                case IGNORED: {
                    this.handleIgnoredMessage(queueDetail, rqueueMessage, userMessage, messageMetadata, failureCount, jobExecutionStartTime);
                    break;
                }
                case OLD_MESSAGE: {
                    this.handleOldMessage(queueDetail, rqueueMessage);
                    break;
                }
                case SUCCESSFUL: {
                    this.handleSuccessFullExecution(queueDetail, rqueueMessage, userMessage, messageMetadata, failureCount, jobExecutionStartTime);
                    break;
                }
                case FAILED: {
                    this.handleFailure(queueDetail, rqueueMessage, userMessage, messageMetadata, failureCount, jobExecutionStartTime);
                    break;
                }
                default: {
                    throw new UnknownSwitchCase(String.valueOf((Object)status));
                }
            }
        }
        catch (Exception e) {
            this.log(Level.ERROR, "Error occurred in post processing, RqueueMessage: {}, Status: {}", e, new Object[]{rqueueMessage, status});
        }
    }

    private void handleOldMessage(QueueDetail queueDetail, RqueueMessage rqueueMessage) {
        if (this.isDebugEnabled()) {
            this.log(Level.DEBUG, "Message {} ignored due to new message, Queue: {}", null, rqueueMessage, queueDetail.getName());
        }
        this.rqueueMessageTemplate.removeElementFromZset(queueDetail.getProcessingQueueName(), rqueueMessage);
    }

    private void publishEvent(QueueDetail queueDetail, RqueueMessage rqueueMessage, MessageMetadata messageMetadata, MessageStatus status, long jobExecutionStartTime) {
        this.updateMetadata(messageMetadata, rqueueMessage, jobExecutionStartTime, status);
        if (this.rqueueWebConfig.isCollectListenerStats()) {
            RqueueExecutionEvent event = new RqueueExecutionEvent(queueDetail, rqueueMessage, status.getTaskStatus(), messageMetadata);
            this.applicationEventPublisher.publishEvent((ApplicationEvent)event);
        }
    }

    private void updateMetadata(MessageMetadata messageMetadata, RqueueMessage rqueueMessage, long jobExecutionStartTime, MessageStatus messageStatus) {
        messageMetadata.setStatus(messageStatus);
        messageMetadata.setRqueueMessage(rqueueMessage);
        messageMetadata.addExecutionTime(jobExecutionStartTime);
        if (messageStatus.isTerminalState()) {
            this.rqueueMessageMetadataService.save(messageMetadata, Duration.ofSeconds(this.rqueueConfig.getMessageDurabilityInTerminalStateInSecond()));
        } else {
            this.rqueueMessageMetadataService.save(messageMetadata, Duration.ofMinutes(this.rqueueConfig.getMessageDurabilityInMinute()));
        }
    }

    private void deleteMessage(QueueDetail queueDetail, RqueueMessage rqueueMessage, Object userMessage, MessageMetadata messageMetadata, MessageStatus status, int failureCount, long jobExecutionStartTime) {
        this.rqueueMessageTemplate.removeElementFromZset(queueDetail.getProcessingQueueName(), rqueueMessage);
        rqueueMessage.setFailureCount(failureCount);
        this.messageProcessorHandler.handleMessage(rqueueMessage, userMessage, status);
        this.publishEvent(queueDetail, rqueueMessage, messageMetadata, status, jobExecutionStartTime);
    }

    private void moveMessageToQueue(QueueDetail queueDetail, String queueName, RqueueMessage oldMessage, RqueueMessage newMessage) {
        RedisUtils.executePipeLine(this.rqueueMessageTemplate.getTemplate(), (connection, keySerializer, valueSerializer) -> {
            byte[] newMessageBytes = valueSerializer.serialize(newMessage);
            byte[] oldMessageBytes = valueSerializer.serialize(oldMessage);
            byte[] processingQueueNameBytes = keySerializer.serialize(queueDetail.getProcessingQueueName());
            byte[] queueNameBytes = keySerializer.serialize(queueName);
            connection.rPush(queueNameBytes, (byte[][])new byte[][]{newMessageBytes});
            connection.zRem(processingQueueNameBytes, (byte[][])new byte[][]{oldMessageBytes});
        });
    }

    private void moveMessageForReprocessingOrDlq(QueueDetail queueDetail, RqueueMessage oldMessage, RqueueMessage newMessage, Object userMessage) {
        this.messageProcessorHandler.handleMessage(newMessage, userMessage, MessageStatus.MOVED_TO_DLQ);
        if (queueDetail.isDeadLetterConsumerEnabled()) {
            String configKey = this.rqueueConfig.getQueueConfigKey(queueDetail.getDeadLetterQueueName());
            QueueConfig queueConfig = this.rqueueSystemConfigDao.getQConfig(configKey, true);
            if (queueConfig == null) {
                this.log(Level.ERROR, "Queue Config not found for queue {}", null, queueDetail.getDeadLetterQueue());
                this.moveMessageToQueue(queueDetail, queueDetail.getDeadLetterQueueName(), oldMessage, newMessage);
            } else {
                this.moveMessageToQueue(queueDetail, queueConfig.getQueueName(), oldMessage, newMessage);
            }
        } else {
            this.moveMessageToQueue(queueDetail, queueDetail.getDeadLetterQueueName(), oldMessage, newMessage);
        }
    }

    private void moveMessageToDlq(QueueDetail queueDetail, RqueueMessage rqueueMessage, Object userMessage, MessageMetadata messageMetadata, int failureCount, long jobExecutionStartTime) throws CloneNotSupportedException {
        if (this.isWarningEnabled()) {
            this.log(Level.WARN, "Message {} Moved to dead letter queue: {}", null, userMessage, queueDetail.getDeadLetterQueueName());
        }
        RqueueMessage newMessage = rqueueMessage.clone();
        newMessage.setFailureCount(failureCount);
        newMessage.updateReEnqueuedAt();
        this.moveMessageForReprocessingOrDlq(queueDetail, rqueueMessage, newMessage, userMessage);
        this.publishEvent(queueDetail, newMessage, messageMetadata, MessageStatus.MOVED_TO_DLQ, jobExecutionStartTime);
    }

    private void parkMessageForRetry(QueueDetail queueDetail, RqueueMessage rqueueMessage, Object userMessage, MessageMetadata messageMetadata, int failureCount, long jobExecutionStartTime, long delay) throws CloneNotSupportedException {
        if (this.isDebugEnabled()) {
            this.log(Level.DEBUG, "Message {} will be retried in {}Ms", null, userMessage, delay);
        }
        RqueueMessage newMessage = rqueueMessage.clone();
        newMessage.setFailureCount(failureCount);
        newMessage.updateReEnqueuedAt();
        this.rqueueMessageTemplate.moveMessage(queueDetail.getProcessingQueueName(), queueDetail.getDelayedQueueName(), rqueueMessage, newMessage, delay);
        this.updateMetadata(messageMetadata, newMessage, jobExecutionStartTime, MessageStatus.FAILED);
    }

    private void discardMessage(QueueDetail queueDetail, RqueueMessage rqueueMessage, Object userMessage, MessageMetadata messageMetadata, int failureCount, long jobExecutionStartTime) {
        if (this.isDebugEnabled()) {
            this.log(Level.DEBUG, "Message {} discarded due to retry limit exhaust", null, userMessage);
        }
        this.deleteMessage(queueDetail, rqueueMessage, userMessage, messageMetadata, MessageStatus.DISCARDED, failureCount, jobExecutionStartTime);
    }

    private void handleManualDeletion(QueueDetail queueDetail, RqueueMessage rqueueMessage, Object userMessage, MessageMetadata messageMetadata, int failureCount, long jobExecutionStartTime) {
        if (this.isDebugEnabled()) {
            this.log(Level.DEBUG, "Message Deleted {} successfully", null, rqueueMessage);
        }
        this.deleteMessage(queueDetail, rqueueMessage, userMessage, messageMetadata, MessageStatus.DELETED, failureCount, jobExecutionStartTime);
    }

    private void handleSuccessFullExecution(QueueDetail queueDetail, RqueueMessage rqueueMessage, Object userMessage, MessageMetadata messageMetadata, int failureCount, long jobExecutionStartTime) {
        if (this.isDebugEnabled()) {
            this.log(Level.DEBUG, "Message consumed {} successfully", null, rqueueMessage);
        }
        this.deleteMessage(queueDetail, rqueueMessage, userMessage, messageMetadata, MessageStatus.SUCCESSFUL, failureCount, jobExecutionStartTime);
    }

    private void handleRetryExceededMessage(QueueDetail queueDetail, RqueueMessage rqueueMessage, Object userMessage, MessageMetadata messageMetadata, int failureCount, long jobExecutionStartTime) throws CloneNotSupportedException {
        if (queueDetail.isDlqSet()) {
            this.moveMessageToDlq(queueDetail, rqueueMessage, userMessage, messageMetadata, failureCount, jobExecutionStartTime);
        } else {
            this.discardMessage(queueDetail, rqueueMessage, userMessage, messageMetadata, failureCount, jobExecutionStartTime);
        }
    }

    private int getMaxRetryCount(RqueueMessage rqueueMessage, QueueDetail queueDetail) {
        return rqueueMessage.getRetryCount() == null ? queueDetail.getNumRetry() : rqueueMessage.getRetryCount().intValue();
    }

    private void handleFailure(QueueDetail queueDetail, RqueueMessage rqueueMessage, Object userMessage, MessageMetadata messageMetadata, int failureCount, long jobExecutionStartTime) throws CloneNotSupportedException {
        int maxRetryCount = this.getMaxRetryCount(rqueueMessage, queueDetail);
        if (failureCount < maxRetryCount) {
            long delay = this.taskExecutionBackoff.nextBackOff(userMessage, rqueueMessage, failureCount);
            if (delay == -1L) {
                this.handleRetryExceededMessage(queueDetail, rqueueMessage, userMessage, messageMetadata, failureCount, jobExecutionStartTime);
            } else {
                this.parkMessageForRetry(queueDetail, rqueueMessage, userMessage, messageMetadata, failureCount, jobExecutionStartTime, delay);
            }
        } else {
            this.handleRetryExceededMessage(queueDetail, rqueueMessage, userMessage, messageMetadata, failureCount, jobExecutionStartTime);
        }
    }

    private void handleIgnoredMessage(QueueDetail queueDetail, RqueueMessage rqueueMessage, Object userMessage, MessageMetadata messageMetadata, int failureCount, long jobExecutionStartTime) {
        if (this.isDebugEnabled()) {
            this.log(Level.DEBUG, "Message {} ignored, Queue: {}", null, rqueueMessage, queueDetail.getName());
        }
        this.deleteMessage(queueDetail, rqueueMessage, userMessage, messageMetadata, MessageStatus.IGNORED, failureCount, jobExecutionStartTime);
    }
}

