/*
 * Decompiled with CFR 0.152.
 */
package rx.apache.http;

import java.util.concurrent.Future;
import org.apache.http.HttpResponse;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.nio.client.HttpAsyncClient;
import org.apache.http.nio.client.methods.HttpAsyncMethods;
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
import org.apache.http.protocol.BasicHttpContext;
import org.apache.http.protocol.HttpContext;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.apache.http.ObservableHttpResponse;
import rx.apache.http.consumers.ResponseConsumerDelegate;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;

public class ObservableHttp<T> {
    private final Observable.OnSubscribe<T> onSubscribe;

    private ObservableHttp(Observable.OnSubscribe<T> onSubscribe) {
        this.onSubscribe = onSubscribe;
    }

    private static <T> ObservableHttp<T> create(Observable.OnSubscribe<T> onSubscribe) {
        return new ObservableHttp<T>(onSubscribe);
    }

    public Observable<T> toObservable() {
        return Observable.create((Observable.OnSubscribe)new Observable.OnSubscribe<T>(){

            public void call(Subscriber<? super T> observer) {
                ObservableHttp.this.onSubscribe.call(observer);
            }
        });
    }

    public static ObservableHttp<ObservableHttpResponse> createGet(String uri, HttpAsyncClient client) {
        return ObservableHttp.createRequest(HttpAsyncMethods.createGet((String)uri), client);
    }

    public static ObservableHttp<ObservableHttpResponse> createRequest(HttpAsyncRequestProducer requestProducer, HttpAsyncClient client) {
        return ObservableHttp.createRequest(requestProducer, client, (HttpContext)new BasicHttpContext());
    }

    public static ObservableHttp<ObservableHttpResponse> createRequest(final HttpAsyncRequestProducer requestProducer, final HttpAsyncClient client, final HttpContext context) {
        return ObservableHttp.create(new Observable.OnSubscribe<ObservableHttpResponse>(){

            public void call(final Subscriber<? super ObservableHttpResponse> observer) {
                CompositeSubscription parentSubscription = new CompositeSubscription();
                observer.add((Subscription)parentSubscription);
                parentSubscription.add(Subscriptions.from((Future)client.execute(requestProducer, (HttpAsyncResponseConsumer)new ResponseConsumerDelegate((Observer<? super ObservableHttpResponse>)observer, parentSubscription), context, (FutureCallback)new FutureCallback<HttpResponse>(){

                    public void completed(HttpResponse result) {
                        observer.onCompleted();
                    }

                    public void failed(Exception ex) {
                        observer.onError((Throwable)ex);
                    }

                    public void cancelled() {
                        observer.onCompleted();
                    }
                })));
            }
        });
    }
}

