/*
 * Decompiled with CFR 0.152.
 */
package io.split.engine.common;

import io.split.cache.SegmentCache;
import io.split.cache.SplitCache;
import io.split.client.ApiKeyCounter;
import io.split.client.SplitClientConfig;
import io.split.engine.SDKReadinessGates;
import io.split.engine.common.Backoff;
import io.split.engine.common.PushManager;
import io.split.engine.common.PushManagerImp;
import io.split.engine.common.SyncManager;
import io.split.engine.common.Synchronizer;
import io.split.engine.common.SynchronizerImp;
import io.split.engine.experiments.SplitFetcher;
import io.split.engine.experiments.SplitSynchronizationTask;
import io.split.engine.segments.SegmentSynchronizationTaskImp;
import io.split.telemetry.domain.StreamingEvent;
import io.split.telemetry.domain.enums.StreamEventsEnum;
import io.split.telemetry.storage.TelemetryRuntimeProducer;
import io.split.telemetry.synchronizer.TelemetrySynchronizer;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import split.com.google.common.annotations.VisibleForTesting;
import split.com.google.common.base.Preconditions;
import split.com.google.common.util.concurrent.ThreadFactoryBuilder;

public class SyncManagerImp
implements SyncManager {
    private static final Logger _log = LoggerFactory.getLogger(SyncManager.class);
    private final AtomicBoolean _streamingEnabledConfig;
    private final Synchronizer _synchronizer;
    private final PushManager _pushManager;
    private final AtomicBoolean _shutdown;
    private final LinkedBlockingQueue<PushManager.Status> _incomingPushStatus;
    private final ExecutorService _executorService;
    private final ExecutorService _startExecutorService;
    private final SDKReadinessGates _gates;
    private Future<?> _pushStatusMonitorTask;
    private Backoff _backoff;
    private final TelemetryRuntimeProducer _telemetryRuntimeProducer;
    private final TelemetrySynchronizer _telemetrySynchronizer;
    private final SplitClientConfig _config;

    @VisibleForTesting
    SyncManagerImp(boolean streamingEnabledConfig, Synchronizer synchronizer, PushManager pushManager, LinkedBlockingQueue<PushManager.Status> pushMessages, int authRetryBackOffBase, SDKReadinessGates gates, TelemetryRuntimeProducer telemetryRuntimeProducer, TelemetrySynchronizer telemetrySynchronizer, SplitClientConfig config) {
        this._streamingEnabledConfig = new AtomicBoolean(streamingEnabledConfig);
        this._synchronizer = Preconditions.checkNotNull(synchronizer);
        this._pushManager = Preconditions.checkNotNull(pushManager);
        this._shutdown = new AtomicBoolean(false);
        this._incomingPushStatus = pushMessages;
        this._executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("SPLIT-PushStatusMonitor-%d").setDaemon(true).build());
        this._startExecutorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("SPLIT-PollingMode-%d").setDaemon(true).build());
        this._backoff = new Backoff(authRetryBackOffBase);
        this._gates = Preconditions.checkNotNull(gates);
        this._telemetryRuntimeProducer = Preconditions.checkNotNull(telemetryRuntimeProducer);
        this._telemetrySynchronizer = Preconditions.checkNotNull(telemetrySynchronizer);
        this._config = Preconditions.checkNotNull(config);
    }

    public static SyncManagerImp build(boolean streamingEnabledConfig, SplitSynchronizationTask splitSynchronizationTask, SplitFetcher splitFetcher, SegmentSynchronizationTaskImp segmentSynchronizationTaskImp, SplitCache splitCache, String authUrl, CloseableHttpClient httpClient, String streamingServiceUrl, int authRetryBackOffBase, CloseableHttpClient sseHttpClient, SegmentCache segmentCache, int streamingRetryDelay, int maxOnDemandFetchRetries, int failedAttemptsBeforeLogging, boolean cdnDebugLogging, SDKReadinessGates gates, TelemetryRuntimeProducer telemetryRuntimeProducer, TelemetrySynchronizer telemetrySynchronizer, SplitClientConfig config) {
        LinkedBlockingQueue<PushManager.Status> pushMessages = new LinkedBlockingQueue<PushManager.Status>();
        SynchronizerImp synchronizer = new SynchronizerImp(splitSynchronizationTask, splitFetcher, segmentSynchronizationTaskImp, splitCache, segmentCache, streamingRetryDelay, maxOnDemandFetchRetries, failedAttemptsBeforeLogging, cdnDebugLogging, gates);
        PushManagerImp pushManager = PushManagerImp.build(synchronizer, streamingServiceUrl, authUrl, httpClient, pushMessages, sseHttpClient, telemetryRuntimeProducer);
        return new SyncManagerImp(streamingEnabledConfig, synchronizer, pushManager, pushMessages, authRetryBackOffBase, gates, telemetryRuntimeProducer, telemetrySynchronizer, config);
    }

    @Override
    public void start() {
        this._startExecutorService.submit(() -> {
            while (!this._synchronizer.syncAll()) {
                try {
                    Thread.currentThread();
                    Thread.sleep(1000L);
                }
                catch (InterruptedException e) {
                    _log.warn("Sdk Initializer thread interrupted");
                    Thread.currentThread().interrupt();
                }
            }
            this._gates.sdkInternalReady();
            this._telemetrySynchronizer.synchronizeConfig(this._config, System.currentTimeMillis(), ApiKeyCounter.getApiKeyCounterInstance().getFactoryInstances(), new ArrayList<String>());
            if (this._streamingEnabledConfig.get()) {
                this.startStreamingMode();
            } else {
                this.startPollingMode();
            }
        });
    }

    @Override
    public void shutdown() {
        this._shutdown.set(true);
        this._synchronizer.stopPeriodicFetching();
        this._pushManager.stop();
    }

    private void startStreamingMode() {
        _log.debug("Starting in streaming mode ...");
        if (null == this._pushStatusMonitorTask) {
            this._pushStatusMonitorTask = this._executorService.submit(this::incomingPushStatusHandler);
        }
        this._pushManager.start();
        this._telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SYNC_MODE_UPDATE.getType(), StreamEventsEnum.SyncModeUpdateValues.STREAMING_EVENT.getValue(), System.currentTimeMillis()));
    }

    private void startPollingMode() {
        _log.debug("Starting in polling mode ...");
        this._synchronizer.startPeriodicFetching();
        this._telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SYNC_MODE_UPDATE.getType(), StreamEventsEnum.SyncModeUpdateValues.POLLING_EVENT.getValue(), System.currentTimeMillis()));
    }

    @VisibleForTesting
    void incomingPushStatusHandler() {
        while (!Thread.interrupted()) {
            try {
                PushManager.Status status = this._incomingPushStatus.take();
                _log.debug(String.format("Streaming status received: %s", status.toString()));
                switch (status) {
                    case STREAMING_READY: {
                        this._synchronizer.stopPeriodicFetching();
                        this._synchronizer.syncAll();
                        this._pushManager.startWorkers();
                        this._pushManager.scheduleConnectionReset();
                        this._backoff.reset();
                        this._telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.STREAMING_STATUS.getType(), StreamEventsEnum.StreamingStatusValues.STREAMING_ENABLED.getValue(), System.currentTimeMillis()));
                        _log.info("Streaming up and running.");
                        break;
                    }
                    case STREAMING_DOWN: {
                        _log.info("Streaming service temporarily unavailable, working in polling mode.");
                        this._pushManager.stopWorkers();
                        this._synchronizer.startPeriodicFetching();
                        break;
                    }
                    case STREAMING_BACKOFF: {
                        long howLong = this._backoff.interval() * 1000L;
                        _log.info(String.format("Retryable error in streaming subsystem. Switching to polling and retrying in %d seconds", howLong / 1000L));
                        this._synchronizer.startPeriodicFetching();
                        this._pushManager.stopWorkers();
                        this._pushManager.stop();
                        Thread.sleep(howLong);
                        this._incomingPushStatus.clear();
                        this._pushManager.start();
                        break;
                    }
                    case STREAMING_OFF: {
                        _log.info("Unrecoverable error in streaming subsystem. SDK will work in polling-mode and will not retry an SSE connection.");
                        this._pushManager.stop();
                        this._synchronizer.startPeriodicFetching();
                        if (null != this._pushStatusMonitorTask) {
                            this._pushStatusMonitorTask.cancel(false);
                        }
                        return;
                    }
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
    }
}

