/*
 * Decompiled with CFR 0.152.
 */
package robaho.net.httpserver.websockets;

import com.sun.net.httpserver.HttpExchange;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import robaho.net.httpserver.websockets.CloseCode;
import robaho.net.httpserver.websockets.WebSocket;
import robaho.net.httpserver.websockets.WebSocketFrame;

public abstract class BufferedWebSocket
extends WebSocket {
    private static ExecutorService executor;
    private final LinkedBlockingQueue<WebSocketFrame> queue = new LinkedBlockingQueue();
    private final AtomicInteger pending = new AtomicInteger();
    private final Future<?> future;

    private static synchronized ExecutorService getExecutor() {
        if (executor == null) {
            executor = Executors.newCachedThreadPool();
        }
        return executor;
    }

    public static synchronized void setExecutor(ExecutorService executor) {
        if (executor != null) {
            throw new IllegalStateException("executor is already set");
        }
        BufferedWebSocket.executor = executor;
    }

    public BufferedWebSocket(HttpExchange exchange) {
        this(exchange, 0);
    }

    public BufferedWebSocket(HttpExchange exchange, int kernelBufferSizeBytes) {
        super(exchange);
        if (kernelBufferSizeBytes != 0) {
            exchange.setAttribute("__SOCKET_WRITE_BUFFER", kernelBufferSizeBytes);
        }
        this.future = BufferedWebSocket.getExecutor().submit(() -> this.sendFrames());
    }

    @Override
    void doClose(CloseCode code, String reason, boolean initiatedByRemote) {
        this.future.cancel(true);
        super.doClose(code, reason, initiatedByRemote);
    }

    private void sendFrames() {
        try {
            while (true) {
                WebSocketFrame frame = this.queue.take();
                super.sendFrame(frame);
                this.pending.decrementAndGet();
            }
        }
        catch (IOException ex) {
            this.onException(ex);
            this.doClose(CloseCode.InvalidFramePayloadData, ex.toString(), false);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    @Override
    public void sendFrame(WebSocketFrame frame) throws IOException {
        this.pending.incrementAndGet();
        this.queue.add(frame);
    }

    public int pending() {
        return this.pending.get();
    }
}

