/*
 * Decompiled with CFR 0.152.
 */
package com.turtlequeue;

import com.cognitect.transit.Reader;
import com.cognitect.transit.TransitFactory;
import com.google.common.base.MoreObjects;
import com.google.common.collect.Queues;
import com.turtlequeue.AcknowledgeBuilder;
import com.turtlequeue.ClientImpl;
import com.turtlequeue.Consumer;
import com.turtlequeue.ConsumerParams;
import com.turtlequeue.ConsumerPossibleStates;
import com.turtlequeue.EndOfTopicMessageListener;
import com.turtlequeue.GrowableArrayBlockingQueue;
import com.turtlequeue.Message;
import com.turtlequeue.MessageId;
import com.turtlequeue.MessageIdImpl;
import com.turtlequeue.MessageImpl;
import com.turtlequeue.StateMachine;
import com.turtlequeue.Topic;
import com.turtlequeue.TopicImpl;
import com.turtlequeue.TqClientException;
import com.turtlequeue.sdk.api.proto.Tq;
import com.turtlequeue.sdk.api.proto.TurtleQueueGrpc;
import io.grpc.ConnectivityState;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.IntUnaryOperator;
import java.util.logging.Level;
import java.util.logging.Logger;

public class ConsumerImpl<T>
implements Consumer<T> {
    private static final Logger logger = Logger.getLogger(TurtleQueueGrpc.class.getName());
    private static final int MAX_REDELIVER_UNACKNOWLEDGED = 1000;
    ClientImpl c = null;
    protected Long consumerId = null;
    protected Topic topic = null;
    protected String subName = null;
    protected String consumerName = null;
    protected String schema = null;
    protected Integer priority = null;
    protected MessageId initialPosition = null;
    protected EndOfTopicMessageListener<T> endOfTopicMessageListener = null;
    protected Long ackTimeoutCount = null;
    protected TimeUnit ackTimeoutUnit = null;
    protected ConsumerParams conf;
    final BlockingQueue<Tq.CommandMessage> incomingMessages;
    protected Integer receiverQueueRefillThreshold = null;
    protected Integer maxReceiverQueueSize = null;
    protected Boolean hasReachedEndOfTopic = null;
    protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives;
    private StateMachine<ConsumerPossibleStates> stateMachine = null;
    private static final AtomicIntegerFieldUpdater<ConsumerImpl> AVAILABLE_PERMITS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ConsumerImpl.class, "availablePermits");
    private volatile int availablePermits = 0;
    private CompletableFuture<ConsumerImpl> subscribeReturnF = null;

    ConsumerImpl(ClientImpl client, ConsumerParams conf) {
        this.c = client;
        this.conf = conf;
        this.consumerId = conf.getConsumerId();
        this.topic = conf.getTopic();
        this.subName = conf.getSubName();
        this.consumerName = conf.getConsumerName();
        this.priority = conf.getPriority();
        this.initialPosition = conf.getInitialPosition();
        this.ackTimeoutCount = conf.getAckTimeout();
        this.ackTimeoutUnit = conf.getAckTimeoutTimeUnit();
        this.endOfTopicMessageListener = conf.getEndOfTopicMessageListener();
        this.maxReceiverQueueSize = conf.getReceiverQueueSize();
        this.receiverQueueRefillThreshold = conf.getReceiverQueueSize() / 2;
        this.incomingMessages = new GrowableArrayBlockingQueue<Tq.CommandMessage>();
        this.pendingReceives = Queues.newConcurrentLinkedQueue();
        this.hasReachedEndOfTopic = false;
        this.stateMachine = new StateMachine<ConsumerPossibleStates>().setState(ConsumerPossibleStates.Idle);
        this.c.registerConsumer(this);
        this.subscribeReturnF = new CompletableFuture();
        ((CompletableFuture)this.c.registerConsumerBroker(this).thenRun(() -> {
            logger.log(Level.FINE, "[{0}] Registering consumer success", conf);
            this.stateMachine.setState(ConsumerPossibleStates.Ready);
            this.subscribeReturnF.complete(this);
        })).exceptionally(ex -> {
            logger.log(Level.WARNING, "[{0}]Registering consumer broker failed {1}", new Object[]{conf, ex});
            this.stateMachine.setState(ConsumerPossibleStates.Stopping);
            this.subscribeReturnF.completeExceptionally((Throwable)ex);
            return null;
        });
    }

    protected void disconnect() {
        this.clearReceiverQueue();
        ((CompletableFuture)this.c.consumerCommand(Tq.CommandConsumer.newBuilder().setConsumerId(this.getConsumerId().longValue()).setCommandCloseConsumer(Tq.CommandCloseConsumer.newBuilder().build()).build()).thenRun(() -> this.stateMachine.setState(ConsumerPossibleStates.Idle))).exceptionally(t -> {
            this.stateMachine.setState(ConsumerPossibleStates.Idle);
            return null;
        });
    }

    protected void reconnect() {
        this.clearReceiverQueue();
        ((CompletableFuture)this.c.registerConsumerBroker(this).thenRun(() -> {
            logger.log(Level.FINE, "[{0}] Re-registering consumer success", this.conf);
            this.stateMachine.setState(ConsumerPossibleStates.Ready);
        })).exceptionally(ex -> {
            logger.log(Level.WARNING, "[{0}]Re-registering consumer with the broker failed {1}", new Object[]{this.conf, ex});
            this.stateMachine.setState(ConsumerPossibleStates.Stopping);
            return null;
        });
    }

    protected void setTopicTerminated() {
        this.hasReachedEndOfTopic = true;
        if (this.endOfTopicMessageListener != null) {
            this.endOfTopicMessageListener.reachedEndOfTopic(this);
        }
    }

    @Override
    public Boolean hasReachedEndOfTopic() {
        return this.hasReachedEndOfTopic;
    }

    public CompletableFuture<Boolean> hasMessageAvailable() {
        return new CompletableFuture<Boolean>();
    }

    public CompletableFuture<Void> seek(MessageId messageId) {
        CompletableFuture<Void> res = new CompletableFuture<Void>();
        if (this.c.tqClient == null || this.c.tqClient.getState() != ConnectivityState.READY) {
            res.completeExceptionally(new TqClientException.AlreadyClosedException("Cannot seek when the TqClient is not ready"));
        } else {
            this.stateMachine.setState(ConsumerPossibleStates.Seeking);
            this.clearReceiverQueue();
            this.c.consumerCommand(Tq.CommandConsumer.newBuilder().setConsumerId(this.getConsumerId().longValue()).setCommandSeek(Tq.CommandSeek.newBuilder().setMessageId(MessageIdImpl.toMessageIdData(messageId)).build()).build()).handle((resp, t) -> {
                this.stateMachine.setState(ConsumerPossibleStates.Ready);
                if (t == null) {
                    res.complete(null);
                } else {
                    logger.log(Level.WARNING, "Error seeking consumer={0} messageId={1}", new Object[]{this, messageId});
                    res.completeExceptionally((Throwable)t);
                }
                return null;
            });
        }
        return res;
    }

    @Override
    public boolean isConnected() {
        return this.c.tqClient != null && this.c.tqClient.getState() == ConnectivityState.READY;
    }

    public CompletableFuture<Void> seek(long timestamp) {
        CompletableFuture<Void> res = new CompletableFuture<Void>();
        if (this.c.tqClient == null || this.c.tqClient.getState() != ConnectivityState.READY) {
            res.completeExceptionally(new TqClientException.AlreadyClosedException("Cannot seek when the TqClient is not ready"));
        } else {
            this.stateMachine.setState(ConsumerPossibleStates.Seeking);
            this.clearReceiverQueue();
            this.c.consumerCommand(Tq.CommandConsumer.newBuilder().setConsumerId(this.getConsumerId().longValue()).setCommandSeek(Tq.CommandSeek.newBuilder().setMessagePublishTime(timestamp).build()).build()).handle((resp, t) -> {
                this.stateMachine.setState(ConsumerPossibleStates.Ready);
                if (t == null) {
                    res.complete(null);
                } else {
                    logger.log(Level.WARNING, "Error seeking consumer={0} timestamp={1}", new Object[]{this, timestamp});
                    res.completeExceptionally((Throwable)t);
                }
                return null;
            });
        }
        return res;
    }

    public Topic getTopic() {
        return this.topic;
    }

    protected ConsumerParams getConf() {
        return this.conf;
    }

    protected Long getConsumerId() {
        return this.consumerId;
    }

    protected void clearReceiverQueue() {
        this.incomingMessages.clear();
        AVAILABLE_PERMITS_UPDATER.set(this, 0);
    }

    private CompletableFuture<Message<T>> pollPendingReceive() {
        CompletableFuture<Message<T>> receivedFuture;
        while ((receivedFuture = this.pendingReceives.poll()) != null && receivedFuture.isDone()) {
        }
        return receivedFuture;
    }

    protected void enqueue(Tq.CommandMessage msg) {
        ConsumerPossibleStates state = this.stateMachine.getInternalState();
        if (state != ConsumerPossibleStates.Ready) {
            logger.log(Level.WARNING, "[.enqueue] skipping consumerId={0} state={1} messages={2}", new Object[]{this.consumerId, state, msg});
            return;
        }
        if (this.pendingReceives.isEmpty()) {
            logger.log(Level.INFO, "[.enqueue] putting {0} in incomingMessages {1}", new Object[]{msg, this.incomingMessages});
            this.incomingMessages.add(msg);
        } else {
            logger.log(Level.INFO, "[.enqueue] there are already {0} .receive futures waiting", this.pendingReceives.size());
            CompletableFuture<Message<Message<T>>> userReceiveFuture = this.pollPendingReceive();
            if (userReceiveFuture == null) {
                this.incomingMessages.add(msg);
                return;
            }
            userReceiveFuture.complete(this.messageProcessed(msg));
        }
    }

    protected CompletableFuture<ConsumerImpl> subscribeReturn() {
        return this.subscribeReturnF;
    }

    private CompletableFuture sendFlowPermitsToBroker(int numMessages) {
        return this.c.consumerCommand(Tq.CommandConsumer.newBuilder().setConsumerId(this.getConsumerId().longValue()).setCommandFlow(Tq.CommandFlow.newBuilder().setMessagePermits(numMessages).build()).build());
    }

    protected void increaseAvailablePermits(int delta) {
        int available = AVAILABLE_PERMITS_UPDATER.addAndGet(this, delta);
        logger.log(Level.WARNING, "{0}: +{1} permits, total= {2} messages, refill = {3}", new Object[]{this, delta, available, available >= this.receiverQueueRefillThreshold});
        if (available >= this.receiverQueueRefillThreshold) {
            this.sendFlowPermitsToBroker(available);
            IntUnaryOperator updateFn = current -> {
                logger.log(Level.WARNING, "updating permits: was={0}, sentToBroker={1}, left={2}", new Object[]{current, available, current - available});
                return current - available;
            };
            int left = AVAILABLE_PERMITS_UPDATER.updateAndGet(this, updateFn);
            logger.log(Level.WARNING, "Permits now left are {0} for {1}\n ", new Object[]{left, this});
        }
    }

    protected synchronized void onMessageProcessed() {
        this.increaseAvailablePermits(1);
    }

    private Message<T> messageProcessed(Tq.CommandMessage msg) {
        ByteArrayInputStream in = new ByteArrayInputStream(msg.getPayload().toByteArray());
        Reader reader = TransitFactory.reader((TransitFactory.Format)TransitFactory.Format.JSON, (InputStream)in);
        Object data = reader.read();
        MessageImpl<Object> result = new MessageImpl<Object>(this.c, this, MessageIdImpl.fromMessageIdData(msg.getMessageId()), data, msg.getProducerName(), msg.getEventTime(), msg.getPublishTime(), TopicImpl.fromTqTopic(msg.getTopic()), msg.getKey(), msg.getPropertiesMap(), null, msg.getIsReplicated(), msg.getReplicatedFrom(), null, null);
        this.onMessageProcessed();
        return result;
    }

    @Override
    public CompletableFuture<Message<T>> receive() {
        CompletableFuture<Message<Message<T>>> result = new CompletableFuture<Message<Message<T>>>();
        try {
            Tq.CommandMessage msg = this.incomingMessages.poll(0L, TimeUnit.MILLISECONDS);
            if (msg != null) {
                logger.log(Level.INFO, "[.receive] called, got message from receiverQueue {0}", msg);
                result.complete(this.messageProcessed(msg));
                return result;
            }
            logger.log(Level.INFO, "[.receive] called, no messages in receiverQueue");
            this.pendingReceives.add(result);
        }
        catch (InterruptedException ex) {
            logger.log(Level.INFO, "[.receive] Interrupt Exception");
            Thread.currentThread().interrupt();
            result.completeExceptionally(ex);
        }
        return result;
    }

    public Tq.CommandMessage receive(long timeout, TimeUnit unit) throws Exception {
        return this.incomingMessages.poll(timeout, unit);
    }

    public AcknowledgeBuilder newAcknowledge() {
        return new AcknowledgeBuilder(this.c).setConsumer(this);
    }

    public CompletableFuture<Void> acknowledge(MessageId messageId) {
        return this.c.consumerCommand(Tq.CommandConsumer.newBuilder().setConsumerId(this.getConsumerId().longValue()).setCommandAck(Tq.CommandAck.newBuilder().setAckType(Tq.CommandAck.AckType.INDIVIDUAL).setMessageId(MessageIdImpl.toMessageIdData(messageId)).setNegativeAck(false).build()).build());
    }

    @Override
    public CompletableFuture<Void> acknowledge(Message<T> message) {
        return this.acknowledge(message.getMessageId());
    }

    @Override
    public CompletableFuture<Void> nonAcknowledge(MessageId messageId) {
        return this.c.consumerCommand(Tq.CommandConsumer.newBuilder().setConsumerId(this.getConsumerId().longValue()).setCommandAck(Tq.CommandAck.newBuilder().setAckType(Tq.CommandAck.AckType.INDIVIDUAL).setMessageId(MessageIdImpl.toMessageIdData(messageId)).setNegativeAck(true).build()).build());
    }

    @Override
    public CompletableFuture<Void> nonAcknowledge(Message<T> message) {
        return this.nonAcknowledge(message.getMessageId());
    }

    @Override
    public CompletableFuture<Void> redeliverUnacknowledgedMessages() {
        return this.c.consumerCommand(Tq.CommandConsumer.newBuilder().setConsumerId(this.getConsumerId().longValue()).setRedeliverUnacknowledgedMessages(Tq.CommandRedeliverUnacknowledgedMessages.newBuilder().build()).build());
    }

    @Override
    public CompletableFuture<Void> acknowledgeCumulativeAsync(Message<T> message) {
        return this.c.consumerCommand(Tq.CommandConsumer.newBuilder().setConsumerId(this.getConsumerId().longValue()).setCommandAck(Tq.CommandAck.newBuilder().setAckType(Tq.CommandAck.AckType.CUMULATIVE).setNegativeAck(false).setMessageId(MessageIdImpl.toMessageIdData(message.getMessageId())).build()).build()).thenApply(x -> null);
    }

    protected CompletableFuture<Void> closeAsync(boolean informBroker) {
        this.stateMachine.setState(ConsumerPossibleStates.Stopping);
        if (informBroker) {
            ClientImpl clientRef = this.c;
            return this.c.consumerCommand(Tq.CommandConsumer.newBuilder().setConsumerId(this.getConsumerId().longValue()).setCommandCloseConsumer(Tq.CommandCloseConsumer.newBuilder().build()).build()).thenRun(() -> {
                this.stateMachine.setState(ConsumerPossibleStates.Idle);
                clientRef.removeConsumer(this);
            });
        }
        this.stateMachine.setState(ConsumerPossibleStates.Idle);
        this.c.removeConsumer(this);
        return CompletableFuture.supplyAsync(() -> null);
    }

    @Override
    public void close() {
        this.closeAsync(true).join();
    }

    public String toString() {
        return MoreObjects.toStringHelper((Object)this).add("consumerId", (Object)this.consumerId).add("subscription", (Object)this.subName).add("consumerName", (Object)this.consumerName).add("topic", (Object)this.topic).add("subType", (Object)this.conf.getSubType()).add("priority", (Object)this.priority).add("receiverQueueSize", (Object)this.maxReceiverQueueSize).add("ackTimeout", (Object)this.ackTimeoutCount).add("ackTimeoutUnit", (Object)this.ackTimeoutUnit).toString();
    }
}

