/*
 * Decompiled with CFR 0.152.
 */
package com.github.sonus21.rqueue.listener;

import com.github.sonus21.rqueue.config.RqueueConfig;
import com.github.sonus21.rqueue.core.RqueueMessage;
import com.github.sonus21.rqueue.listener.MessageContainerBase;
import com.github.sonus21.rqueue.listener.PostProcessingHandler;
import com.github.sonus21.rqueue.listener.QueueDetail;
import com.github.sonus21.rqueue.listener.RqueueMessageHandler;
import com.github.sonus21.rqueue.listener.RqueueMessageHeaders;
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer;
import com.github.sonus21.rqueue.metrics.RqueueMetricsCounter;
import com.github.sonus21.rqueue.models.db.MessageMetadata;
import com.github.sonus21.rqueue.models.db.MessageStatus;
import com.github.sonus21.rqueue.models.enums.ExecutionStatus;
import com.github.sonus21.rqueue.utils.MessageUtils;
import com.github.sonus21.rqueue.web.service.RqueueMessageMetadataService;
import java.lang.ref.WeakReference;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.Semaphore;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.MessageBuilder;

class RqueueExecutor
extends MessageContainerBase {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(RqueueExecutor.class);
    private final QueueDetail queueDetail;
    private Message<String> message;
    private final RqueueMessage rqueueMessage;
    private final RqueueMessageHandler rqueueMessageHandler;
    private final RqueueMessageMetadataService rqueueMessageMetadataService;
    private final PostProcessingHandler postProcessingHandler;
    private final Semaphore semaphore;
    private final RqueueConfig rqueueConfig;
    private MessageMetadata messageMetadata;
    private Object userMessage;
    private boolean updatedToProcessing;

    RqueueExecutor(RqueueMessage rqueueMessage, QueueDetail queueDetail, Semaphore semaphore, WeakReference<RqueueMessageListenerContainer> container, RqueueConfig rqueueConfig, PostProcessingHandler postProcessingHandler) {
        super(log, queueDetail.getName(), container);
        this.rqueueMessage = rqueueMessage;
        this.queueDetail = queueDetail;
        this.semaphore = semaphore;
        this.rqueueMessageHandler = ((RqueueMessageListenerContainer)Objects.requireNonNull(container.get())).getRqueueMessageHandler();
        this.rqueueConfig = rqueueConfig;
        this.postProcessingHandler = postProcessingHandler;
        this.rqueueMessageMetadataService = ((RqueueMessageListenerContainer)Objects.requireNonNull(container.get())).getRqueueMessageMetadataService();
        this.init();
    }

    private void init() {
        this.message = MessageBuilder.createMessage((Object)this.rqueueMessage.getMessage(), (MessageHeaders)RqueueMessageHeaders.buildMessageHeaders(this.queueDetail.getName(), this.rqueueMessage));
        try {
            this.userMessage = MessageUtils.convertMessageToObject(this.message, this.rqueueMessageHandler.getMessageConverter());
        }
        catch (Exception e) {
            this.log(Level.ERROR, "Unable to convert message {}", e, this.rqueueMessage.getMessage());
            throw e;
        }
        this.messageMetadata = this.rqueueMessageMetadataService.getOrCreateMessageMetadata(this.rqueueMessage);
    }

    private int getMaxRetryCount() {
        return this.rqueueMessage.getRetryCount() == null ? this.queueDetail.getNumRetry() : this.rqueueMessage.getRetryCount().intValue();
    }

    private void updateCounter(boolean fail) {
        RqueueMetricsCounter counter = ((RqueueMessageListenerContainer)Objects.requireNonNull(this.container.get())).getRqueueMetricsCounter();
        if (counter == null) {
            return;
        }
        if (fail) {
            counter.updateFailureCount(this.queueDetail.getName());
        } else {
            counter.updateExecutionCount(this.queueDetail.getName());
        }
    }

    private long getMaxProcessingTime() {
        return System.currentTimeMillis() + this.queueDetail.getVisibilityTimeout() - 1000L;
    }

    private boolean isMessageDeleted() {
        if (this.messageMetadata.isDeleted()) {
            return true;
        }
        this.messageMetadata = this.rqueueMessageMetadataService.get(this.messageMetadata.getId());
        return this.messageMetadata.isDeleted();
    }

    private boolean shouldIgnore() {
        return !((RqueueMessageListenerContainer)Objects.requireNonNull(this.container.get())).getPreExecutionMessageProcessor().process(this.userMessage, this.rqueueMessage);
    }

    private boolean isOldMessage() {
        return this.messageMetadata.getRqueueMessage() != null && this.messageMetadata.getRqueueMessage().getQueuedTime() != this.rqueueMessage.getQueuedTime();
    }

    private int getRetryCount() {
        int maxRetry = this.getMaxRetryCount();
        if (this.rqueueConfig.getRetryPerPoll() == -1) {
            return maxRetry;
        }
        return Math.min(this.rqueueConfig.getRetryPerPoll(), maxRetry);
    }

    private boolean queueInActive() {
        return !this.isQueueActive(this.queueDetail.getName());
    }

    private ExecutionStatus getStatus() {
        if (this.queueInActive()) {
            return ExecutionStatus.QUEUE_INACTIVE;
        }
        if (this.shouldIgnore()) {
            return ExecutionStatus.IGNORED;
        }
        if (this.isOldMessage()) {
            return ExecutionStatus.OLD_MESSAGE;
        }
        if (this.isMessageDeleted()) {
            return ExecutionStatus.DELETED;
        }
        return null;
    }

    private void updateToProcessing() {
        if (this.updatedToProcessing) {
            return;
        }
        this.updatedToProcessing = true;
        this.messageMetadata.setStatus(MessageStatus.PROCESSING);
        this.rqueueMessageMetadataService.save(this.messageMetadata, Duration.ofMinutes(this.rqueueConfig.getMessageDurabilityInMinute()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    void start() {
        int failureCount = this.rqueueMessage.getFailureCount();
        long maxProcessingTime = this.getMaxProcessingTime();
        long startTime = System.currentTimeMillis();
        int retryCount = this.getRetryCount();
        int attempt = 1;
        try {
            ExecutionStatus status;
            do {
                this.log(Level.DEBUG, "Attempt {} message: {}", null, attempt, this.userMessage);
                status = this.getStatus();
                if (status != null) break;
                try {
                    this.updateToProcessing();
                    this.updateCounter(false);
                    this.rqueueMessageHandler.handleMessage(this.message);
                    status = ExecutionStatus.SUCCESSFUL;
                }
                catch (MessagingException e) {
                    this.updateCounter(true);
                    ++failureCount;
                }
                catch (Exception e) {
                    this.updateCounter(true);
                    ++failureCount;
                    this.log(Level.ERROR, "Message execution failed, RqueueMessage: {}", e, this.rqueueMessage);
                }
                ++attempt;
            } while (--retryCount > 0 && status == null && System.currentTimeMillis() < maxProcessingTime);
            this.postProcessingHandler.handle(this.queueDetail, this.rqueueMessage, this.userMessage, this.messageMetadata, status == null ? ExecutionStatus.FAILED : status, failureCount, startTime);
        }
        finally {
            this.semaphore.release();
        }
    }
}

