/*
 * Decompiled with CFR 0.152.
 */
package com.github.sonus21.rqueue.listener;

import com.github.sonus21.rqueue.core.RqueueMessage;
import com.github.sonus21.rqueue.listener.MessageContainerBase;
import com.github.sonus21.rqueue.listener.PostProcessingHandler;
import com.github.sonus21.rqueue.listener.QueueDetail;
import com.github.sonus21.rqueue.listener.RqueueMessageHandler;
import com.github.sonus21.rqueue.listener.RqueueMessageHeaders;
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer;
import com.github.sonus21.rqueue.metrics.RqueueMetricsCounter;
import com.github.sonus21.rqueue.models.db.MessageMetadata;
import com.github.sonus21.rqueue.models.db.TaskStatus;
import com.github.sonus21.rqueue.utils.MessageUtils;
import com.github.sonus21.rqueue.web.service.RqueueMessageMetadataService;
import java.lang.ref.WeakReference;
import java.util.Objects;
import java.util.concurrent.Semaphore;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.MessageBuilder;

class RqueueExecutor
extends MessageContainerBase {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(RqueueExecutor.class);
    private final QueueDetail queueDetail;
    private final Message<String> message;
    private final RqueueMessage rqueueMessage;
    private final RqueueMessageHandler rqueueMessageHandler;
    private final RqueueMessageMetadataService rqueueMessageMetadataService;
    private final PostProcessingHandler postProcessingHandler;
    private final String messageMetadataId;
    private final Semaphore semaphore;
    private final int retryPerPoll;
    private MessageMetadata messageMetadata;
    private Object userMessage;

    RqueueExecutor(RqueueMessage rqueueMessage, QueueDetail queueDetail, Semaphore semaphore, WeakReference<RqueueMessageListenerContainer> container, int retryPerPoll, PostProcessingHandler postProcessingHandler) {
        super(log, queueDetail.getName(), container);
        this.rqueueMessage = rqueueMessage;
        this.queueDetail = queueDetail;
        this.semaphore = semaphore;
        this.rqueueMessageHandler = ((RqueueMessageListenerContainer)Objects.requireNonNull(container.get())).getRqueueMessageHandler();
        this.messageMetadataId = MessageUtils.getMessageMetaId(rqueueMessage.getId());
        this.retryPerPoll = retryPerPoll;
        this.postProcessingHandler = postProcessingHandler;
        this.message = MessageBuilder.createMessage((Object)rqueueMessage.getMessage(), (MessageHeaders)RqueueMessageHeaders.buildMessageHeaders(queueDetail.getName(), rqueueMessage));
        try {
            this.userMessage = MessageUtils.convertMessageToObject(this.message, this.rqueueMessageHandler.getMessageConverter());
        }
        catch (Exception e) {
            this.log(Level.ERROR, "Unable to convert message {}", e, rqueueMessage.getMessage());
        }
        this.rqueueMessageMetadataService = ((RqueueMessageListenerContainer)Objects.requireNonNull(container.get())).getRqueueMessageMetadataService();
    }

    private int getMaxRetryCount() {
        return this.rqueueMessage.getRetryCount() == null ? this.queueDetail.getNumRetry() : this.rqueueMessage.getRetryCount().intValue();
    }

    private void updateCounter(boolean fail) {
        RqueueMetricsCounter counter = ((RqueueMessageListenerContainer)Objects.requireNonNull(this.container.get())).getRqueueMetricsCounter();
        if (counter == null) {
            return;
        }
        if (fail) {
            counter.updateFailureCount(this.queueDetail.getName());
        } else {
            counter.updateExecutionCount(this.queueDetail.getName());
        }
    }

    private long getMaxProcessingTime() {
        return System.currentTimeMillis() + this.queueDetail.getVisibilityTimeout() - 1000L;
    }

    private boolean isMessageDeleted() {
        this.messageMetadata = this.rqueueMessageMetadataService.get(this.messageMetadataId);
        if (this.messageMetadata == null) {
            return false;
        }
        return this.messageMetadata.isDeleted();
    }

    private boolean shouldIgnore() {
        return !((RqueueMessageListenerContainer)Objects.requireNonNull(this.container.get())).getPreExecutionMessageProcessor().process(this.userMessage, this.rqueueMessage);
    }

    private int getRetryCount() {
        int maxRetry = this.getMaxRetryCount();
        if (this.retryPerPoll == -1) {
            return maxRetry;
        }
        return Math.min(this.retryPerPoll, maxRetry);
    }

    private boolean queueActive() {
        return this.isQueueActive(this.queueDetail.getName());
    }

    private TaskStatus getStatus() {
        if (!this.queueActive()) {
            return TaskStatus.QUEUE_INACTIVE;
        }
        if (this.shouldIgnore()) {
            return TaskStatus.IGNORED;
        }
        if (this.isMessageDeleted()) {
            return TaskStatus.DELETED;
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    void start() {
        int failureCount = this.rqueueMessage.getFailureCount();
        long maxProcessingTime = this.getMaxProcessingTime();
        long startTime = System.currentTimeMillis();
        int retryCount = this.getRetryCount();
        try {
            TaskStatus status;
            while ((status = this.getStatus()) == null) {
                try {
                    this.updateCounter(false);
                    this.rqueueMessageHandler.handleMessage(this.message);
                    status = TaskStatus.SUCCESSFUL;
                }
                catch (MessagingException e) {
                    this.updateCounter(true);
                    ++failureCount;
                }
                catch (Exception e) {
                    this.updateCounter(true);
                    ++failureCount;
                    this.log(Level.ERROR, "Message execution failed", e, new Object[0]);
                }
                if (--retryCount > 0 && status == null && System.currentTimeMillis() < maxProcessingTime) continue;
            }
            this.postProcessingHandler.handlePostProcessing(this.queueDetail, this.rqueueMessage, this.userMessage, this.messageMetadata, status == null ? TaskStatus.FAILED : status, failureCount, startTime);
        }
        finally {
            this.semaphore.release();
        }
    }
}

