/*
 * Decompiled with CFR 0.152.
 */
package org.xlightweb;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.SocketTimeoutException;
import java.nio.BufferUnderflowException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.List;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.xlightweb.Event;
import org.xlightweb.EventStreamHandlerAdapter;
import org.xlightweb.GetRequest;
import org.xlightweb.HttpUtils;
import org.xlightweb.IBodyCompleteListener;
import org.xlightweb.IBodyDataHandler;
import org.xlightweb.IBodyDestroyListener;
import org.xlightweb.IEventDataSource;
import org.xlightweb.IEventHandler;
import org.xlightweb.IHttpRequest;
import org.xlightweb.IHttpResponse;
import org.xlightweb.NonBlockingBodyDataSource;
import org.xlightweb.client.XHttpClient;
import org.xsocket.DataConverter;
import org.xsocket.Execution;

public final class EventDataSource
implements IEventDataSource {
    private static final Logger LOG = Logger.getLogger(EventDataSource.class.getName());
    private static final int DEFAULT_MAX_RECONNECT_TRIALS = Integer.parseInt(System.getProperty("org.xlightweb.eventdatasource.maxreconnectrials", "5"));
    private static final int DEFAULT_RECONNECT_TIME_MILLIS = 10000;
    private int reconnectionTimeMillis = 10000;
    private final AtomicInteger reconnectTrials = new AtomicInteger(0);
    private final AtomicBoolean isReconnecting = new AtomicBoolean(false);
    private final AtomicInteger numReconnects = new AtomicInteger();
    private long timeLastConnectTrial = 0L;
    public static final int DEFAULT_READ_TIMEOUT_MILLIS = Integer.MAX_VALUE;
    private final XHttpClient httpClient;
    private final String uriString;
    private final Object bodyDataSourceGuard = new Object();
    private NonBlockingBodyDataSource bodyDataSource;
    private final AtomicBoolean isOpen = new AtomicBoolean(true);
    private boolean isIgnoreCommentMessages = true;
    private final List<Event> inQueue = new ArrayList<Event>();
    private int inQueueVersion = 0;
    private AtomicReference<String> lastEventIdRef = new AtomicReference();
    private final String[] headerlines;
    private final EventStreamHandlerAdapter webEventHandlerAdapter;

    public EventDataSource(XHttpClient httpClient, String uriString, IEventHandler webEventHandler) throws MalformedURLException, IOException {
        this(httpClient, uriString, true, webEventHandler, new String[0]);
    }

    public EventDataSource(XHttpClient httpClient, String uriString, boolean isIgnoreCommentMessage, IEventHandler webEventHandler, String ... headerlines) throws MalformedURLException, IOException {
        this(httpClient, uriString, isIgnoreCommentMessage, webEventHandler, DEFAULT_MAX_RECONNECT_TRIALS, headerlines);
    }

    EventDataSource(XHttpClient httpClient, String uriString, boolean isIgnoreCommentMessage, IEventHandler webEventHandler, int maxReconnectTrials, String ... headerlines) throws MalformedURLException, IOException {
        this.httpClient = httpClient;
        this.uriString = uriString;
        this.isIgnoreCommentMessages = isIgnoreCommentMessage;
        this.headerlines = headerlines;
        this.webEventHandlerAdapter = new EventStreamHandlerAdapter(webEventHandler);
        this.connect();
    }

    public boolean isIgnoreCommentMessages() {
        return this.isIgnoreCommentMessages;
    }

    public void setIgnoreCommentMessages(boolean isIgnoreCommentMessages) {
        this.isIgnoreCommentMessages = isIgnoreCommentMessages;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void connect() throws MalformedURLException, IOException {
        if (LOG.isLoggable(Level.FINE)) {
            LOG.fine("open data event stream " + this.uriString);
        }
        GetRequest request = new GetRequest(this.uriString);
        request.setHeader("Accept", "text/event-stream");
        request.setHeader("Cache-Control", "no-cache");
        for (String headerline : this.headerlines) {
            request.addHeaderLine(headerline);
        }
        String lastEventId = this.lastEventIdRef.get();
        if (lastEventId != null) {
            request.setHeader("Last-Event-ID", lastEventId);
        }
        this.timeLastConnectTrial = System.currentTimeMillis();
        IHttpResponse response = this.httpClient.call((IHttpRequest)request);
        if (response.getStatus() < 200 || response.getStatus() > 299) {
            throw new IOException("got " + response.getStatus() + " " + response.getReason());
        }
        if (!response.getContentType().toLowerCase().startsWith("text/event-stream")) {
            throw new IOException("got content type " + response.getContentType() + " instead text/event-stream");
        }
        Object object = this.bodyDataSourceGuard;
        synchronized (object) {
            this.bodyDataSource = response.getNonBlockingBody();
            EventHandler eh = new EventHandler();
            this.bodyDataSource.setDataHandler((IBodyDataHandler)eh);
            this.bodyDataSource.addDestroyListener((IBodyDestroyListener)eh);
            this.bodyDataSource.addCompleteListener((IBodyCompleteListener)eh);
        }
        this.webEventHandlerAdapter.onConnect(this);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void destroy() {
        Object object = this.bodyDataSourceGuard;
        synchronized (object) {
            if (this.isOpen.getAndSet(false)) {
                this.bodyDataSource.destroy();
            }
        }
        this.webEventHandlerAdapter.onDisconnect(this);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() throws IOException {
        Object object = this.bodyDataSourceGuard;
        synchronized (object) {
            if (this.isOpen.getAndSet(false)) {
                this.bodyDataSource.close();
            }
        }
        this.webEventHandlerAdapter.onDisconnect(this);
    }

    public void closeQuitly() {
        try {
            this.close();
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }

    public String getId() {
        return this.bodyDataSource.getId();
    }

    public String getLastEventId() {
        return this.lastEventIdRef.get();
    }

    public int getReconnectionTimeMillis() {
        return 0;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int availableMessages() {
        List<Event> list = this.inQueue;
        synchronized (list) {
            return this.inQueue.size();
        }
    }

    public Event readMessage() throws IOException, SocketTimeoutException, ClosedChannelException {
        return this.readMessage(Integer.MAX_VALUE);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Event readMessage(int readTimeoutMillis) throws IOException, SocketTimeoutException, ClosedChannelException {
        long start = System.currentTimeMillis();
        long remainingTime = readTimeoutMillis;
        do {
            List<Event> list = this.inQueue;
            synchronized (list) {
                if (this.inQueue.isEmpty()) {
                    try {
                        this.inQueue.wait(remainingTime);
                    }
                    catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                    }
                } else {
                    ++this.inQueueVersion;
                    return this.inQueue.remove(0);
                }
            }
        } while ((remainingTime = HttpUtils.computeRemainingTime((long)start, (int)readTimeoutMillis)) > 0L);
        if (LOG.isLoggable(Level.FINE)) {
            LOG.fine("receive timeout " + DataConverter.toFormatedDuration((long)readTimeoutMillis) + " reached. throwing timeout exception");
        }
        throw new SocketTimeoutException("timeout " + DataConverter.toFormatedDuration((long)readTimeoutMillis) + " reached");
    }

    public void setReconnectionTimeMillis(int reconnectionTimeMillis) {
        this.reconnectionTimeMillis = reconnectionTimeMillis;
    }

    public int getNumReconnects() {
        return this.numReconnects.get();
    }

    void processMultithreaded(Runnable task) {
        this.bodyDataSource.getExecutor().processMultithreaded(task);
    }

    void processNonthreaded(Runnable task) {
        this.bodyDataSource.getExecutor().processNonthreaded(task);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    int getInQueueVersion() {
        List<Event> list = this.inQueue;
        synchronized (list) {
            return this.inQueueVersion;
        }
    }

    private void performReconnect() {
        if (!this.isOpen.get()) {
            return;
        }
        if (!this.isReconnecting.getAndSet(true)) {
            if (LOG.isLoggable(Level.FINE)) {
                LOG.fine("connection is terminated. Try to reconnect to " + this.uriString);
            }
            TimerTask reconnectTask = new TimerTask(){

                public void run() {
                    try {
                        if (LOG.isLoggable(Level.FINE)) {
                            LOG.fine("try reconnect " + EventDataSource.this.uriString);
                        }
                        EventDataSource.this.connect();
                        EventDataSource.this.numReconnects.incrementAndGet();
                        EventDataSource.this.reconnectTrials.set(0);
                        EventDataSource.this.isReconnecting.set(false);
                        if (LOG.isLoggable(Level.FINE)) {
                            LOG.fine("reconnected to " + EventDataSource.this.uriString);
                        }
                    }
                    catch (IOException ioe) {
                        if (LOG.isLoggable(Level.FINE)) {
                            LOG.fine("reconnecting " + EventDataSource.this.uriString + " failed " + ioe.toString());
                        }
                        EventDataSource.this.isReconnecting.set(false);
                        EventDataSource.this.performReconnect();
                    }
                }
            };
            long waitTime = (long)this.reconnectionTimeMillis + this.timeLastConnectTrial - System.currentTimeMillis();
            if (waitTime > 0L) {
                if (LOG.isLoggable(Level.FINE)) {
                    LOG.fine("reconnecting " + this.uriString + " in " + DataConverter.toFormatedDuration((long)waitTime));
                }
                HttpUtils.schedule((TimerTask)reconnectTask, (long)waitTime);
            } else {
                this.processMultithreaded(reconnectTask);
            }
        }
    }

    @Execution(value=0)
    private final class EventHandler
    implements IBodyDataHandler,
    IBodyDestroyListener,
    IBodyCompleteListener {
        private EventHandler() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public boolean onData(NonBlockingBodyDataSource bodyDataSource) throws BufferUnderflowException {
            block12: {
                Event webEvent = null;
                try {
                    int idx = bodyDataSource.indexOf("\n\n");
                    if (idx != -1) {
                        webEvent = Event.parse(bodyDataSource.readStringByLength(idx + 2));
                    } else {
                        idx = bodyDataSource.indexOf("\r\n\r\n");
                        if (idx != -1) {
                            webEvent = Event.parse(bodyDataSource.readStringByLength(idx + 4));
                        }
                    }
                    if (webEvent == null) break block12;
                    if (EventDataSource.this.isIgnoreCommentMessages && webEvent.isCommentMessage()) {
                        if (LOG.isLoggable(Level.FINE)) {
                            LOG.fine("comment message received. ignoring it (property isIgnoreCommentMessages=true)");
                        }
                        return true;
                    }
                    List list = EventDataSource.this.inQueue;
                    synchronized (list) {
                        EventDataSource.this.lastEventIdRef.set(webEvent.getId());
                        if (webEvent.getRetryMillis() != null) {
                            EventDataSource.this.reconnectionTimeMillis = webEvent.getRetryMillis();
                        }
                        EventDataSource.this.inQueueVersion++;
                        EventDataSource.this.inQueue.add(webEvent);
                        EventDataSource.this.inQueue.notifyAll();
                    }
                    EventDataSource.this.webEventHandlerAdapter.onMessage(EventDataSource.this);
                }
                catch (IOException ioe) {
                    if (LOG.isLoggable(Level.FINE)) {
                        LOG.fine("[" + bodyDataSource.getId() + "] error occured by parsing event " + ioe.toString());
                    }
                    bodyDataSource.destroy();
                }
            }
            return true;
        }

        public void onDestroyed() throws IOException {
            EventDataSource.this.performReconnect();
        }

        public void onComplete() {
            EventDataSource.this.performReconnect();
        }
    }
}

