/*
 * Decompiled with CFR 0.152.
 */
package org.cometd.websocket.server.common;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.text.ParseException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.TimeoutException;
import org.cometd.bayeux.Promise;
import org.cometd.bayeux.server.BayeuxContext;
import org.cometd.bayeux.server.ServerMessage;
import org.cometd.bayeux.server.ServerSession;
import org.cometd.bayeux.server.ServerTransport;
import org.cometd.common.AsyncFoldLeft;
import org.cometd.server.AbstractServerTransport;
import org.cometd.server.ServerMessageImpl;
import org.cometd.server.ServerSessionImpl;
import org.cometd.websocket.server.common.AbstractWebSocketTransport;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractWebSocketEndPoint {
    private final Logger _logger = LoggerFactory.getLogger(this.getClass());
    private final Flusher flusher = new Flusher();
    private final AbstractWebSocketTransport _transport;
    private final BayeuxContext _bayeuxContext;
    private ServerSessionImpl _session;

    protected AbstractWebSocketEndPoint(AbstractWebSocketTransport transport, BayeuxContext context) {
        this._transport = transport;
        this._bayeuxContext = context;
    }

    protected abstract void send(ServerSession var1, String var2, Callback var3);

    public abstract void close(int var1, String var2);

    public void onMessage(String data, Promise<Void> p) {
        Promise promise = Promise.from(arg_0 -> p.succeed(arg_0), failure -> {
            if (this._logger.isDebugEnabled()) {
                this._logger.debug("", failure);
            }
            this.close(1011, failure.toString());
            p.fail(failure);
        });
        try {
            ServerMessage.Mutable[] messages = this._transport.parseMessages(data);
            if (this._logger.isDebugEnabled()) {
                this._logger.debug("Parsed {} messages", (Object)(messages == null ? -1 : messages.length));
            }
            if (messages != null) {
                this.processMessages(messages, (Promise<Void>)promise);
            } else {
                promise.succeed(null);
            }
        }
        catch (ParseException x) {
            this.close(1011, x.toString());
            this._logger.warn("Error parsing JSON: " + data, (Throwable)x);
            promise.succeed(null);
        }
        catch (Throwable x) {
            promise.fail(x);
        }
    }

    public void onClose(int code, String reason) {
        ServerSessionImpl session = this._session;
        if (this._logger.isDebugEnabled()) {
            this._logger.debug("Closing {}/{} - {}", new Object[]{code, reason, session});
        }
        if (session != null) {
            this._session = null;
            session.setScheduler(null);
            session.scheduleExpiration(this._transport.getInterval());
        }
        this._transport.onClose(code, reason);
    }

    public void onError(Throwable failure) {
        if (failure instanceof SocketTimeoutException || failure instanceof TimeoutException) {
            if (this._logger.isDebugEnabled()) {
                this._logger.debug("WebSocket Timeout", failure);
            }
        } else {
            InetSocketAddress address = this._bayeuxContext == null ? null : this._bayeuxContext.getRemoteAddress();
            this._logger.info("WebSocket Error, Address: " + address, failure);
        }
    }

    private void processMessages(ServerMessage.Mutable[] messages, Promise<Void> promise) {
        if (messages.length == 0) {
            promise.fail((Throwable)new IOException("bayeux protocol violation"));
        } else {
            ServerSessionImpl session;
            ServerMessage.Mutable m = messages[0];
            if ("/meta/handshake".equals(m.getChannel())) {
                this._session = null;
                session = this._transport.getBayeux().newServerSession();
                session.setAllowMessageDeliveryDuringHandshake(this._transport.isAllowMessageDeliveryDuringHandshake());
            } else {
                session = this._session;
                if (session == null) {
                    if (!this._transport.isRequireHandshakePerConnection()) {
                        session = this._session = (ServerSessionImpl)this._transport.getBayeux().getSession(m.getClientId());
                    }
                } else if (this._transport.getBayeux().getSession(session.getId()) == null) {
                    this._session = null;
                    session = null;
                }
            }
            Context context = new Context(session);
            AsyncFoldLeft.run((Object[])messages, (Object)true, (result, message, loop) -> this.processMessage(messages, context, (ServerMessageImpl)message, (Promise<Boolean>)Promise.from(b -> loop.proceed((Object)(result != false && b != false ? 1 : 0)), arg_0 -> ((AsyncFoldLeft.Loop)loop).fail(arg_0))), (Promise)Promise.from(flush -> {
                if (flush.booleanValue()) {
                    this.flush(context, promise);
                } else {
                    promise.succeed(null);
                }
            }, arg_0 -> promise.fail(arg_0)));
        }
    }

    private void processMessage(ServerMessage.Mutable[] messages, Context context, ServerMessageImpl message, Promise<Boolean> promise) {
        if (this._logger.isDebugEnabled()) {
            this._logger.debug("Processing {}", (Object)message);
        }
        message.setServerTransport((ServerTransport)this._transport);
        message.setBayeuxContext(this._bayeuxContext);
        ServerSessionImpl session = context.session;
        if (session != null) {
            session.setServerTransport((ServerTransport)this._transport);
        }
        switch (message.getChannel()) {
            case "/meta/handshake": {
                if (messages.length > 1) {
                    promise.fail((Throwable)new IOException("protocol violation"));
                    break;
                }
                this.processMetaHandshake(context, (ServerMessage.Mutable)message, promise);
                break;
            }
            case "/meta/connect": {
                this.processMetaConnect(context, (ServerMessage.Mutable)message, (Promise<Boolean>)Promise.from(proceed -> {
                    if (proceed.booleanValue()) {
                        this.resume(context, (ServerMessage.Mutable)message, (Promise<Void>)Promise.from(y -> promise.succeed((Object)true), arg_0 -> ((Promise)promise).fail(arg_0)));
                    } else {
                        promise.succeed((Object)false);
                    }
                }, arg_0 -> promise.fail(arg_0)));
                break;
            }
            default: {
                this.processMessage(context, message, promise);
            }
        }
    }

    private void processMetaHandshake(Context context, ServerMessage.Mutable message, Promise<Boolean> promise) {
        ServerSessionImpl session = context.session;
        this._transport.getBayeux().handle(session, message, Promise.from(reply -> this._transport.processReply(session, (ServerMessage.Mutable)reply, Promise.from(r -> {
            if (r != null) {
                context.replies.add(r);
                if (r.isSuccessful()) {
                    this._session = session;
                }
            }
            context.sendQueue = this._transport.allowMessageDeliveryDuringHandshake(session) && r != null && r.isSuccessful();
            context.scheduleExpiration = true;
            promise.succeed((Object)true);
        }, arg_0 -> ((Promise)promise).fail(arg_0))), arg_0 -> promise.fail(arg_0)));
    }

    private void processMetaConnect(Context context, ServerMessage.Mutable message, Promise<Boolean> promise) {
        ServerSessionImpl session = context.session;
        boolean wasConnected = session != null && session.isConnected();
        this._transport.getBayeux().handle(session, message, Promise.from(reply -> {
            boolean proceed = true;
            if (session != null) {
                long timeout;
                boolean metaConnectDelivery;
                boolean maySuspend = !session.shouldSchedule();
                boolean bl = metaConnectDelivery = this._transport.isMetaConnectDeliveryOnly() || session.isMetaConnectDeliveryOnly();
                if ((maySuspend || !metaConnectDelivery) && reply.isSuccessful() && (timeout = session.calculateTimeout(this._transport.getTimeout())) > 0L && wasConnected && session.isConnected()) {
                    AbstractServerTransport.Scheduler scheduler = this.suspend(context, message, timeout);
                    session.setScheduler(scheduler);
                    proceed = false;
                }
                if (proceed && session.isDisconnected()) {
                    reply.getAdvice(true).put("reconnect", "none");
                }
            }
            promise.succeed((Object)proceed);
        }, arg_0 -> promise.fail(arg_0)));
    }

    private void processMessage(Context context, ServerMessageImpl message, Promise<Boolean> promise) {
        ServerSessionImpl session = context.session;
        this._transport.getBayeux().handle(session, (ServerMessage.Mutable)message, Promise.from(y -> this._transport.processReply(session, message.getAssociated(), Promise.from(reply -> {
            if (reply != null) {
                context.replies.add(reply);
            }
            promise.succeed((Object)true);
        }, arg_0 -> ((Promise)promise).fail(arg_0))), arg_0 -> promise.fail(arg_0)));
    }

    private AbstractServerTransport.Scheduler suspend(Context context, ServerMessage.Mutable message, long timeout) {
        if (this._logger.isDebugEnabled()) {
            this._logger.debug("Suspended {}", (Object)message);
        }
        context.session.notifySuspended((ServerMessage)message, timeout);
        return new WebSocketScheduler(context, message, timeout);
    }

    private void resume(Context context, ServerMessage.Mutable message, Promise<Void> promise) {
        ServerMessage.Mutable reply = message.getAssociated();
        ServerSessionImpl session = context.session;
        if (session != null && session.isTerminated()) {
            reply.getAdvice(true).put("reconnect", "none");
        }
        this._transport.processReply(session, reply, Promise.from(r -> {
            if (r != null) {
                context.replies.add(r);
            }
            context.sendQueue = true;
            context.scheduleExpiration = true;
            promise.succeed(null);
        }, arg_0 -> promise.fail(arg_0)));
    }

    protected void send(List<? extends ServerMessage> messages, int batchSize, Callback callback) {
        if (messages.isEmpty()) {
            callback.succeeded();
            return;
        }
        int size = messages.size();
        int batch = Math.min(batchSize, size);
        int capacity = batch * 4 * 48;
        StringBuilder builder = new StringBuilder(capacity);
        builder.append("[");
        if (batch == 1) {
            ServerMessage serverMessage = messages.remove(0);
            builder.append(this.toJSON(serverMessage));
        } else {
            boolean comma = false;
            for (int b = 0; b < batch; ++b) {
                ServerMessage serverMessage = messages.get(b);
                if (comma) {
                    builder.append(",");
                }
                comma = true;
                builder.append(this.toJSON(serverMessage));
            }
            if (batch == size) {
                messages.clear();
            } else {
                messages.subList(0, batch).clear();
            }
        }
        builder.append("]");
        this.send((ServerSession)this._session, builder.toString(), callback);
    }

    protected void flush(Context context, Promise<Void> promise) {
        boolean queued;
        List queue = Collections.emptyList();
        ServerSessionImpl session = context.session;
        if (context.sendQueue && session != null) {
            queue = session.takeQueue();
        }
        if (this._logger.isDebugEnabled()) {
            this._logger.debug("Flushing {}, replies={}, messages={}", new Object[]{session, context.replies, queue});
        }
        if (queued = this.flusher.queue(new Entry(context, queue, promise))) {
            this.flusher.iterate();
        }
    }

    private String toJSON(ServerMessage message) {
        return this._transport.toJSON(message);
    }

    protected static class Context {
        private final List<ServerMessage.Mutable> replies = new ArrayList<ServerMessage.Mutable>();
        private final ServerSessionImpl session;
        private boolean sendQueue;
        private boolean scheduleExpiration;

        private Context(ServerSessionImpl session) {
            this.session = session;
        }
    }

    private class Entry {
        private final Context _context;
        private final List<ServerMessage> _queue;
        private final Promise<Void> _promise;

        private Entry(Context context, List<ServerMessage> queue, Promise<Void> promise) {
            this._context = context;
            this._queue = queue;
            this._promise = promise;
        }

        private void scheduleExpiration() {
            ServerSessionImpl session;
            if (this._context.scheduleExpiration && (session = this._context.session) != null) {
                session.scheduleExpiration(AbstractWebSocketEndPoint.this._transport.getInterval());
            }
        }

        public String toString() {
            return String.format("%s@%x[messages=%d,replies=%d]", this.getClass().getSimpleName(), this.hashCode(), this._queue.size(), this._context.replies.size());
        }
    }

    private class Flusher
    extends IteratingCallback {
        private final Queue<Entry> _entries = new ArrayDeque<Entry>();
        private Entry _entry;
        private Throwable _failure;

        private Flusher() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private boolean queue(Entry entry) {
            Throwable failure;
            Flusher flusher = this;
            synchronized (flusher) {
                failure = this._failure;
                if (failure == null) {
                    return this._entries.offer(entry);
                }
            }
            entry.scheduleExpiration();
            entry._promise.fail(failure);
            return false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected IteratingCallback.Action process() {
            ServerMessage.Mutable reply;
            Entry entry;
            Flusher flusher = this;
            synchronized (flusher) {
                entry = this._entry = this._entries.peek();
            }
            if (AbstractWebSocketEndPoint.this._logger.isDebugEnabled()) {
                AbstractWebSocketEndPoint.this._logger.debug("Processing {}", (Object)entry);
            }
            if (entry == null) {
                return IteratingCallback.Action.IDLE;
            }
            List replies = entry._context.replies;
            List queue = entry._queue;
            if (replies.size() > 0 && "/meta/handshake".equals((reply = (ServerMessage.Mutable)replies.get(0)).getChannel())) {
                if (AbstractWebSocketEndPoint.this._transport.allowMessageDeliveryDuringHandshake(AbstractWebSocketEndPoint.this._session) && !queue.isEmpty()) {
                    reply.put((Object)"x-messages", (Object)queue.size());
                }
                AbstractWebSocketEndPoint.this._transport.getBayeux().freeze(reply);
                AbstractWebSocketEndPoint.this.send(replies, 1, (Callback)this);
                return IteratingCallback.Action.SCHEDULED;
            }
            if (!queue.isEmpty()) {
                int batchSize;
                int size = queue.size();
                int messagesPerFrame = AbstractWebSocketEndPoint.this._transport.getMessagesPerFrame();
                int n = batchSize = messagesPerFrame > 0 ? Math.min(messagesPerFrame, size) : size;
                if (AbstractWebSocketEndPoint.this._logger.isDebugEnabled()) {
                    AbstractWebSocketEndPoint.this._logger.debug("Processing queue, batch size {}: {}", (Object)batchSize, (Object)queue);
                }
                AbstractWebSocketEndPoint.this.send(queue, batchSize, (Callback)this);
                return IteratingCallback.Action.SCHEDULED;
            }
            Flusher flusher2 = this;
            synchronized (flusher2) {
                this._entries.poll();
            }
            entry.scheduleExpiration();
            if (AbstractWebSocketEndPoint.this._logger.isDebugEnabled()) {
                AbstractWebSocketEndPoint.this._logger.debug("Processing replies {}", (Object)replies);
            }
            for (ServerMessage.Mutable reply2 : replies) {
                AbstractWebSocketEndPoint.this._transport.getBayeux().freeze(reply2);
            }
            AbstractWebSocketEndPoint.this.send(replies, replies.size(), (Callback)this);
            return IteratingCallback.Action.SCHEDULED;
        }

        public void succeeded() {
            this._entry._promise.succeed(null);
            super.succeeded();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void onCompleteFailure(Throwable x) {
            Entry entry;
            Flusher flusher = this;
            synchronized (flusher) {
                this._failure = x;
                entry = this._entry;
            }
            if (entry != null) {
                entry.scheduleExpiration();
                entry._promise.fail(x);
            }
        }
    }

    private class WebSocketScheduler
    implements AbstractServerTransport.Scheduler,
    Runnable,
    Promise<Void> {
        private final Context context;
        private final ServerMessage.Mutable message;
        private Scheduler.Task task;

        public WebSocketScheduler(Context context, ServerMessage.Mutable message, long timeout) {
            this.context = context;
            this.message = message;
            this.task = AbstractWebSocketEndPoint.this._transport.getBayeux().schedule((Runnable)this, timeout);
        }

        public void schedule() {
            boolean metaConnectDelivery;
            ServerSessionImpl session = this.context.session;
            boolean bl = metaConnectDelivery = AbstractWebSocketEndPoint.this._transport.isMetaConnectDeliveryOnly() || session.isMetaConnectDeliveryOnly();
            if (metaConnectDelivery || session.isTerminated()) {
                if (this.cancelTimeout()) {
                    session.notifyResumed((ServerMessage)this.message, false);
                    AbstractWebSocketEndPoint.this.resume(this.context, this.message, (Promise<Void>)this);
                }
            } else {
                Context context = new Context(session);
                context.sendQueue = true;
                AbstractWebSocketEndPoint.this.flush(context, (Promise<Void>)Promise.from(y -> {}, this::fail));
            }
        }

        public void cancel() {
            if (this.cancelTimeout()) {
                AbstractWebSocketEndPoint.this.close(1000, "Cancel");
            }
        }

        @Override
        public void run() {
            if (this.cancelTimeout()) {
                if (AbstractWebSocketEndPoint.this._logger.isDebugEnabled()) {
                    AbstractWebSocketEndPoint.this._logger.debug("Resumed {}", (Object)this.message);
                }
                this.context.session.notifyResumed((ServerMessage)this.message, true);
                AbstractWebSocketEndPoint.this.resume(this.context, this.message, (Promise<Void>)this);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private boolean cancelTimeout() {
            Scheduler.Task task;
            WebSocketScheduler webSocketScheduler = this;
            synchronized (webSocketScheduler) {
                task = this.task;
                if (task != null) {
                    this.task = null;
                }
            }
            if (task != null) {
                task.cancel();
                return true;
            }
            return false;
        }

        public void succeed(Void result) {
            AbstractWebSocketEndPoint.this.flush(this.context, (Promise<Void>)Promise.from(y -> {}, this::fail));
        }

        public void fail(Throwable failure) {
            AbstractWebSocketEndPoint.this.close(1011, failure.toString());
        }
    }
}

