/*
 * Decompiled with CFR 0.152.
 */
package com.github.sonus21.rqueue.listener;

import com.github.sonus21.rqueue.config.RqueueConfig;
import com.github.sonus21.rqueue.core.Job;
import com.github.sonus21.rqueue.core.RqueueMessage;
import com.github.sonus21.rqueue.core.middleware.HandlerMiddleware;
import com.github.sonus21.rqueue.core.middleware.Middleware;
import com.github.sonus21.rqueue.core.support.RqueueMessageUtils;
import com.github.sonus21.rqueue.listener.JobImpl;
import com.github.sonus21.rqueue.listener.MessageContainerBase;
import com.github.sonus21.rqueue.listener.PostProcessingHandler;
import com.github.sonus21.rqueue.listener.QueueDetail;
import com.github.sonus21.rqueue.listener.RqueueMessageHandler;
import com.github.sonus21.rqueue.listener.RqueueMessageHeaders;
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer;
import com.github.sonus21.rqueue.metrics.RqueueMetricsCounter;
import com.github.sonus21.rqueue.models.db.MessageMetadata;
import com.github.sonus21.rqueue.models.enums.ExecutionStatus;
import com.github.sonus21.rqueue.models.enums.MessageStatus;
import com.github.sonus21.rqueue.web.service.RqueueMessageMetadataService;
import java.lang.ref.WeakReference;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Semaphore;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.MessageBuilder;

class RqueueExecutor
extends MessageContainerBase {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(RqueueExecutor.class);
    private final RqueueMessageHandler rqueueMessageHandler;
    private final RqueueMessageMetadataService rqueueMessageMetadataService;
    private final PostProcessingHandler postProcessingHandler;
    private final Semaphore semaphore;
    private final RqueueConfig rqueueConfig;
    private boolean updatedToProcessing;
    private JobImpl job;
    private ExecutionStatus status;
    private Throwable error;
    private int failureCount;

    RqueueExecutor(WeakReference<RqueueMessageListenerContainer> container, RqueueConfig rqueueConfig, PostProcessingHandler postProcessingHandler, RqueueMessage rqueueMessage, QueueDetail queueDetail, Semaphore semaphore) {
        super(log, queueDetail.getName(), container);
        this.rqueueConfig = rqueueConfig;
        this.postProcessingHandler = postProcessingHandler;
        this.rqueueMessageMetadataService = Objects.requireNonNull((RqueueMessageListenerContainer)container.get()).rqueueMessageMetadataService();
        this.rqueueMessageHandler = Objects.requireNonNull((RqueueMessageListenerContainer)container.get()).getRqueueMessageHandler();
        this.semaphore = semaphore;
        this.init(rqueueMessage, queueDetail);
    }

    private void init(RqueueMessage rqueueMessage, QueueDetail queueDetail) {
        Message tmpMessage = MessageBuilder.createMessage((Object)rqueueMessage.getMessage(), (MessageHeaders)RqueueMessageHeaders.buildMessageHeaders(queueDetail.getName(), rqueueMessage, null, null));
        MessageMetadata messageMetadata = this.rqueueMessageMetadataService.getOrCreateMessageMetadata(rqueueMessage);
        Exception t = null;
        Object userMessage = null;
        try {
            userMessage = RqueueMessageUtils.convertMessageToObject((Message<String>)tmpMessage, this.rqueueMessageHandler.getMessageConverter());
        }
        catch (Exception e) {
            this.log(Level.ERROR, "Unable to convert message {}", e, rqueueMessage.getMessage());
            t = e;
            throw e;
        }
        finally {
            this.job = new JobImpl(this.rqueueConfig, Objects.requireNonNull((RqueueMessageListenerContainer)this.container.get()).rqueueMessageMetadataService(), Objects.requireNonNull((RqueueMessageListenerContainer)this.container.get()).rqueueJobDao(), queueDetail, messageMetadata, rqueueMessage, userMessage, t, this.postProcessingHandler);
        }
        this.failureCount = this.job.getRqueueMessage().getFailureCount();
    }

    private int getMaxRetryCount() {
        return this.job.getRqueueMessage().getRetryCount() == null ? this.job.getQueueDetail().getNumRetry() : this.job.getRqueueMessage().getRetryCount().intValue();
    }

    private void updateCounter(boolean fail) {
        RqueueMetricsCounter counter = Objects.requireNonNull((RqueueMessageListenerContainer)this.container.get()).getRqueueMetricsCounter();
        if (counter == null) {
            return;
        }
        if (fail) {
            counter.updateFailureCount(this.job.getQueueDetail().getName());
        } else {
            counter.updateExecutionCount(this.job.getQueueDetail().getName());
        }
    }

    private long maxExecutionTime() {
        return this.job.getQueueDetail().getVisibilityTimeout() - 1000L;
    }

    private long getMaxProcessingTime() {
        return System.currentTimeMillis() + this.maxExecutionTime();
    }

    private boolean isMessageDeleted() {
        if (this.job.getMessageMetadata().isDeleted()) {
            return true;
        }
        MessageMetadata newMessageMetadata = this.rqueueMessageMetadataService.getOrCreateMessageMetadata(this.job.getRqueueMessage());
        if (!newMessageMetadata.equals(this.job.getMessageMetadata())) {
            this.job.setMessageMetadata(newMessageMetadata);
        }
        return this.job.getMessageMetadata().isDeleted();
    }

    private boolean shouldIgnore() {
        return !Objects.requireNonNull((RqueueMessageListenerContainer)this.container.get()).getPreExecutionMessageProcessor().process(this.job);
    }

    private boolean isOldMessage() {
        return this.job.getMessageMetadata().getRqueueMessage() != null && this.job.getMessageMetadata().getRqueueMessage().getQueuedTime() != this.job.getRqueueMessage().getQueuedTime();
    }

    private int getRetryCount() {
        int maxRetry = this.getMaxRetryCount();
        if (this.rqueueConfig.getRetryPerPoll() == -1) {
            return maxRetry;
        }
        return Math.min(this.rqueueConfig.getRetryPerPoll(), maxRetry);
    }

    private boolean queueInActive() {
        return !this.isQueueActive(this.job.getQueueDetail().getName());
    }

    private ExecutionStatus getStatus() {
        if (this.queueInActive()) {
            return ExecutionStatus.QUEUE_INACTIVE;
        }
        if (this.shouldIgnore()) {
            return ExecutionStatus.IGNORED;
        }
        if (this.isMessageDeleted()) {
            return ExecutionStatus.DELETED;
        }
        if (this.isOldMessage()) {
            return ExecutionStatus.OLD_MESSAGE;
        }
        return null;
    }

    private void updateToProcessing() {
        if (this.updatedToProcessing) {
            return;
        }
        this.updatedToProcessing = true;
        this.job.updateMessageStatus(MessageStatus.PROCESSING);
    }

    private void logExecutionTimeWarning(long maxProcessingTime, long startTime, ExecutionStatus status) {
        if (System.currentTimeMillis() > maxProcessingTime) {
            long maxAllowedTime = this.maxExecutionTime();
            long executionTime = System.currentTimeMillis() - startTime;
            this.log(Level.WARN, "Message listener is taking longer time [Queue: {}, TaskStatus: {}] MaxAllowedTime: {}, ExecutionTime: {}", null, new Object[]{this.job.getQueueDetail().getName(), status, maxAllowedTime, executionTime});
        }
    }

    private void begin() {
        this.job.execute();
        this.error = null;
        this.status = this.getStatus();
    }

    private void end() {
        if (this.status == null) {
            this.job.updateExecutionStatus(ExecutionStatus.FAILED, this.error);
        } else {
            this.job.updateExecutionStatus(this.status, this.error);
        }
    }

    private void callMiddlewares(int currentIndex, List<Middleware> middlewares, Job job) throws Exception {
        if (currentIndex == middlewares.size()) {
            new HandlerMiddleware(this.rqueueMessageHandler).handle(job, null);
        } else {
            middlewares.get(currentIndex).handle(job, () -> {
                this.callMiddlewares(currentIndex + 1, middlewares, job);
                return null;
            });
        }
    }

    private void processMessage() throws Exception {
        List<Middleware> middlewareList = Objects.requireNonNull((RqueueMessageListenerContainer)this.container.get()).getMiddleWares();
        if (middlewareList == null) {
            this.callMiddlewares(0, Collections.emptyList(), this.job);
        } else {
            this.callMiddlewares(0, middlewareList, this.job);
        }
        this.status = ExecutionStatus.SUCCESSFUL;
    }

    private void execute() {
        try {
            this.updateToProcessing();
            this.updateCounter(false);
            this.processMessage();
        }
        catch (MessagingException e) {
            this.updateCounter(true);
            ++this.failureCount;
            this.error = e;
        }
        catch (Exception e) {
            this.updateCounter(true);
            ++this.failureCount;
            this.error = e;
            this.log(Level.ERROR, "Message execution failed, RqueueMessage: {}", e, this.job.getRqueueMessage());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleMessage() {
        long maxProcessingTime = this.getMaxProcessingTime();
        long startTime = System.currentTimeMillis();
        int retryCount = this.getRetryCount();
        int attempt = 1;
        try {
            do {
                this.log(Level.DEBUG, "Attempt {} message: {}", null, attempt, this.job.getMessage());
                this.begin();
                if (this.status == null) {
                    this.execute();
                }
                ++attempt;
                this.end();
            } while (--retryCount > 0 && this.status == null && System.currentTimeMillis() < maxProcessingTime);
            this.postProcessingHandler.handle(this.job, this.status == null ? ExecutionStatus.FAILED : this.status, this.failureCount);
            this.logExecutionTimeWarning(maxProcessingTime, startTime, this.status);
        }
        finally {
            this.semaphore.release();
        }
    }

    private long getTtlForScheduledMessageKey(RqueueMessage message) {
        long expiryInSeconds = 2L * this.job.getQueueDetail().getVisibilityTimeout() / 1000L;
        long remainingTime = (message.getProcessAt() - System.currentTimeMillis()) / 1000L;
        if (remainingTime > 0L) {
            expiryInSeconds += remainingTime;
        }
        return expiryInSeconds;
    }

    private String getScheduledMessageKey(RqueueMessage message) {
        return String.format("%s%s%s%ssch%s%d", this.job.getQueueDetail().getQueueName(), "::", this.job.getRqueueMessage().getId(), "::", "::", message.getProcessAt());
    }

    private void processPeriodicMessage() {
        RqueueMessage newMessage = this.job.getRqueueMessage().toBuilder().processAt(this.job.getRqueueMessage().nextProcessAt()).build();
        String messageKey = this.getScheduledMessageKey(newMessage);
        long expiryInSeconds = this.getTtlForScheduledMessageKey(newMessage);
        log.debug("Schedule periodic message: {} Status: {}", (Object)this.job.getRqueueMessage(), (Object)this.getRqueueMessageTemplate().scheduleMessage(this.job.getQueueDetail().getDelayedQueueName(), messageKey, newMessage, expiryInSeconds));
        this.handleMessage();
    }

    @Override
    public void start() {
        if (this.job.getRqueueMessage().isPeriodicTask()) {
            this.processPeriodicMessage();
        } else {
            this.handleMessage();
        }
    }
}

