/*
 * Decompiled with CFR 0.152.
 */
package robaho.net.httpserver.http2;

import com.sun.net.httpserver.Headers;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import robaho.net.httpserver.NoSyncBufferedOutputStream;
import robaho.net.httpserver.OptimizedHeaders;
import robaho.net.httpserver.http2.HTTP2Connection;
import robaho.net.httpserver.http2.HTTP2ErrorCode;
import robaho.net.httpserver.http2.HTTP2Exception;
import robaho.net.httpserver.http2.frame.BaseFrame;
import robaho.net.httpserver.http2.frame.DataFrame;
import robaho.net.httpserver.http2.frame.FrameFlag;
import robaho.net.httpserver.http2.frame.FrameHeader;
import robaho.net.httpserver.http2.frame.FrameType;
import robaho.net.httpserver.http2.frame.ResetStreamFrame;
import robaho.net.httpserver.http2.frame.SettingIdentifier;
import robaho.net.httpserver.http2.frame.SettingParameter;
import robaho.net.httpserver.http2.frame.WindowUpdateFrame;
import robaho.net.httpserver.http2.hpack.HPackContext;

public class HTTP2Stream {
    private final int streamId;
    final AtomicLong sendWindow = new AtomicLong(65535L);
    private final AtomicLong receiveWindow = new AtomicLong(65535L);
    private final int initialWindowSize;
    private final HTTP2Connection connection;
    private final System.Logger logger;
    private final OutputStream outputStream;
    private final DataIn dataIn;
    private final HTTP2Connection.StreamHandler handler;
    private final Headers requestHeaders;
    private final Headers responseHeaders = new OptimizedHeaders(16);
    private final AtomicBoolean headersSent = new AtomicBoolean(false);
    private volatile Thread thread;
    private volatile boolean streamOpen = true;
    private volatile boolean halfClosed = false;
    private volatile boolean streamOutputClosed = false;
    private volatile AtomicBoolean handlingRequest = new AtomicBoolean(false);
    private long dataInSize = 0L;

    public HTTP2Stream(int streamId, HTTP2Connection connection, Headers requestHeaders, HTTP2Connection.StreamHandler handler) throws IOException {
        this.streamId = streamId;
        this.connection = connection;
        this.logger = connection.logger;
        this.requestHeaders = requestHeaders;
        this.handler = handler;
        this.dataIn = new DataIn();
        this.outputStream = new NoSyncBufferedOutputStream(new Http2OutputStream(streamId));
        SettingParameter setting = connection.getRemoteSettings().get(SettingIdentifier.SETTINGS_INITIAL_WINDOW_SIZE);
        if (setting != null) {
            this.sendWindow.set((int)setting.value);
        }
        if ((setting = connection.getLocalSettings().get(SettingIdentifier.SETTINGS_INITIAL_WINDOW_SIZE)) != null) {
            this.receiveWindow.set((int)setting.value);
            this.initialWindowSize = (int)setting.value;
        } else {
            this.initialWindowSize = 65535;
        }
        if (this.logger.isLoggable(System.Logger.Level.TRACE)) {
            this.logger.log(System.Logger.Level.TRACE, () -> "new stream, send window size " + this.sendWindow.get() + ", receive window size " + this.receiveWindow.get() + " on stream " + streamId);
        }
    }

    public OutputStream getOutputStream() {
        return this.outputStream;
    }

    public Headers getRequestHeaders() {
        return this.requestHeaders;
    }

    public Headers getResponseHeaders() {
        return this.responseHeaders;
    }

    public String toString() {
        return this.connection.httpConnection.toString() + " stream " + this.streamId;
    }

    public void debug() {
        this.logger.log(System.Logger.Level.INFO, this.connection.toString() + ", stream " + this.streamId + " open " + this.streamOpen + " half closed " + this.halfClosed + ", streamOutputClosed " + this.streamOutputClosed + ", thread " + String.valueOf(this.thread));
        this.logger.log(System.Logger.Level.INFO, this.connection.toString() + ", stream " + this.streamId + " data in size " + this.dataInSize + " expected " + this.expectedSize());
        this.logger.log(System.Logger.Level.INFO, Arrays.toString(this.thread.getStackTrace()));
    }

    public boolean isOpen() {
        return this.streamOpen;
    }

    public boolean isHalfClosed() {
        return this.halfClosed;
    }

    private long expectedSize() {
        if (this.requestHeaders.containsKey("Content-length")) {
            return Long.parseLong(this.requestHeaders.getFirst("Content-length"));
        }
        return -1L;
    }

    public void close() {
        this.streamOpen = false;
        if (this.connection.http2Streams.put(this.streamId, null) == null) {
            return;
        }
        this.logger.log(System.Logger.Level.TRACE, () -> "closing stream " + this.streamId);
        try {
            this.dataIn.close();
            this.outputStream.close();
            if (this.thread != null) {
                this.thread.interrupt();
            }
        }
        catch (IOException e) {
            if (!this.connection.isClosed()) {
                this.connection.close();
                this.logger.log(this.connection.httpConnection.requestCount.get() > 0L ? System.Logger.Level.WARNING : System.Logger.Level.DEBUG, "IOException closing http2 stream", (Throwable)e);
            }
        }
    }

    public void processFrame(BaseFrame frame) throws HTTP2Exception, IOException {
        switch (frame.getHeader().getType()) {
            case HEADERS: 
            case CONTINUATION: {
                if (this.halfClosed) {
                    throw new HTTP2Exception(HTTP2ErrorCode.STREAM_CLOSED);
                }
                this.halfClosed = frame.getHeader().getFlags().contains(FrameFlag.END_STREAM);
                this.performRequest();
                break;
            }
            case DATA: {
                DataFrame dataFrame = (DataFrame)frame;
                this.logger.log(System.Logger.Level.TRACE, () -> "received data frame, length " + dataFrame.body.length + " on stream " + this.streamId);
                if (this.halfClosed) {
                    throw new HTTP2Exception(HTTP2ErrorCode.STREAM_CLOSED);
                }
                if (!this.streamOpen) {
                    throw new HTTP2Exception(HTTP2ErrorCode.PROTOCOL_ERROR);
                }
                this.dataIn.enqueue(dataFrame.body);
                this.dataInSize += (long)dataFrame.body.length;
                if (!dataFrame.getHeader().getFlags().contains(FrameFlag.END_STREAM)) break;
                long expected = this.expectedSize();
                if (expected != -1L && this.dataInSize != expected) {
                    this.connection.sendResetStream(HTTP2ErrorCode.PROTOCOL_ERROR, this.streamId);
                    this.close();
                    break;
                }
                this.halfClosed = true;
                this.dataIn.wakeupReader();
                break;
            }
            case PRIORITY: {
                if (!this.streamOpen) break;
                throw new HTTP2Exception(HTTP2ErrorCode.PROTOCOL_ERROR);
            }
            case RST_STREAM: {
                ResetStreamFrame resetFrame = (ResetStreamFrame)frame;
                this.logger.log(System.Logger.Level.DEBUG, "received reset stream " + String.valueOf((Object)resetFrame.errorCode) + ", on stream " + this.streamId);
                this.halfClosed = true;
                this.close();
                break;
            }
            case WINDOW_UPDATE: {
                int windowSizeIncrement = ((WindowUpdateFrame)frame).getWindowSizeIncrement();
                if (this.sendWindow.addAndGet(windowSizeIncrement) > Integer.MAX_VALUE) {
                    this.connection.sendResetStream(HTTP2ErrorCode.FLOW_CONTROL_ERROR, this.streamId);
                    this.close();
                }
                this.logger.log(System.Logger.Level.DEBUG, "received window update " + windowSizeIncrement + ", new size " + this.sendWindow.get() + ", on stream " + this.streamId);
                break;
            }
        }
    }

    private void performRequest() throws IOException, HTTP2Exception {
        if (!this.handlingRequest.compareAndSet(false, true)) {
            throw new HTTP2Exception(HTTP2ErrorCode.PROTOCOL_ERROR, "already received headers for stream " + this.streamId);
        }
        this.connection.httpConnection.requestCount.incrementAndGet();
        this.connection.requestsInProgress.incrementAndGet();
        this.connection.stats.activeStreams.incrementAndGet();
        InputStream in = this.halfClosed ? InputStream.nullInputStream() : this.dataIn;
        this.handler.getExecutor().execute(() -> {
            this.thread = Thread.currentThread();
            try {
                this.handler.handleStream(this, in, this.outputStream);
            }
            catch (IOException ex) {
                this.logger.log(System.Logger.Level.DEBUG, "io exception on stream " + this.streamId, (Throwable)ex);
                this.close();
            }
        });
    }

    public void writeResponseHeaders(boolean closeStream) throws IOException {
        if (this.headersSent.compareAndSet(false, true)) {
            this.connection.lock();
            try {
                HPackContext.writeHeaderFrame(this.responseHeaders, this.connection.outputStream, this.streamId, closeStream);
                if (closeStream) {
                    this.streamOutputClosed = true;
                }
            }
            finally {
                this.connection.unlock();
            }
        }
    }

    public InetSocketAddress getLocalAddress() {
        return this.connection.getLocalAddress();
    }

    public InetSocketAddress getRemoteAddress() {
        return this.connection.getRemoteAddress();
    }

    private class DataIn
    extends InputStream {
        private final ConcurrentLinkedQueue<byte[]> queue = new ConcurrentLinkedQueue();
        private volatile Thread reader;
        private int offset = 0;
        private final byte[] single = new byte[1];

        void enqueue(byte[] data) {
            this.queue.add(data);
            LockSupport.unpark(this.reader);
        }

        void wakeupReader() {
            LockSupport.unpark(this.reader);
        }

        @Override
        public void close() throws IOException {
            if (Thread.currentThread() == this.reader || this.reader == null) {
                this.readAllBytes();
            } else {
                LockSupport.unpark(this.reader);
            }
        }

        @Override
        public int read() throws IOException {
            int n = this.read(this.single, 0, 1);
            return n == -1 ? -1 : this.single[0] & 0xFF;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public int read(byte[] b, int off, int len) throws IOException {
            int read = 0;
            try {
                this.reader = Thread.currentThread();
                while (len > 0) {
                    byte[] data;
                    while ((data = this.queue.peek()) == null) {
                        int n;
                        if (read > 0) {
                            n = read;
                            return n;
                        }
                        if (HTTP2Stream.this.halfClosed) {
                            n = -1;
                            return n;
                        }
                        LockSupport.park();
                        if (!Thread.interrupted()) continue;
                        throw new IOException("interrupted");
                    }
                    int available = data.length - this.offset;
                    int bytesToRead = Math.min(len, available);
                    System.arraycopy(data, this.offset, b, off, bytesToRead);
                    this.offset += bytesToRead;
                    off += bytesToRead;
                    len -= bytesToRead;
                    read += bytesToRead;
                    if ((available -= bytesToRead) != 0) continue;
                    this.queue.poll();
                    this.offset = 0;
                }
                int n = read;
                return n;
            }
            finally {
                if (HTTP2Stream.this.receiveWindow.addAndGet(-read) < (long)(HTTP2Stream.this.initialWindowSize / 2)) {
                    HTTP2Stream.this.receiveWindow.addAndGet(HTTP2Stream.this.initialWindowSize / 2);
                    HTTP2Stream.this.connection.lock();
                    try {
                        WindowUpdateFrame frame = new WindowUpdateFrame(HTTP2Stream.this.streamId, HTTP2Stream.this.initialWindowSize / 2);
                        frame.writeTo(HTTP2Stream.this.connection.outputStream);
                        HTTP2Stream.this.logger.log(System.Logger.Level.TRACE, () -> "sent stream window update, receive window " + HTTP2Stream.this.receiveWindow.get() + " on stream " + HTTP2Stream.this.streamId);
                    }
                    finally {
                        HTTP2Stream.this.connection.unlock();
                    }
                }
            }
        }
    }

    class Http2OutputStream
    extends OutputStream {
        private static final FrameFlag.FlagSet END_STREAM = FrameFlag.FlagSet.of(FrameFlag.END_STREAM);
        private final int streamId;
        private final int max_frame_size;
        private boolean closed;

        public Http2OutputStream(int streamId) {
            this.streamId = streamId;
            SettingParameter setting = HTTP2Stream.this.connection.getRemoteSettings().get(SettingIdentifier.SETTINGS_MAX_FRAME_SIZE);
            this.max_frame_size = setting != null ? (int)setting.value : 16384;
        }

        @Override
        public void write(int b) throws IOException {
            this.write(new byte[]{(byte)b});
        }

        @Override
        public void write(byte[] b) throws IOException {
            this.write(b, 0, b.length);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void write(byte[] b, int off, int len) throws IOException {
            HTTP2Stream.this.connection.stats.bytesSent.addAndGet(len);
            while (HTTP2Stream.this.sendWindow.get() <= 0L && !HTTP2Stream.this.connection.isClosed()) {
                HTTP2Stream.this.connection.stats.pauses.incrementAndGet();
                LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1L));
            }
            HTTP2Stream.this.writeResponseHeaders(false);
            if (HTTP2Stream.this.streamOutputClosed) {
                throw new IOException("output stream was closed during headers send");
            }
            while (len > 0) {
                int _len = Math.min(Math.min(len, this.max_frame_size), (int)Math.min(HTTP2Stream.this.connection.sendWindow.get(), HTTP2Stream.this.sendWindow.get()));
                if (_len <= 0) {
                    HTTP2Stream.this.connection.stats.pauses.incrementAndGet();
                    HTTP2Stream.this.connection.lock();
                    try {
                        HTTP2Stream.this.connection.stats.flushes.incrementAndGet();
                        HTTP2Stream.this.connection.outputStream.flush();
                    }
                    finally {
                        HTTP2Stream.this.connection.unlock();
                    }
                    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1L));
                    if (HTTP2Stream.this.connection.isClosed()) {
                        throw new IOException("connection closed");
                    }
                    int remaining = len;
                    continue;
                }
                if (HTTP2Stream.this.connection.sendWindow.addAndGet(-_len) < 0L) {
                    HTTP2Stream.this.connection.sendWindow.addAndGet(_len);
                    continue;
                }
                HTTP2Stream.this.connection.lock();
                try {
                    FrameHeader.writeTo(HTTP2Stream.this.connection.outputStream, _len, FrameType.DATA, FrameFlag.NONE, this.streamId);
                    HTTP2Stream.this.connection.outputStream.write(b, off, _len);
                    HTTP2Stream.this.connection.stats.framesSent.incrementAndGet();
                }
                finally {
                    HTTP2Stream.this.connection.unlock();
                }
                off += _len;
                len -= _len;
                HTTP2Stream.this.sendWindow.addAndGet(-_len);
                HTTP2Stream.this.logger.log(System.Logger.Level.TRACE, () -> "sent data frame, length " + _len + ", new send window " + HTTP2Stream.this.sendWindow.get() + " on stream " + this.streamId);
            }
        }

        @Override
        public void flush() throws IOException {
        }

        @Override
        public void close() throws IOException {
            if (this.closed) {
                return;
            }
            try {
                if (HTTP2Stream.this.connection.isClosed()) {
                    if (HTTP2Stream.this.headersSent.compareAndSet(false, true)) {
                        HTTP2Stream.this.logger.log(System.Logger.Level.WARNING, "stream connection is closed and headers not sent on stream " + this.streamId);
                    }
                    return;
                }
                HTTP2Stream.this.writeResponseHeaders(false);
                HTTP2Stream.this.connection.lock();
                boolean lastRequest = HTTP2Stream.this.connection.requestsInProgress.decrementAndGet() == 0;
                try {
                    if (!HTTP2Stream.this.streamOutputClosed) {
                        FrameHeader.writeTo(HTTP2Stream.this.connection.outputStream, 0, FrameType.DATA, END_STREAM, this.streamId);
                        HTTP2Stream.this.connection.stats.framesSent.incrementAndGet();
                    }
                    if (lastRequest) {
                        HTTP2Stream.this.connection.outputStream.flush();
                        HTTP2Stream.this.connection.stats.flushes.incrementAndGet();
                    }
                }
                finally {
                    HTTP2Stream.this.connection.unlock();
                }
                HTTP2Stream.this.dataIn.close();
            }
            finally {
                HTTP2Stream.this.connection.stats.activeStreams.decrementAndGet();
                this.closed = true;
                HTTP2Stream.this.close();
            }
        }
    }
}

