/*
 * Decompiled with CFR 0.152.
 */
package io.split.engine.common;

import io.split.client.ApiKeyCounter;
import io.split.client.SplitClientConfig;
import io.split.client.events.EventsTask;
import io.split.client.impressions.ImpressionsManager;
import io.split.client.impressions.UniqueKeysTracker;
import io.split.engine.SDKReadinessGates;
import io.split.engine.common.Backoff;
import io.split.engine.common.PushManager;
import io.split.engine.common.PushManagerImp;
import io.split.engine.common.SplitAPI;
import io.split.engine.common.SplitTasks;
import io.split.engine.common.SyncManager;
import io.split.engine.common.Synchronizer;
import io.split.engine.common.SynchronizerImp;
import io.split.engine.experiments.SplitFetcher;
import io.split.engine.experiments.SplitSynchronizationTask;
import io.split.engine.segments.SegmentSynchronizationTask;
import io.split.storages.SegmentCacheProducer;
import io.split.storages.SplitCacheProducer;
import io.split.telemetry.domain.StreamingEvent;
import io.split.telemetry.domain.enums.StreamEventsEnum;
import io.split.telemetry.storage.TelemetryRuntimeProducer;
import io.split.telemetry.synchronizer.TelemetrySyncTask;
import io.split.telemetry.synchronizer.TelemetrySynchronizer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import split.com.google.common.annotations.VisibleForTesting;
import split.com.google.common.base.Preconditions;
import split.com.google.common.util.concurrent.ThreadFactoryBuilder;

public class SyncManagerImp
implements SyncManager {
    private static final Logger _log = LoggerFactory.getLogger(SyncManager.class);
    private final AtomicBoolean _streamingEnabledConfig;
    private final Synchronizer _synchronizer;
    private final PushManager _pushManager;
    private final AtomicBoolean _shuttedDown;
    private final LinkedBlockingQueue<PushManager.Status> _incomingPushStatus;
    private final ExecutorService _pushMonitorExecutorService;
    private final ExecutorService _initializationtExecutorService;
    private final SDKReadinessGates _gates;
    private Future<?> _pushStatusMonitorTask;
    private Backoff _backoff;
    private final TelemetryRuntimeProducer _telemetryRuntimeProducer;
    private final TelemetrySynchronizer _telemetrySynchronizer;
    private final SplitClientConfig _config;
    private final long _startingSyncCallBackoffBaseMs;
    private final ImpressionsManager _impressionManager;
    private final EventsTask _eventsTask;
    private final TelemetrySyncTask _telemetrySyncTask;
    private final SegmentSynchronizationTask _segmentSynchronizationTaskImp;
    private final SplitSynchronizationTask _splitSynchronizationTask;
    private final UniqueKeysTracker _uniqueKeysTracker;
    private static final long STARTING_SYNC_ALL_BACKOFF_MAX_WAIT_MS = new Long(10000L);
    private final SplitAPI _splitAPI;

    @VisibleForTesting
    SyncManagerImp(SplitTasks splitTasks, boolean streamingEnabledConfig, Synchronizer synchronizer, PushManager pushManager, LinkedBlockingQueue<PushManager.Status> pushMessages, SDKReadinessGates gates, TelemetryRuntimeProducer telemetryRuntimeProducer, TelemetrySynchronizer telemetrySynchronizer, SplitClientConfig config, UniqueKeysTracker uniqueKeysTracker, SplitAPI splitAPI) {
        this._streamingEnabledConfig = new AtomicBoolean(streamingEnabledConfig);
        this._synchronizer = Preconditions.checkNotNull(synchronizer);
        this._pushManager = Preconditions.checkNotNull(pushManager);
        this._shuttedDown = new AtomicBoolean(false);
        this._incomingPushStatus = pushMessages;
        this._pushMonitorExecutorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("SPLIT-PushStatusMonitor-%d").setDaemon(true).build());
        this._initializationtExecutorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("SPLIT-Initialization-%d").setDaemon(true).build());
        this._backoff = new Backoff(config.authRetryBackoffBase());
        this._gates = Preconditions.checkNotNull(gates);
        this._telemetryRuntimeProducer = Preconditions.checkNotNull(telemetryRuntimeProducer);
        this._telemetrySynchronizer = Preconditions.checkNotNull(telemetrySynchronizer);
        this._config = Preconditions.checkNotNull(config);
        this._startingSyncCallBackoffBaseMs = config.startingSyncCallBackoffBaseMs();
        this._impressionManager = Preconditions.checkNotNull(splitTasks.getImpressionManager());
        this._eventsTask = Preconditions.checkNotNull(splitTasks.getEventsTask());
        this._telemetrySyncTask = Preconditions.checkNotNull(splitTasks.getTelemetrySyncTask());
        this._segmentSynchronizationTaskImp = Preconditions.checkNotNull(splitTasks.getSegmentSynchronizationTask());
        this._splitSynchronizationTask = Preconditions.checkNotNull(splitTasks.getSplitSynchronizationTask());
        this._uniqueKeysTracker = uniqueKeysTracker;
        this._splitAPI = splitAPI;
    }

    public static SyncManagerImp build(SplitTasks splitTasks, SplitFetcher splitFetcher, SplitCacheProducer splitCacheProducer, SplitAPI splitAPI, SegmentCacheProducer segmentCacheProducer, SDKReadinessGates gates, TelemetryRuntimeProducer telemetryRuntimeProducer, TelemetrySynchronizer telemetrySynchronizer, SplitClientConfig config, UniqueKeysTracker uniqueKeysTracker) {
        LinkedBlockingQueue<PushManager.Status> pushMessages = new LinkedBlockingQueue<PushManager.Status>();
        SynchronizerImp synchronizer = new SynchronizerImp(splitTasks.getSplitSynchronizationTask(), splitFetcher, splitTasks.getSegmentSynchronizationTask(), splitCacheProducer, segmentCacheProducer, config.streamingRetryDelay(), config.streamingFetchMaxRetries(), config.failedAttemptsBeforeLogging(), config.cdnDebugLogging(), gates);
        PushManagerImp pushManager = PushManagerImp.build(synchronizer, config.streamingServiceURL(), config.authServiceURL(), splitAPI, pushMessages, telemetryRuntimeProducer);
        return new SyncManagerImp(splitTasks, config.streamingEnabled(), synchronizer, pushManager, pushMessages, gates, telemetryRuntimeProducer, telemetrySynchronizer, config, uniqueKeysTracker, splitAPI);
    }

    @Override
    public void start() {
        this._initializationtExecutorService.submit(() -> {
            this._backoff = new Backoff(this._startingSyncCallBackoffBaseMs, STARTING_SYNC_ALL_BACKOFF_MAX_WAIT_MS);
            while (!this._synchronizer.syncAll()) {
                try {
                    long howLong = this._backoff.interval();
                    Thread.currentThread();
                    Thread.sleep(howLong);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
            if (this._shuttedDown.get()) {
                return;
            }
            this._gates.sdkInternalReady();
            try {
                this._impressionManager.start();
            }
            catch (Exception e) {
                _log.error("Error trying to init Impression Manager synchronizer task.", (Throwable)e);
            }
            if (this._uniqueKeysTracker != null) {
                try {
                    this._uniqueKeysTracker.start();
                }
                catch (Exception e) {
                    _log.error("Error trying to init Unique Keys Tracker synchronizer task.", (Throwable)e);
                }
            }
            try {
                this._eventsTask.start();
            }
            catch (Exception e) {
                _log.error("Error trying to init Events synchronizer task.", (Throwable)e);
            }
            if (this._streamingEnabledConfig.get()) {
                this.startStreamingMode();
            } else {
                this.startPollingMode();
            }
            this._telemetrySynchronizer.synchronizeConfig(this._config, System.currentTimeMillis(), ApiKeyCounter.getApiKeyCounterInstance().getFactoryInstances(), new ArrayList<String>());
            try {
                this._telemetrySyncTask.startScheduledTask();
            }
            catch (Exception e) {
                _log.error("Error trying to Telemetry synchronizer task.", (Throwable)e);
            }
        });
    }

    @Override
    public void shutdown(long splitCount, long segmentCount, long segmentKeyCount) throws IOException {
        if (this._shuttedDown.get()) {
            return;
        }
        this._shuttedDown.set(true);
        this._initializationtExecutorService.shutdownNow();
        this._synchronizer.stopPeriodicFetching();
        this._pushManager.stop();
        this._pushMonitorExecutorService.shutdownNow();
        this._impressionManager.close();
        _log.info("Successful shutdown of impressions manager");
        if (this._uniqueKeysTracker != null) {
            this._uniqueKeysTracker.stop();
            _log.info("Successful stop of UniqueKeysTracker");
        }
        this._eventsTask.close();
        _log.info("Successful shutdown of eventsTask");
        this._segmentSynchronizationTaskImp.close();
        _log.info("Successful shutdown of segment fetchers");
        this._splitSynchronizationTask.close();
        _log.info("Successful shutdown of splits");
        this._telemetrySyncTask.stopScheduledTask(splitCount, segmentCount, segmentKeyCount);
        _log.info("Successful shutdown of telemetry sync task");
        this._splitAPI.close();
    }

    private void startStreamingMode() {
        _log.debug("Starting in streaming mode ...");
        if (null == this._pushStatusMonitorTask) {
            this._pushStatusMonitorTask = this._pushMonitorExecutorService.submit(this::incomingPushStatusHandler);
        }
        this._pushManager.start();
        this._telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SYNC_MODE_UPDATE.getType(), StreamEventsEnum.SyncModeUpdateValues.STREAMING_EVENT.getValue(), System.currentTimeMillis()));
    }

    private void startPollingMode() {
        _log.debug("Starting in polling mode ...");
        this._synchronizer.startPeriodicFetching();
        this._telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SYNC_MODE_UPDATE.getType(), StreamEventsEnum.SyncModeUpdateValues.POLLING_EVENT.getValue(), System.currentTimeMillis()));
    }

    @VisibleForTesting
    void incomingPushStatusHandler() {
        while (!Thread.interrupted()) {
            try {
                PushManager.Status status = this._incomingPushStatus.take();
                _log.debug(String.format("Streaming status received: %s", status.toString()));
                switch (status) {
                    case STREAMING_READY: {
                        this._synchronizer.stopPeriodicFetching();
                        this._synchronizer.syncAll();
                        this._pushManager.startWorkers();
                        this._pushManager.scheduleConnectionReset();
                        this._backoff.reset();
                        this._telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.STREAMING_STATUS.getType(), StreamEventsEnum.StreamingStatusValues.STREAMING_ENABLED.getValue(), System.currentTimeMillis()));
                        _log.info("Streaming up and running.");
                        break;
                    }
                    case STREAMING_DOWN: {
                        _log.info("Streaming service temporarily unavailable, working in polling mode.");
                        this._pushManager.stopWorkers();
                        this._synchronizer.startPeriodicFetching();
                        break;
                    }
                    case STREAMING_BACKOFF: {
                        long howLong = this._backoff.interval() * 1000L;
                        _log.info(String.format("Retryable error in streaming subsystem. Switching to polling and retrying in %d seconds", howLong / 1000L));
                        this._synchronizer.startPeriodicFetching();
                        this._pushManager.stopWorkers();
                        this._pushManager.stop();
                        Thread.sleep(howLong);
                        this._incomingPushStatus.clear();
                        this._pushManager.start();
                        break;
                    }
                    case STREAMING_OFF: {
                        _log.info("Unrecoverable error in streaming subsystem. SDK will work in polling-mode and will not retry an SSE connection.");
                        this._pushManager.stop();
                        this._synchronizer.startPeriodicFetching();
                        if (null != this._pushStatusMonitorTask) {
                            this._pushStatusMonitorTask.cancel(false);
                        }
                        return;
                    }
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
    }
}

