/*
 * Decompiled with CFR 0.152.
 */
package com.webull.openapi.quotes.internal.grpc;

import com.google.protobuf.InvalidProtocolBufferException;
import com.webull.openapi.execption.ClientException;
import com.webull.openapi.grpc.BaseGrpcClient;
import com.webull.openapi.grpc.StatefulResponseObserver;
import com.webull.openapi.grpc.auth.SignatureCallCredentials;
import com.webull.openapi.grpc.exception.ErrorResponseException;
import com.webull.openapi.grpc.lifecycle.GrpcHandler;
import com.webull.openapi.grpc.lifecycle.SubStreamObserver;
import com.webull.openapi.grpc.lifecycle.proxy.HandlerProxyFactory;
import com.webull.openapi.grpc.retry.SynchronousGrpcRetryable;
import com.webull.openapi.logger.Logger;
import com.webull.openapi.logger.LoggerFactory;
import com.webull.openapi.quotes.api.QuotesApiClient;
import com.webull.openapi.quotes.api.lifecycle.ReplyMessage;
import com.webull.openapi.quotes.domain.AskBid;
import com.webull.openapi.quotes.domain.Bar;
import com.webull.openapi.quotes.domain.Broker;
import com.webull.openapi.quotes.domain.Instrument;
import com.webull.openapi.quotes.domain.Order;
import com.webull.openapi.quotes.domain.Quote;
import com.webull.openapi.quotes.domain.Snapshot;
import com.webull.openapi.quotes.domain.Tick;
import com.webull.openapi.quotes.internal.grpc.lifecycle.ApiLoggingHandler;
import com.webull.openapi.quotes.internal.grpc.lifecycle.channel.ApiGrpcChannel;
import com.webull.openapi.quotes.internal.grpc.lifecycle.channel.GrpcChannel;
import com.webull.openapi.quotes.internal.grpc.lifecycle.channel.GrpcStreamChannelPoolMap;
import com.webull.openapi.quotes.internal.grpc.lifecycle.channel.GrpcStreamEndpoint;
import com.webull.openapi.quotes.internal.grpc.lifecycle.channel.SingletonChannelPool;
import com.webull.openapi.quotes.internal.grpc.lifecycle.channel.StreamObserverFactory;
import com.webull.openapi.quotes.internal.grpc.lifecycle.proxy.ApiHandlerProxyFactory;
import com.webull.openapi.quotes.internal.grpc.proto.Api;
import com.webull.openapi.quotes.internal.grpc.proto.Gateway;
import com.webull.openapi.quotes.internal.grpc.proto.QuoteGrpc;
import com.webull.openapi.retry.RetryPolicy;
import com.webull.openapi.retry.Retryable;
import com.webull.openapi.utils.Assert;
import com.webull.openapi.utils.CollectionUtils;
import com.webull.openapi.utils.StringUtils;
import io.grpc.CallCredentials;
import io.grpc.Channel;
import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class GrpcQuotesApiClient
extends BaseGrpcClient<Gateway.ClientResponse>
implements QuotesApiClient {
    private static final Logger logger = LoggerFactory.getLogger(GrpcQuotesApiClient.class);
    private static final GrpcStreamChannelPoolMap<Gateway.ClientRequest, Gateway.ClientResponse> channelPoolMap = new GrpcStreamChannelPoolMap();
    private final long connectTimeoutMillis;
    private final long readTimeoutMillis;
    private final SingletonChannelPool<Gateway.ClientRequest, Gateway.ClientResponse> channelPool;
    private final ApiConnectionManager connectionManager = new ApiConnectionManager();
    private final CallCredentials credentials = new SignatureCallCredentials(this.appKey, this.appSecret, this.host, Integer.valueOf(this.port), "/openapi.Quote/StreamRequest", null);

    protected GrpcQuotesApiClient(String appKey, String appSecret, String host, int port, long connectTimeoutMillis, long readTimeoutMillis, RetryPolicy retryPolicy, boolean enableTls, List<GrpcHandler> handlers) {
        this(appKey, appSecret, host, port, connectTimeoutMillis, readTimeoutMillis, retryPolicy, enableTls, handlers, (HandlerProxyFactory<Gateway.ClientResponse>)ApiHandlerProxyFactory.getInstance());
    }

    protected GrpcQuotesApiClient(String appKey, String appSecret, String host, int port, long connectTimeoutMillis, long readTimeoutMillis, RetryPolicy retryPolicy, boolean enableTls, List<GrpcHandler> handlers, HandlerProxyFactory<Gateway.ClientResponse> handlerProxyFactory) {
        super(appKey, appSecret, host, port, retryPolicy, enableTls, handlers, handlerProxyFactory);
        Assert.nonnegative((String)"connectTimeoutMillis", (long)connectTimeoutMillis);
        Assert.nonnegative((String)"readTimeoutMillis", (long)readTimeoutMillis);
        this.connectTimeoutMillis = connectTimeoutMillis;
        this.readTimeoutMillis = readTimeoutMillis;
        GrpcStreamEndpoint<Gateway.ClientRequest, Gateway.ClientResponse> endpoint = new GrpcStreamEndpoint<Gateway.ClientRequest, Gateway.ClientResponse>(this.appKey, this.host, this.port, new ApiStreamObserverFactory());
        this.channelPool = (SingletonChannelPool)channelPoolMap.get(endpoint);
    }

    private Gateway.ClientResponse doRequest(Gateway.ClientRequest request) {
        GrpcChannel grpcChannel = this.channelPool.acquire(this.connectTimeoutMillis, TimeUnit.MILLISECONDS);
        try (ReplyMessage replyMessage = grpcChannel.exchange(request);){
            Gateway.ClientResponse response = (Gateway.ClientResponse)replyMessage.get(this.readTimeoutMillis, TimeUnit.MILLISECONDS);
            if (response.getCode() != 0) {
                throw new ErrorResponseException(response.getCode(), response.getMsg(), response.getRequestId());
            }
            Gateway.ClientResponse clientResponse = response;
            return clientResponse;
        }
    }

    @Override
    public List<Instrument> getInstruments(Set<String> symbols, String category) {
        Assert.notEmpty((String)"symbols", symbols);
        Assert.notBlank((String)"category", (String)category);
        Api.InstrumentRequest tickerRequest = Api.InstrumentRequest.newBuilder().setSymbols(String.join((CharSequence)",", symbols)).setCategory(category).build();
        Gateway.ClientRequest request = Gateway.ClientRequest.newBuilder().setType(Gateway.MsgType.Payload).setRequestId(UUID.randomUUID().toString()).setPath("/instrument/list").setPayload(tickerRequest.toByteString()).build();
        Gateway.ClientResponse response = this.doRequest(request);
        try {
            List<Api.Instrument> instrumentResponses = Api.InstrumentResponse.parseFrom(response.getPayload()).getResultList();
            return instrumentResponses.stream().map(r -> {
                Instrument instrument = new Instrument();
                instrument.setName(r.getName());
                instrument.setSymbol(r.getSymbol());
                instrument.setInstrumentId(r.getInstrumentId());
                instrument.setExchangeCode(r.getExchangeCode());
                instrument.setCurrency(r.getCurrency());
                return instrument;
            }).collect(Collectors.toList());
        }
        catch (InvalidProtocolBufferException e) {
            throw new ClientException("Deserialize instrument response error.", (Throwable)e);
        }
    }

    @Override
    public List<Bar> getBars(String symbol, String category, String timespan, int count) {
        Assert.notBlank((String)"symbol", (String)symbol);
        Assert.notBlank((String)"category", (String)category);
        Assert.notBlank((String)"timespan", (String)timespan);
        Assert.nonnegative((String)"count", (int)count);
        Api.BarsRequest barsRequest = Api.BarsRequest.newBuilder().setSymbol(symbol).setCategory(category).setTimespan(timespan).setCount(String.valueOf(count)).build();
        Gateway.ClientRequest request = Gateway.ClientRequest.newBuilder().setType(Gateway.MsgType.Payload).setRequestId(UUID.randomUUID().toString()).setPath("/market-data/bars").setPayload(barsRequest.toByteString()).build();
        Gateway.ClientResponse response = this.doRequest(request);
        try {
            List<Api.Bar> barResponses = Api.BarsResponse.parseFrom(response.getPayload()).getResultList();
            return barResponses.stream().map(r -> {
                Bar bar = new Bar();
                bar.setTime(r.getTime());
                bar.setOpen(r.getOpen());
                bar.setClose(r.getClose());
                bar.setHigh(r.getHigh());
                bar.setLow(r.getLow());
                bar.setVolume(r.getVolume());
                return bar;
            }).collect(Collectors.toList());
        }
        catch (InvalidProtocolBufferException e) {
            throw new ClientException("Deserialize bar response error.", (Throwable)e);
        }
    }

    @Override
    public Quote getQuote(String symbol, String category) {
        Assert.notBlank((String)"symbol", (String)symbol);
        Assert.notBlank((String)"category", (String)category);
        Api.QuoteRequest quoteRequest = Api.QuoteRequest.newBuilder().setSymbol(symbol).setCategory(category).build();
        Gateway.ClientRequest request = Gateway.ClientRequest.newBuilder().setType(Gateway.MsgType.Payload).setRequestId(UUID.randomUUID().toString()).setPath("/market-data/quotes").setPayload(quoteRequest.toByteString()).build();
        Gateway.ClientResponse response = this.doRequest(request);
        try {
            Api.QuoteResponse quoteResponse = Api.QuoteResponse.parseFrom(response.getPayload());
            Quote quote = new Quote();
            quote.setSymbol(quoteResponse.getSymbol());
            quote.setInstrumentId(quoteResponse.getInstrumentId());
            List<AskBid> asks = quoteResponse.getAsksList().stream().map(this::assembleAskBid).collect(Collectors.toList());
            quote.setAsks(asks);
            List<AskBid> bids = quoteResponse.getBidsList().stream().map(this::assembleAskBid).collect(Collectors.toList());
            quote.setBids(bids);
            return quote;
        }
        catch (InvalidProtocolBufferException e) {
            throw new ClientException("Deserialize quote response error.", (Throwable)e);
        }
    }

    private AskBid assembleAskBid(Api.AskBid from) {
        AskBid result = new AskBid();
        result.setPrice(from.getPrice());
        result.setSize(from.getSize());
        List<Order> orders = from.getOrderList().stream().map(this::assembleOrder).collect(Collectors.toList());
        result.setOrders(orders);
        List<Broker> brokers = from.getBrokerList().stream().map(this::assembleBroker).collect(Collectors.toList());
        result.setBrokers(brokers);
        return result;
    }

    private Order assembleOrder(Api.Order from) {
        Order result = new Order();
        result.setMpid(from.getMpid());
        result.setSize(from.getSize());
        return result;
    }

    private Broker assembleBroker(Api.Broker from) {
        Broker result = new Broker();
        result.setBid(from.getBid());
        result.setName(from.getName());
        return result;
    }

    @Override
    public List<Snapshot> getSnapshots(Set<String> symbols, String category) {
        Assert.notEmpty((String)"symbols", symbols);
        Assert.notBlank((String)"category", (String)category);
        Api.SnapshotRequest snapshotsRequest = Api.SnapshotRequest.newBuilder().setSymbols(String.join((CharSequence)",", symbols)).setCategory(category).build();
        Gateway.ClientRequest request = Gateway.ClientRequest.newBuilder().setType(Gateway.MsgType.Payload).setRequestId(UUID.randomUUID().toString()).setPath("/market-data/snapshot").setPayload(snapshotsRequest.toByteString()).build();
        Gateway.ClientResponse response = this.doRequest(request);
        try {
            List<Api.Snapshot> snapshotResponses = Api.SnapshotResponse.parseFrom(response.getPayload()).getResultList();
            return snapshotResponses.stream().map(r -> {
                Snapshot snapshot = new Snapshot();
                snapshot.setSymbol(r.getSymbol());
                snapshot.setInstrumentId(r.getInstrumentId());
                snapshot.setTradeTime(r.getTradeTime());
                snapshot.setPrice(r.getPrice());
                snapshot.setOpen(r.getOpen());
                snapshot.setHigh(r.getHigh());
                snapshot.setLow(r.getLow());
                snapshot.setPreClose(r.getPreClose());
                snapshot.setVolume(r.getVolume());
                snapshot.setChange(r.getChange());
                snapshot.setChangeRatio(r.getChangeRatio());
                return snapshot;
            }).collect(Collectors.toList());
        }
        catch (InvalidProtocolBufferException e) {
            throw new ClientException("Deserialize snapshot response error.", (Throwable)e);
        }
    }

    @Override
    public List<Tick> getTicks(String symbol, String category, int count) {
        Assert.notBlank((String)"symbol", (String)symbol);
        Assert.notBlank((String)"category", (String)category);
        Assert.nonnegative((String)"count", (int)count);
        Api.TickRequest quoteRequest = Api.TickRequest.newBuilder().setSymbol(symbol).setCategory(category).setCount(String.valueOf(count)).build();
        Gateway.ClientRequest request = Gateway.ClientRequest.newBuilder().setType(Gateway.MsgType.Payload).setRequestId(UUID.randomUUID().toString()).setPath("/market-data/tick").setPayload(quoteRequest.toByteString()).build();
        Gateway.ClientResponse response = this.doRequest(request);
        try {
            Api.TickResponse ticksResponse = Api.TickResponse.parseFrom(response.getPayload());
            return ticksResponse.getResultList().stream().map(r -> {
                Tick tick = new Tick();
                tick.setSymbol(ticksResponse.getSymbol());
                tick.setInstrumentId(ticksResponse.getInstrumentId());
                tick.setTime(r.getTime());
                tick.setPrice(r.getPrice());
                tick.setVolume(r.getVolume());
                tick.setSide(r.getSide());
                return tick;
            }).collect(Collectors.toList());
        }
        catch (InvalidProtocolBufferException e) {
            throw new ClientException("Deserialize tick response error.", (Throwable)e);
        }
    }

    @Override
    public String getToken() {
        Gateway.ClientRequest request = Gateway.ClientRequest.newBuilder().setType(Gateway.MsgType.Payload).setRequestId(UUID.randomUUID().toString()).setPath("/market-data/streaming/token").build();
        Gateway.ClientResponse response = this.doRequest(request);
        try {
            Api.TokenResponse tokenResponse = Api.TokenResponse.parseFrom(response.getPayload());
            return tokenResponse.getToken();
        }
        catch (InvalidProtocolBufferException e) {
            throw new ClientException("Deserialize token response error.", (Throwable)e);
        }
    }

    @Override
    public void subscribe(String token, Set<String> symbols, String category, Set<String> subTypes) {
        Assert.notBlank((String)"token", (String)token);
        Assert.notEmpty((String)"symbols", symbols);
        Assert.notBlank((String)"category", (String)category);
        Assert.notEmpty((String)"subTypes", subTypes);
        Api.SubscribeRequest subscribeRequest = Api.SubscribeRequest.newBuilder().setToken(token).addAllSymbols(symbols).setCategory(category).addAllSubTypes(subTypes).build();
        Gateway.ClientRequest request = Gateway.ClientRequest.newBuilder().setType(Gateway.MsgType.Payload).setRequestId(UUID.randomUUID().toString()).setPath("/market-data/streaming/subscribe").setPayload(subscribeRequest.toByteString()).build();
        this.doRequest(request);
    }

    @Override
    public void unsubscribe(String token, Set<String> symbols, String category, Set<String> subTypes, Boolean unsubscribeAll) {
        Assert.notBlank((String)"token", (String)token);
        Api.SubscribeRequest.Builder builder = Api.SubscribeRequest.newBuilder().setToken(token);
        if (CollectionUtils.isNotEmpty(symbols)) {
            builder.addAllSymbols(symbols);
        }
        if (StringUtils.isNotBlank((CharSequence)category)) {
            builder.setCategory(category);
        }
        if (CollectionUtils.isNotEmpty(subTypes)) {
            builder.addAllSubTypes(subTypes);
        }
        if (unsubscribeAll != null) {
            builder.setUnsubscribeAll(String.valueOf(unsubscribeAll));
        }
        Api.SubscribeRequest subscribeRequest = builder.build();
        Gateway.ClientRequest request = Gateway.ClientRequest.newBuilder().setType(Gateway.MsgType.Payload).setRequestId(UUID.randomUUID().toString()).setPath("/market-data/streaming/unsubscribe").setPayload(subscribeRequest.toByteString()).build();
        this.doRequest(request);
    }

    public void shutdown(long timeout, TimeUnit timeUnit) throws InterruptedException {
        this.channelPool.close();
        super.shutdown(timeout, timeUnit);
    }

    public void shutdownNow() {
        this.channelPool.close();
        super.shutdownNow();
    }

    private class ApiStreamObserverFactory
    implements StreamObserverFactory<Gateway.ClientRequest, Gateway.ClientResponse> {
        private volatile StreamObserver<Gateway.ClientResponse> responseStreamObserver;
        private volatile QuoteGrpc.QuoteStub stub;

        private ApiStreamObserverFactory() {
        }

        @Override
        public StreamObserver<Gateway.ClientRequest> createRequestObserver() {
            StreamObserver<Gateway.ClientResponse> streamObserver = this.createResponseObserver();
            logger.debug("Start to grpc stream request...");
            return this.stub.streamRequest(streamObserver);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public StreamObserver<Gateway.ClientResponse> createResponseObserver() {
            if (this.responseStreamObserver == null) {
                ApiStreamObserverFactory apiStreamObserverFactory = this;
                synchronized (apiStreamObserverFactory) {
                    if (this.responseStreamObserver == null) {
                        GrpcQuotesApiClient.this.buildChannel();
                        this.stub = (QuoteGrpc.QuoteStub)QuoteGrpc.newStub((Channel)GrpcQuotesApiClient.this.channel).withCallCredentials(GrpcQuotesApiClient.this.credentials);
                        GrpcQuotesApiClient.this.subObservers.addFirst(GrpcQuotesApiClient.this.connectionManager);
                        GrpcQuotesApiClient.this.subObservers.addFirst(ApiLoggingHandler.getInstance());
                        StatefulResponseObserver observer = new StatefulResponseObserver((List)GrpcQuotesApiClient.this.subObservers);
                        observer.setRetryable((Retryable)new SynchronousGrpcRetryable(GrpcQuotesApiClient.this.channelPool::tryAcquire, GrpcQuotesApiClient.this.retryPolicy));
                        this.responseStreamObserver = observer;
                    }
                }
            }
            return this.responseStreamObserver;
        }
    }

    private class ApiConnectionManager
    implements SubStreamObserver<Gateway.ClientResponse> {
        private ApiConnectionManager() {
        }

        public void onReady() {
            GrpcQuotesApiClient.this.channelPool.onConnected(new ApiGrpcChannel());
        }

        public void onNext(Gateway.ClientResponse value) {
            Gateway.MsgType msgType = value.getType();
            if (Gateway.MsgType.Payload == msgType) {
                GrpcQuotesApiClient.this.channelPool.acquireIfExist().ifPresent(channel -> channel.receive(value));
            }
        }

        public void onError(Throwable cause) {
            GrpcQuotesApiClient.this.channelPool.remove().ifPresent(GrpcChannel::close);
        }

        public void onCompleted() {
            GrpcQuotesApiClient.this.channelPool.remove().ifPresent(GrpcChannel::close);
        }
    }
}

