/*
 * Decompiled with CFR 0.152.
 */
package io.split.engine.sse;

import io.split.engine.sse.EventSourceClient;
import io.split.engine.sse.NotificationParser;
import io.split.engine.sse.NotificationParserImp;
import io.split.engine.sse.NotificationProcessor;
import io.split.engine.sse.NotificationProcessorImp;
import io.split.engine.sse.PushStatusTracker;
import io.split.engine.sse.client.RawEvent;
import io.split.engine.sse.client.SSEClient;
import io.split.engine.sse.dtos.SegmentQueueDto;
import io.split.engine.sse.exceptions.EventParsingException;
import io.split.engine.sse.workers.SplitsWorker;
import io.split.engine.sse.workers.Worker;
import io.split.telemetry.storage.TelemetryRuntimeProducer;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import split.com.google.common.annotations.VisibleForTesting;
import split.com.google.common.base.Preconditions;
import split.com.google.common.base.Strings;
import split.org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import split.org.apache.hc.core5.net.URIBuilder;

public class EventSourceClientImp
implements EventSourceClient {
    private static final Logger _log = LoggerFactory.getLogger(EventSourceClient.class);
    private static final String ERROR = "error";
    private static final String MESSAGE = "message";
    private final String _baseStreamingUrl;
    private final NotificationParser _notificationParser;
    private final NotificationProcessor _notificationProcessor;
    private final SSEClient _sseClient;
    private final PushStatusTracker _pushStatusTracker;
    private final AtomicBoolean _firstEvent;

    @VisibleForTesting
    EventSourceClientImp(String baseStreamingUrl, NotificationParser notificationParser, NotificationProcessor notificationProcessor, PushStatusTracker pushStatusTracker, CloseableHttpClient sseHttpClient, TelemetryRuntimeProducer telemetryRuntimeProducer, ThreadFactory threadFactory) {
        this._baseStreamingUrl = Preconditions.checkNotNull(baseStreamingUrl);
        this._notificationParser = Preconditions.checkNotNull(notificationParser);
        this._notificationProcessor = Preconditions.checkNotNull(notificationProcessor);
        this._pushStatusTracker = pushStatusTracker;
        this._sseClient = new SSEClient(inboundEvent -> {
            this.onMessage((RawEvent)inboundEvent);
            return null;
        }, status -> {
            this._pushStatusTracker.handleSseStatus((SSEClient.StatusMessage)((Object)status));
            return null;
        }, sseHttpClient, telemetryRuntimeProducer, threadFactory);
        this._firstEvent = new AtomicBoolean();
    }

    public static EventSourceClientImp build(String baseStreamingUrl, SplitsWorker splitsWorker, Worker<SegmentQueueDto> segmentWorker, PushStatusTracker pushStatusTracker, CloseableHttpClient sseHttpClient, TelemetryRuntimeProducer telemetryRuntimeProducer, ThreadFactory threadFactory) {
        return new EventSourceClientImp(baseStreamingUrl, new NotificationParserImp(), NotificationProcessorImp.build(splitsWorker, segmentWorker, pushStatusTracker), pushStatusTracker, sseHttpClient, telemetryRuntimeProducer, threadFactory);
    }

    @Override
    public boolean start(String channelList, String token) {
        if (this._sseClient.isOpen()) {
            this._sseClient.close();
        }
        try {
            this._firstEvent.set(false);
            return this._sseClient.open(this.buildUri(channelList, token));
        }
        catch (URISyntaxException e) {
            _log.error("Error building Streaming URI: " + e.getMessage());
            return false;
        }
    }

    @Override
    public void stop() {
        _log.info("Stopping EventSourceClientImp");
        if (!this._sseClient.isOpen()) {
            _log.info("Event Source Client is closed.");
            return;
        }
        this._sseClient.close();
    }

    private URI buildUri(String channelList, String token) throws URISyntaxException {
        return new URIBuilder(this._baseStreamingUrl).addParameter("channels", channelList).addParameter("v", "1.1").addParameter("accessToken", token).build();
    }

    private void onMessage(RawEvent event) {
        try {
            String type = event.event();
            String payload = event.data();
            if (this._firstEvent.compareAndSet(false, true) && !ERROR.equals(type)) {
                this._pushStatusTracker.handleSseStatus(SSEClient.StatusMessage.FIRST_EVENT);
            }
            if (!Strings.isNullOrEmpty(payload)) {
                _log.debug(String.format("Payload received: %s", payload));
                switch (type) {
                    case "message": {
                        this._notificationProcessor.process(this._notificationParser.parseMessage(payload));
                        break;
                    }
                    case "error": {
                        this._pushStatusTracker.handleIncomingAblyError(this._notificationParser.parseError(payload));
                        break;
                    }
                    default: {
                        throw new EventParsingException("Wrong notification type.", payload);
                    }
                }
            }
        }
        catch (EventParsingException ex) {
            _log.debug(String.format("Error parsing the event: %s. Payload: %s", ex.getMessage(), ex.getPayload()));
        }
        catch (Exception e) {
            _log.debug(String.format("Error parsing the event id: %s. OnMessage: %s", event.id(), e.getMessage()), (Throwable)e);
        }
    }
}

