/*
 * Decompiled with CFR 0.152.
 */
package com.axibase.collector;

import com.axibase.collector.CountedQueue;
import com.axibase.collector.EventWrapper;
import com.axibase.collector.MessageWriter;
import com.axibase.collector.SendMessageTrigger;
import com.axibase.collector.config.SeriesSenderConfig;
import com.axibase.collector.logback.LogbackEventTrigger;
import java.io.IOException;
import java.nio.channels.WritableByteChannel;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class Aggregator<E> {
    public static final int DEFAULT_CHECK_PERIOD_MS = 333;
    private final Worker worker = new Worker();
    private volatile CountedQueue<E> cache = new CountedQueue();
    private CountedQueue<EventWrapper<E>> singles = new CountedQueue();
    private AtomicInteger stepCounter = new AtomicInteger(0);
    private AtomicLong last = new AtomicLong(System.currentTimeMillis());
    private WritableByteChannel writer;
    private final MessageWriter<E> messageWriter;
    private ExecutorService senderExecutor;
    private volatile CountDownLatch latch = new CountDownLatch(1);
    private SendMessageTrigger<E>[] triggers = null;
    private SeriesSenderConfig seriesSenderConfig = SeriesSenderConfig.DEFAULT;
    private int skippedCount = 0;

    public Aggregator(MessageWriter<E> messageWriter) {
        this.messageWriter = messageWriter;
    }

    public boolean register(E event) throws IOException {
        try {
            this.cache.add(event);
            this.stepCounter.incrementAndGet();
            int count = this.cache.getCount();
            if (count > 0 && count % this.seriesSenderConfig.getCacheFlushThreshold() == 0) {
                this.latch.countDown();
            }
            if (count > this.seriesSenderConfig.getCacheSkipThreshold()) {
                if (++this.skippedCount % this.seriesSenderConfig.getCacheSkipThreshold() == 0) {
                    System.err.println("skipped: " + this.skippedCount + " events");
                }
                this.cache.poll();
            }
            if (this.triggers != null) {
                int lines = 0;
                boolean fire = false;
                for (SendMessageTrigger<E> trigger : this.triggers) {
                    if (!trigger.onEvent(event)) continue;
                    fire = true;
                    int stackTraceLines = trigger.getStackTraceLines();
                    if (stackTraceLines < 0) {
                        lines = Integer.MAX_VALUE;
                        continue;
                    }
                    if (stackTraceLines <= lines) continue;
                    lines = stackTraceLines;
                }
                if (fire) {
                    this.sendSingle(event, lines);
                }
            }
        }
        catch (Throwable t) {
            t.printStackTrace();
            throw new IOException(t);
        }
        return true;
    }

    public void start() {
        this.senderExecutor = Executors.newSingleThreadExecutor();
        this.senderExecutor.execute(this.worker);
    }

    public void stop() {
        if (this.worker != null) {
            this.worker.stop();
        }
        if (this.senderExecutor != null && !this.senderExecutor.isShutdown()) {
            this.senderExecutor.shutdown();
        }
        if (this.writer != null && this.writer.isOpen()) {
            try {
                this.writer.close();
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void checkThresholds() throws IOException {
        int sendThreshold;
        long periodMs;
        long lastTime;
        int cnt = this.stepCounter.get();
        long currentTime = System.currentTimeMillis();
        long dt = currentTime - (lastTime = this.last.get());
        if (dt > (periodMs = this.seriesSenderConfig.getPeriodMs()) && this.last.compareAndSet(lastTime, currentTime)) {
            this.flush(lastTime, currentTime);
            cnt = 0;
        }
        if ((sendThreshold = this.seriesSenderConfig.getSendThreshold()) > 0 && dt > this.seriesSenderConfig.getMinPeriodMs() && cnt > sendThreshold || cnt > this.seriesSenderConfig.getCacheFlushThreshold()) {
            int last;
            while ((last = this.stepCounter.get()) != 0) {
                if (!this.stepCounter.compareAndSet(last, 0)) continue;
                this.flush(lastTime, currentTime);
                break;
            }
        }
        if (!this.singles.isEmpty()) {
            this.messageWriter.writeSingles(this.writer, this.singles);
        }
    }

    protected void flush(long lastTime, long currentTime) throws IOException {
        this.last.set(currentTime);
        this.stepCounter.set(0);
        CountedQueue<E> lastCache = this.cache;
        this.cache = new CountedQueue();
        this.messageWriter.writeStatMessages(this.writer, lastCache, 1L + currentTime - lastTime);
    }

    private void sendSingle(E event, int lines) throws IOException {
        this.singles.add(new EventWrapper<E>(event, lines));
        if (this.singles.getCount() > this.seriesSenderConfig.getMessageSkipThreshold()) {
            this.singles.poll();
        }
    }

    public void setWriter(WritableByteChannel writer) {
        this.writer = writer;
    }

    public void addSendMessageTrigger(LogbackEventTrigger messageTrigger) {
        if (this.triggers == null) {
            this.triggers = new SendMessageTrigger[]{messageTrigger};
        } else {
            int l = this.triggers.length;
            this.triggers = Arrays.copyOf(this.triggers, l + 1);
            this.triggers[l] = messageTrigger;
        }
    }

    public void setSeriesSenderConfig(SeriesSenderConfig seriesSenderConfig) {
        this.seriesSenderConfig = seriesSenderConfig;
    }

    private class Worker
    implements Runnable {
        private volatile boolean stopped = false;

        private Worker() {
        }

        @Override
        public void run() {
            while (!this.stopped) {
                if (Aggregator.this.latch.getCount() == 0L) {
                    Aggregator.this.latch = new CountDownLatch(1);
                }
                try {
                    Aggregator.this.latch.await(333L, TimeUnit.MILLISECONDS);
                    Aggregator.this.checkThresholds();
                }
                catch (IOException e) {
                    e.printStackTrace();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }

        public void stop() {
            this.stopped = true;
        }
    }
}

