/*
 * Decompiled with CFR 0.152.
 */
package com.turtlequeue;

import com.google.common.base.MoreObjects;
import com.turtlequeue.ClientImpl;
import com.turtlequeue.ConsumerImpl;
import com.turtlequeue.ConsumerParams;
import com.turtlequeue.Message;
import com.turtlequeue.MessageId;
import com.turtlequeue.Reader;
import com.turtlequeue.Topic;
import com.turtlequeue.sdk.api.proto.TurtleQueueGrpc;
import java.util.concurrent.CompletableFuture;
import java.util.logging.Level;
import java.util.logging.Logger;

public class ReaderImpl<T>
implements Reader<T> {
    private static final Logger logger = Logger.getLogger(TurtleQueueGrpc.class.getName());
    ConsumerImpl<T> consumer = null;

    ReaderImpl(ClientImpl c, ConsumerParams conf) {
        this.consumer = new ConsumerImpl(c, conf);
    }

    @Override
    public Topic getTopic() {
        return this.consumer.getTopic();
    }

    @Override
    public CompletableFuture<Message<T>> readNext() {
        CompletableFuture<Message<T>> receiveFuture = this.consumer.receive();
        receiveFuture.whenComplete((msg, t) -> {
            if (msg != null) {
                this.consumer.acknowledgeCumulativeAsync((Message<T>)msg).exceptionally(ex -> {
                    logger.log(Level.WARNING, "Reader could not acknowledge messages {0}", new Object[]{ex});
                    return null;
                });
            }
        });
        return receiveFuture;
    }

    @Override
    public CompletableFuture<Void> closeAsync() {
        return this.consumer.closeAsync(true);
    }

    @Override
    public void close() {
        this.closeAsync().join();
    }

    @Override
    public boolean hasReachedEndOfTopic() {
        return this.consumer.hasReachedEndOfTopic();
    }

    @Override
    public CompletableFuture<Boolean> hasMessageAvailable() {
        return this.consumer.hasMessageAvailable();
    }

    @Override
    public boolean isConnected() {
        return this.consumer.isConnected();
    }

    @Override
    public CompletableFuture<Void> seek(MessageId messageId) {
        return this.consumer.seek(messageId);
    }

    @Override
    public CompletableFuture<Void> seek(long timestamp) {
        return this.consumer.seek(timestamp);
    }

    public CompletableFuture<Reader<T>> subscribeReturn() {
        return this.consumer.subscribeReturn().thenApply(consumer -> this);
    }

    public String toString() {
        return MoreObjects.toStringHelper((Object)this).add("consumerId", (Object)this.consumer.consumerId).add("subscription", (Object)this.consumer.subName).add("readerName", (Object)this.consumer.consumerName).add("topic", (Object)this.consumer.topic).toString();
    }
}

