/*
 * Decompiled with CFR 0.152.
 */
package com.webull.openapi.quotes.internal.mqtt.lifecycle;

import com.webull.openapi.logger.Logger;
import com.webull.openapi.logger.LoggerFactory;
import com.webull.openapi.quotes.api.QuotesApiClient;
import com.webull.openapi.quotes.internal.mqtt.lifecycle.QuotesApiFailedContext;
import com.webull.openapi.quotes.internal.mqtt.support.SchedulerConfig;
import com.webull.openapi.quotes.subsribe.lifecycle.ClientStateMachine;
import com.webull.openapi.quotes.subsribe.lifecycle.QuotesSubsConnectedContext;
import com.webull.openapi.quotes.subsribe.lifecycle.QuotesSubsFailedContext;
import com.webull.openapi.quotes.subsribe.lifecycle.QuotesSubsSessionHandler;
import com.webull.openapi.quotes.subsribe.lifecycle.SubscriptionManager;
import com.webull.openapi.retry.RetryContext;
import com.webull.openapi.retry.RetryPolicy;
import com.webull.openapi.utils.Assert;
import com.webull.openapi.utils.CollectionUtils;
import com.webull.openapi.utils.GUID;
import io.reactivex.disposables.Disposable;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

public class ApiSubscriptionManager
implements SubscriptionManager,
QuotesSubsSessionHandler {
    private static final Logger logger = LoggerFactory.getLogger(ApiSubscriptionManager.class);
    private volatile Disposable subscribeDisposable;
    private final AtomicReference<SubscribeTask> taskRef = new AtomicReference();
    private final QuotesApiClient apiClient;
    private final RetryPolicy retryPolicy;
    private final Map<SubsGroupKey, Set<String>> subscriptionGroup = new ConcurrentHashMap<SubsGroupKey, Set<String>>();
    private volatile boolean subscribeOnConnected = false;
    private Consumer<Throwable> subsOnConnectedFailedFallback;

    public ApiSubscriptionManager(QuotesApiClient apiClient, RetryPolicy retryPolicy) {
        this.apiClient = apiClient;
        this.retryPolicy = retryPolicy;
    }

    public ApiSubscriptionManager(QuotesApiClient apiClient, RetryPolicy retryPolicy, Set<String> symbols, String category, Set<String> subTypes) {
        Assert.notEmpty((String)"symbols", symbols);
        Assert.notBlank((String)"category", (String)category);
        Assert.notEmpty((String)"subTypes", subTypes);
        this.apiClient = apiClient;
        this.retryPolicy = retryPolicy;
        this.subscriptionGroup.put(SubsGroupKey.of(category, subTypes), symbols);
    }

    @Override
    public void subscribeOnConnected(Consumer<Throwable> subscribeFailedFallback) {
        this.subsOnConnectedFailedFallback = subscribeFailedFallback;
        this.subscribeOnConnected = true;
    }

    @Override
    public void addSubscription(String token, Set<String> symbols, String category, Set<String> subTypes) {
        this.apiClient.subscribe(token, symbols, category, subTypes);
        this.subscriptionGroup.merge(SubsGroupKey.of(category, subTypes), symbols, (existed, newValue) -> {
            existed.addAll(newValue);
            return existed;
        });
    }

    @Override
    public void removeSubscription(String token, Set<String> symbols, String category, Set<String> subTypes, Boolean unsubscribeAll) {
        this.apiClient.unsubscribe(token, symbols, category, subTypes, unsubscribeAll);
        this.subscriptionGroup.computeIfPresent(SubsGroupKey.of(category, subTypes), (key, existed) -> {
            existed.removeAll(symbols);
            return existed.isEmpty() ? null : existed;
        });
    }

    @Override
    public void onConnected(QuotesSubsConnectedContext context) {
        if (!this.subscribeOnConnected) {
            return;
        }
        Optional<String> tokenOpt = context.getAuthProvider().getToken();
        if (!tokenOpt.isPresent()) {
            logger.error("Cannot add quotes subscription before session initialized.");
            return;
        }
        if (this.subscribeDisposable != null) {
            this.subscribeDisposable.dispose();
        }
        SubscribeTask newTask = new SubscribeTask(tokenOpt.get(), context.getState());
        this.taskRef.set(newTask);
        if (logger.isDebugEnabled()) {
            logger.debug("Start add quotes subscription task[{}] on mqtt connected.", (Object)newTask.taskId);
        }
        this.subscribeDisposable = SchedulerConfig.api().scheduleDirect((Runnable)newTask);
    }

    @Override
    public void onDisconnected(QuotesSubsFailedContext context) {
        if (!this.subscribeOnConnected) {
            return;
        }
        if (this.subscribeDisposable != null) {
            this.subscribeDisposable.dispose();
            this.subscribeDisposable = null;
        }
        SubscribeTask task = this.taskRef.getAndSet(null);
        if (logger.isDebugEnabled() && task != null) {
            logger.debug("Stop add quotes subscription task[{}] on mqtt disconnected.", (Object)task.taskId);
        }
    }

    @Override
    public void close() {
        this.onDisconnected(null);
        this.apiClient.close();
    }

    private static class SubsGroupKey {
        private final String category;
        private final Set<String> subTypes;

        private SubsGroupKey(String category, Set<String> subTypes) {
            this.category = category;
            this.subTypes = subTypes;
        }

        static SubsGroupKey of(String category, Set<String> subTypes) {
            return new SubsGroupKey(category, subTypes);
        }

        String getCategory() {
            return this.category;
        }

        Set<String> getSubTypes() {
            return this.subTypes;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            SubsGroupKey groupKey = (SubsGroupKey)o;
            return Objects.equals(this.category, groupKey.category) && Objects.equals(this.subTypes, groupKey.subTypes);
        }

        public int hashCode() {
            return Objects.hash(this.category, this.subTypes);
        }
    }

    private final class SubscribeTask
    implements Runnable {
        private final String taskId;
        private final String token;
        private final ClientStateMachine state;
        private int attempts = 0;

        private SubscribeTask(String token, ClientStateMachine state) {
            this.state = state;
            this.taskId = GUID.get();
            this.token = token;
        }

        @Override
        public void run() {
            if (this != ApiSubscriptionManager.this.taskRef.get()) {
                return;
            }
            try {
                if (CollectionUtils.isNotEmpty((Map)ApiSubscriptionManager.this.subscriptionGroup)) {
                    ApiSubscriptionManager.this.subscriptionGroup.forEach((groupKey, symbols) -> ApiSubscriptionManager.this.apiClient.subscribe(this.token, (Set<String>)symbols, groupKey.getCategory(), groupKey.getSubTypes()));
                    if (logger.isDebugEnabled()) {
                        logger.debug("Add quotes subscription successful by task[{}].", (Object)this.taskId);
                    }
                }
            }
            catch (Exception e) {
                QuotesApiFailedContext newContext = new QuotesApiFailedContext(this.state, ++this.attempts, e);
                boolean shouldRetry = ApiSubscriptionManager.this.retryPolicy.shouldRetry((RetryContext)newContext);
                logger.error("Add subscription error when subscribing quotes, attempts={}, should retry={}, task[{}].", new Object[]{this.attempts, shouldRetry, this.taskId, e});
                if (shouldRetry) {
                    long delayMillis = ApiSubscriptionManager.this.retryPolicy.nextRetryDelay((RetryContext)newContext, TimeUnit.MILLISECONDS);
                    ApiSubscriptionManager.this.subscribeDisposable = SchedulerConfig.api().scheduleDirect((Runnable)this, delayMillis, TimeUnit.MILLISECONDS);
                }
                try {
                    ApiSubscriptionManager.this.subsOnConnectedFailedFallback.accept(e);
                }
                catch (Exception fallbackErr) {
                    logger.error("Startup subscribe failed fallback error", (Throwable)fallbackErr);
                }
            }
        }
    }
}

