/*
 * Decompiled with CFR 0.152.
 */
package com.axibase.collector;

import com.axibase.collector.MessageWriter;
import java.io.IOException;
import java.nio.channels.WritableByteChannel;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class Aggregator<E> {
    public static final long MILLIS = 1000L;
    public static final long DEFAULT_PERIOD_SEC = 60000L;
    private Collection<E> cache = new ConcurrentLinkedQueue();
    private AtomicInteger stepCounter = new AtomicInteger(0);
    private volatile long totalCounter;
    private AtomicLong last = new AtomicLong(System.currentTimeMillis());
    private int zeroRepeats;
    private WritableByteChannel writer;
    private final MessageWriter<E> messageWriter;
    private int sendEvery;
    private long periodMs = 60000L;
    private int sendThreshold;
    private ScheduledExecutorService scheduledExecutorService;

    public Aggregator(MessageWriter<E> messageWriter) {
        this.messageWriter = messageWriter;
    }

    public boolean register(E event) throws IOException {
        try {
            ++this.totalCounter;
            this.cache.add(event);
            this.checkThresholds(true);
            if (this.totalCounter > 0L && this.sendEvery > 0 && this.totalCounter % (long)this.sendEvery == 0L) {
                this.sendSingle(event);
            }
        }
        catch (Throwable t) {
            t.printStackTrace();
            throw new IOException(t);
        }
        return true;
    }

    public void start() {
        if (this.zeroRepeats > 0) {
            this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

                @Override
                public void run() {
                    try {
                        Aggregator.this.checkThresholds(false);
                    }
                    catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }, this.periodMs, this.periodMs, TimeUnit.MILLISECONDS);
        }
    }

    public void stop() {
        if (this.scheduledExecutorService != null && !this.scheduledExecutorService.isShutdown()) {
            this.scheduledExecutorService.shutdown();
        }
    }

    private void checkThresholds(boolean increment) throws IOException {
        long lastTime;
        int cnt = increment ? this.stepCounter.incrementAndGet() : this.stepCounter.get();
        long current = System.currentTimeMillis();
        if (current - (lastTime = this.last.get()) > this.periodMs && this.last.compareAndSet(lastTime, current)) {
            this.flush(current);
            cnt = 0;
        }
        if (this.sendThreshold > 0 && cnt > this.sendThreshold && this.stepCounter.compareAndSet(cnt, 0)) {
            this.flush(current);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void flush(long current) throws IOException {
        this.last.set(current);
        this.stepCounter.set(0);
        Collection<E> lastCache = this.cache;
        this.cache = new ConcurrentLinkedQueue();
        Aggregator aggregator = this;
        synchronized (aggregator) {
            this.messageWriter.writeStatMessages(this.writer, lastCache);
        }
        lastCache.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendSingle(E event) throws IOException {
        Aggregator aggregator = this;
        synchronized (aggregator) {
            this.messageWriter.writeSingleMessage(this.writer, event);
        }
    }

    public void setZeroRepeats(int zeroRepeats) {
        this.zeroRepeats = zeroRepeats;
    }

    public void setWriter(WritableByteChannel writer) {
        this.writer = writer;
    }

    public void setSendThreshold(int sendThreshold) {
        this.sendThreshold = sendThreshold;
    }

    public void setPeriodSec(int periodSec) {
        if (periodSec < 1) {
            throw new IllegalArgumentException("Period value must by more than 0, currently " + periodSec);
        }
        this.periodMs = (long)periodSec * 1000L;
    }

    public void setSendEvery(int sendEvery) {
        this.sendEvery = sendEvery;
    }

    public String toString() {
        return "Aggregator{counter=" + this.stepCounter + ", zeroRepeats=" + this.zeroRepeats + ", writer=" + this.writer + ", sendEvery=" + this.sendEvery + ", sendThreshold=" + this.sendThreshold + ", periodMs=" + this.periodMs + '}';
    }
}

