/*
 * Decompiled with CFR 0.152.
 */
package io.split.engine.sse.client;

import io.split.client.RequestDecorator;
import io.split.client.utils.SplitExecutorFactory;
import io.split.engine.sse.client.RawEvent;
import io.split.telemetry.domain.StreamingEvent;
import io.split.telemetry.domain.enums.StreamEventsEnum;
import io.split.telemetry.storage.TelemetryRuntimeProducer;
import java.io.BufferedReader;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.SocketException;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import split.com.google.common.base.Preconditions;
import split.com.google.common.base.Strings;
import split.org.apache.hc.client5.http.classic.methods.HttpGet;
import split.org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import split.org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;

public class SSEClient {
    private static final String SOCKET_CLOSED_MESSAGE = "Socket closed";
    private static final String KEEP_ALIVE_PAYLOAD = ":keepalive\n";
    private static final long CONNECT_TIMEOUT = 30000L;
    private static final Logger _log = LoggerFactory.getLogger(SSEClient.class);
    private final ExecutorService _connectionExecutor;
    private final CloseableHttpClient _client;
    private final Function<RawEvent, Void> _eventCallback;
    private final Function<StatusMessage, Void> _statusCallback;
    private final AtomicReference<ConnectionState> _state = new AtomicReference<ConnectionState>(ConnectionState.CLOSED);
    private final AtomicReference<CloseableHttpResponse> _ongoingResponse = new AtomicReference();
    private final AtomicReference<HttpGet> _ongoingRequest = new AtomicReference();
    private AtomicBoolean _forcedStop;
    private final RequestDecorator _requestDecorator;
    private final TelemetryRuntimeProducer _telemetryRuntimeProducer;

    public SSEClient(Function<RawEvent, Void> eventCallback, Function<StatusMessage, Void> statusCallback, CloseableHttpClient client, TelemetryRuntimeProducer telemetryRuntimeProducer, ThreadFactory threadFactory, RequestDecorator requestDecorator) {
        this._eventCallback = eventCallback;
        this._statusCallback = statusCallback;
        this._client = client;
        this._forcedStop = new AtomicBoolean();
        this._telemetryRuntimeProducer = Preconditions.checkNotNull(telemetryRuntimeProducer);
        this._connectionExecutor = SplitExecutorFactory.buildExecutorService(threadFactory, "SPLIT-SSEConnection-%d");
        this._requestDecorator = requestDecorator;
    }

    public synchronized boolean open(URI uri) {
        if (this.isOpen()) {
            _log.info("SSEClient already open.");
            return false;
        }
        this._statusCallback.apply(StatusMessage.INITIALIZATION_IN_PROGRESS);
        CountDownLatch signal = new CountDownLatch(1);
        this._connectionExecutor.submit(() -> this.connectAndLoop(uri, signal));
        try {
            if (!signal.await(30000L, TimeUnit.SECONDS)) {
                return false;
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            if (e.getMessage() == null) {
                _log.info("The thread was interrupted while opening SSEClient");
                return false;
            }
            _log.info(e.getMessage());
            return false;
        }
        return this.isOpen();
    }

    public boolean isOpen() {
        return ConnectionState.OPEN.equals((Object)this._state.get());
    }

    public synchronized void close() {
        this._forcedStop.set(true);
        if (this._state.compareAndSet(ConnectionState.OPEN, ConnectionState.CLOSED) && this._ongoingResponse.get() != null) {
            try {
                this._ongoingRequest.get().abort();
                this._ongoingResponse.get().close();
            }
            catch (IOException e) {
                _log.debug(String.format("SSEClient close forced: %s", e.getMessage()));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private void connectAndLoop(URI uri, CountDownLatch signal) {
        Preconditions.checkNotNull(uri);
        Preconditions.checkNotNull(signal);
        try {
            if (!this.establishConnection(uri, signal)) {
                this._statusCallback.apply(StatusMessage.RETRYABLE_ERROR);
                return;
            }
            InputStream stream = this._ongoingResponse.get().getEntity().getContent();
            BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
            while (this.isOpen() && !Thread.currentThread().isInterrupted()) {
                try {
                    this.handleMessage(SSEClient.readMessageAsString(reader));
                }
                catch (SocketException exc) {
                    _log.debug(exc.getMessage());
                    if (SOCKET_CLOSED_MESSAGE.equals(exc.getMessage())) {
                        this._statusCallback.apply(StatusMessage.FORCED_STOP);
                        this._telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SSE_CONNECTION_ERROR.getType(), StreamEventsEnum.SseConnectionErrorValues.REQUESTED_CONNECTION_ERROR.getValue(), System.currentTimeMillis()));
                        try {
                            this._ongoingResponse.get().close();
                        }
                        catch (IOException e) {
                            _log.debug(e.getMessage());
                        }
                        this._state.set(ConnectionState.CLOSED);
                        _log.debug("SSEClient finished.");
                        this._forcedStop.set(false);
                        return;
                    }
                    this._statusCallback.apply(StatusMessage.RETRYABLE_ERROR);
                    this._telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SSE_CONNECTION_ERROR.getType(), StreamEventsEnum.SseConnectionErrorValues.NON_REQUESTED_CONNECTION_ERROR.getValue(), System.currentTimeMillis()));
                    try {
                        this._ongoingResponse.get().close();
                    }
                    catch (IOException e) {
                        _log.debug(e.getMessage());
                    }
                    this._state.set(ConnectionState.CLOSED);
                    _log.debug("SSEClient finished.");
                    this._forcedStop.set(false);
                    return;
                }
                catch (IOException exc) {
                    block29: {
                        if (this._forcedStop.get()) break block29;
                        _log.debug(String.format("SSE connection ended abruptly: %s. Retying", exc.getMessage()));
                        this._telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SSE_CONNECTION_ERROR.getType(), StreamEventsEnum.SseConnectionErrorValues.REQUESTED_CONNECTION_ERROR.getValue(), System.currentTimeMillis()));
                        this._statusCallback.apply(StatusMessage.RETRYABLE_ERROR);
                        try {
                            this._ongoingResponse.get().close();
                        }
                        catch (IOException e) {
                            _log.debug(e.getMessage());
                        }
                        this._state.set(ConnectionState.CLOSED);
                        _log.debug("SSEClient finished.");
                        this._forcedStop.set(false);
                        return;
                    }
                    this._telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SSE_CONNECTION_ERROR.getType(), StreamEventsEnum.SseConnectionErrorValues.NON_REQUESTED_CONNECTION_ERROR.getValue(), System.currentTimeMillis()));
                    continue;
                    {
                        catch (Exception e) {
                            this._telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SSE_CONNECTION_ERROR.getType(), StreamEventsEnum.SseConnectionErrorValues.NON_REQUESTED_CONNECTION_ERROR.getValue(), System.currentTimeMillis()));
                            _log.warn(e.getMessage(), (Throwable)e);
                            this._statusCallback.apply(StatusMessage.NONRETRYABLE_ERROR);
                            return;
                        }
                        catch (Throwable throwable) {
                            throw throwable;
                            return;
                        }
                    }
                }
            }
        }
        finally {
            try {
                this._ongoingResponse.get().close();
            }
            catch (IOException e) {
                _log.debug(e.getMessage());
            }
            this._state.set(ConnectionState.CLOSED);
            _log.debug("SSEClient finished.");
            this._forcedStop.set(false);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean establishConnection(URI uri, CountDownLatch signal) {
        HttpGet request = new HttpGet(uri);
        request = (HttpGet)this._requestDecorator.decorateHeaders(request);
        this._ongoingRequest.set(request);
        try {
            this._ongoingResponse.set(this._client.execute(this._ongoingRequest.get()));
            if (this._ongoingResponse.get().getCode() != 200) {
                _log.error(String.format("Establishing connection, code error: %s. The url is %s", this._ongoingResponse.get().getCode(), uri.toURL()));
                boolean bl = false;
                return bl;
            }
            this._state.set(ConnectionState.OPEN);
            this._statusCallback.apply(StatusMessage.CONNECTED);
        }
        catch (IOException exc) {
            _log.error(String.format("Error establishConnection to %s", uri), (Throwable)exc);
            boolean bl = false;
            return bl;
        }
        finally {
            signal.countDown();
        }
        return true;
    }

    private static String readMessageAsString(BufferedReader reader) throws IOException {
        StringBuilder lines = new StringBuilder();
        while (true) {
            String line;
            if (null == (line = reader.readLine())) {
                throw new EOFException("connection closed by remote host");
            }
            if (line.isEmpty()) {
                return lines.toString();
            }
            lines.append(line).append("\n");
        }
    }

    private void handleMessage(String message) {
        if (Strings.isNullOrEmpty(message) || KEEP_ALIVE_PAYLOAD.equals(message)) {
            _log.debug("Keep Alive event");
            return;
        }
        RawEvent e = RawEvent.fromString(message);
        this._eventCallback.apply(e);
    }

    private static enum ConnectionState {
        OPEN,
        CLOSED;

    }

    public static enum StatusMessage {
        CONNECTED,
        RETRYABLE_ERROR,
        NONRETRYABLE_ERROR,
        INITIALIZATION_IN_PROGRESS,
        FORCED_STOP,
        FIRST_EVENT;

    }
}

