/*
 * Decompiled with CFR 0.152.
 */
package io.featureflow.client.core;

import io.featureflow.client.core.ConnectionHandler;
import io.featureflow.client.core.EventSourceHandler;
import io.featureflow.client.core.EventSourceParser;
import io.featureflow.client.core.FailedResponseException;
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import okhttp3.Call;
import okhttp3.Headers;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okio.BufferedSource;
import okio.Okio;
import okio.Source;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventSource
implements ConnectionHandler,
Closeable {
    private final AtomicReference<State> state = new AtomicReference<State>(State.UNINITIALISED);
    private final URI uri;
    private final ExecutorService executor;
    private volatile long reconnectTimeMillis = 0L;
    private final Headers headers;
    private final EventSourceHandler eventSourceHandler;
    private final OkHttpClient client;
    private volatile Call call;
    public static final Logger log = LoggerFactory.getLogger(EventSource.class);
    private String lastEventId;

    public EventSource(URI uri, long reconnectTimeMillis, Headers headers, EventSourceHandler eventSourceHandler) {
        this.uri = uri;
        this.executor = Executors.newCachedThreadPool();
        this.reconnectTimeMillis = reconnectTimeMillis;
        this.headers = headers;
        this.eventSourceHandler = eventSourceHandler;
        this.client = new OkHttpClient().newBuilder().readTimeout(0L, TimeUnit.SECONDS).writeTimeout(0L, TimeUnit.SECONDS).connectTimeout(0L, TimeUnit.SECONDS).retryOnConnectionFailure(true).build();
    }

    public void init() {
        if (!this.state.compareAndSet(State.UNINITIALISED, State.CONNECTING)) {
            log.info("Already starting.");
        } else {
            log.debug("state change: " + (Object)((Object)State.UNINITIALISED) + " to " + (Object)((Object)State.CONNECTING));
            log.info("Starting EventSource client using URI: " + this.uri);
            this.executor.execute(new Runnable(){

                @Override
                public void run() {
                    EventSource.this.doConnect();
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doConnect() {
        Response response = null;
        block11: while (true) {
            try {
                while (!Thread.currentThread().isInterrupted() && this.state.get() != State.SHUTDOWN) {
                    State currentState = this.state.getAndSet(State.CONNECTING);
                    log.debug("state change: " + (Object)((Object)currentState) + " to " + (Object)((Object)State.CONNECTING));
                    try {
                        Request.Builder ioe = new Request.Builder().headers(this.headers).url(this.uri.toASCIIString()).get();
                        if (this.lastEventId != null && !this.lastEventId.isEmpty()) {
                            ioe.addHeader("Last-Event-ID", this.lastEventId);
                        }
                        this.call = this.client.newCall(ioe.build());
                        response = this.call.execute();
                        if (response.isSuccessful()) {
                            String line;
                            currentState = this.state.getAndSet(State.OPEN);
                            if (currentState != State.CONNECTING) {
                                log.warn("Unexpected state change: " + (Object)((Object)currentState) + " to " + (Object)((Object)State.OPEN));
                            } else {
                                log.debug("state change: " + (Object)((Object)currentState) + " to " + (Object)((Object)State.OPEN));
                            }
                            log.info("Connected to Feature Control SSE Stream");
                            BufferedSource bs = Okio.buffer((Source)response.body().source());
                            EventSourceParser parser = new EventSourceParser(this.uri, this.eventSourceHandler, this);
                            while (!Thread.currentThread().isInterrupted() && (line = bs.readUtf8LineStrict()) != null) {
                                parser.line(line);
                            }
                        } else {
                            log.debug("Failed Response: " + response);
                            this.eventSourceHandler.onError(new FailedResponseException(response.message(), response.code(), response.request().url().toString()));
                        }
                    }
                    catch (EOFException eof) {
                        log.warn("Connection unexpectedly closed due to {}.", (Object)eof.getMessage());
                    }
                    catch (IOException var15) {
                        log.debug("Connection problem.", (Throwable)var15);
                        this.eventSourceHandler.onError(var15);
                    }
                    finally {
                        currentState = this.state.getAndSet(State.CLOSED);
                        log.debug("state change: " + (Object)((Object)currentState) + " to " + (Object)((Object)State.CLOSED));
                        if (response != null && response.body() != null) {
                            response.body().close();
                        }
                        if (this.call != null) {
                            this.call.cancel();
                        }
                    }
                    if (this.reconnectTimeMillis <= 0L) continue;
                    log.info("Waiting to reconnect.." + this.reconnectTimeMillis);
                    try {
                        Thread.sleep(this.reconnectTimeMillis);
                        continue block11;
                    }
                    catch (InterruptedException interruptedException) {
                    }
                }
                break;
            }
            catch (RejectedExecutionException rejectedExecutionException) {
                // empty catch block
                break;
            }
        }
    }

    @Override
    public void setReconnectionTimeMillis(long reconnectionTimeMillis) {
        this.reconnectTimeMillis = reconnectionTimeMillis;
    }

    @Override
    public void setLastEventId(String lastEventId) {
        this.lastEventId = lastEventId;
    }

    @Override
    public void close() throws IOException {
    }

    static enum State {
        UNINITIALISED,
        CONNECTING,
        OPEN,
        CLOSED,
        SHUTDOWN;

    }
}

