/*
 * Decompiled with CFR 0.152.
 */
package com.axibase.collector;

import com.axibase.collector.MessageWriter;
import com.axibase.collector.SendMessageTrigger;
import com.axibase.collector.config.SeriesSenderConfig;
import com.axibase.collector.logback.LogbackEventTrigger;
import java.io.IOException;
import java.nio.channels.WritableByteChannel;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class Aggregator<E> {
    private Collection<E> cache = new ConcurrentLinkedQueue();
    private AtomicInteger stepCounter = new AtomicInteger(0);
    private AtomicLong last = new AtomicLong(System.currentTimeMillis());
    private WritableByteChannel writer;
    private final MessageWriter<E> messageWriter;
    private ScheduledExecutorService scheduledExecutorService;
    private SendMessageTrigger<E>[] triggers = null;
    private SeriesSenderConfig seriesSenderConfig = SeriesSenderConfig.DEFAULT;

    public Aggregator(MessageWriter<E> messageWriter) {
        this.messageWriter = messageWriter;
    }

    public boolean register(E event) throws IOException {
        try {
            this.cache.add(event);
            this.checkThresholds(true);
            if (this.triggers != null) {
                int lines = 0;
                boolean fire = false;
                for (SendMessageTrigger<E> trigger : this.triggers) {
                    if (!trigger.onEvent(event)) continue;
                    fire = true;
                    int stackTraceLines = trigger.getStackTraceLines();
                    if (stackTraceLines < 0) {
                        lines = Integer.MAX_VALUE;
                        continue;
                    }
                    if (stackTraceLines <= lines) continue;
                    lines = stackTraceLines;
                }
                if (fire) {
                    this.sendSingle(event, lines);
                }
            }
        }
        catch (Throwable t) {
            t.printStackTrace();
            throw new IOException(t);
        }
        return true;
    }

    public void start() {
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        long periodMs = this.seriesSenderConfig.getPeriodMs();
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                try {
                    Aggregator.this.checkThresholds(false);
                }
                catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }, periodMs, periodMs, TimeUnit.MILLISECONDS);
    }

    public void stop() {
        if (this.scheduledExecutorService != null && !this.scheduledExecutorService.isShutdown()) {
            this.scheduledExecutorService.shutdown();
        }
    }

    private void checkThresholds(boolean increment) throws IOException {
        int sendThreshold;
        long lastTime;
        int cnt = increment ? this.stepCounter.incrementAndGet() : this.stepCounter.get();
        long currentTime = System.currentTimeMillis();
        long dt = currentTime - (lastTime = this.last.get());
        if (dt > this.seriesSenderConfig.getPeriodMs() && this.last.compareAndSet(lastTime, currentTime)) {
            this.flush(lastTime, currentTime);
            cnt = 0;
        }
        if ((sendThreshold = this.seriesSenderConfig.getSendThreshold()) > 0 && dt > this.seriesSenderConfig.getMinPeriodMs() && cnt > sendThreshold && this.stepCounter.compareAndSet(cnt, 0)) {
            this.flush(lastTime, currentTime);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void flush(long lastTime, long currentTime) throws IOException {
        this.last.set(currentTime);
        this.stepCounter.set(0);
        Collection<E> lastCache = this.cache;
        this.cache = new ConcurrentLinkedQueue();
        Aggregator aggregator = this;
        synchronized (aggregator) {
            this.messageWriter.writeStatMessages(this.writer, lastCache, 1L + currentTime - lastTime);
        }
        lastCache.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendSingle(E event, int lines) throws IOException {
        Aggregator aggregator = this;
        synchronized (aggregator) {
            this.messageWriter.writeSingleMessage(this.writer, event, lines);
        }
    }

    public void setWriter(WritableByteChannel writer) {
        this.writer = writer;
    }

    public void addSendMessageTrigger(LogbackEventTrigger messageTrigger) {
        if (this.triggers == null) {
            this.triggers = new SendMessageTrigger[]{messageTrigger};
        } else {
            int l = this.triggers.length;
            this.triggers = Arrays.copyOf(this.triggers, l + 1);
            this.triggers[l] = messageTrigger;
        }
    }

    public void setSeriesSenderConfig(SeriesSenderConfig seriesSenderConfig) {
        this.seriesSenderConfig = seriesSenderConfig;
    }
}

