/*
 * Decompiled with CFR 0.152.
 */
package com.github.sonus21.rqueue.listener;

import com.github.sonus21.rqueue.config.RqueueWebConfig;
import com.github.sonus21.rqueue.core.RqueueMessage;
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
import com.github.sonus21.rqueue.dao.RqueueSystemConfigDao;
import com.github.sonus21.rqueue.exception.UnknownSwitchCase;
import com.github.sonus21.rqueue.listener.JobImpl;
import com.github.sonus21.rqueue.listener.MessageProcessorHandler;
import com.github.sonus21.rqueue.listener.QueueDetail;
import com.github.sonus21.rqueue.models.db.QueueConfig;
import com.github.sonus21.rqueue.models.enums.ExecutionStatus;
import com.github.sonus21.rqueue.models.enums.MessageStatus;
import com.github.sonus21.rqueue.models.event.RqueueExecutionEvent;
import com.github.sonus21.rqueue.utils.PrefixLogger;
import com.github.sonus21.rqueue.utils.RedisUtils;
import com.github.sonus21.rqueue.utils.backoff.TaskExecutionBackOff;
import java.io.Serializable;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;

class PostProcessingHandler
extends PrefixLogger {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(PostProcessingHandler.class);
    private final ApplicationEventPublisher applicationEventPublisher;
    private final RqueueWebConfig rqueueWebConfig;
    private final RqueueMessageTemplate rqueueMessageTemplate;
    private final TaskExecutionBackOff taskExecutionBackoff;
    private final MessageProcessorHandler messageProcessorHandler;
    private final RqueueSystemConfigDao rqueueSystemConfigDao;

    PostProcessingHandler(RqueueWebConfig rqueueWebConfig, ApplicationEventPublisher applicationEventPublisher, RqueueMessageTemplate rqueueMessageTemplate, TaskExecutionBackOff taskExecutionBackoff, MessageProcessorHandler messageProcessorHandler, RqueueSystemConfigDao rqueueSystemConfigDao) {
        super(log, null);
        this.applicationEventPublisher = applicationEventPublisher;
        this.rqueueWebConfig = rqueueWebConfig;
        this.rqueueMessageTemplate = rqueueMessageTemplate;
        this.taskExecutionBackoff = taskExecutionBackoff;
        this.messageProcessorHandler = messageProcessorHandler;
        this.rqueueSystemConfigDao = rqueueSystemConfigDao;
    }

    void handle(JobImpl job, ExecutionStatus status, int failureCount) {
        try {
            switch (status) {
                case QUEUE_INACTIVE: {
                    return;
                }
                case DELETED: {
                    this.handleManualDeletion(job, failureCount);
                    break;
                }
                case IGNORED: {
                    this.handleIgnoredMessage(job, failureCount);
                    break;
                }
                case OLD_MESSAGE: {
                    this.handleOldMessage(job, job.getRqueueMessage());
                    break;
                }
                case SUCCESSFUL: {
                    this.handleSuccessFullExecution(job, failureCount);
                    break;
                }
                case FAILED: {
                    this.handleFailure(job, failureCount);
                    break;
                }
                default: {
                    throw new UnknownSwitchCase(String.valueOf((Object)status));
                }
            }
        }
        catch (Exception e) {
            this.log(Level.ERROR, "Error occurred in post processing, RqueueMessage: {}, Status: {}", e, new Object[]{job.getRqueueMessage(), status});
        }
    }

    private void handleOldMessage(JobImpl job, RqueueMessage rqueueMessage) {
        this.log(Level.TRACE, "Message {} ignored due to old message, Queue: {}", null, rqueueMessage, job.getQueueDetail().getName());
        this.rqueueMessageTemplate.removeElementFromZset(job.getQueueDetail().getProcessingQueueName(), rqueueMessage);
    }

    private void publishEvent(JobImpl job, RqueueMessage rqueueMessage, MessageStatus messageStatus) {
        this.updateMetadata(job, rqueueMessage, messageStatus);
        if (this.rqueueWebConfig.isCollectListenerStats()) {
            RqueueExecutionEvent event = new RqueueExecutionEvent(job);
            this.applicationEventPublisher.publishEvent((ApplicationEvent)event);
        }
    }

    private void updateMetadata(JobImpl job, RqueueMessage rqueueMessage, MessageStatus messageStatus) {
        job.updateExecutionTime(rqueueMessage, messageStatus);
    }

    private void deleteMessage(JobImpl job, MessageStatus status, int failureCount) {
        RqueueMessage rqueueMessage = job.getRqueueMessage();
        this.rqueueMessageTemplate.removeElementFromZset(job.getQueueDetail().getProcessingQueueName(), rqueueMessage);
        rqueueMessage.setFailureCount(failureCount);
        this.messageProcessorHandler.handleMessage(rqueueMessage, job.getMessage(), status);
        this.publishEvent(job, job.getRqueueMessage(), status);
    }

    private void moveMessageToQueue(QueueDetail queueDetail, String queueName, RqueueMessage oldMessage, RqueueMessage newMessage, long delay) {
        RedisUtils.executePipeLine(this.rqueueMessageTemplate.getTemplate(), (connection, keySerializer, valueSerializer) -> {
            byte[] newMessageBytes = valueSerializer.serialize((Object)newMessage);
            byte[] oldMessageBytes = valueSerializer.serialize((Object)oldMessage);
            byte[] processingQueueNameBytes = keySerializer.serialize((Object)queueDetail.getProcessingQueueName());
            byte[] queueNameBytes = keySerializer.serialize((Object)queueName);
            assert (queueNameBytes != null);
            assert (newMessageBytes != null);
            if (delay > 0L) {
                connection.zAdd(queueNameBytes, (double)delay, newMessageBytes);
            } else {
                connection.lPush(queueNameBytes, (byte[][])new byte[][]{newMessageBytes});
            }
            assert (processingQueueNameBytes != null);
            connection.zRem(processingQueueNameBytes, (byte[][])new byte[][]{oldMessageBytes});
        });
    }

    private void moveMessageToDlq(JobImpl job, int failureCount) {
        this.log(Level.DEBUG, "Message {} Moved to dead letter queue: {}", null, job.getRqueueMessage(), job.getQueueDetail().getDeadLetterQueueName());
        RqueueMessage rqueueMessage = job.getRqueueMessage();
        RqueueMessage newMessage = rqueueMessage.toBuilder().failureCount(failureCount).build();
        newMessage.updateReEnqueuedAt();
        QueueDetail queueDetail = job.getQueueDetail();
        Object userMessage = job.getMessage();
        this.messageProcessorHandler.handleMessage(newMessage, userMessage, MessageStatus.MOVED_TO_DLQ);
        if (queueDetail.isDeadLetterConsumerEnabled()) {
            QueueConfig queueConfig = this.rqueueSystemConfigDao.getConfigByName(queueDetail.getDeadLetterQueueName(), true);
            if (queueConfig == null) {
                this.log(Level.ERROR, "Queue Config not found for queue {}", null, queueDetail.getDeadLetterQueue());
                this.moveMessageToQueue(queueDetail, queueDetail.getDeadLetterQueueName(), rqueueMessage, newMessage, -1L);
            } else {
                newMessage.setQueueName(queueConfig.getName());
                newMessage.setFailureCount(0);
                newMessage.setSourceQueueName(rqueueMessage.getQueueName());
                newMessage.setSourceQueueFailureCount(failureCount);
                long backOff = this.taskExecutionBackoff.nextBackOff(userMessage, newMessage, failureCount);
                backOff = backOff == -1L ? 5000L : backOff;
                this.moveMessageToQueue(queueDetail, queueConfig.getDelayedQueueName(), rqueueMessage, newMessage, backOff);
            }
        } else {
            this.moveMessageToQueue(queueDetail, queueDetail.getDeadLetterQueueName(), rqueueMessage, newMessage, -1L);
        }
        this.publishEvent(job, newMessage, MessageStatus.MOVED_TO_DLQ);
    }

    RqueueMessage parkMessageForRetry(RqueueMessage rqueueMessage, int failureCount, long delay, QueueDetail queueDetail) {
        RqueueMessage newMessage = rqueueMessage.toBuilder().failureCount(failureCount).build().updateReEnqueuedAt();
        if (delay <= 0L) {
            this.rqueueMessageTemplate.moveMessage(queueDetail.getProcessingQueueName(), queueDetail.getQueueName(), rqueueMessage, newMessage);
        } else {
            this.rqueueMessageTemplate.moveMessage(queueDetail.getProcessingQueueName(), queueDetail.getDelayedQueueName(), rqueueMessage, newMessage, delay);
        }
        return newMessage;
    }

    void parkMessageForRetry(JobImpl job, Serializable why, int failureCount, long delay) {
        if (why == null) {
            this.log(Level.TRACE, "Message {} will be retried in {}Ms", null, job.getRqueueMessage(), delay);
        } else {
            this.log(Level.TRACE, "Message {} will be retried in {}Ms, Reason: {}", null, job.getRqueueMessage(), delay, why);
        }
        RqueueMessage newMessage = this.parkMessageForRetry(job.getRqueueMessage(), failureCount, delay, job.getQueueDetail());
        this.updateMetadata(job, newMessage, MessageStatus.FAILED);
    }

    private void discardMessage(JobImpl job, int failureCount) {
        this.log(Level.DEBUG, "Message {} discarded due to retry limit exhaust", null, job.getRqueueMessage());
        this.deleteMessage(job, MessageStatus.DISCARDED, failureCount);
    }

    void handleManualDeletion(JobImpl job, int failureCount) {
        this.log(Level.DEBUG, "Message Deleted {} successfully", null, job.getRqueueMessage());
        this.deleteMessage(job, MessageStatus.DELETED, failureCount);
    }

    private void handleSuccessFullExecution(JobImpl job, int failureCount) {
        this.log(Level.DEBUG, "Message consumed {} successfully", null, job.getRqueueMessage());
        this.deleteMessage(job, MessageStatus.SUCCESSFUL, failureCount);
    }

    private void handleRetryExceededMessage(JobImpl job, int failureCount) {
        if (job.getQueueDetail().isDlqSet()) {
            this.moveMessageToDlq(job, failureCount);
        } else {
            this.discardMessage(job, failureCount);
        }
    }

    private int getMaxRetryCount(RqueueMessage rqueueMessage, QueueDetail queueDetail) {
        return rqueueMessage.getRetryCount() == null ? queueDetail.getNumRetry() : rqueueMessage.getRetryCount().intValue();
    }

    private void handleFailure(JobImpl job, int failureCount) {
        int maxRetryCount = this.getMaxRetryCount(job.getRqueueMessage(), job.getQueueDetail());
        if (failureCount < maxRetryCount) {
            long delay = this.taskExecutionBackoff.nextBackOff(job.getMessage(), job.getRqueueMessage(), failureCount);
            if (delay == -1L) {
                this.handleRetryExceededMessage(job, failureCount);
            } else {
                this.parkMessageForRetry(job, null, failureCount, delay);
            }
        } else {
            this.handleRetryExceededMessage(job, failureCount);
        }
    }

    private void handleIgnoredMessage(JobImpl job, int failureCount) {
        this.log(Level.DEBUG, "Message {} ignored, Queue: {}", null, job.getRqueueMessage(), job.getQueueDetail().getName());
        this.deleteMessage(job, MessageStatus.IGNORED, failureCount);
    }
}

