/*
 * Decompiled with CFR 0.152.
 */
package com.webull.openapi.data.internal.mqtt;

import com.webull.openapi.core.common.ApiModule;
import com.webull.openapi.core.endpoint.EndpointResolver;
import com.webull.openapi.core.execption.ClientException;
import com.webull.openapi.core.http.HttpApiConfig;
import com.webull.openapi.core.retry.RetryPolicy;
import com.webull.openapi.core.retry.backoff.FixedDelayStrategy;
import com.webull.openapi.core.utils.Assert;
import com.webull.openapi.core.utils.CollectionUtils;
import com.webull.openapi.core.utils.StringUtils;
import com.webull.openapi.data.DataStreamingClient;
import com.webull.openapi.data.internal.mqtt.codec.PublishToMarketDataDecoder;
import com.webull.openapi.data.internal.mqtt.lifecycle.ApiAuthProvider;
import com.webull.openapi.data.internal.mqtt.lifecycle.ApiSubscriptionManager;
import com.webull.openapi.data.internal.mqtt.lifecycle.QuotesSubsLoggingHandler;
import com.webull.openapi.data.internal.mqtt.lifecycle.QuotesSubsReconnectHandler;
import com.webull.openapi.data.quotes.api.subsribe.IMarketStreamingClient;
import com.webull.openapi.data.quotes.api.subsribe.MarketStreamingClient;
import com.webull.openapi.data.quotes.subsribe.IDataStreamingClient;
import com.webull.openapi.data.quotes.subsribe.IDataStreamingClientBuilder;
import com.webull.openapi.data.quotes.subsribe.lifecycle.QuotesSubsHandler;
import com.webull.openapi.data.quotes.subsribe.lifecycle.QuotesSubsInboundHandler;
import com.webull.openapi.data.quotes.subsribe.message.MarketData;
import com.webull.openapi.data.quotes.subsribe.proxy.ProxyConfig;
import com.webull.openapi.data.quotes.subsribe.retry.QuotesSubsRetryCondition;
import java.util.LinkedList;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

public final class DataStreamingClientBuilder
implements IDataStreamingClientBuilder {
    private String appKey;
    private String appSecret;
    private String sessionId;
    private String regionId;
    private String http_host;
    private String mqtt_host;
    private int mqtt_port = 1883;
    private long connectTimeoutMillis = 5000L;
    private long readTimeoutMillis = 60000L;
    private IMarketStreamingClient apiClient;
    private RetryPolicy retryPolicy = new RetryPolicy(QuotesSubsRetryCondition.getInstance(), new FixedDelayStrategy(10L, TimeUnit.SECONDS));
    private boolean enableTls = true;
    private ProxyConfig proxyConfig;
    private String tokenDir;
    private final LinkedList<QuotesSubsHandler> handlers = new LinkedList();
    private final LinkedList<QuotesSubsInboundHandler> onMessages = new LinkedList();
    private Set<String> symbols;
    private String category;
    private Set<String> subTypes;
    private String depth;
    private Boolean overnightRequired;

    @Override
    public IDataStreamingClientBuilder appKey(String appKey) {
        this.appKey = appKey;
        return this;
    }

    @Override
    public IDataStreamingClientBuilder appSecret(String appSecret) {
        this.appSecret = appSecret;
        return this;
    }

    @Override
    public IDataStreamingClientBuilder sessionId(String sessionId) {
        this.sessionId = sessionId;
        return this;
    }

    @Override
    public IDataStreamingClientBuilder http_host(String http_host) {
        this.http_host = http_host;
        return this;
    }

    @Override
    public IDataStreamingClientBuilder mqtt_host(String mqtt_host) {
        this.mqtt_host = mqtt_host;
        return this;
    }

    @Override
    public IDataStreamingClientBuilder mqtt_port(int mqtt_port) {
        this.mqtt_port = mqtt_port;
        return this;
    }

    @Override
    public IDataStreamingClientBuilder regionId(String regionId) {
        this.regionId = regionId;
        return this;
    }

    @Override
    public IDataStreamingClientBuilder connectTimeout(long timeoutMillis) {
        this.connectTimeoutMillis = timeoutMillis;
        return this;
    }

    @Override
    public IDataStreamingClientBuilder readTimeout(long timeoutMillis) {
        this.readTimeoutMillis = timeoutMillis;
        return this;
    }

    @Override
    public IDataStreamingClientBuilder apiClient(IMarketStreamingClient apiClient) {
        this.apiClient = apiClient;
        return this;
    }

    @Override
    public IDataStreamingClientBuilder reconnectBy(RetryPolicy retryPolicy) {
        this.retryPolicy = retryPolicy;
        return this;
    }

    @Override
    public IDataStreamingClientBuilder enableTls(boolean enableTls) {
        this.enableTls = enableTls;
        return this;
    }

    @Override
    public IDataStreamingClientBuilder proxy(ProxyConfig proxyConfig) {
        this.proxyConfig = proxyConfig;
        return this;
    }

    @Override
    public IDataStreamingClientBuilder tokenDir(String tokenDir) {
        this.tokenDir = tokenDir;
        return this;
    }

    @Override
    public IDataStreamingClientBuilder addHandler(QuotesSubsHandler handler) {
        Assert.notNull("handler", (Object)handler);
        this.handlers.add(handler);
        return this;
    }

    @Override
    public IDataStreamingClientBuilder onMessage(Consumer<MarketData> consumer) {
        Assert.notNull("consumer", consumer);
        QuotesSubsInboundHandler handler = message -> {
            consumer.accept((MarketData)message);
            return message;
        };
        this.onMessages.add(handler);
        return this;
    }

    @Override
    public IDataStreamingClientBuilder addSubscription(Set<String> symbols, String category, Set<String> subTypes, String depth, Boolean overnightRequired) {
        this.symbols = symbols;
        this.category = category;
        this.subTypes = subTypes;
        this.depth = depth;
        this.overnightRequired = overnightRequired;
        return this;
    }

    @Override
    public IDataStreamingClient build() {
        if (StringUtils.isBlank(this.mqtt_host)) {
            Assert.notBlank("regionId", this.regionId);
            this.mqtt_host = EndpointResolver.getDefault().resolve(this.regionId, ApiModule.of("QUOTES")).orElseThrow(() -> new ClientException("EndpointResolvingError", "Unknown region"));
        }
        if (this.apiClient == null) {
            HttpApiConfig.HttpApiConfigBuilder httpApiConfigBuilder = HttpApiConfig.builder().appKey(this.appKey).appSecret(this.appSecret).regionId(this.regionId).tokenDir(this.tokenDir);
            if (StringUtils.isNotBlank(this.http_host)) {
                httpApiConfigBuilder.endpoint(this.http_host);
            }
            HttpApiConfig apiConfig = httpApiConfigBuilder.build();
            this.apiClient = new MarketStreamingClient(apiConfig);
        }
        LinkedList<QuotesSubsHandler> allHandlers = new LinkedList<QuotesSubsHandler>();
        ApiAuthProvider authProvider = new ApiAuthProvider(this.sessionId, this.appKey, this.apiClient);
        allHandlers.add(new QuotesSubsLoggingHandler());
        if (this.retryPolicy != null) {
            QuotesSubsReconnectHandler reconnectHandler = new QuotesSubsReconnectHandler(this.appKey, authProvider, this.retryPolicy);
            allHandlers.add(reconnectHandler);
        }
        ApiSubscriptionManager subscriptionManager = CollectionUtils.isNotEmpty(this.symbols) || StringUtils.isNotBlank(this.category) || CollectionUtils.isNotEmpty(this.subTypes) ? new ApiSubscriptionManager(this.apiClient, this.retryPolicy, this.symbols, this.category, this.subTypes, this.depth, this.overnightRequired) : new ApiSubscriptionManager(this.apiClient, this.retryPolicy);
        allHandlers.add(subscriptionManager);
        allHandlers.add(new PublishToMarketDataDecoder());
        allHandlers.addAll(this.onMessages);
        allHandlers.addAll(this.handlers);
        return new DataStreamingClient(this.mqtt_host, this.mqtt_port, this.enableTls, this.retryPolicy, authProvider, subscriptionManager, allHandlers, this.connectTimeoutMillis, this.readTimeoutMillis, this.proxyConfig);
    }
}

