/*
 * Decompiled with CFR 0.152.
 */
package com.turtlequeue;

import com.cognitect.transit.TransitFactory;
import com.cognitect.transit.Writer;
import com.google.protobuf.ByteString;
import com.turtlequeue.ClientImpl;
import com.turtlequeue.Encoder;
import com.turtlequeue.Message;
import com.turtlequeue.MessageBuilder;
import com.turtlequeue.MessageId;
import com.turtlequeue.Producer;
import com.turtlequeue.ProducerParams;
import com.turtlequeue.ProducerPossibleStates;
import com.turtlequeue.StateMachine;
import com.turtlequeue.sdk.api.proto.Tq;
import com.turtlequeue.sdk.api.proto.TurtleQueueGrpc;
import java.io.OutputStream;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.logging.Level;
import java.util.logging.Logger;

public class ProducerImpl<T>
implements Producer {
    private static final Logger logger = Logger.getLogger(TurtleQueueGrpc.class.getName());
    ClientImpl c = null;
    ProducerParams conf = null;
    StateMachine<ProducerPossibleStates> stateMachine = null;
    CompletableFuture<ProducerImpl<T>> producerCreateReturnF = null;

    ProducerImpl(ClientImpl client, ProducerParams conf) {
        this.c = client;
        this.conf = conf;
        this.stateMachine = new StateMachine<ProducerPossibleStates>().setState(ProducerPossibleStates.Idle);
        this.c.registerProducer(this);
        this.producerCreateReturnF = new CompletableFuture();
        ((CompletableFuture)this.c.registerProducerBroker(this).thenRun(() -> {
            logger.log(Level.FINE, "[{0}] Registering producer success", conf);
            this.stateMachine.setState(ProducerPossibleStates.Ready);
            this.producerCreateReturnF.complete(this);
        })).exceptionally(ex -> {
            logger.log(Level.WARNING, "[{0}]Registering producer broker failed {1}", new Object[]{conf, ex});
            this.stateMachine.setState(ProducerPossibleStates.Stopping);
            this.producerCreateReturnF.completeExceptionally((Throwable)ex);
            return null;
        });
    }

    protected CompletableFuture<ProducerImpl<T>> createReturn() {
        return this.producerCreateReturnF;
    }

    @Override
    public MessageBuilder<T> newMessage() {
        return new MessageBuilder(this);
    }

    protected CompletableFuture<MessageId> send(Message<T> msg) {
        ByteString.Output out = ByteString.newOutput();
        Writer writer = null;
        writer = this.c.getTransitWriter() != null ? this.c.getTransitWriter().apply((OutputStream)out) : TransitFactory.writer((TransitFactory.Format)TransitFactory.Format.JSON, (OutputStream)out, this.c.getCustomWriteHandlers(), this.c.getCustomDefaultWriteHandler());
        try {
            writer.write(msg.getData());
        }
        catch (Exception ex) {
            logger.log(Level.WARNING, "[{0}] Failed to encode data", msg.getData());
            throw ex;
        }
        Tq.CommandSend.Builder b = Tq.CommandSend.newBuilder();
        b.setPayload(out.toByteString()).setReplicationDisabled(!msg.isReplicated());
        if (msg.getKey() != null) {
            b.setKey(msg.getKey());
        }
        if (msg.getEventTime() != null) {
            b.setEventTime(msg.getEventTime().longValue());
        }
        if (msg.getDelayValue() != null && msg.getDelayTimeUnit() != null) {
            b.setDelayTimeValue(msg.getDelayValue().longValue()).setDelayTimeUnit(Encoder.javaTimeUnitToProtobuf(msg.getDelayTimeUnit()));
        }
        if (msg.getProperties() != null) {
            Map<String, String> p = msg.getProperties();
            b.putAllProperties(p);
        }
        Tq.CommandProducer cmd = Tq.CommandProducer.newBuilder().setProducerId(this.getProducerId().longValue()).setCommandSend(b.build()).build();
        return this.c.producerCommand(cmd).thenApply(rp -> {
            MessageId messageId = MessageId.fromMessageIdData(rp.getMessageId());
            return messageId;
        });
    }

    protected Long getProducerId() {
        return this.conf.getProducerId();
    }

    protected ProducerParams getConf() {
        return this.conf;
    }

    protected ClientImpl getClient() {
        return this.c;
    }

    protected void reconnect() {
        ((CompletableFuture)this.c.registerProducerBroker(this).thenRun(() -> {
            logger.log(Level.FINE, "[{0}] Re-registering producer success", this.conf);
            this.stateMachine.setState(ProducerPossibleStates.Ready);
        })).exceptionally(ex -> {
            logger.log(Level.WARNING, "[{0}]Re-registering producer with the broker failed {1}", new Object[]{this.conf, ex});
            this.stateMachine.setState(ProducerPossibleStates.Stopping);
            return null;
        });
    }

    protected CompletableFuture<Void> closeAsync(boolean informBroker) {
        this.stateMachine.setState(ProducerPossibleStates.Stopping);
        if (informBroker) {
            ClientImpl clientRef = this.c;
            return this.c.producerCommand(Tq.CommandProducer.newBuilder().setProducerId(this.getProducerId().longValue()).setCommandCloseProducer(Tq.CommandCloseProducer.newBuilder().build()).build()).thenRun(() -> {
                this.stateMachine.setState(ProducerPossibleStates.Idle);
                clientRef.removeProducer(this);
            });
        }
        this.stateMachine.setState(ProducerPossibleStates.Idle);
        this.c.removeProducer(this);
        return CompletableFuture.supplyAsync(() -> null);
    }

    @Override
    public void close() {
        this.closeAsync(true).join();
    }
}

