/*
 * Decompiled with CFR 0.152.
 */
package com.webull.openapi.data.internal.mqtt.lifecycle;

import com.webull.openapi.core.logger.Logger;
import com.webull.openapi.core.logger.LoggerFactory;
import com.webull.openapi.core.retry.RetryPolicy;
import com.webull.openapi.core.utils.Assert;
import com.webull.openapi.core.utils.CollectionUtils;
import com.webull.openapi.core.utils.GUID;
import com.webull.openapi.core.utils.StringUtils;
import com.webull.openapi.data.internal.mqtt.lifecycle.QuotesApiFailedContext;
import com.webull.openapi.data.internal.mqtt.support.SchedulerConfig;
import com.webull.openapi.data.quotes.api.subsribe.IMarketStreamingClient;
import com.webull.openapi.data.quotes.subsribe.lifecycle.ClientStateMachine;
import com.webull.openapi.data.quotes.subsribe.lifecycle.QuotesSubsConnectedContext;
import com.webull.openapi.data.quotes.subsribe.lifecycle.QuotesSubsFailedContext;
import com.webull.openapi.data.quotes.subsribe.lifecycle.QuotesSubsSessionHandler;
import com.webull.openapi.data.quotes.subsribe.lifecycle.SubscriptionManager;
import io.reactivex.disposables.Disposable;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

public class ApiSubscriptionManager
implements SubscriptionManager,
QuotesSubsSessionHandler {
    private static final Logger logger = LoggerFactory.getLogger(ApiSubscriptionManager.class);
    private volatile Disposable subscribeDisposable;
    private final AtomicReference<SubscribeTask> taskRef = new AtomicReference();
    private final IMarketStreamingClient apiClient;
    private final RetryPolicy retryPolicy;
    private final Map<SubsGroupKey, Set<String>> subscriptionGroup = new ConcurrentHashMap<SubsGroupKey, Set<String>>();
    private volatile boolean subscribeOnConnected = false;
    private Consumer<Throwable> subsOnConnectedFailedFallback;

    public ApiSubscriptionManager(IMarketStreamingClient apiClient, RetryPolicy retryPolicy) {
        this.apiClient = apiClient;
        this.retryPolicy = retryPolicy;
    }

    public ApiSubscriptionManager(IMarketStreamingClient apiClient, RetryPolicy retryPolicy, Set<String> symbols, String category, Set<String> subTypes, String depth, Boolean overnightRequired) {
        Assert.notEmpty("symbols", symbols);
        Assert.notBlank("category", category);
        Assert.notEmpty("sub_types", subTypes);
        this.apiClient = apiClient;
        this.retryPolicy = retryPolicy;
        this.subscriptionGroup.put(SubsGroupKey.of(category, subTypes, depth, overnightRequired), symbols);
    }

    @Override
    public void subscribeOnConnected(Consumer<Throwable> subscribeFailedFallback) {
        this.subsOnConnectedFailedFallback = subscribeFailedFallback;
        this.subscribeOnConnected = true;
    }

    @Override
    public void addSubscription(String sessionId, Set<String> symbols, String category, Set<String> subTypes, String depth, Boolean overnightRequired) {
        this.apiClient.subscribe(sessionId, symbols, category, subTypes, depth, overnightRequired);
        this.subscriptionGroup.merge(SubsGroupKey.of(category, subTypes, depth, overnightRequired), symbols, (existed, newValue) -> {
            existed.addAll(newValue);
            return existed;
        });
    }

    @Override
    public void removeSubscription(String sessionId, Set<String> symbols, String category, Set<String> subTypes, Boolean unsubscribeAll) {
        this.apiClient.unsubscribe(sessionId, symbols, category, subTypes, unsubscribeAll);
        this.subscriptionGroup.computeIfPresent(SubsGroupKey.of(category, subTypes, null, null), (key, existed) -> {
            existed.removeAll(symbols);
            return existed.isEmpty() ? null : existed;
        });
    }

    @Override
    public void onConnected(QuotesSubsConnectedContext context) {
        if (!this.subscribeOnConnected) {
            return;
        }
        String sessionId = context.getAuthProvider().getSessionId();
        if (StringUtils.isBlank(sessionId)) {
            logger.error("Cannot add quotes subscription before session initialized.");
            return;
        }
        if (this.subscribeDisposable != null) {
            this.subscribeDisposable.dispose();
        }
        SubscribeTask newTask = new SubscribeTask(sessionId, context.getState());
        this.taskRef.set(newTask);
        if (logger.isDebugEnabled()) {
            logger.debug("Start add quotes subscription task[{}] on mqtt connected.", (Object)newTask.taskId);
        }
        this.subscribeDisposable = SchedulerConfig.api().scheduleDirect((Runnable)newTask);
    }

    @Override
    public void onDisconnected(QuotesSubsFailedContext context) {
        if (!this.subscribeOnConnected) {
            return;
        }
        if (this.subscribeDisposable != null) {
            this.subscribeDisposable.dispose();
            this.subscribeDisposable = null;
        }
        SubscribeTask task = this.taskRef.getAndSet(null);
        if (logger.isDebugEnabled() && task != null) {
            logger.debug("Stop add quotes subscription task[{}] on mqtt disconnected.", (Object)task.taskId);
        }
    }

    @Override
    public void close() {
        this.onDisconnected(null);
    }

    private static class SubsGroupKey {
        private final String category;
        private final Set<String> subTypes;
        private final String depth;
        private final Boolean overnightRequired;

        private SubsGroupKey(String category, Set<String> subTypes, String depth, Boolean overnightRequired) {
            this.category = category;
            this.subTypes = subTypes;
            this.depth = depth;
            this.overnightRequired = overnightRequired;
        }

        static SubsGroupKey of(String category, Set<String> subTypes, String depth, Boolean overnightRequired) {
            return new SubsGroupKey(category, subTypes, depth, overnightRequired);
        }

        String getCategory() {
            return this.category;
        }

        Set<String> getSubTypes() {
            return this.subTypes;
        }

        String getDepth() {
            return this.depth;
        }

        Boolean getOvernightRequired() {
            return this.overnightRequired;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            SubsGroupKey groupKey = (SubsGroupKey)o;
            return Objects.equals(this.category, groupKey.category) && Objects.equals(this.subTypes, groupKey.subTypes) && Objects.equals(this.depth, groupKey.getDepth()) && Objects.equals(this.overnightRequired, groupKey.getOvernightRequired());
        }

        public int hashCode() {
            return Objects.hash(this.category, this.subTypes, this.depth, this.overnightRequired);
        }
    }

    private final class SubscribeTask
    implements Runnable {
        private final String taskId;
        private final String sessionId;
        private final ClientStateMachine state;
        private int attempts = 0;

        private SubscribeTask(String sessionId, ClientStateMachine state) {
            this.state = state;
            this.taskId = GUID.get();
            this.sessionId = sessionId;
        }

        @Override
        public void run() {
            if (this != ApiSubscriptionManager.this.taskRef.get()) {
                return;
            }
            try {
                if (CollectionUtils.isNotEmpty(ApiSubscriptionManager.this.subscriptionGroup)) {
                    ApiSubscriptionManager.this.subscriptionGroup.forEach((groupKey, symbols) -> ApiSubscriptionManager.this.apiClient.subscribe(this.sessionId, (Set<String>)symbols, groupKey.getCategory(), groupKey.getSubTypes(), groupKey.getDepth(), groupKey.getOvernightRequired()));
                    if (logger.isDebugEnabled()) {
                        logger.debug("Add quotes subscription successful by task[{}].", (Object)this.taskId);
                    }
                }
            }
            catch (Exception e) {
                QuotesApiFailedContext newContext = new QuotesApiFailedContext(this.state, ++this.attempts, e);
                boolean shouldRetry = ApiSubscriptionManager.this.retryPolicy.shouldRetry(newContext);
                logger.error("Add subscription error when subscribing quotes, attempts={}, should retry={}, task[{}].", this.attempts, shouldRetry, this.taskId, e);
                if (shouldRetry) {
                    long delayMillis = ApiSubscriptionManager.this.retryPolicy.nextRetryDelay(newContext, TimeUnit.MILLISECONDS);
                    ApiSubscriptionManager.this.subscribeDisposable = SchedulerConfig.api().scheduleDirect((Runnable)this, delayMillis, TimeUnit.MILLISECONDS);
                }
                try {
                    ApiSubscriptionManager.this.subsOnConnectedFailedFallback.accept(e);
                }
                catch (Exception fallbackErr) {
                    logger.error("Startup subscribe failed fallback error", fallbackErr);
                }
            }
        }
    }
}

