/*
 * Decompiled with CFR 0.152.
 */
package org.cometd.server.websocket.common;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.text.ParseException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicMarkableReference;
import org.cometd.bayeux.Promise;
import org.cometd.bayeux.server.BayeuxContext;
import org.cometd.bayeux.server.ServerMessage;
import org.cometd.bayeux.server.ServerSession;
import org.cometd.bayeux.server.ServerTransport;
import org.cometd.common.AsyncFoldLeft;
import org.cometd.server.AbstractServerTransport;
import org.cometd.server.ServerMessageImpl;
import org.cometd.server.ServerSessionImpl;
import org.cometd.server.websocket.common.AbstractWebSocketTransport;
import org.eclipse.jetty.io.QuietException;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractWebSocketEndPoint {
    private final Logger _logger = LoggerFactory.getLogger(this.getClass());
    private final Flusher flusher = new Flusher();
    private final AtomicBoolean terminate = new AtomicBoolean();
    private final AbstractWebSocketTransport _transport;
    private final BayeuxContext _bayeuxContext;
    private ServerSessionImpl _session;

    protected AbstractWebSocketEndPoint(AbstractWebSocketTransport transport, BayeuxContext context) {
        this._transport = transport;
        this._bayeuxContext = context;
    }

    protected abstract void send(ServerSession var1, String var2, Callback var3);

    public abstract void close(int var1, String var2);

    public void onMessage(String data, Promise<Void> p) {
        Promise promise = Promise.from(arg_0 -> p.succeed(arg_0), failure -> {
            if (this._logger.isDebugEnabled()) {
                this._logger.debug("", failure);
            }
            this.close(1011, failure.toString());
            p.fail(failure);
        });
        try {
            ServerMessage.Mutable[] messages = this._transport.parseMessages(data);
            if (this._logger.isDebugEnabled()) {
                this._logger.debug("Parsed {} messages", (Object)(messages == null ? -1 : messages.length));
            }
            if (messages != null) {
                this.processMessages(messages, (Promise<Void>)promise);
            } else {
                promise.succeed(null);
            }
        }
        catch (ParseException x) {
            this.close(1011, x.toString());
            this._logger.warn("Error parsing JSON: " + data, (Throwable)x);
            promise.succeed(null);
        }
        catch (Throwable x) {
            promise.fail(x);
        }
    }

    public void onClose(int code, String reason) {
        if (this.terminate.compareAndSet(false, true)) {
            ServerSessionImpl session = this._session;
            if (this._logger.isDebugEnabled()) {
                this._logger.debug("Closing {}/{} - {}", new Object[]{code, reason, session});
            }
            this._transport.onClose(code, reason);
        }
    }

    public void onError(Throwable failure) {
        if (this.terminate.compareAndSet(false, true)) {
            if (failure instanceof SocketTimeoutException || failure instanceof TimeoutException) {
                if (this._logger.isDebugEnabled()) {
                    this._logger.debug("WebSocket timeout", failure);
                }
            } else {
                InetSocketAddress address;
                InetSocketAddress inetSocketAddress = address = this._bayeuxContext == null ? null : this._bayeuxContext.getRemoteAddress();
                if (failure instanceof QuietException) {
                    if (this._logger.isDebugEnabled()) {
                        this._logger.debug("WebSocket failure, address: " + address, failure);
                    }
                } else {
                    this._logger.info("WebSocket failure, address: " + address, failure);
                }
            }
        }
    }

    private void processMessages(ServerMessage.Mutable[] messages, Promise<Void> promise) {
        if (messages.length == 0) {
            promise.fail((Throwable)new IOException("bayeux protocol violation"));
        } else {
            ServerSessionImpl session;
            ServerMessage.Mutable m = messages[0];
            if ("/meta/handshake".equals(m.getChannel())) {
                this._session = null;
                session = this._transport.getBayeux().newServerSession();
                session.setAllowMessageDeliveryDuringHandshake(this._transport.isAllowMessageDeliveryDuringHandshake());
            } else {
                session = this._session;
                if (session == null) {
                    if (!this._transport.isRequireHandshakePerConnection()) {
                        session = this._session = (ServerSessionImpl)this._transport.getBayeux().getSession(m.getClientId());
                    }
                } else if (this._transport.getBayeux().getSession(session.getId()) == null) {
                    this._session = null;
                    session = null;
                }
            }
            Context context = new Context(session);
            AsyncFoldLeft.run((Object[])messages, (Object)true, (result, message, loop) -> this.processMessage(messages, context, (ServerMessageImpl)message, (Promise<Boolean>)Promise.from(b -> loop.proceed((Object)(result != false && b != false ? 1 : 0)), arg_0 -> ((AsyncFoldLeft.Loop)loop).fail(arg_0))), (Promise)Promise.from(flush -> {
                if (flush.booleanValue()) {
                    this.flush(context, promise);
                } else {
                    promise.succeed(null);
                }
            }, arg_0 -> promise.fail(arg_0)));
        }
    }

    private void processMessage(ServerMessage.Mutable[] messages, Context context, ServerMessageImpl message, Promise<Boolean> promise) {
        String channel;
        if (this._logger.isDebugEnabled()) {
            this._logger.debug("Processing {}", (Object)message);
        }
        message.setServerTransport((ServerTransport)this._transport);
        message.setBayeuxContext(this._bayeuxContext);
        ServerSessionImpl session = context.session;
        if (session != null) {
            session.setServerTransport((ServerTransport)this._transport);
        }
        if ("/meta/handshake".equals(channel = message.getChannel())) {
            if (messages.length > 1) {
                promise.fail((Throwable)new IOException("protocol violation"));
            } else {
                this.processMetaHandshake(context, (ServerMessage.Mutable)message, promise);
            }
        } else if ("/meta/connect".equals(channel)) {
            this.processMetaConnect(context, (ServerMessage.Mutable)message, (Promise<Boolean>)Promise.from(proceed -> {
                if (proceed.booleanValue()) {
                    this.resume(context, (ServerMessage.Mutable)message, (Promise<Void>)Promise.from(y -> promise.succeed((Object)true), arg_0 -> ((Promise)promise).fail(arg_0)));
                } else {
                    promise.succeed((Object)false);
                }
            }, arg_0 -> promise.fail(arg_0)));
        } else {
            this.processMessage(context, message, promise);
        }
    }

    private void processMetaHandshake(Context context, ServerMessage.Mutable message, Promise<Boolean> promise) {
        ServerSessionImpl session = context.session;
        this._transport.getBayeux().handle(session, message, Promise.from(reply -> this._transport.processReply(session, (ServerMessage.Mutable)reply, Promise.from(r -> {
            if (r != null) {
                context.replies.add(r);
                if (r.isSuccessful()) {
                    this._session = session;
                }
            }
            context.sendQueue = this._transport.allowMessageDeliveryDuringHandshake(session) && r != null && r.isSuccessful();
            context.scheduleExpiration = true;
            promise.succeed((Object)true);
        }, arg_0 -> ((Promise)promise).fail(arg_0))), x -> this.scheduleExpirationAndFail(session, context.metaConnectCycle, promise, (Throwable)x)));
    }

    private void processMetaConnect(Context context, ServerMessage.Mutable message, Promise<Boolean> promise) {
        ServerSessionImpl session = context.session;
        boolean wasConnected = session != null && session.isConnected();
        this._transport.getBayeux().handle(session, message, Promise.from(reply -> {
            boolean proceed = true;
            if (session != null) {
                long timeout;
                boolean maySuspend = !session.shouldSchedule();
                boolean metaConnectDelivery = this.isMetaConnectDeliveryOnly(session);
                if ((maySuspend || !metaConnectDelivery) && reply.isSuccessful() && (timeout = session.calculateTimeout(this._transport.getTimeout())) > 0L && wasConnected && session.isConnected()) {
                    AbstractServerTransport.Scheduler scheduler = this.suspend(context, message, timeout);
                    session.setScheduler(scheduler);
                    proceed = false;
                }
                if (proceed && session.isDisconnected()) {
                    reply.getAdvice(true).put("reconnect", "none");
                }
            }
            promise.succeed((Object)proceed);
        }, x -> this.scheduleExpirationAndFail(session, context.metaConnectCycle, promise, (Throwable)x)));
    }

    private void processMessage(Context context, ServerMessageImpl message, Promise<Boolean> promise) {
        ServerSessionImpl session = context.session;
        this._transport.getBayeux().handle(session, (ServerMessage.Mutable)message, Promise.from(y -> this._transport.processReply(session, message.getAssociated(), Promise.from(reply -> {
            if (reply != null) {
                context.replies.add(reply);
            }
            if (!this.isMetaConnectDeliveryOnly(session)) {
                context.sendQueue = true;
            }
            promise.succeed((Object)true);
        }, arg_0 -> ((Promise)promise).fail(arg_0))), arg_0 -> promise.fail(arg_0)));
    }

    private boolean isMetaConnectDeliveryOnly(ServerSessionImpl session) {
        return this._transport.isMetaConnectDeliveryOnly() || session != null && session.isMetaConnectDeliveryOnly();
    }

    private AbstractServerTransport.Scheduler suspend(Context context, ServerMessage.Mutable message, long timeout) {
        if (this._logger.isDebugEnabled()) {
            this._logger.debug("Suspended {}", (Object)message);
        }
        context.session.notifySuspended((ServerMessage)message, timeout);
        return new WebSocketScheduler(context, message, timeout);
    }

    private void resume(Context context, ServerMessage.Mutable message, Promise<Void> promise) {
        ServerMessage.Mutable reply = message.getAssociated();
        ServerSessionImpl session = context.session;
        if (session != null) {
            Map advice = session.takeAdvice((ServerTransport)this._transport);
            if (advice != null) {
                reply.put((Object)"advice", (Object)advice);
            }
            if (session.isDisconnected()) {
                reply.getAdvice(true).put("reconnect", "none");
            }
        }
        this._transport.processReply(session, reply, Promise.from(r -> {
            if (r != null) {
                context.replies.add(r);
            }
            context.sendQueue = true;
            context.scheduleExpiration = true;
            promise.succeed(null);
        }, x -> this.scheduleExpirationAndFail(session, context.metaConnectCycle, promise, (Throwable)x)));
    }

    private void scheduleExpirationAndFail(ServerSessionImpl session, long metaConnectCycle, Promise<?> promise, Throwable x) {
        this._transport.scheduleExpiration(session, metaConnectCycle);
        promise.fail(x);
    }

    protected void flush(Context context, Promise<Void> promise) {
        List msgs = Collections.emptyList();
        ServerSessionImpl session = context.session;
        if (context.sendQueue && session != null) {
            msgs = session.takeQueue(context.replies);
        }
        if (this._logger.isDebugEnabled()) {
            this._logger.debug("Flushing {}, replies={}, messages={}", new Object[]{session, context.replies, msgs});
        }
        List messages = msgs;
        boolean queued = this.flusher.queue(new Entry(context, messages, Promise.from(y -> {
            promise.succeed(null);
            this.writeComplete(context, messages);
        }, arg_0 -> promise.fail(arg_0))));
        if (queued) {
            this.flusher.iterate();
        }
    }

    protected void writeComplete(Context context, List<ServerMessage> messages) {
    }

    private String toJSON(ServerMessage message) {
        return this._transport.toJSON(message);
    }

    public static class Context {
        private final List<ServerMessage.Mutable> replies = new ArrayList<ServerMessage.Mutable>();
        private final ServerSessionImpl session;
        private boolean sendQueue;
        private boolean scheduleExpiration;
        private long metaConnectCycle;

        private Context(ServerSessionImpl session) {
            this.session = session;
        }
    }

    private class Flusher
    extends IteratingCallback {
        private final Queue<Entry> _entries = new ArrayDeque<Entry>();
        private State _state = State.IDLE;
        private StringBuilder _buffer;
        private Entry _entry;
        private int _messageIndex;
        private int _replyIndex;
        private Throwable _failure;

        private Flusher() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private boolean queue(Entry entry) {
            Throwable failure;
            Flusher flusher = this;
            synchronized (flusher) {
                failure = this._failure;
                if (failure == null) {
                    return this._entries.offer(entry);
                }
            }
            entry.scheduleExpiration();
            entry._promise.fail(failure);
            return false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected IteratingCallback.Action process() {
            block10: while (true) {
                switch (this._state) {
                    case IDLE: {
                        Flusher flusher = this;
                        synchronized (flusher) {
                            this._entry = this._entries.poll();
                        }
                        if (AbstractWebSocketEndPoint.this._logger.isDebugEnabled()) {
                            AbstractWebSocketEndPoint.this._logger.debug("Processing {}", (Object)this._entry);
                        }
                        if (this._entry == null) {
                            return IteratingCallback.Action.IDLE;
                        }
                        this._state = State.HANDSHAKE;
                        this._buffer = new StringBuilder(256);
                        continue block10;
                    }
                    case HANDSHAKE: {
                        ServerMessage.Mutable reply;
                        this._state = State.MESSAGES;
                        List replies = this._entry._context.replies;
                        if (replies.isEmpty() || !"/meta/handshake".equals((reply = (ServerMessage.Mutable)replies.get(0)).getChannel())) continue block10;
                        if (AbstractWebSocketEndPoint.this._logger.isDebugEnabled()) {
                            AbstractWebSocketEndPoint.this._logger.debug("Processing handshake reply {}", (Object)reply);
                        }
                        List queue = this._entry._queue;
                        if (AbstractWebSocketEndPoint.this._transport.allowMessageDeliveryDuringHandshake(AbstractWebSocketEndPoint.this._session) && !queue.isEmpty()) {
                            reply.put((Object)"x-messages", (Object)queue.size());
                        }
                        AbstractWebSocketEndPoint.this._transport.getBayeux().freeze(reply);
                        this._buffer.setLength(0);
                        this._buffer.append("[");
                        this._buffer.append(AbstractWebSocketEndPoint.this.toJSON((ServerMessage)reply));
                        this._buffer.append("]");
                        ++this._replyIndex;
                        AbstractWebSocketEndPoint.this.send((ServerSession)AbstractWebSocketEndPoint.this._session, this._buffer.toString(), (Callback)this);
                        return IteratingCallback.Action.SCHEDULED;
                    }
                    case MESSAGES: {
                        List messages = this._entry._queue;
                        int size = messages.size();
                        if (this._messageIndex < size) {
                            int batchSize = AbstractWebSocketEndPoint.this._transport.getMessagesPerFrame();
                            int n = batchSize = batchSize > 0 ? Math.min(batchSize, size) : size;
                            if (AbstractWebSocketEndPoint.this._logger.isDebugEnabled()) {
                                AbstractWebSocketEndPoint.this._logger.debug("Processing messages, batch size {}: {}", (Object)batchSize, (Object)messages);
                            }
                            this._buffer.setLength(0);
                            this._buffer.append("[");
                            boolean comma = false;
                            int endIndex = Math.min(size, this._messageIndex + batchSize);
                            while (this._messageIndex < endIndex) {
                                ServerMessage message = (ServerMessage)messages.get(this._messageIndex);
                                if (comma) {
                                    this._buffer.append(",");
                                }
                                comma = true;
                                this._buffer.append(AbstractWebSocketEndPoint.this.toJSON(message));
                                ++this._messageIndex;
                            }
                            this._buffer.append("]");
                            AbstractWebSocketEndPoint.this.send((ServerSession)AbstractWebSocketEndPoint.this._session, this._buffer.toString(), (Callback)this);
                            return IteratingCallback.Action.SCHEDULED;
                        }
                        this._entry.scheduleExpiration();
                        this._state = State.REPLIES;
                        continue block10;
                    }
                    case REPLIES: {
                        List replies = this._entry._context.replies;
                        int size = replies.size();
                        if (this._replyIndex < size) {
                            if (AbstractWebSocketEndPoint.this._logger.isDebugEnabled()) {
                                AbstractWebSocketEndPoint.this._logger.debug("Processing replies {}", (Object)replies);
                            }
                            this._buffer.setLength(0);
                            this._buffer.append("[");
                            boolean comma = false;
                            while (this._replyIndex < size) {
                                ServerMessage.Mutable reply = (ServerMessage.Mutable)replies.get(this._replyIndex);
                                AbstractWebSocketEndPoint.this._transport.getBayeux().freeze(reply);
                                if (comma) {
                                    this._buffer.append(",");
                                }
                                comma = true;
                                this._buffer.append(AbstractWebSocketEndPoint.this.toJSON((ServerMessage)reply));
                                ++this._replyIndex;
                            }
                            this._buffer.append("]");
                            AbstractWebSocketEndPoint.this.send((ServerSession)AbstractWebSocketEndPoint.this._session, this._buffer.toString(), (Callback)this);
                            return IteratingCallback.Action.SCHEDULED;
                        }
                        this._state = State.COMPLETE;
                        continue block10;
                    }
                    case COMPLETE: {
                        Entry entry = this._entry;
                        this._state = State.IDLE;
                        this._buffer = null;
                        this._entry = null;
                        this._messageIndex = 0;
                        this._replyIndex = 0;
                        entry._promise.succeed(null);
                        continue block10;
                    }
                }
                break;
            }
            throw new IllegalStateException("Invalid state " + (Object)((Object)this._state));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void onCompleteFailure(Throwable x) {
            Entry entry;
            Flusher flusher = this;
            synchronized (flusher) {
                this._failure = x;
                entry = this._entry;
            }
            if (entry != null) {
                entry.scheduleExpiration();
                entry._promise.fail(x);
            }
        }
    }

    private class WebSocketScheduler
    implements AbstractServerTransport.Scheduler,
    Runnable,
    Promise<Void> {
        private final Context context;
        private final ServerMessage.Mutable message;
        private final AtomicMarkableReference<Scheduler.Task> taskRef;
        private final AtomicBoolean flushing = new AtomicBoolean();

        public WebSocketScheduler(Context context, ServerMessage.Mutable message, long timeout) {
            this.context = context;
            this.message = message;
            this.taskRef = new AtomicMarkableReference<Scheduler.Task>(AbstractWebSocketEndPoint.this._transport.getBayeux().schedule((Runnable)this, timeout), true);
            context.metaConnectCycle = AbstractWebSocketEndPoint.this._transport.newMetaConnectCycle();
        }

        public long getMetaConnectCycle() {
            return this.context.metaConnectCycle;
        }

        public void schedule() {
            ServerSessionImpl session = this.context.session;
            boolean metaConnectDelivery = AbstractWebSocketEndPoint.this.isMetaConnectDeliveryOnly(session);
            if (metaConnectDelivery || session.isTerminated()) {
                if (this.cancelTimeout(false)) {
                    if (AbstractWebSocketEndPoint.this._logger.isDebugEnabled()) {
                        AbstractWebSocketEndPoint.this._logger.debug("Resuming suspended {} for {}", (Object)this.message, (Object)session);
                    }
                    session.notifyResumed((ServerMessage)this.message, false);
                    AbstractWebSocketEndPoint.this.resume(this.context, this.message, (Promise<Void>)this);
                }
            } else if (this.taskRef.isMarked()) {
                Context ctx = new Context(session);
                ctx.sendQueue = true;
                ctx.metaConnectCycle = this.context.metaConnectCycle;
                this.flush(ctx);
            }
        }

        private void flush(Context context) {
            if (this.flushing.compareAndSet(false, true)) {
                this.executeFlush(context, (Promise<Void>)Promise.from(y -> {
                    this.flushing.set(false);
                    if (context.session.hasNonLazyMessages()) {
                        this.flush(context);
                    }
                }, this::fail));
            }
        }

        private void executeFlush(Context context, Promise<Void> promise) {
            AbstractWebSocketEndPoint.this._transport.getBayeux().execute(() -> AbstractWebSocketEndPoint.this.flush(context, promise));
        }

        public void cancel() {
            if (this.cancelTimeout(true)) {
                if (AbstractWebSocketEndPoint.this._logger.isDebugEnabled()) {
                    AbstractWebSocketEndPoint.this._logger.debug("Cancelling suspended {} for {}", (Object)this.message, (Object)this.context.session);
                }
                AbstractWebSocketEndPoint.this._transport.scheduleExpiration(this.context.session, this.context.metaConnectCycle);
            }
        }

        public void destroy() {
            if (this.cancelTimeout(true)) {
                AbstractWebSocketEndPoint.this.close(1000, "Destroy");
            }
        }

        @Override
        public void run() {
            if (this.cancelTimeout(false)) {
                if (AbstractWebSocketEndPoint.this._logger.isDebugEnabled()) {
                    AbstractWebSocketEndPoint.this._logger.debug("Timing out suspended {} for {}", (Object)this.message, (Object)this.context.session);
                }
                this.context.session.notifyResumed((ServerMessage)this.message, true);
                AbstractWebSocketEndPoint.this.resume(this.context, this.message, (Promise<Void>)this);
            }
        }

        private boolean cancelTimeout(boolean disable) {
            boolean enabled;
            Scheduler.Task task;
            while (!this.taskRef.compareAndSet(task = this.taskRef.getReference(), null, enabled = this.taskRef.isMarked(), !disable)) {
            }
            if (task == null) {
                return false;
            }
            task.cancel();
            return true;
        }

        public void succeed(Void result) {
            this.executeFlush(this.context, (Promise<Void>)Promise.from(y -> {}, this::fail));
        }

        public void fail(Throwable failure) {
            AbstractWebSocketEndPoint.this.close(1011, failure.toString());
        }

        public String toString() {
            return String.format("%s@%x[cycle=%d]", this.getClass().getSimpleName(), this.hashCode(), this.getMetaConnectCycle());
        }
    }

    private class Entry {
        private final Context _context;
        private final List<ServerMessage> _queue;
        private final Promise<Void> _promise;

        private Entry(Context context, List<ServerMessage> queue, Promise<Void> promise) {
            this._context = context;
            this._queue = queue;
            this._promise = promise;
        }

        private void scheduleExpiration() {
            if (this._context.scheduleExpiration) {
                AbstractWebSocketEndPoint.this._transport.scheduleExpiration(this._context.session, this._context.metaConnectCycle);
            }
        }

        public String toString() {
            return String.format("%s@%x[messages=%d,replies=%d]", this.getClass().getSimpleName(), this.hashCode(), this._queue.size(), this._context.replies.size());
        }
    }

    private static enum State {
        IDLE,
        HANDSHAKE,
        MESSAGES,
        REPLIES,
        COMPLETE;

    }
}

