/*
 * Decompiled with CFR 0.152.
 */
package com.turtlequeue;

import com.cognitect.transit.ArrayReader;
import com.cognitect.transit.DefaultReadHandler;
import com.cognitect.transit.MapReader;
import com.cognitect.transit.ReadHandler;
import com.cognitect.transit.Reader;
import com.cognitect.transit.WriteHandler;
import com.cognitect.transit.Writer;
import com.google.common.base.MoreObjects;
import com.turtlequeue.AcknowledgeBuilder;
import com.turtlequeue.Admin;
import com.turtlequeue.Client;
import com.turtlequeue.ClientPossibleStates;
import com.turtlequeue.ConsumerBuilder;
import com.turtlequeue.ConsumerImpl;
import com.turtlequeue.ConsumerParams;
import com.turtlequeue.DeadLetterPolicy;
import com.turtlequeue.Encoder;
import com.turtlequeue.MessageId;
import com.turtlequeue.MessageIdImpl;
import com.turtlequeue.ProducerBuilder;
import com.turtlequeue.ProducerImpl;
import com.turtlequeue.ProducerParams;
import com.turtlequeue.ReaderBuilder;
import com.turtlequeue.StateMachine;
import com.turtlequeue.SubscriptionMode;
import com.turtlequeue.TopicBuilderImpl;
import com.turtlequeue.TopicImpl;
import com.turtlequeue.TqClient;
import com.turtlequeue.TqClientException;
import com.turtlequeue.Version;
import com.turtlequeue.sdk.api.proto.Tq;
import com.turtlequeue.sdk.api.proto.TurtleQueueGrpc;
import io.grpc.ConnectivityState;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.io.InputStream;
import java.io.OutputStream;
import java.time.Duration;
import java.time.Instant;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;

public class ClientImpl
implements Client {
    private static final Logger logger = Logger.getLogger(TurtleQueueGrpc.class.getName());
    String host = "";
    Integer port = -1;
    Boolean secure = true;
    String userToken = null;
    String apiKey = null;
    static final Properties versionInfo = Version.getVersion();
    protected TqClient tqClient = null;
    protected static Class<Admin> adminClass = null;
    protected Admin admin = null;
    StreamObserver<Tq.ClientToBroker> clientToBroker = null;
    StreamObserver<Tq.BrokerToClient> brokerToClient = null;
    UUID connectionUid = null;
    StateMachine<ClientPossibleStates> stateMachine = null;
    ScheduledExecutorService pingLoopService = null;
    long lastReplyPong = 0L;
    ScheduledFuture pingPongFut = null;
    ScheduledExecutorService stateChangeWatcher = null;
    ScheduledFuture stateChangeFut = null;
    Map<Long, CompletableFuture> pendingRequests = null;
    private final AtomicLong requestIdGenerator;
    private final AtomicLong consumerIdGenerator;
    private final AtomicLong producerIdGenerator;
    ConcurrentHashMap<Long, ConsumerImpl> consumerRegistry = new ConcurrentHashMap();
    ConcurrentHashMap<Long, ProducerImpl> producerRegistry = new ConcurrentHashMap();
    Map<String, ReadHandler<?, ?>> customReadHandlers = null;
    Map<Class, WriteHandler<?, ?>> customWriteHandlers = null;
    DefaultReadHandler<?> customReadDefaultHandler = null;
    WriteHandler<?, ?> customDefaultWriteHandler = null;
    MapReader<?, Map<Object, Object>, Object, Object> mapBuilder = null;
    ArrayReader<?, List<Object>, Object> listBuilder = null;
    Function<InputStream, Reader> transitReader = null;
    Function<OutputStream, Writer> transitWriter = null;
    String dataFormat = null;
    private volatile Instant lastMessageSentAt = null;

    public ClientImpl(String host, Integer port, Boolean secure, String userToken, String apiKey, Function<InputStream, Reader> transitReader, Function<OutputStream, Writer> transitWriter, Map<String, ReadHandler<?, ?>> customReadHandlers, Map<Class, WriteHandler<?, ?>> customWriteHandlers, DefaultReadHandler<?> customReadDefaultHandler, WriteHandler<?, ?> customDefaultWriteHandler, MapReader<?, Map<Object, Object>, Object, Object> mapBuilder, ArrayReader<?, List<Object>, Object> listBuilder, String dataFormat) {
        this.host = host;
        this.port = port;
        this.secure = secure;
        this.userToken = userToken;
        this.apiKey = apiKey;
        this.customReadHandlers = customReadHandlers;
        this.customWriteHandlers = customWriteHandlers;
        this.customReadDefaultHandler = customReadDefaultHandler;
        this.customDefaultWriteHandler = customDefaultWriteHandler;
        this.mapBuilder = mapBuilder;
        this.listBuilder = listBuilder;
        this.transitReader = transitReader;
        this.transitWriter = transitWriter;
        this.dataFormat = dataFormat;
        this.pendingRequests = new ConcurrentHashMap<Long, CompletableFuture>();
        this.requestIdGenerator = new AtomicLong(1L);
        this.consumerIdGenerator = new AtomicLong(1L);
        this.producerIdGenerator = new AtomicLong(1L);
        this.pingLoopService = Executors.newScheduledThreadPool(1);
        this.stateChangeWatcher = Executors.newScheduledThreadPool(1);
        ((ScheduledThreadPoolExecutor)this.pingLoopService).setRemoveOnCancelPolicy(true);
        this.stateMachine = new StateMachine<ClientPossibleStates>().setState(ClientPossibleStates.Idle);
        this.tqClient = new TqClient(this);
    }

    public String getHost() {
        return this.host;
    }

    public int getPort() {
        return this.port;
    }

    public Boolean getSecure() {
        if (this.secure == null) {
            return true;
        }
        return this.secure;
    }

    protected String getAuthMethod() {
        return "Token";
    }

    protected String getDataFormat() {
        return this.dataFormat;
    }

    protected Map<String, ReadHandler<?, ?>> getCustomReadHandlers() {
        return this.customReadHandlers;
    }

    protected Map<Class, WriteHandler<?, ?>> getCustomWriteHandlers() {
        return this.customWriteHandlers;
    }

    protected DefaultReadHandler<?> getCustomReadDefaultHandler() {
        return this.customReadDefaultHandler;
    }

    protected WriteHandler<?, ?> getCustomDefaultWriteHandler() {
        return this.customDefaultWriteHandler;
    }

    protected MapReader<?, Map<Object, Object>, Object, Object> getMapBuilder() {
        return this.mapBuilder;
    }

    protected ArrayReader<?, List<Object>, Object> getListBuilder() {
        return this.listBuilder;
    }

    protected Function<InputStream, Reader> getTransitReader() {
        return this.transitReader;
    }

    protected Function<OutputStream, Writer> getTransitWriter() {
        return this.transitWriter;
    }

    protected String getSdkVersion() {
        return "java:" + this.getVersionInfo().getProperty("groupId") + ":" + this.getVersionInfo().getProperty("artifactId") + ":" + this.getVersionInfo().getProperty("version");
    }

    private <T> CompletableFuture<T> timeoutAfter(long timeout, TimeUnit unit) {
        CompletableFuture result = new CompletableFuture();
        this.pingLoopService.schedule(() -> result.completeExceptionally(new TimeoutException()), timeout, unit);
        return result;
    }

    private void reconnectConsumers() {
        for (Map.Entry<Long, ConsumerImpl> pair : this.consumerRegistry.entrySet()) {
            ConsumerImpl c = pair.getValue();
            c.reconnect();
        }
    }

    private void reconnectProducers() {
        for (Map.Entry<Long, ProducerImpl> pair : this.producerRegistry.entrySet()) {
            ProducerImpl p = pair.getValue();
            p.reconnect();
        }
    }

    private void checkIfReconnected() throws InterruptedException, ExecutionException, TimeoutException {
        ConnectivityState st = this.tqClient.getState();
        if (st == ConnectivityState.READY) {
            try {
                this.connect().get(2L, TimeUnit.SECONDS);
            }
            catch (Exception ex) {
                logger.log(Level.INFO, "Failed to reconnect {0}", ex);
            }
            this.stateChangeFut.cancel(false);
        }
    }

    private void pingPong() throws InterruptedException, ExecutionException, TimeoutException {
        ConnectivityState st = this.tqClient.getState();
        if (st == ConnectivityState.READY) {
            long requestId = this.getNextRequestId();
            this.clientToBrokerOnNext(Tq.ClientToBroker.newBuilder().setRequestId(requestId).setCommandPing(Tq.CommandPing.newBuilder().build()).build());
            if (this.lastMessageSentAt != null && Duration.between(this.lastMessageSentAt, Instant.now()).compareTo(Duration.ofSeconds(15L)) > 0) {
                ((CompletableFuture)this.waitForResponse(requestId).acceptEither((CompletionStage)this.timeoutAfter(15L, TimeUnit.SECONDS), replyPong -> this.pendingRequests.remove(requestId))).exceptionally(ex -> {
                    logger.log(Level.WARNING, "Ping timeout requestId={0}, State={1}, Exception:\n{2}", new Object[]{requestId, st, ex});
                    this.pendingRequests.remove(requestId);
                    return null;
                });
            }
        } else {
            logger.log(Level.WARNING, "Cannot ping, the connections is State={0}", new Object[]{st});
        }
    }

    protected void stopPingLoop() {
    }

    protected void registerConsumer(ConsumerImpl consumer) {
        logger.log(Level.FINE, "Registering new consumer {0}", consumer);
        this.consumerRegistry.put(consumer.getConsumerId(), consumer);
    }

    protected void registerProducer(ProducerImpl producer) {
        logger.log(Level.FINE, "Registering new producer {0}", producer);
        this.producerRegistry.put(producer.getProducerId(), producer);
    }

    protected void removeConsumer(ConsumerImpl consumer) {
        logger.log(Level.FINE, "Removing consumer {0}", consumer);
        this.consumerRegistry.remove(consumer.getConsumerId(), consumer);
    }

    protected void removeProducer(ProducerImpl producer) {
        logger.log(Level.FINE, "Removing producer {0}", producer);
        this.producerRegistry.remove(producer.getProducerId(), producer);
    }

    public CompletableFuture<Void> registerConsumerBroker(ConsumerImpl consumer) {
        ConsumerParams conf = consumer.getConf();
        logger.log(Level.FINE, "Registering new consumer " + consumer.getConsumerId() + " with broker " + conf);
        Tq.CommandSubscribe.Builder cmdBuilder = Tq.CommandSubscribe.newBuilder().setConsumerId(consumer.getConsumerId().longValue()).setTopic(TopicImpl.toTqTopic(conf.getTopic())).setSubName(conf.getSubName()).setSubType(conf.getSubType().toTqSubType()).setPriorityLevel(conf.getPriority().intValue()).setStartMessageId(MessageIdImpl.toMessageIdData(conf.getInitialPosition())).setReceiverQueueSize(conf.getReceiverQueueSize().intValue()).putAllMeta(conf.getMetadata());
        if (conf.getAckTimeout() != null && conf.getAckTimeoutTimeUnit() != null) {
            cmdBuilder.setAckTimeoutUnit(Encoder.javaTimeUnitToProtobuf(conf.getAckTimeoutTimeUnit()));
            cmdBuilder.setAckTimeoutValue(conf.getAckTimeout().longValue());
        }
        if (conf.getConsumerName() != null) {
            cmdBuilder.setConsumerName(conf.getConsumerName());
        }
        if (conf.getSubscriptionMode() != SubscriptionMode.Durable) {
            cmdBuilder.setSubscriptionMode(conf.getSubscriptionMode().toTqSubscriptionMode());
        }
        if (conf.getJsonPath() != null) {
            cmdBuilder.setJsonPath(conf.getJsonPath());
        }
        if (conf.getEnableRetry() != null) {
            cmdBuilder.setEnableRetry(conf.getEnableRetry().booleanValue());
        }
        if (conf.getNegativeAckRedeliveryDelayValue() != null && conf.getNegativeAckRedeliveryDelayUnit() != null) {
            cmdBuilder.setNegativeAckRedeliveryDelayValue(conf.getNegativeAckRedeliveryDelayValue().longValue());
            cmdBuilder.setNegativeAckRedeliveryDelayUnit(Encoder.javaTimeUnitToProtobuf(conf.getNegativeAckRedeliveryDelayUnit()));
        }
        if (conf.getDeadLetterQueuePolicy() != null) {
            DeadLetterPolicy p = conf.getDeadLetterQueuePolicy();
            Tq.DeadLetterQueuePolicy.Builder builder = Tq.DeadLetterQueuePolicy.newBuilder();
            builder.setMaxRedeliveryCount(p.getMaxRedeliverCount());
            if (p.getRetryLetterTopic() != null) {
                builder.setRetryTopic(p.getRetryLetterTopic());
            }
            if (p.getDeadLetterTopic() != null) {
                builder.setDeadLetterTopic(p.getDeadLetterTopic());
            }
            cmdBuilder.setDeadLetterQueuePolicy(builder.build());
        }
        Tq.CommandSubscribe cmd = cmdBuilder.build();
        long requestId = this.getNextRequestId();
        this.clientToBrokerOnNext(Tq.ClientToBroker.newBuilder().setRequestId(requestId).setCommandSubscribe(cmd).build());
        return this.waitForResponse(requestId);
    }

    public CompletableFuture<Void> registerProducerBroker(ProducerImpl producer) {
        ProducerParams conf = producer.getConf();
        logger.log(Level.FINE, "Registering new producer " + producer.getProducerId() + " with broker " + conf);
        Tq.CommandProducerCreate.Builder cmdBuilder = Tq.CommandProducerCreate.newBuilder().setProducerId(producer.getProducerId().longValue()).setTopic(TopicImpl.toTqTopic(conf.getTopic()));
        if (conf.getProducerName() != null) {
            cmdBuilder.setProducerName(conf.getProducerName());
        }
        if (conf.getEnableBatching() != null) {
            cmdBuilder.setEnableBatching(conf.getEnableBatching().booleanValue());
        }
        if (conf.getMaxPendingMessages() != null) {
            cmdBuilder.setMaxPendingMessages(conf.getMaxPendingMessages().intValue());
        }
        if (conf.getSendTimeoutUnit() != null && conf.getSendTimeoutValue() != null) {
            cmdBuilder.setSendTimeoutUnit(Encoder.javaTimeUnitToProtobuf(conf.getSendTimeoutUnit()));
            cmdBuilder.setSendTimeoutValue((long)conf.getSendTimeoutValue().intValue());
        }
        if (conf.getHashingScheme() != null) {
            cmdBuilder.setHashingScheme(conf.getHashingScheme().toTqHashingScheme());
        }
        if (conf.getBlockIfQueueFull() != null) {
            cmdBuilder.setBlockIfQueueFull(conf.getBlockIfQueueFull().booleanValue());
        }
        Tq.CommandProducerCreate cmd = cmdBuilder.build();
        long requestId = this.getNextRequestId();
        this.clientToBrokerOnNext(Tq.ClientToBroker.newBuilder().setRequestId(requestId).setCommandProducerCreate(cmd).build());
        return this.waitForResponse(requestId);
    }

    public Properties getVersionInfo() {
        return versionInfo;
    }

    protected void checkState(String desc) {
        this.stateMachine.checkStateIs(ClientPossibleStates.Ready, desc);
    }

    @Override
    public CompletableFuture<Client> connect() throws Exception {
        TurtleQueueGrpc.TurtleQueueStub stub = this.tqClient.checkAndGetStub(true);
        final ClientImpl clientRef = this;
        final CompletableFuture<Client> connectResponse = new CompletableFuture<Client>();
        this.brokerToClient = new StreamObserver<Tq.BrokerToClient>(){

            public void onNext(Tq.BrokerToClient c) {
                try {
                    Long requestId = c.getRequestId();
                    logger.log(Level.FINE, "Broker says: " + requestId + "  " + c.getBtocOneofCase() + "\n" + c);
                    switch (c.getBtocOneofCase()) {
                        case COMMAND_PING: {
                            clientRef.clientToBrokerOnNext(Tq.ClientToBroker.newBuilder().setReplyPong(Tq.ReplyPong.newBuilder().build()).build());
                            break;
                        }
                        case REPLY_CONNECT: {
                            UUID newUuid = UUID.fromString(c.getReplyConnect().getUuid());
                            if (clientRef.connectionUid != null && clientRef.connectionUid != newUuid) {
                                logger.log(Level.FINE, "Broker replied, reconnection complete {0}", newUuid);
                                clientRef.connectionUid = newUuid;
                                clientRef.reconnectConsumers();
                                break;
                            }
                            logger.log(Level.FINE, "Broker replied, handshake complete {0}", newUuid);
                            clientRef.connectionUid = newUuid;
                            ClientImpl.this.pingPongFut = ClientImpl.this.pingLoopService.scheduleWithFixedDelay(new PingCallable(clientRef), 10L, 20L, TimeUnit.SECONDS);
                            connectResponse.complete(clientRef);
                            break;
                        }
                        case REPLY_PONG: {
                            Tq.ReplyPong success = c.getReplyPong();
                            clientRef.deliverResponse(requestId, success);
                            break;
                        }
                        case COMMAND_MESSAGE: {
                            Tq.CommandMessage msg = c.getCommandMessage();
                            Long consumerId = msg.getConsumerId();
                            ConsumerImpl consumer = clientRef.consumerRegistry.get(consumerId);
                            if (consumer != null) {
                                consumer.enqueue(c.getCommandMessage());
                                break;
                            }
                            logger.log(Level.INFO, "Received message for consumer already closed. {requestId=" + requestId + " consumerId=" + consumerId + "}");
                            logger.log(Level.FINE, "Consumers present are: " + clientRef.consumerRegistry);
                            break;
                        }
                        case COMMAND_END_OF_TOPIC: {
                            Tq.CommandEndOfTopic eot = c.getCommandEndOfTopic();
                            Long consumerId = eot.getConsumerId();
                            ConsumerImpl consumer = clientRef.consumerRegistry.get(consumerId);
                            if (consumer != null) {
                                consumer.setTopicTerminated();
                                break;
                            }
                            logger.log(Level.INFO, "Received endOfTopic for consumer already closed" + requestId + "consumerId=" + consumerId);
                            break;
                        }
                        case RESPONSE_PUBLISH: {
                            Tq.ResponsePublish rp = c.getResponsePublish();
                            clientRef.deliverResponse(requestId, rp);
                            break;
                        }
                        case REPLY_SUCCESS: {
                            Tq.ReplySuccess success = c.getReplySuccess();
                            clientRef.deliverResponse(requestId, success);
                            break;
                        }
                        case REPLY_ERROR: {
                            logger.log(Level.SEVERE, "Error from the broker " + requestId);
                            TqClientException ex = TqClientException.makeTqExceptionFromReplyError(c.getReplyError());
                            logger.log(Level.SEVERE, "Error from the broker as exception " + ex);
                            if (requestId != 0L) {
                                clientRef.pendingRequests.get(requestId).completeExceptionally(ex);
                                clientRef.pendingRequests.remove(requestId);
                                break;
                            }
                            logger.log(Level.WARNING, "Could not find matching request for error" + requestId + ex);
                            break;
                        }
                        case COMMAND_GET_LAST_MESSAGE_ID_RESPONSE: {
                            if (requestId != 0L) {
                                Tq.CommandGetLastMessageIdResponse resp = c.getCommandGetLastMessageIdResponse();
                                clientRef.pendingRequests.get(requestId).complete(resp);
                                clientRef.pendingRequests.remove(requestId);
                                break;
                            }
                            logger.log(Level.WARNING, "Could not find matching request" + requestId + c);
                            break;
                        }
                        case BTOCONEOF_NOT_SET: {
                            logger.log(Level.WARNING, "Unknown message received {0}", c);
                            break;
                        }
                        default: {
                            logger.log(Level.WARNING, "Unimplemented message received {0}", c);
                        }
                    }
                }
                catch (Exception ex) {
                    logger.log(Level.SEVERE, "Unhandled SDK exception message={0}", c);
                    logger.log(Level.SEVERE, "Unhandled SDK exception error={0}", ex);
                }
            }

            public void onError(Throwable t) {
                logger.log(Level.FINE, "onError called {0} ", t);
                Status st = Status.fromThrowable((Throwable)t);
                if (!connectResponse.isDone()) {
                    logger.log(Level.SEVERE, "Cannot connect, please check credentials and connectivity {0} " + connectResponse, t);
                    connectResponse.completeExceptionally(t);
                } else if (clientRef.getConnState() == ConnectivityState.SHUTDOWN) {
                    logger.log(Level.FINE, "onError while shutdown - ignoring " + st);
                } else {
                    logger.log(Level.FINE, "onError State={0} BrokerReplyStatus={1}", new Object[]{clientRef.tqClient.getState(), st});
                    if (clientRef.tqClient.getState() == ConnectivityState.IDLE && (st.getCode() == Status.Code.UNAVAILABLE || st.getCode() == Status.Code.CANCELLED)) {
                        logger.log(Level.INFO, "Detected disconnection, initiating reconnect");
                        try {
                            clientRef.connect();
                        }
                        catch (Exception ex) {
                            logger.log(Level.SEVERE, "Failed to initiate reconnect {0}", ex);
                        }
                    }
                }
            }

            public void onCompleted() {
                System.out.println("Broker has closed bidi link TODO reconnect depending on state ?");
            }
        };
        try {
            this.clientToBroker = stub.bidilink(this.brokerToClient);
            if (this.userToken == null) {
                logger.log(Level.SEVERE, "Missing user token");
            }
            if (this.apiKey == null) {
                logger.log(Level.SEVERE, "Missing api key");
            }
        }
        catch (StatusRuntimeException e) {
            logger.log(Level.INFO, "RPC failed: {0}", e.getStatus());
            throw e;
        }
        return connectResponse;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void clientToBrokerOnNext(Tq.ClientToBroker cmd) {
        StreamObserver<Tq.ClientToBroker> streamObserver = this.clientToBroker;
        synchronized (streamObserver) {
            this.clientToBroker.onNext((Object)cmd);
            this.lastMessageSentAt = Instant.now();
        }
    }

    public ConnectivityState getConnState() {
        return this.tqClient.getState();
    }

    private <T> CompletableFuture<T> waitForResponse(Long requestId) {
        CompletableFuture res = new CompletableFuture();
        this.pendingRequests.put(requestId, res);
        return res;
    }

    private <T> void deliverResponse(Long requestId, T data) {
        try {
            CompletableFuture f = this.pendingRequests.get(requestId);
            if (f != null) {
                logger.log(Level.FINE, "Delivering response to " + f);
                f.complete(data);
                this.pendingRequests.remove(requestId);
            } else {
                logger.log(Level.WARNING, "Could not deliver response, missing requestId[{0}], data[{1}]", new Object[]{requestId, data});
            }
        }
        catch (Exception ex) {
            logger.log(Level.SEVERE, "Could not deliver response, exception ", ex);
        }
    }

    private void deliverPublishResponse(Long requestId, MessageId messageId) {
        CompletableFuture promiseResult = this.pendingRequests.get(requestId);
        promiseResult.complete(messageId);
    }

    public long getNextRequestId() {
        return this.requestIdGenerator.getAndIncrement();
    }

    public long getNextConsumerId() {
        return this.consumerIdGenerator.getAndIncrement();
    }

    public long getNextProducerId() {
        return this.producerIdGenerator.getAndIncrement();
    }

    @Override
    public ConsumerBuilder newConsumer() {
        return new ConsumerBuilder(this);
    }

    @Override
    public ProducerBuilder newProducer() {
        return new ProducerBuilder(this);
    }

    @Override
    public ReaderBuilder newReader() {
        return new ReaderBuilder(this);
    }

    @Override
    public AcknowledgeBuilder newAcknowledge() {
        return new AcknowledgeBuilder(this);
    }

    protected <T> CompletableFuture<T> consumerCommand(Tq.CommandConsumer commandConsumer) {
        long requestId = this.getNextRequestId();
        this.clientToBrokerOnNext(Tq.ClientToBroker.newBuilder().setRequestId(requestId).setCommandConsumer(commandConsumer).build());
        return this.waitForResponse(requestId);
    }

    protected <T> CompletableFuture<T> producerCommand(Tq.CommandProducer commandProducer) {
        long requestId = this.getNextRequestId();
        this.clientToBrokerOnNext(Tq.ClientToBroker.newBuilder().setRequestId(requestId).setCommandProducer(commandProducer).build());
        return this.waitForResponse(requestId);
    }

    protected static void registerAdmin(Class a) {
        adminClass = a;
    }

    @Override
    public Admin admin() {
        if (this.admin != null) {
            return this.admin;
        }
        if (adminClass == null) {
            logger.log(Level.SEVERE, "Admin dependency is missing. Please import com.turtlequeue.admin and initialize it before using it");
            return null;
        }
        try {
            Admin ob;
            this.admin = ob = adminClass.getDeclaredConstructor(this.getClass()).newInstance(this);
            return this.admin;
        }
        catch (Exception ex) {
            logger.log(Level.SEVERE, "Error creating new admin. {0}", ex);
            return null;
        }
    }

    @Override
    public TopicBuilderImpl newTopicBuilder() {
        return new TopicBuilderImpl();
    }

    protected String getUserToken() {
        return this.userToken;
    }

    protected String getApiKey() {
        return this.apiKey;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws Exception {
        this.stateMachine.setState(ClientPossibleStates.Stopping);
        if (this.pingPongFut != null) {
            this.pingPongFut.cancel(true);
        }
        this.pingLoopService.shutdownNow();
        if (this.stateChangeFut != null) {
            this.stateChangeFut.cancel(true);
        }
        this.stateChangeWatcher.shutdownNow();
        Iterator<Map.Entry<Long, ConsumerImpl>> it = this.consumerRegistry.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Long, ConsumerImpl> pair = it.next();
            ConsumerImpl c = pair.getValue();
            c.closeAsync(false);
            it.remove();
        }
        StreamObserver<Tq.ClientToBroker> streamObserver = this.clientToBroker;
        synchronized (streamObserver) {
            ((ClientCallStreamObserver)this.clientToBroker).cancel("Client is closing", null);
        }
        this.tqClient.close();
        this.stateMachine.setState(ClientPossibleStates.Idle);
    }

    public String toString() {
        return MoreObjects.toStringHelper((Object)this).add("connectionUid", (Object)this.connectionUid).add("state", (Object)this.stateMachine.getInternalState()).add("connState", (Object)this.tqClient.getState()).toString();
    }

    private class PingCallable
    implements Runnable {
        private final ClientImpl client;

        PingCallable(ClientImpl client) {
            this.client = client;
        }

        @Override
        public void run() {
            try {
                this.client.pingPong();
            }
            catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
            }
            catch (ExecutionException executionException) {
            }
            catch (TimeoutException timeoutException) {
                // empty catch block
            }
        }
    }

    private class WatchStateCallable
    implements Runnable {
        private final ClientImpl client;

        WatchStateCallable(ClientImpl client) {
            this.client = client;
        }

        @Override
        public void run() {
            try {
                this.client.checkIfReconnected();
            }
            catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
            }
            catch (ExecutionException executionException) {
            }
            catch (TimeoutException timeoutException) {
                // empty catch block
            }
        }
    }
}

