/*
 * Decompiled with CFR 0.152.
 */
package com.axibase.collector.writer;

import com.axibase.collector.CountedQueue;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.xml.bind.DatatypeConverter;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.StatusLine;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.config.ConnectionConfig;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.entity.BufferedHttpEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.BasicHttpClientConnectionManager;

public class HttpStreamingAtsdWriter
implements WritableByteChannel {
    public static final int DEFAULT_SKIP_DATA_THRESHOLD = 100000;
    public static final int MIN_RECONNECTION_TIME = 30000;
    private int skipDataThreshold = 100000;
    private String url;
    private String username;
    private String password;
    private CountedQueue<ByteBuffer> data = new CountedQueue();
    private StreamingWorker streamingWorker;
    private ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
    private long lastConnectionTryTime = 0L;

    public void setUrl(String url) {
        this.url = url;
    }

    public void setSkipDataThreshold(int skipDataThreshold) {
        this.skipDataThreshold = skipDataThreshold;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    @Override
    public int write(ByteBuffer src) throws IOException {
        if (!this.isConnected()) {
            this.connect();
        }
        if (this.streamingWorker != null) {
            this.data.add(src);
            if (this.data.getCount() > this.skipDataThreshold) {
                this.data.poll();
            }
            return src.remaining();
        }
        return 0;
    }

    private void connect() {
        if (System.currentTimeMillis() - this.lastConnectionTryTime < 30000L) {
            return;
        }
        this.lastConnectionTryTime = System.currentTimeMillis();
        CountDownLatch latch = new CountDownLatch(1);
        this.streamingWorker = new StreamingWorker(this.data, latch, this.url);
        this.streamingWorker.setCredentials(this.username, this.password);
        this.singleThreadExecutor.execute(this.streamingWorker);
        try {
            if (!latch.await(5000L, TimeUnit.MILLISECONDS)) {
                this.streamingWorker.stop();
                this.streamingWorker = null;
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @Override
    public boolean isOpen() {
        return this.streamingWorker != null && !this.streamingWorker.isStopped();
    }

    @Override
    public void close() throws IOException {
        if (this.streamingWorker != null) {
            this.streamingWorker.stop();
        }
        if (this.singleThreadExecutor != null) {
            this.singleThreadExecutor.shutdown();
        }
    }

    public boolean isConnected() {
        return this.streamingWorker != null && !this.streamingWorker.isStopped();
    }

    private static class StreamingWorker
    implements HttpEntity,
    Runnable {
        public static final int PING_TIMEOUT_MS = 5000;
        public static final int BUFFER_SIZE = 64;
        private static final int TIMEOUT_MS = 5000;
        private volatile boolean stopped = false;
        private CountedQueue<ByteBuffer> data;
        private CountDownLatch latch;
        private String url;
        private BasicHttpClientConnectionManager connectionManager;
        private CloseableHttpClient httpClient;
        private String username;
        private String password;
        private long lastCommandTime = System.currentTimeMillis();

        public StreamingWorker(CountedQueue<ByteBuffer> data, CountDownLatch latch, String url) {
            this.data = data;
            this.latch = latch;
            this.url = url;
        }

        public boolean isRepeatable() {
            return false;
        }

        public boolean isChunked() {
            return false;
        }

        public long getContentLength() {
            return -1L;
        }

        public Header getContentType() {
            return null;
        }

        public Header getContentEncoding() {
            return null;
        }

        public InputStream getContent() throws IOException, IllegalStateException {
            return null;
        }

        public void writeTo(OutputStream outputStream) throws IOException {
            while (!this.stopped) {
                ByteBuffer buffer;
                if (this.latch.getCount() > 0L) {
                    this.latch.countDown();
                }
                int cnt = 0;
                while ((buffer = this.data.poll()) != null) {
                    ++cnt;
                    byte[] data = new byte[buffer.remaining()];
                    buffer.get(data);
                    outputStream.write(data);
                }
                if (cnt > 0) {
                    this.data.clearCount();
                    outputStream.flush();
                    this.lastCommandTime = System.currentTimeMillis();
                } else if (System.currentTimeMillis() - this.lastCommandTime > 5000L) {
                    outputStream.write("\nping\n".getBytes());
                    outputStream.flush();
                    this.lastCommandTime = System.currentTimeMillis();
                }
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }

        public boolean isStreaming() {
            return true;
        }

        public void consumeContent() throws IOException {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            System.out.println("Creating http client to send commands to URL: " + this.url);
            this.connectionManager = new BasicHttpClientConnectionManager();
            ConnectionConfig connectionConfig = ConnectionConfig.custom().setBufferSize(64).build();
            this.connectionManager.setConnectionConfig(connectionConfig);
            this.httpClient = HttpClients.custom().setConnectionManager((HttpClientConnectionManager)this.connectionManager).build();
            try {
                this.checkConfiguration();
                HttpPost httpPost = this.createRequest();
                httpPost.setEntity((HttpEntity)new BufferedHttpEntity((HttpEntity)this));
                CloseableHttpResponse response = this.httpClient.execute((HttpUriRequest)httpPost);
                StatusLine statusLine = response.getStatusLine();
                if (statusLine.getStatusCode() != 200) {
                    System.err.println("HTTP: " + statusLine.getStatusCode() + ", " + statusLine.getReasonPhrase());
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
            finally {
                this.stop();
            }
        }

        private void checkConfiguration() throws IOException {
            HttpPost httpPost = this.createRequest();
            httpPost.setEntity((HttpEntity)new StringEntity("ping\n"));
            CloseableHttpResponse response = this.httpClient.execute((HttpUriRequest)httpPost);
            StatusLine statusLine = response.getStatusLine();
            if (statusLine.getStatusCode() != 200) {
                System.err.println("HTTP: " + statusLine.getStatusCode() + ", " + statusLine.getReasonPhrase());
                throw new IllegalStateException("Could not connect to URL: " + this.url + ", reason: " + statusLine.getReasonPhrase());
            }
            InputStream content = response.getEntity().getContent();
            IOUtils.closeQuietly((InputStream)content);
        }

        private HttpPost createRequest() {
            HttpPost httpPost = new HttpPost(this.url);
            RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(5000).setConnectTimeout(5000).setConnectionRequestTimeout(5000).build();
            httpPost.setConfig(requestConfig);
            if (StringUtils.isNotEmpty((CharSequence)this.username)) {
                httpPost.setHeader("Authorization", "Basic " + DatatypeConverter.printBase64Binary((byte[])(this.username + ":" + this.password).getBytes()));
            }
            return httpPost;
        }

        public void stop() {
            this.stopped = true;
            try {
                if (this.httpClient != null) {
                    this.httpClient.close();
                }
                this.connectionManager.close();
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        }

        public boolean isStopped() {
            return this.stopped;
        }

        public void setCredentials(String username, String password) {
            this.username = username;
            this.password = password;
        }
    }
}

