/*
 * Decompiled with CFR 0.152.
 */
package com.wavefront.sdk.direct.ingestion;

import com.wavefront.sdk.common.NamedThreadFactory;
import com.wavefront.sdk.common.Pair;
import com.wavefront.sdk.common.Utils;
import com.wavefront.sdk.common.WavefrontSender;
import com.wavefront.sdk.common.annotation.Nullable;
import com.wavefront.sdk.direct.ingestion.DataIngesterAPI;
import com.wavefront.sdk.direct.ingestion.DataIngesterService;
import com.wavefront.sdk.entities.histograms.HistogramGranularity;
import com.wavefront.sdk.entities.tracing.SpanLog;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

public class WavefrontDirectIngestionClient
implements WavefrontSender,
Runnable {
    private static final String DEFAULT_SOURCE = "wavefrontDirectSender";
    private static final Logger logger = Logger.getLogger(WavefrontDirectIngestionClient.class.getCanonicalName());
    private final AtomicInteger failures = new AtomicInteger();
    private final int batchSize;
    private final LinkedBlockingQueue<String> metricsBuffer;
    private final LinkedBlockingQueue<String> histogramsBuffer;
    private final LinkedBlockingQueue<String> tracingSpansBuffer;
    private final DataIngesterAPI directService;
    private final ScheduledExecutorService scheduler;

    private WavefrontDirectIngestionClient(Builder builder) {
        this.batchSize = builder.batchSize;
        this.metricsBuffer = new LinkedBlockingQueue(builder.maxQueueSize);
        this.histogramsBuffer = new LinkedBlockingQueue(builder.maxQueueSize);
        this.tracingSpansBuffer = new LinkedBlockingQueue(builder.maxQueueSize);
        this.directService = new DataIngesterService(builder.server, builder.token);
        this.scheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory(DEFAULT_SOURCE));
        this.scheduler.scheduleAtFixedRate(this, 1L, builder.flushIntervalSeconds, TimeUnit.SECONDS);
    }

    @Override
    public void sendMetric(String name, double value, @Nullable Long timestamp, @Nullable String source, @Nullable Map<String, String> tags) throws IOException {
        String point = Utils.metricToLineData(name, value, timestamp, source, tags, DEFAULT_SOURCE);
        if (!this.metricsBuffer.offer(point)) {
            logger.log(Level.WARNING, "Buffer full, dropping metric point: " + point);
        }
    }

    @Override
    public void sendDistribution(String name, List<Pair<Double, Integer>> centroids, Set<HistogramGranularity> histogramGranularities, @Nullable Long timestamp, @Nullable String source, @Nullable Map<String, String> tags) throws IOException {
        String histograms = Utils.histogramToLineData(name, centroids, histogramGranularities, timestamp, source, tags, DEFAULT_SOURCE);
        if (!this.histogramsBuffer.offer(histograms)) {
            logger.log(Level.WARNING, "Buffer full, dropping histograms: " + histograms);
        }
    }

    @Override
    public void sendSpan(String name, long startMillis, long durationMillis, @Nullable String source, UUID traceId, UUID spanId, @Nullable List<UUID> parents, @Nullable List<UUID> followsFrom, @Nullable List<Pair<String, String>> tags, @Nullable List<SpanLog> spanLogs) throws IOException {
        String span = Utils.tracingSpanToLineData(name, startMillis, durationMillis, source, traceId, spanId, parents, followsFrom, tags, spanLogs, DEFAULT_SOURCE);
        if (!this.tracingSpansBuffer.offer(span)) {
            logger.log(Level.WARNING, "Buffer full, dropping span: " + span);
        }
    }

    @Override
    public void run() {
        try {
            this.flush();
        }
        catch (Throwable ex) {
            logger.log(Level.WARNING, "Unable to report to Wavefront cluster", ex);
        }
    }

    @Override
    public void flush() throws IOException {
        this.internalFlush(this.metricsBuffer, "wavefront");
        this.internalFlush(this.histogramsBuffer, "histogram");
        this.internalFlush(this.tracingSpansBuffer, "trace");
    }

    private void internalFlush(LinkedBlockingQueue<String> buffer, String format) throws IOException {
        List<String> batch = this.getBatch(buffer);
        if (batch.isEmpty()) {
            return;
        }
        try (InputStream is = this.batchToStream(batch);){
            int statusCode = this.directService.report(format, is);
            if (400 <= statusCode && statusCode <= 599) {
                logger.log(Level.WARNING, "Error reporting points, respStatus=" + statusCode);
                try {
                    buffer.addAll(batch);
                }
                catch (Exception ex) {
                    logger.log(Level.WARNING, "Buffer full, dropping attempted points");
                }
            }
        }
        catch (IOException ex) {
            this.failures.incrementAndGet();
            throw ex;
        }
    }

    private List<String> getBatch(LinkedBlockingQueue<String> buffer) {
        int blockSize = Math.min(buffer.size(), this.batchSize);
        ArrayList<String> points = new ArrayList<String>(blockSize);
        buffer.drainTo(points, blockSize);
        return points;
    }

    private InputStream batchToStream(List<String> batch) {
        StringBuilder sb = new StringBuilder();
        for (String item : batch) {
            sb.append(item);
        }
        return new ByteArrayInputStream(sb.toString().getBytes());
    }

    @Override
    public int getFailureCount() {
        return this.failures.get();
    }

    @Override
    public synchronized void close() {
        try {
            this.flush();
        }
        catch (IOException e) {
            logger.log(Level.WARNING, "error flushing buffer", e);
        }
        try {
            this.scheduler.shutdownNow();
        }
        catch (SecurityException ex) {
            logger.log(Level.WARNING, "shutdown error", ex);
        }
    }

    public static class Builder {
        private final String server;
        private final String token;
        private int maxQueueSize = 50000;
        private int batchSize = 10000;
        private int flushIntervalSeconds = 1;

        public Builder(String server, String token) {
            this.server = server;
            this.token = token;
        }

        public Builder maxQueueSize(int maxQueueSize) {
            this.maxQueueSize = maxQueueSize;
            return this;
        }

        public Builder batchSize(int batchSize) {
            this.batchSize = batchSize;
            return this;
        }

        public Builder flushIntervalSeconds(int flushIntervalSeconds) {
            this.flushIntervalSeconds = flushIntervalSeconds;
            return this;
        }

        public WavefrontDirectIngestionClient build() {
            return new WavefrontDirectIngestionClient(this);
        }
    }
}

