/*
 * Decompiled with CFR 0.152.
 */
package com.graphql_java_generator.client;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.TreeNode;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.graphql_java_generator.annotation.RequestType;
import com.graphql_java_generator.client.GraphQLRequestObject;
import com.graphql_java_generator.client.OAuthTokenExtractor;
import com.graphql_java_generator.client.SubscriptionCallback;
import com.graphql_java_generator.client.SubscriptionClient;
import com.graphql_java_generator.client.SubscriptionClientReactiveImpl;
import com.graphql_java_generator.client.request.AbstractGraphQLRequest;
import com.graphql_java_generator.client.response.Error;
import com.graphql_java_generator.client.response.JsonResponseWrapper;
import com.graphql_java_generator.exception.GraphQLRequestExecutionException;
import com.graphql_java_generator.util.GraphqlUtils;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.lang.Nullable;
import org.springframework.security.oauth2.client.web.reactive.function.client.ServerOAuth2AuthorizedClientExchangeFilterFunction;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.reactive.socket.client.WebSocketClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.scheduler.Schedulers;

public class GraphQLReactiveWebSocketHandler
implements WebSocketHandler {
    private static Logger logger = LoggerFactory.getLogger(GraphQLReactiveWebSocketHandler.class);
    private static final List<String> SUB_PROTOCOL_LIST = Arrays.asList("graphql-transport-ws");
    private static final int NB_SECONDS_TIME_OUT_FOR_INITIALIZATION = 10;
    private static final int NB_SECONDS_TIME_OUT_FOR_REQUESTS = 30;
    private static final ObjectMapper defaultObjectMapper = new ObjectMapper();
    private static long nextUniqueIdOperation = 1L;
    String graphqlEndpoint;
    @Deprecated
    String graphqlSubscriptionEndpoint;
    WebSocketClient webSocketClient;
    final ServerOAuth2AuthorizedClientExchangeFilterFunction serverOAuth2AuthorizedClientExchangeFilterFunction;
    final OAuthTokenExtractor oAuthTokenExtractor;
    private final Map<String, WebSocketSessionHandler> sessionHandlers = new ConcurrentHashMap<String, WebSocketSessionHandler>();
    CountDownLatch webSocketConnectionLatch = null;
    Throwable webSocketConnectionError = null;

    public GraphQLReactiveWebSocketHandler(String graphqlEndpoint, String graphqlSubscriptionEndpoint, WebSocketClient webSocketClient, ServerOAuth2AuthorizedClientExchangeFilterFunction serverOAuth2AuthorizedClientExchangeFilterFunction, OAuthTokenExtractor oAuthTokenExtractor) {
        this.graphqlEndpoint = graphqlEndpoint;
        this.graphqlSubscriptionEndpoint = graphqlSubscriptionEndpoint;
        this.webSocketClient = webSocketClient;
        this.serverOAuth2AuthorizedClientExchangeFilterFunction = serverOAuth2AuthorizedClientExchangeFilterFunction;
        this.oAuthTokenExtractor = oAuthTokenExtractor;
        new Thread(() -> {
            try {
                this.getActiveWebSocketSession();
            }
            catch (GraphQLRequestExecutionException e) {
                e.printStackTrace();
            }
        }).start();
    }

    public Mono<Void> handle(WebSocketSession session) {
        logger.trace("new web socket session received: {}", (Object)session.getId());
        if (this.sessionHandlers.get(session.getId()) != null) {
            throw new RuntimeException("[Internal Error] Trying to handle an already known Web Socket session: " + session.getId());
        }
        WebSocketSessionHandler sessionHandler = new WebSocketSessionHandler(session);
        this.sessionHandlers.put(session.getId(), sessionHandler);
        Mono<Void> ret = sessionHandler.handleWebSocketSession();
        this.webSocketConnectionLatch.countDown();
        return ret;
    }

    public <R, T> SubscriptionClient executeSubscription(AbstractGraphQLRequest request, Map<String, Object> parameters, SubscriptionCallback<T> subscriptionCallback, Class<R> subscriptionType, Class<T> messsageType) throws GraphQLRequestExecutionException {
        return this.getActiveWebSocketSession().executeSubscription(request, parameters, subscriptionCallback, subscriptionType, messsageType);
    }

    public <R extends GraphQLRequestObject> R executeQueryOrMutation(AbstractGraphQLRequest graphQLRequest, Map<String, Object> parameters, Class<R> dataResponseType) throws GraphQLRequestExecutionException {
        return this.getActiveWebSocketSession().executeQueryOrMutation(graphQLRequest, parameters, dataResponseType);
    }

    private synchronized WebSocketSessionHandler getActiveWebSocketSession() throws GraphQLRequestExecutionException {
        WebSocketSessionHandler activeSession = this.findActiveWebSocketSession();
        if (activeSession != null) {
            return activeSession;
        }
        logger.debug("getActiveWebSocketSession(): Starting a new connection on {}", (Object)this.getWebSocketURI());
        this.webSocketConnectionLatch = new CountDownLatch(1);
        this.webSocketConnectionError = null;
        HttpHeaders headers = new HttpHeaders();
        if (this.serverOAuth2AuthorizedClientExchangeFilterFunction != null && this.oAuthTokenExtractor != null) {
            String authorizationHeaderValue = this.oAuthTokenExtractor.getAuthorizationHeaderValue();
            logger.debug("Got this OAuth token (authorization header value): {}", (Object)authorizationHeaderValue);
            headers.add("Authorization", authorizationHeaderValue);
        } else {
            logger.debug("No serverOAuth2AuthorizedClientExchangeFilterFunction or no oAuthTokenExtractor where provided. No OAuth token is provided.");
        }
        if (logger.isTraceEnabled()) {
            StringBuilder sb = new StringBuilder();
            sb.append("The Subscription GET request will be sent with these headers:\n");
            if (headers.entrySet().size() == 0) {
                sb.append("    ").append("<No headers!>");
            } else {
                for (Map.Entry header : headers.entrySet()) {
                    sb.append("    ").append((String)header.getKey());
                    boolean first = false;
                    for (String value : (List)header.getValue()) {
                        if (!first) {
                            sb.append(",");
                        }
                        sb.append(value);
                        if (!first) {
                            sb.append("\n");
                        }
                        first = false;
                    }
                }
            }
            logger.trace(sb.toString());
        }
        this.webSocketClient.execute(this.getWebSocketURI(), headers, (WebSocketHandler)this).subscribeOn(Schedulers.parallel()).doOnError(t -> {
            logger.error("Receive an error during web socket connection: {}", (Object)t.getMessage());
            this.webSocketConnectionError = t;
            this.webSocketConnectionLatch.countDown();
        }).subscribe();
        try {
            if (!this.webSocketConnectionLatch.await(10L, TimeUnit.SECONDS) && this.webSocketConnectionError == null) {
                this.webSocketConnectionError = new GraphQLRequestExecutionException("The web socket connection to " + (this.graphqlSubscriptionEndpoint == null ? this.graphqlEndpoint : this.graphqlSubscriptionEndpoint) + " has not been initialized after " + 10 + " seconds");
            }
        }
        catch (InterruptedException e) {
            throw new GraphQLRequestExecutionException("The thread got interrupted while waiting for web socket connection to " + (this.graphqlSubscriptionEndpoint == null ? this.graphqlEndpoint : this.graphqlSubscriptionEndpoint));
        }
        if (this.webSocketConnectionError != null) {
            if (this.webSocketConnectionError instanceof GraphQLRequestExecutionException) {
                throw (GraphQLRequestExecutionException)this.webSocketConnectionError;
            }
            throw new GraphQLRequestExecutionException("Error during Web Socket connection to " + (this.graphqlSubscriptionEndpoint == null ? this.graphqlEndpoint : this.graphqlSubscriptionEndpoint) + ": " + this.webSocketConnectionError.getClass().getSimpleName() + "-" + this.webSocketConnectionError.getMessage(), this.webSocketConnectionError);
        }
        activeSession = this.findActiveWebSocketSession();
        if (activeSession != null) {
            return activeSession;
        }
        throw new GraphQLRequestExecutionException("Unable to create a Web Socket Session to " + (this.graphqlSubscriptionEndpoint == null ? this.graphqlEndpoint : this.graphqlSubscriptionEndpoint));
    }

    private WebSocketSessionHandler findActiveWebSocketSession() throws GraphQLRequestExecutionException {
        if (this.sessionHandlers.values().size() > 0) {
            WebSocketSessionHandler sessionHandler = this.sessionHandlers.values().iterator().next();
            try {
                logger.trace("Waiting for readiness of the web socket session {}", (Object)sessionHandler.getSession().getId());
                sessionHandler.checkWebSessionReadiness();
            }
            catch (GraphQLRequestExecutionException e) {
                logger.trace("The web socket session {} is not valid ({})", (Object)sessionHandler.getSession().getId(), (Object)e.getMessage());
                this.sessionHandlers.remove(sessionHandler.getSession().getId());
            }
            return sessionHandler;
        }
        return null;
    }

    public List<String> getSubProtocols() {
        return SUB_PROTOCOL_LIST;
    }

    public URI getWebSocketURI() throws GraphQLRequestExecutionException {
        String endpoint;
        String string = endpoint = this.graphqlSubscriptionEndpoint != null ? this.graphqlSubscriptionEndpoint : this.graphqlEndpoint;
        if (endpoint.startsWith("http:") || endpoint.startsWith("https:")) {
            try {
                return new URI("ws" + endpoint.substring(4));
            }
            catch (URISyntaxException e) {
                throw new GraphQLRequestExecutionException("Error when trying to determine the Web Socket endpoint for GraphQL endpoint " + endpoint, e);
            }
        }
        throw new GraphQLRequestExecutionException("non managed protocol for endpoint " + endpoint + ". This method manages only http and https");
    }

    static interface SubscriptionRequestEmitter {
        public void emit(RequestData<?, ?> var1, WebSocketMessage var2);
    }

    private static class GraphQlStatus {
        private static final CloseStatus INVALID_MESSAGE_STATUS = new CloseStatus(4400, "Invalid message");
        private static final CloseStatus UNAUTHORIZED_STATUS = new CloseStatus(4401, "Unauthorized");
        private static final CloseStatus INIT_TIMEOUT_STATUS = new CloseStatus(4408, "Connection initialisation timeout");
        private static final CloseStatus TOO_MANY_INIT_REQUESTS_STATUS = new CloseStatus(4429, "Too many initialisation requests");

        private GraphQlStatus() {
        }
    }

    private static class QueryOrMutationCallback<R>
    implements SubscriptionCallback<R> {
        CountDownLatch latchResponseOrExceptionReceived = new CountDownLatch(1);
        R response = null;
        List<Throwable> exceptions = new ArrayList<Throwable>();

        private QueryOrMutationCallback() {
        }

        @Override
        public void onConnect() {
        }

        @Override
        public void onMessage(R r) {
            this.response = r;
            this.latchResponseOrExceptionReceived.countDown();
        }

        @Override
        public void onClose(int statusCode, String reason) {
            if (this.response == null) {
                this.exceptions.add(new GraphQLRequestExecutionException("Received onClose while expecting a message (status=" + statusCode + ", reason=" + reason + ")"));
            }
            this.latchResponseOrExceptionReceived.countDown();
        }

        @Override
        public void onError(Throwable cause) {
            this.exceptions.add(cause);
            this.latchResponseOrExceptionReceived.countDown();
        }
    }

    static class RequestData<R, T> {
        final WebSocketSession session;
        final String uniqueIdOperation;
        final AbstractGraphQLRequest request;
        final Map<String, Object> parameters;
        final SubscriptionCallback<T> subscriptionCallback;
        final Class<R> subscriptionType;
        final Class<T> messageType;
        boolean completed = false;

        RequestData(WebSocketSession session, AbstractGraphQLRequest request, Map<String, Object> parameters, SubscriptionCallback<T> subscriptionCallback, Class<R> requestType, Class<T> messageType) throws GraphQLRequestExecutionException {
            this.session = session;
            this.request = request;
            this.parameters = parameters;
            this.subscriptionCallback = subscriptionCallback;
            this.subscriptionType = requestType;
            this.messageType = messageType;
            this.uniqueIdOperation = this.getNextIdOperation();
            if (!request.getRequestType().equals((Object)RequestType.subscription) && requestType != messageType) {
                throw new GraphQLRequestExecutionException("[Internal error] When executing query or mutation, T and R should be equal. But R (requestType) is " + requestType.getName() + " and T (messsageType) is " + messageType.getName());
            }
        }

        Map<String, Object> getRequestMap() {
            try {
                return this.request.buildRequestAsMap(this.parameters);
            }
            catch (GraphQLRequestExecutionException e) {
                throw new RuntimeException(e.getMessage(), e);
            }
        }

        private synchronized String getNextIdOperation() {
            return Long.toString(nextUniqueIdOperation++);
        }

        public void onNext(Map<String, Object> result) {
            if (this.completed) {
                logger.trace("Message received for a subscription of id {} from the Web Socket session {}, but the operation {} has already completed (the message was {})", new Object[]{this.uniqueIdOperation, this.session.getId(), this.uniqueIdOperation, result});
            } else {
                logger.trace("Message received for a subscription of id {}, from the Web Socket: {} (on session {})", new Object[]{this.uniqueIdOperation, result, this.session.getId()});
                JsonResponseWrapper response = this.request.getGraphQLObjectMapper().convertValue(result, JsonResponseWrapper.class);
                if (response.errors != null && response.errors.size() > 0) {
                    String msg;
                    List<Error> errors;
                    if (response.errors == null) {
                        errors = null;
                        msg = "Unknown error received from the GraphQL server for subscription " + this.uniqueIdOperation;
                        logger.error(msg);
                    } else {
                        errors = response.errors;
                        StringBuilder sb = new StringBuilder();
                        sb.append("An error has been received from the GraphQL server for subscription ");
                        sb.append(this.uniqueIdOperation);
                        msg = sb.toString();
                        if (logger.isErrorEnabled()) {
                            sb.append(": ");
                            for (Error err : errors) {
                                if (errors.size() > 0) {
                                    sb.append(" | ");
                                }
                                sb.append(err.message);
                            }
                            logger.error(sb.toString());
                        }
                    }
                    this.subscriptionCallback.onError(new GraphQLRequestExecutionException(msg, errors));
                } else {
                    R r = this.request.getGraphQLObjectMapper().convertValue((Object)response.data, this.subscriptionType);
                    if (this.request.getRequestType().equals((Object)RequestType.subscription)) {
                        Object t = GraphqlUtils.graphqlUtils.invokeGetter(r, this.request.getSubscription().getFields().get(0).getName());
                        this.subscriptionCallback.onMessage(t);
                    } else {
                        R t = r;
                        this.subscriptionCallback.onMessage(t);
                    }
                }
            }
        }

        public void onError(Throwable t) {
            if (this.completed) {
                logger.trace("Error received from the Web Socket session {}, but the operation {} has already completed", (Object)this.session.getId(), (Object)this.uniqueIdOperation);
            } else {
                logger.trace("Error received for WebSocketSession {}: {}", (Object)this.session.getId(), (Object)t.getMessage());
                this.subscriptionCallback.onError(t);
            }
        }

        public void onClose(int statusCode, String reason) {
            if (this.completed) {
                logger.trace("'Close' received from the Web Socket session {}, but the operation {} has already completed (status={}, reason={})", new Object[]{this.session.getId(), this.uniqueIdOperation, reason, this.session.getId()});
            } else {
                logger.trace("onClose(code={}, reason={}) received for WebSocketSession {}: {}", new Object[]{statusCode, reason, this.session.getId()});
                this.subscriptionCallback.onClose(statusCode, reason);
            }
        }

        public void onComplete() {
            if (this.completed) {
                logger.trace("Complete received from the Web Socket session {}, but the operation {} has already completed", (Object)this.session.getId(), (Object)this.uniqueIdOperation);
            } else {
                logger.trace("onComplete received for id {} on WebSocketSession {}", (Object)this.uniqueIdOperation, (Object)this.session.getId());
                this.subscriptionCallback.onClose(0, "Complete");
            }
        }

        public boolean isCompleted() {
            return this.completed;
        }

        public void setCompleted(boolean completed) {
            this.completed = completed;
        }
    }

    static class WebSocketSessionHandler {
        Logger logger = LoggerFactory.getLogger(WebSocketSessionHandler.class);
        final WebSocketSession session;
        SubscriptionRequestEmitter sessionEmitter = null;
        GraphqlUtils graphqlUtils = GraphqlUtils.graphqlUtils;
        final Map<String, RequestData<?, ?>> registeredGraphQLQueries = new ConcurrentHashMap();
        CountDownLatch sessionInitializationLatch = new CountDownLatch(1);
        Throwable lastSessionError = null;

        WebSocketSessionHandler(WebSocketSession session) {
            this.session = session;
        }

        public <R, T> SubscriptionClient executeSubscription(AbstractGraphQLRequest request, Map<String, Object> parameters, SubscriptionCallback<T> subscriptionCallback, Class<R> subscriptionType, Class<T> messsageType) throws GraphQLRequestExecutionException {
            if (!request.getRequestType().equals((Object)RequestType.subscription)) {
                throw new IllegalArgumentException("The request must be either a subscription, but is " + request.getRequestType());
            }
            this.checkWebSessionReadiness();
            RequestData<R, T> requestData = new RequestData<R, T>(this.session, request, parameters, subscriptionCallback, subscriptionType, messsageType);
            this.registeredGraphQLQueries.put(requestData.uniqueIdOperation, requestData);
            this.logger.trace("Emitting execution of the subscription id={} on the web socket {} (request={})", new Object[]{requestData.uniqueIdOperation, this.session.getId(), request.getGraphQLRequest()});
            this.sessionEmitter.emit(requestData, this.session.textMessage(this.encode(requestData.uniqueIdOperation, MessageType.SUBSCRIBE, requestData)));
            subscriptionCallback.onConnect();
            return new SubscriptionClientReactiveImpl(requestData.uniqueIdOperation, this);
        }

        public <R extends GraphQLRequestObject> R executeQueryOrMutation(AbstractGraphQLRequest request, Map<String, Object> parameters, Class<R> requestType) throws GraphQLRequestExecutionException {
            QueryOrMutationCallback callback = new QueryOrMutationCallback();
            if (request.getRequestType().equals((Object)RequestType.subscription)) {
                throw new IllegalArgumentException("The request must be either a query or a mutation, but is " + request.getRequestType());
            }
            this.checkWebSessionReadiness();
            RequestData requestData = new RequestData(this.session, request, parameters, callback, requestType, requestType);
            this.registeredGraphQLQueries.put(requestData.uniqueIdOperation, requestData);
            this.logger.trace("Emitting execution of the subscription id={} on the web socket {} (request={})", new Object[]{requestData.uniqueIdOperation, this.session.getId(), request});
            this.sessionEmitter.emit(requestData, this.session.textMessage(this.encode(requestData.uniqueIdOperation, MessageType.SUBSCRIBE, requestData)));
            try {
                callback.latchResponseOrExceptionReceived.await(30L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                throw new GraphQLRequestExecutionException("Got interrupted while waiting for request response", e);
            }
            if (callback.exceptions.size() > 0) {
                throw new GraphQLRequestExecutionException("An error occurred while processing the request: " + callback.exceptions.get(0).getMessage(), callback.exceptions.get(0));
            }
            if (callback.response != null) {
                return (R)((GraphQLRequestObject)callback.response);
            }
            throw new GraphQLRequestExecutionException("Received no answer after 30 seconds");
        }

        public void unsubscribe(String uniqueIdOperation) throws GraphQLRequestExecutionException {
            this.logger.trace("Emitting 'complete' message to close the subscription for the uniqueIdOperation={} on socket {}", (Object)uniqueIdOperation, (Object)this.session.getId());
            RequestData<?, ?> requestData = this.registeredGraphQLQueries.get(uniqueIdOperation);
            if (requestData == null) {
                throw new GraphQLRequestExecutionException("Unknown uniqueIdOperation " + uniqueIdOperation + " for web socket session " + this.session + " when trying to unsubscribe");
            }
            requestData.setCompleted(true);
            this.sessionEmitter.emit(requestData, this.session.textMessage(this.encode(requestData.uniqueIdOperation, MessageType.COMPLETE, null)));
        }

        public WebSocketSession getSession() {
            return this.session;
        }

        Mono<Void> handleWebSocketSession() {
            Mono input = this.session.receive().subscribeOn(Schedulers.parallel()).doOnNext(message -> this.onNext((WebSocketMessage)message)).doOnError(t -> this.onError((Throwable)t)).doOnComplete(() -> this.onComplete()).doFinally(sig -> this.onfinally((SignalType)sig)).then();
            Mono output = this.session.send((Publisher)Flux.create(sink -> {
                sink.next((Object)this.session.textMessage(this.encode(null, MessageType.CONNECTION_INIT, null)));
                this.logger.trace("The 'connection_init' message has been written on the web socket {}", (Object)this.session.getId());
                this.sessionEmitter = new SubscriptionRequestEmitter((FluxSink)sink){
                    final /* synthetic */ FluxSink val$sink;
                    {
                        this.val$sink = fluxSink;
                    }

                    @Override
                    public void emit(RequestData<?, ?> requestData, WebSocketMessage msg) {
                        if (logger.isTraceEnabled()) {
                            logger.trace("Emitting message for uniqueIdOperation {} on web socket {}: {}", new Object[]{requestData.uniqueIdOperation, session.getId(), msg.getPayloadAsText()});
                        }
                        this.val$sink.next((Object)msg);
                    }
                };
            }).doOnError(t -> {
                this.lastSessionError = t;
                this.logger.error("Error received on the emitting flux toward the server for session {}: {}", (Object)this.session.getId(), (Object)t.getMessage());
            }));
            this.logger.trace("End of handle(session {}) method execution", (Object)this.session.getId());
            return Mono.zip((Mono)input, (Mono)output).then();
        }

        public void checkWebSessionReadiness() throws GraphQLRequestExecutionException {
            try {
                if (!this.sessionInitializationLatch.await(10L, TimeUnit.SECONDS)) {
                    this.lastSessionError = new GraphQLRequestExecutionException("The session on Web Socket " + this.session.getId() + " has not been initialized after " + 10 + " seconds");
                }
                if (this.lastSessionError != null) {
                    throw new GraphQLRequestExecutionException("Error during Web Socket or Subscription initialization: " + this.lastSessionError.getClass().getSimpleName() + "-" + this.lastSessionError.getMessage(), this.lastSessionError);
                }
            }
            catch (InterruptedException e) {
                throw new GraphQLRequestExecutionException("The thread got interrupted while waiting for web socket initialization");
            }
            if (!this.session.isOpen()) {
                throw new GraphQLRequestExecutionException("The Web Socket session " + this.session.getId() + " is closed");
            }
        }

        void closeSession(CloseStatus status, String reason) {
            this.sessionInitializationLatch.countDown();
            this.closeAllRequests(status, reason);
            this.session.close(status);
        }

        private void onNext(WebSocketMessage message) {
            String type;
            MessageType messageType;
            Map map;
            JsonNode jsonNode;
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("The web socket {} received this message: {}", (Object)this.session.getId(), (Object)message.getPayloadAsText());
            }
            try {
                jsonNode = defaultObjectMapper.readTree(message.getPayloadAsText());
                map = (Map)defaultObjectMapper.treeToValue((TreeNode)jsonNode, HashMap.class);
            }
            catch (JsonProcessingException e) {
                throw new RuntimeException("Error while reading '" + message.getPayloadAsText() + "' as a Map", e);
            }
            String id = (String)map.get("id");
            RequestData<?, ?> requestData = null;
            if (id != null) {
                requestData = this.registeredGraphQLQueries.get(id);
            }
            if ((messageType = MessageType.resolve(type = (String)map.get("type"))) == null) {
                this.closeSession(GraphQlStatus.INVALID_MESSAGE_STATUS, "Invalid message: " + message.getPayloadAsText());
                return;
            }
            switch (messageType) {
                case CONNECTION_ACK: {
                    this.sessionInitializationLatch.countDown();
                    this.logger.trace("Received 'connection_ack' on web socket {}", (Object)this.session.getId());
                    break;
                }
                case NEXT: {
                    if (this.logger.isTraceEnabled()) {
                        this.logger.trace("Received 'next' for id {} on web socket {} (payload={})", new Object[]{id, this.session.getId(), message.getPayloadAsText()});
                    }
                    if (id == null) {
                        this.closeSession(GraphQlStatus.INVALID_MESSAGE_STATUS, "Invalid message (id is null): " + message.getPayloadAsText());
                        return;
                    }
                    if (requestData == null) {
                        this.logger.warn("[graphql-transport-ws] Unknown uniqueIdOperation {} for web socket session {} (a 'complete' message is sent to the server to that he stops managing this uniqueIdOperation)", (Object)id, (Object)this.session.getId());
                        this.sessionEmitter.emit(requestData, this.session.textMessage(this.encode(id, MessageType.COMPLETE, null)));
                        break;
                    }
                    if (requestData.isCompleted()) {
                        this.logger.warn("Receive a message for a closed uniqueIdOperation ({}) on web socket {}", (Object)id, (Object)this.session.getId());
                        break;
                    }
                    if (map.get("payload") == null) {
                        String msg = "payload is mandatory for 'next' messages";
                        this.logger.error(msg);
                        requestData.onError(new GraphQLRequestExecutionException(msg));
                        break;
                    }
                    if (!(map.get("payload") instanceof Map)) {
                        String msg = "payload should be a Map, but <" + map.get("payload") + "> is not a Map";
                        this.logger.error(msg);
                        requestData.onError(new GraphQLRequestExecutionException(msg));
                        break;
                    }
                    requestData.onNext((Map)map.get("payload"));
                    break;
                }
                case COMPLETE: {
                    this.logger.trace("Received 'complete' for id {} on web socket {} (payload={})", new Object[]{id, this.session.getId(), message});
                    requestData.onComplete();
                    break;
                }
                case ERROR: {
                    this.logger.warn("Received 'error' for id {} on web socket {} (payload={})", new Object[]{id, this.session.getId(), message.getPayloadAsText()});
                    if (map.get("payload") instanceof Map) {
                        String msg = (String)((Map)map.get("payload")).get("message");
                        requestData.onError(new GraphQLRequestExecutionException(msg));
                        break;
                    }
                    try {
                        ArrayList<Error> errors = new ArrayList<Error>();
                        for (JsonNode node : jsonNode.get("payload")) {
                            errors.add((Error)defaultObjectMapper.treeToValue((TreeNode)node, Error.class));
                        }
                        requestData.onError(new GraphQLRequestExecutionException("Error on subscription " + id, errors));
                        break;
                    }
                    catch (JsonProcessingException e) {
                        throw new RuntimeException("Error while reading the errors from '" + message.getPayloadAsText(), e);
                    }
                }
                default: {
                    this.logger.warn("Received non managed message '{}' for id {} on web socket {} (payload={})", new Object[]{type, id, this.session.getId(), message});
                    String msg = "Non managed message type '" + type + "'";
                    if (requestData != null) {
                        requestData.onError(new GraphQLRequestExecutionException(msg));
                        break;
                    }
                    this.logger.error(msg);
                }
            }
        }

        private void onError(Throwable t) {
            this.logger.error("Received this error for session {}", (Object)this.session.getId());
            this.lastSessionError = t;
            this.sessionInitializationLatch.countDown();
            if (t == null) {
                t = new RuntimeException("Unknown exception");
                this.logger.error("The Web Socket session {} ended with an unknown error", (Object)this.session.getId());
            } else {
                StringBuilder sb = new StringBuilder();
                sb.append("The Web Socket session ").append(this.session.getId()).append(" ended with an error (").append(t.getClass().getSimpleName()).append(t.getMessage()).append(")");
                for (StackTraceElement row : t.getStackTrace()) {
                    sb.append("\n    ").append(row);
                }
                this.logger.error(sb.toString());
            }
            for (RequestData<?, ?> requestData : this.registeredGraphQLQueries.values()) {
                requestData.onError(t);
            }
            this.closeSession(CloseStatus.SERVER_ERROR, "");
        }

        private void onComplete() {
            this.logger.trace("onComplete received for WebSocketSession {}", (Object)this.session.getId());
            for (RequestData<?, ?> requestData : this.registeredGraphQLQueries.values()) {
                requestData.onComplete();
            }
        }

        private void onfinally(SignalType sig) {
            this.sessionInitializationLatch.countDown();
            this.logger.trace("onfinally received for WebSocketSession {} with signal {}", (Object)this.session.getId(), (Object)sig);
            this.closeAllRequests(CloseStatus.SERVER_ERROR, "Signal received: " + sig.toString());
            this.session.close();
        }

        private void closeAllRequests(CloseStatus status, String reason) {
            this.logger.debug("Closing session {} for status {} and reason {}", new Object[]{this.session.getId(), status, reason});
            for (RequestData<?, ?> requestData : this.registeredGraphQLQueries.values()) {
                requestData.onClose(status.getCode(), reason == null ? status.getReason() : reason);
            }
        }

        String encode(@Nullable String id, MessageType messageType, @Nullable RequestData<?, ?> requestData) {
            HashMap<String, Object> payloadMap = new HashMap<String, Object>(3);
            payloadMap.put("type", messageType.getType());
            if (id != null) {
                payloadMap.put("id", id);
            }
            if (requestData != null) {
                payloadMap.put("payload", requestData.getRequestMap());
            }
            try {
                return defaultObjectMapper.writeValueAsString(payloadMap);
            }
            catch (IOException ex) {
                throw new RuntimeException("Failed to write " + payloadMap + " as JSON", ex);
            }
        }
    }

    public static enum MessageType {
        CONNECTION_INIT("connection_init"),
        CONNECTION_ACK("connection_ack"),
        SUBSCRIBE("subscribe"),
        NEXT("next"),
        ERROR("error"),
        COMPLETE("complete"),
        START("start");

        private static final Map<String, MessageType> messageTypes;
        private final String type;

        private MessageType(String type) {
            this.type = type;
        }

        public String getType() {
            return this.type;
        }

        @Nullable
        public static MessageType resolve(@Nullable String type) {
            return type != null ? messageTypes.get(type) : null;
        }

        static {
            messageTypes = new HashMap<String, MessageType>(6);
            for (MessageType messageType : MessageType.values()) {
                messageTypes.put(messageType.getType(), messageType);
            }
        }
    }
}

