/*
 * Decompiled with CFR 0.152.
 */
package com.github.sonus21.rqueue.web.service;

import com.github.sonus21.rqueue.common.RqueueLockManager;
import com.github.sonus21.rqueue.config.RqueueConfig;
import com.github.sonus21.rqueue.config.RqueueWebConfig;
import com.github.sonus21.rqueue.core.RqueueMessage;
import com.github.sonus21.rqueue.dao.RqueueQStatsDao;
import com.github.sonus21.rqueue.listener.QueueDetail;
import com.github.sonus21.rqueue.models.aggregator.QueueEvents;
import com.github.sonus21.rqueue.models.aggregator.TasksStat;
import com.github.sonus21.rqueue.models.db.MessageMetadata;
import com.github.sonus21.rqueue.models.db.QueueStatistics;
import com.github.sonus21.rqueue.models.enums.MessageStatus;
import com.github.sonus21.rqueue.models.event.RqueueExecutionEvent;
import com.github.sonus21.rqueue.utils.DateTimeUtils;
import com.github.sonus21.rqueue.utils.ThreadUtils;
import com.github.sonus21.rqueue.utils.TimeoutUtils;
import java.time.Duration;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.SmartLifecycle;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

@Service
public class RqueueTaskAggregatorService
implements ApplicationListener<RqueueExecutionEvent>,
DisposableBean,
SmartLifecycle {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(RqueueTaskAggregatorService.class);
    private final RqueueConfig rqueueConfig;
    private final RqueueWebConfig rqueueWebConfig;
    private final RqueueLockManager rqueueLockManager;
    private final RqueueQStatsDao rqueueQStatsDao;
    private final Object lifecycleMgr = new Object();
    private final Object aggregatorLock = new Object();
    private volatile boolean running = false;
    private ThreadPoolTaskScheduler taskExecutor;
    private Map<String, QueueEvents> queueNameToEvents;
    private BlockingQueue<QueueEvents> queue;
    private List<Future<?>> eventAggregatorTasks;

    @Autowired
    public RqueueTaskAggregatorService(RqueueConfig rqueueConfig, RqueueWebConfig rqueueWebConfig, RqueueLockManager rqueueLockManager, RqueueQStatsDao rqueueQStatsDao) {
        this.rqueueConfig = rqueueConfig;
        this.rqueueWebConfig = rqueueWebConfig;
        this.rqueueLockManager = rqueueLockManager;
        this.rqueueQStatsDao = rqueueQStatsDao;
    }

    public void destroy() throws Exception {
        log.info("Destroying task aggregator");
        this.stop();
        if (this.taskExecutor != null) {
            this.taskExecutor.destroy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        log.info("Starting task aggregation");
        Object object = this.lifecycleMgr;
        synchronized (object) {
            this.running = true;
            if (!this.rqueueWebConfig.isCollectListenerStats()) {
                return;
            }
            this.eventAggregatorTasks = new ArrayList();
            this.queueNameToEvents = new ConcurrentHashMap<String, QueueEvents>();
            this.queue = new LinkedBlockingQueue<QueueEvents>();
            int threadCount = this.rqueueWebConfig.getStatsAggregatorThreadCount();
            this.taskExecutor = ThreadUtils.createTaskScheduler(threadCount, "RqueueTaskAggregator-", 30);
            for (int i = 0; i < threadCount; ++i) {
                EventAggregator eventAggregator = new EventAggregator();
                this.eventAggregatorTasks.add(this.taskExecutor.submit((Runnable)eventAggregator));
            }
            this.taskExecutor.scheduleAtFixedRate((Runnable)new SweepJob(), Duration.ofSeconds(this.rqueueWebConfig.getAggregateEventWaitTime()));
            this.lifecycleMgr.notifyAll();
        }
    }

    private boolean processingRequired(QueueEvents queueEvents) {
        return queueEvents.processingRequired(this.rqueueWebConfig.getAggregateEventWaitTime(), this.rqueueWebConfig.getAggregateEventCount());
    }

    private void waitForRunningTaskToStop() {
        if (!CollectionUtils.isEmpty(this.eventAggregatorTasks)) {
            for (Future<?> future : this.eventAggregatorTasks) {
                ThreadUtils.waitForTermination(log, future, this.rqueueWebConfig.getAggregateShutdownWaitTime(), "Aggregator task termination", new Object[0]);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() {
        log.info("Stopping task aggregation");
        Object object = this.lifecycleMgr;
        synchronized (object) {
            Object object2 = this.aggregatorLock;
            synchronized (object2) {
                if (!CollectionUtils.isEmpty(this.queueNameToEvents)) {
                    Collection<QueueEvents> queueEvents = this.queueNameToEvents.values();
                    this.queue.addAll(queueEvents);
                    queueEvents.clear();
                }
                this.aggregatorLock.notifyAll();
            }
            this.running = false;
            this.waitForRunningTaskToStop();
            this.lifecycleMgr.notifyAll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isRunning() {
        Object object = this.lifecycleMgr;
        synchronized (object) {
            return this.running;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onApplicationEvent(RqueueExecutionEvent event) {
        Object object = this.aggregatorLock;
        synchronized (object) {
            QueueDetail queueDetail;
            String queueName;
            QueueEvents queueEvents;
            if (log.isTraceEnabled()) {
                log.trace("Event {}", (Object)event);
            }
            if ((queueEvents = this.queueNameToEvents.get(queueName = (queueDetail = (QueueDetail)event.getSource()).getName())) == null) {
                queueEvents = new QueueEvents(event);
            } else {
                queueEvents.addEvent(event);
            }
            if (this.processingRequired(queueEvents)) {
                if (log.isTraceEnabled()) {
                    log.trace("Adding events to the queue");
                }
                this.queue.add(queueEvents);
                this.queueNameToEvents.remove(queueName);
            } else {
                this.queueNameToEvents.put(queueName, queueEvents);
            }
            this.aggregatorLock.notifyAll();
        }
    }

    private class EventAggregator
    implements Runnable {
        private EventAggregator() {
        }

        private void aggregate(RqueueExecutionEvent event, TasksStat stat) {
            MessageMetadata messageMetadata = event.getJob().getMessageMetadata();
            RqueueMessage rqueueMessage = event.getJob().getRqueueMessage();
            MessageStatus messageStatus = messageMetadata.getStatus();
            if (MessageStatus.DISCARDED.equals((Object)messageStatus)) {
                ++stat.discarded;
            } else if (MessageStatus.SUCCESSFUL.equals((Object)messageStatus)) {
                ++stat.success;
            } else if (MessageStatus.MOVED_TO_DLQ.equals((Object)messageStatus)) {
                ++stat.movedToDlq;
            }
            if (rqueueMessage.getFailureCount() > 0) {
                ++stat.retried;
            }
            stat.minExecution = Math.min(stat.minExecution, messageMetadata.getTotalExecutionTime());
            stat.maxExecution = Math.max(stat.maxExecution, messageMetadata.getTotalExecutionTime());
            ++stat.jobCount;
            stat.totalExecutionTime += messageMetadata.getTotalExecutionTime();
        }

        private void aggregate(QueueEvents events) {
            List<RqueueExecutionEvent> queueRqueueExecutionEvents = events.rqueueExecutionEvents;
            RqueueExecutionEvent queueRqueueExecutionEvent = queueRqueueExecutionEvents.get(0);
            HashMap<LocalDate, TasksStat> localDateTasksStatMap = new HashMap<LocalDate, TasksStat>();
            for (RqueueExecutionEvent event : queueRqueueExecutionEvents) {
                LocalDate date = DateTimeUtils.localDateFromMilli(queueRqueueExecutionEvent.getTimestamp());
                TasksStat stat = localDateTasksStatMap.getOrDefault(date, new TasksStat());
                this.aggregate(event, stat);
                localDateTasksStatMap.put(date, stat);
            }
            QueueDetail queueDetail = (QueueDetail)queueRqueueExecutionEvent.getSource();
            String queueStatKey = RqueueTaskAggregatorService.this.rqueueConfig.getQueueStatisticsKey(queueDetail.getName());
            QueueStatistics queueStatistics = RqueueTaskAggregatorService.this.rqueueQStatsDao.findById(queueStatKey);
            if (queueStatistics == null) {
                queueStatistics = new QueueStatistics(queueStatKey);
            }
            LocalDate today = DateTimeUtils.today();
            queueStatistics.updateTime();
            for (Map.Entry entry : localDateTasksStatMap.entrySet()) {
                queueStatistics.update((TasksStat)entry.getValue(), ((LocalDate)entry.getKey()).toString());
            }
            queueStatistics.pruneStats(today, RqueueTaskAggregatorService.this.rqueueWebConfig.getHistoryDay());
            RqueueTaskAggregatorService.this.rqueueQStatsDao.save(queueStatistics);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void processEvents(QueueEvents events) {
            List<RqueueExecutionEvent> queueRqueueExecutionEvents = events.rqueueExecutionEvents;
            if (!CollectionUtils.isEmpty(queueRqueueExecutionEvents)) {
                RqueueExecutionEvent queueRqueueExecutionEvent = queueRqueueExecutionEvents.get(0);
                QueueDetail queueDetail = (QueueDetail)queueRqueueExecutionEvent.getSource();
                String queueStatKey = RqueueTaskAggregatorService.this.rqueueConfig.getQueueStatisticsKey(queueDetail.getName());
                String lockKey = RqueueTaskAggregatorService.this.rqueueConfig.getLockKey(queueStatKey);
                boolean locked = false;
                try {
                    RqueueLockManager rqueueLockManager = RqueueTaskAggregatorService.this.rqueueLockManager;
                    RqueueTaskAggregatorService.this.rqueueConfig;
                    if (rqueueLockManager.acquireLock(lockKey, RqueueConfig.getBrokerId(), Duration.ofSeconds(5L))) {
                        locked = true;
                        this.aggregate(events);
                    } else {
                        log.warn("Unable to acquire lock, will retry later");
                        TimeoutUtils.sleep(1000L);
                        RqueueTaskAggregatorService.this.queue.add(events);
                    }
                }
                finally {
                    if (locked) {
                        RqueueLockManager rqueueLockManager = RqueueTaskAggregatorService.this.rqueueLockManager;
                        RqueueTaskAggregatorService.this.rqueueConfig;
                        rqueueLockManager.releaseLock(lockKey, RqueueConfig.getBrokerId());
                    }
                }
            }
        }

        @Override
        public void run() {
            while (RqueueTaskAggregatorService.this.running) {
                QueueEvents events = null;
                try {
                    if (log.isTraceEnabled()) {
                        log.trace("Aggregating queue stats");
                    }
                    if ((events = (QueueEvents)RqueueTaskAggregatorService.this.queue.poll(RqueueTaskAggregatorService.this.rqueueWebConfig.getAggregateShutdownWaitTime() / 2, TimeUnit.MILLISECONDS)) == null) continue;
                    this.processEvents(events);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                catch (Exception e) {
                    if (events != null) {
                        RqueueTaskAggregatorService.this.queue.add(events);
                    }
                    log.error("Error in aggregator job ", (Throwable)e);
                    TimeoutUtils.sleepLog(1000L, false);
                }
            }
        }
    }

    class SweepJob
    implements Runnable {
        SweepJob() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            if (log.isDebugEnabled()) {
                log.debug("Checking pending events.");
            }
            Object object = RqueueTaskAggregatorService.this.aggregatorLock;
            synchronized (object) {
                ArrayList<String> queuesToSweep = new ArrayList<String>();
                for (Map.Entry entry : RqueueTaskAggregatorService.this.queueNameToEvents.entrySet()) {
                    QueueEvents queueEvents = (QueueEvents)entry.getValue();
                    String queueName = (String)entry.getKey();
                    if (!RqueueTaskAggregatorService.this.processingRequired(queueEvents)) continue;
                    RqueueTaskAggregatorService.this.queue.add(queueEvents);
                    queuesToSweep.add(queueName);
                }
                for (String queueName : queuesToSweep) {
                    RqueueTaskAggregatorService.this.queueNameToEvents.remove(queueName);
                }
                RqueueTaskAggregatorService.this.aggregatorLock.notifyAll();
            }
        }
    }
}

