/*
 * Decompiled with CFR 0.152.
 */
package com.webull.openapi.trade;

import com.webull.openapi.core.execption.ClientException;
import com.webull.openapi.core.retry.RetryPolicy;
import com.webull.openapi.trade.events.internal.CancellableISubscription;
import com.webull.openapi.trade.events.internal.lifecycle.proxy.SubscribeHandlerProxyFactory;
import com.webull.openapi.trade.events.internal.proto.EventServiceGrpc;
import com.webull.openapi.trade.events.internal.proto.Events;
import com.webull.openapi.trade.events.subscribe.ISubscription;
import com.webull.openapi.trade.events.subscribe.ITradeEventClient;
import com.webull.openapi.trade.events.subscribe.message.SubscribeRequest;
import com.webull.openapi.trade.grpc.BaseGrpcClient;
import com.webull.openapi.trade.grpc.StatefulResponseObserver;
import com.webull.openapi.trade.grpc.auth.SignatureCallCredentials;
import com.webull.openapi.trade.grpc.lifecycle.GrpcHandler;
import com.webull.openapi.trade.grpc.lifecycle.proxy.HandlerProxyFactory;
import com.webull.openapi.trade.grpc.retry.SynchronousGrpcRetryable;
import io.grpc.Channel;
import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

public class TradeEventClient
extends BaseGrpcClient<Events.SubscribeResponse>
implements ITradeEventClient {
    private final AtomicBoolean subscribing = new AtomicBoolean(false);
    private final EventServiceGrpc.EventServiceStub stub;

    public TradeEventClient(String appKey, String appSecret, String host, int port, RetryPolicy retryPolicy, boolean enableTls, List<GrpcHandler> handlers) {
        this(appKey, appSecret, host, port, retryPolicy, enableTls, handlers, SubscribeHandlerProxyFactory.getInstance());
    }

    protected TradeEventClient(String appKey, String appSecret, String host, int port, RetryPolicy retryPolicy, boolean enableTls, List<GrpcHandler> handlers, HandlerProxyFactory<Events.SubscribeResponse> handlerProxyFactory) {
        super(appKey, appSecret, host, port, retryPolicy, enableTls, handlers, handlerProxyFactory);
        this.buildChannel();
        this.stub = EventServiceGrpc.newStub((Channel)this.channel);
    }

    @Override
    public ISubscription subscribe(SubscribeRequest request) {
        if (!this.subscribing.compareAndSet(false, true)) {
            throw new ClientException("InvalidState", "Client is already subscribed");
        }
        Events.SubscribeRequest grpcRequest = Events.SubscribeRequest.newBuilder().setSubscribeType(request.getSubscribeType()).setTimestamp(request.getTimestamp()).addAllAccounts(request.getAccounts()).build();
        byte[] requestBytes = grpcRequest.toByteArray();
        SignatureCallCredentials credentials = new SignatureCallCredentials(this.appKey, this.appSecret, requestBytes);
        EventServiceGrpc.EventServiceStub credentialsStub = (EventServiceGrpc.EventServiceStub)this.stub.withCallCredentials(credentials);
        StatefulResponseObserver responseObserver = new StatefulResponseObserver(this.subObservers);
        Supplier<Object> doSubscribe = () -> {
            credentialsStub.subscribe(grpcRequest, (StreamObserver<Events.SubscribeResponse>)responseObserver);
            return null;
        };
        responseObserver.setRetryable(new SynchronousGrpcRetryable(doSubscribe, this.retryPolicy));
        CancellableISubscription subscription = new CancellableISubscription(responseObserver);
        subscription.completable().whenComplete((ignore, ex) -> this.subscribing.compareAndSet(true, false));
        doSubscribe.get();
        return subscription;
    }
}

