/*
 * Decompiled with CFR 0.152.
 */
package com.github.sonus21.rqueue.listener;

import com.github.sonus21.rqueue.core.RqueueMessage;
import com.github.sonus21.rqueue.core.support.MessageProcessor;
import com.github.sonus21.rqueue.exception.UnknownSwitchCase;
import com.github.sonus21.rqueue.listener.MessageContainerBase;
import com.github.sonus21.rqueue.listener.QueueDetail;
import com.github.sonus21.rqueue.listener.RqueueMessageHandler;
import com.github.sonus21.rqueue.listener.RqueueMessageHeaders;
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer;
import com.github.sonus21.rqueue.metrics.RqueueMetricsCounter;
import com.github.sonus21.rqueue.models.db.MessageMetadata;
import com.github.sonus21.rqueue.models.db.TaskStatus;
import com.github.sonus21.rqueue.models.event.RqueueExecutionEvent;
import com.github.sonus21.rqueue.utils.MessageUtils;
import com.github.sonus21.rqueue.utils.RedisUtils;
import com.github.sonus21.rqueue.utils.backoff.TaskExecutionBackOff;
import com.github.sonus21.rqueue.web.service.RqueueMessageMetadataService;
import java.lang.ref.WeakReference;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.Semaphore;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import org.springframework.context.ApplicationEvent;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.MessageBuilder;

class RqueueExecutor
extends MessageContainerBase {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(RqueueExecutor.class);
    private final QueueDetail queueDetail;
    private final Message<String> message;
    private final RqueueMessage rqueueMessage;
    private final RqueueMessageHandler rqueueMessageHandler;
    private final RqueueMessageMetadataService rqueueMessageMetadataService;
    private final String messageMetadataId;
    private final Semaphore semaphore;
    private final int retryPerPoll;
    private final TaskExecutionBackOff taskExecutionBackoff;
    private MessageMetadata messageMetadata;
    private Object userMessage;

    RqueueExecutor(RqueueMessage rqueueMessage, QueueDetail queueDetail, Semaphore semaphore, WeakReference<RqueueMessageListenerContainer> container, RqueueMessageHandler rqueueMessageHandler, int retryPerPoll, TaskExecutionBackOff taskExecutionBackoff) {
        super(log, queueDetail.getName(), container);
        this.rqueueMessage = rqueueMessage;
        this.queueDetail = queueDetail;
        this.semaphore = semaphore;
        this.rqueueMessageHandler = rqueueMessageHandler;
        this.messageMetadataId = MessageUtils.getMessageMetaId(rqueueMessage.getId());
        this.retryPerPoll = retryPerPoll;
        this.taskExecutionBackoff = taskExecutionBackoff;
        this.message = MessageBuilder.createMessage((Object)rqueueMessage.getMessage(), (MessageHeaders)RqueueMessageHeaders.buildMessageHeaders(queueDetail.getName(), rqueueMessage));
        try {
            this.userMessage = MessageUtils.convertMessageToObject(this.message, rqueueMessageHandler.getMessageConverter());
        }
        catch (Exception e) {
            this.log(Level.ERROR, "Unable to convert message {}", e, rqueueMessage.getMessage());
        }
        this.rqueueMessageMetadataService = ((RqueueMessageListenerContainer)Objects.requireNonNull(container.get())).getRqueueMessageMetadataService();
    }

    private int getMaxRetryCount() {
        return this.rqueueMessage.getRetryCount() == null ? this.queueDetail.getNumRetry() : this.rqueueMessage.getRetryCount().intValue();
    }

    private void callMessageProcessor(TaskStatus status, RqueueMessage rqueueMessage) {
        MessageProcessor messageProcessor = null;
        switch (status) {
            case DELETED: {
                messageProcessor = ((RqueueMessageListenerContainer)Objects.requireNonNull(this.container.get())).getManualDeletionMessageProcessor();
                break;
            }
            case MOVED_TO_DLQ: {
                messageProcessor = ((RqueueMessageListenerContainer)Objects.requireNonNull(this.container.get())).getDeadLetterQueueMessageProcessor();
                break;
            }
            case DISCARDED: {
                messageProcessor = ((RqueueMessageListenerContainer)Objects.requireNonNull(this.container.get())).getDiscardMessageProcessor();
                break;
            }
            case SUCCESSFUL: {
                messageProcessor = ((RqueueMessageListenerContainer)Objects.requireNonNull(this.container.get())).getPostExecutionMessageProcessor();
                break;
            }
        }
        if (messageProcessor != null) {
            try {
                this.log(Level.DEBUG, "Calling {} processor for {}", null, new Object[]{status, rqueueMessage});
                messageProcessor.process(this.userMessage, rqueueMessage);
            }
            catch (Exception e) {
                this.log(Level.ERROR, "Message processor {} call failed", e, new Object[]{status});
            }
        }
    }

    private void updateCounter(boolean fail) {
        RqueueMetricsCounter counter = ((RqueueMessageListenerContainer)Objects.requireNonNull(this.container.get())).getRqueueMetricsCounter();
        if (counter == null) {
            return;
        }
        if (fail) {
            counter.updateFailureCount(this.queueDetail.getName());
        } else {
            counter.updateExecutionCount(this.queueDetail.getName());
        }
    }

    private void publishEvent(TaskStatus status, long jobExecutionStartTime) {
        if (((RqueueMessageListenerContainer)Objects.requireNonNull(this.container.get())).getRqueueWebConfig().isCollectListenerStats()) {
            this.addOrDeleteMetadata(jobExecutionStartTime, false);
            RqueueExecutionEvent event = new RqueueExecutionEvent(this.queueDetail, this.rqueueMessage, status, this.messageMetadata);
            ((RqueueMessageListenerContainer)Objects.requireNonNull(this.container.get())).getApplicationEventPublisher().publishEvent((ApplicationEvent)event);
        }
    }

    private void addOrDeleteMetadata(long jobExecutionStartTime, boolean saveOrDelete) {
        if (this.messageMetadata == null) {
            this.messageMetadata = this.rqueueMessageMetadataService.get(this.messageMetadataId);
        }
        if (this.messageMetadata == null) {
            this.messageMetadata = new MessageMetadata(this.messageMetadataId, this.rqueueMessage.getId());
            if (!saveOrDelete) {
                this.messageMetadata.addExecutionTime(jobExecutionStartTime);
                return;
            }
        }
        this.messageMetadata.addExecutionTime(jobExecutionStartTime);
        if (saveOrDelete) {
            ((RqueueMessageListenerContainer)Objects.requireNonNull(this.container.get())).getRqueueMessageMetadataService().save(this.messageMetadata, Duration.ofSeconds(604800L));
        } else {
            this.rqueueMessageMetadataService.delete(this.messageMetadataId);
        }
    }

    private void deleteMessage(TaskStatus status, int failureCount, long jobExecutionStartTime) {
        this.getRqueueMessageTemplate().removeElementFromZset(this.queueDetail.getProcessingQueueName(), this.rqueueMessage);
        this.rqueueMessage.setFailureCount(failureCount);
        this.callMessageProcessor(status, this.rqueueMessage);
        this.publishEvent(status, jobExecutionStartTime);
    }

    private void moveMessageToDlq(int failureCount, long jobExecutionStartTime) throws CloneNotSupportedException {
        if (this.isWarningEnabled()) {
            this.log(Level.WARN, "Message {} Moved to dead letter queue: {}", null, this.userMessage, this.queueDetail.getDeadLetterQueueName());
        }
        RqueueMessage newMessage = this.rqueueMessage.clone();
        newMessage.setFailureCount(failureCount);
        newMessage.updateReEnqueuedAt();
        this.callMessageProcessor(TaskStatus.MOVED_TO_DLQ, newMessage);
        RedisUtils.executePipeLine(this.getRqueueMessageTemplate().getTemplate(), (connection, keySerializer, valueSerializer) -> {
            byte[] newMessageBytes = valueSerializer.serialize(newMessage);
            byte[] oldMessageBytes = valueSerializer.serialize(this.rqueueMessage);
            byte[] processingQueueNameBytes = keySerializer.serialize(this.queueDetail.getProcessingQueueName());
            byte[] dlqNameBytes = keySerializer.serialize(this.queueDetail.getDeadLetterQueueName());
            connection.rPush(dlqNameBytes, (byte[][])new byte[][]{newMessageBytes});
            connection.zRem(processingQueueNameBytes, (byte[][])new byte[][]{oldMessageBytes});
        });
        this.publishEvent(TaskStatus.MOVED_TO_DLQ, jobExecutionStartTime);
    }

    private void parkMessageForRetry(int failureCount, long jobExecutionStartTime, long delay) throws CloneNotSupportedException {
        if (this.isDebugEnabled()) {
            this.log(Level.DEBUG, "Message {} will be retried in {}Ms", null, this.userMessage, delay);
        }
        RqueueMessage newMessage = this.rqueueMessage.clone();
        newMessage.setFailureCount(failureCount);
        newMessage.updateReEnqueuedAt();
        this.getRqueueMessageTemplate().moveMessage(this.queueDetail.getProcessingQueueName(), this.queueDetail.getDelayedQueueName(), this.rqueueMessage, newMessage, delay);
        this.addOrDeleteMetadata(jobExecutionStartTime, true);
    }

    private void discardMessage(int failureCount, long jobExecutionStartTime) {
        if (this.isDebugEnabled()) {
            this.log(Level.DEBUG, "Message {} discarded due to retry limit exhaust", null, this.userMessage);
        }
        this.deleteMessage(TaskStatus.DISCARDED, failureCount, jobExecutionStartTime);
    }

    private void handleManualDeletion(int failureCount, long jobExecutionStartTime) {
        if (this.isDebugEnabled()) {
            this.log(Level.DEBUG, "Message Deleted {} successfully", null, this.rqueueMessage);
        }
        this.deleteMessage(TaskStatus.DELETED, failureCount, jobExecutionStartTime);
    }

    private void handleSuccessFullExecution(int failureCount, long jobExecutionStartTime) {
        if (this.isDebugEnabled()) {
            this.log(Level.DEBUG, "Message consumed {} successfully", null, this.rqueueMessage);
        }
        this.deleteMessage(TaskStatus.SUCCESSFUL, failureCount, jobExecutionStartTime);
    }

    private void handleRetryExceededMessage(int failureCount, long jobExecutionStartTime) throws CloneNotSupportedException {
        if (this.queueDetail.isDlqSet()) {
            this.moveMessageToDlq(failureCount, jobExecutionStartTime);
        } else {
            this.discardMessage(failureCount, jobExecutionStartTime);
        }
    }

    private void handleFailure(int failureCount, long jobExecutionStartTime) throws CloneNotSupportedException {
        int maxRetryCount = this.getMaxRetryCount();
        if (failureCount < maxRetryCount) {
            long delay = this.taskExecutionBackoff.nextBackOff(this.userMessage, this.rqueueMessage, failureCount);
            if (delay == -1L) {
                this.handleRetryExceededMessage(failureCount, jobExecutionStartTime);
            } else {
                this.parkMessageForRetry(failureCount, jobExecutionStartTime, delay);
            }
        } else {
            this.handleRetryExceededMessage(failureCount, jobExecutionStartTime);
        }
    }

    private void handlePostProcessing(TaskStatus status, int failureCount, long jobExecutionStartTime) {
        if (status == TaskStatus.QUEUE_INACTIVE) {
            return;
        }
        try {
            switch (status) {
                case SUCCESSFUL: {
                    this.handleSuccessFullExecution(failureCount, jobExecutionStartTime);
                    break;
                }
                case DELETED: {
                    this.handleManualDeletion(failureCount, jobExecutionStartTime);
                    break;
                }
                case IGNORED: {
                    this.handleIgnoredMessage(failureCount, jobExecutionStartTime);
                    break;
                }
                case FAILED: {
                    this.handleFailure(failureCount, jobExecutionStartTime);
                    break;
                }
                default: {
                    throw new UnknownSwitchCase(String.valueOf((Object)status));
                }
            }
        }
        catch (Exception e) {
            this.log(Level.ERROR, "Error occurred in post processing", e, new Object[0]);
        }
    }

    private void handleIgnoredMessage(int failureCount, long jobExecutionStartTime) {
        if (this.isDebugEnabled()) {
            this.log(Level.DEBUG, "Message {} ignored, Queue: {}", null, this.rqueueMessage, this.queueDetail.getName());
        }
        this.deleteMessage(TaskStatus.IGNORED, failureCount, jobExecutionStartTime);
    }

    private long getMaxProcessingTime() {
        return System.currentTimeMillis() + this.queueDetail.getVisibilityTimeout() - 1000L;
    }

    private boolean isMessageDeleted() {
        this.messageMetadata = this.rqueueMessageMetadataService.get(this.messageMetadataId);
        if (this.messageMetadata == null) {
            return false;
        }
        return this.messageMetadata.isDeleted();
    }

    private boolean shouldIgnore() {
        return !((RqueueMessageListenerContainer)Objects.requireNonNull(this.container.get())).getPreExecutionMessageProcessor().process(this.userMessage, this.rqueueMessage);
    }

    private int getRetryCount() {
        int maxRetry = this.getMaxRetryCount();
        if (this.retryPerPoll == -1) {
            return maxRetry;
        }
        return Math.min(this.retryPerPoll, maxRetry);
    }

    private boolean queueActive() {
        return this.isQueueActive(this.queueDetail.getName());
    }

    private TaskStatus getStatus() {
        if (!this.queueActive()) {
            return TaskStatus.QUEUE_INACTIVE;
        }
        if (this.shouldIgnore()) {
            return TaskStatus.IGNORED;
        }
        if (this.isMessageDeleted()) {
            return TaskStatus.DELETED;
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    void start() {
        int failureCount = this.rqueueMessage.getFailureCount();
        long maxProcessingTime = this.getMaxProcessingTime();
        long startTime = System.currentTimeMillis();
        int retryCount = this.getRetryCount();
        try {
            TaskStatus status;
            while ((status = this.getStatus()) == null) {
                try {
                    this.updateCounter(false);
                    this.rqueueMessageHandler.handleMessage(this.message);
                    status = TaskStatus.SUCCESSFUL;
                }
                catch (MessagingException e) {
                    this.updateCounter(true);
                    ++failureCount;
                }
                catch (Exception e) {
                    this.updateCounter(true);
                    ++failureCount;
                    this.log(Level.ERROR, "Message execution failed", e, new Object[0]);
                }
                if (--retryCount > 0 && status == null && System.currentTimeMillis() < maxProcessingTime) continue;
            }
            this.handlePostProcessing(status == null ? TaskStatus.FAILED : status, failureCount, startTime);
        }
        finally {
            this.semaphore.release();
        }
    }
}

