/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.rx.java.test;

import io.vertx.core.Verticle;
import io.vertx.core.file.OpenOptions;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.NetServerOptions;
import io.vertx.lang.rx.test.SimplePojo;
import io.vertx.rx.java.ObservableFuture;
import io.vertx.rx.java.RxHelper;
import io.vertx.rxjava.core.AbstractVerticle;
import io.vertx.rxjava.core.Context;
import io.vertx.rxjava.core.Vertx;
import io.vertx.rxjava.core.buffer.Buffer;
import io.vertx.rxjava.core.eventbus.EventBus;
import io.vertx.rxjava.core.eventbus.MessageConsumer;
import io.vertx.rxjava.core.http.HttpClient;
import io.vertx.rxjava.core.http.HttpClientRequest;
import io.vertx.rxjava.core.http.HttpClientResponse;
import io.vertx.rxjava.core.http.HttpServer;
import io.vertx.rxjava.core.http.HttpServerRequest;
import io.vertx.rxjava.core.http.ServerWebSocket;
import io.vertx.rxjava.core.http.WebSocket;
import io.vertx.rxjava.core.net.NetServer;
import io.vertx.rxjava.core.net.NetSocket;
import io.vertx.rxjava.core.parsetools.RecordParser;
import io.vertx.rxjava.core.streams.ReadStream;
import io.vertx.test.core.VertxTestBase;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.Single;
import rx.Subscriber;

public class CoreApiTest
extends VertxTestBase {
    private Vertx vertx;

    public void setUp() throws Exception {
        super.setUp();
        this.vertx = new Vertx(((VertxTestBase)this).vertx);
    }

    @Test
    public void testConsumeBodyStream() {
        EventBus eb = this.vertx.eventBus();
        final MessageConsumer consumer = eb.consumer("the-address");
        Observable obs = consumer.bodyStream().toObservable();
        final ArrayList items = new ArrayList();
        obs.subscribe((Subscriber)new Subscriber<String>(){

            public void onNext(String s) {
                items.add(s);
                if (items.size() == 3) {
                    this.unsubscribe();
                    CoreApiTest.this.assertEquals(Arrays.asList("msg1", "msg2", "msg3"), items);
                    CoreApiTest.this.assertFalse(consumer.isRegistered());
                    CoreApiTest.this.testComplete();
                }
            }

            public void onError(Throwable throwable) {
                CoreApiTest.this.fail(throwable.getMessage());
            }

            public void onCompleted() {
                CoreApiTest.this.fail();
            }
        });
        eb.send("the-address", (Object)"msg1");
        eb.send("the-address", (Object)"msg2");
        eb.send("the-address", (Object)"msg3");
        this.await();
    }

    @Test
    public void testRegisterAgain() {
        final EventBus eb = this.vertx.eventBus();
        final MessageConsumer consumer = eb.consumer("the-address");
        final Observable obs = consumer.bodyStream().toObservable();
        obs.subscribe((Subscriber)new Subscriber<String>(){

            public void onNext(String s) {
                CoreApiTest.this.assertEquals("msg1", s);
                this.unsubscribe();
                CoreApiTest.this.assertFalse(consumer.isRegistered());
                obs.subscribe((Subscriber)new Subscriber<String>(){

                    public void onNext(String s) {
                        CoreApiTest.this.assertEquals("msg2", s);
                        this.unsubscribe();
                        CoreApiTest.this.assertFalse(consumer.isRegistered());
                        CoreApiTest.this.testComplete();
                    }

                    public void onError(Throwable throwable) {
                        CoreApiTest.this.fail("Was not esxpecting error " + throwable.getMessage());
                    }

                    public void onCompleted() {
                        CoreApiTest.this.fail();
                    }
                });
                eb.send("the-address", (Object)"msg2");
            }

            public void onError(Throwable throwable) {
                CoreApiTest.this.fail("Was not esxpecting error " + throwable.getMessage());
            }

            public void onCompleted() {
                CoreApiTest.this.fail();
            }
        });
        eb.send("the-address", (Object)"msg1");
        this.await();
    }

    @Test
    public void testObservableUnsubscribeDuringObservation() {
        EventBus eb = this.vertx.eventBus();
        MessageConsumer consumer = eb.consumer("the-address");
        Observable obs = consumer.bodyStream().toObservable();
        Observable a = obs.take(4);
        final ArrayList obtained = new ArrayList();
        a.subscribe((Subscriber)new Subscriber<String>(){

            public void onCompleted() {
                CoreApiTest.this.assertEquals(Arrays.asList("msg0", "msg1", "msg2", "msg3"), obtained);
                CoreApiTest.this.testComplete();
            }

            public void onError(Throwable e) {
                CoreApiTest.this.fail(e.getMessage());
            }

            public void onNext(String str) {
                obtained.add(str);
            }
        });
        for (int i = 0; i < 7; ++i) {
            eb.send("the-address", (Object)("msg" + i));
        }
        this.await();
    }

    @Test
    public void testUnregisterConsumer() {
        EventBus eb = this.vertx.eventBus();
        MessageConsumer consumer = eb.consumer("the-address");
        Observable obs = consumer.bodyStream().toObservable();
        obs.subscribe((Subscriber)new Subscriber<String>(){

            public void onCompleted() {
                CoreApiTest.this.testComplete();
            }

            public void onError(Throwable e) {
                CoreApiTest.this.fail(e.getMessage());
            }

            public void onNext(String str) {
                CoreApiTest.this.fail();
            }
        });
        consumer.unregister();
        this.await();
    }

    @Test
    public void testConcatReplies() {
        EventBus eb = this.vertx.eventBus();
        eb.consumer("the-address", msg -> msg.reply(msg.body()));
        Single obs1 = eb.rxRequest("the-address", (Object)"msg1");
        Single obs2 = eb.rxRequest("the-address", (Object)"msg2");
        eb.request("the-address", (Object)"done", reply -> {
            Observable all = Single.concat((Single)obs1, (Single)obs2);
            LinkedList values = new LinkedList();
            all.subscribe(next -> values.add((String)next.body()), err -> this.fail(), () -> {
                this.assertEquals(Arrays.asList("msg1", "msg2"), values);
                this.testComplete();
            });
        });
        this.await();
    }

    @Test
    public void testObservableNetSocket() {
        ObservableFuture onListen = RxHelper.observableFuture();
        onListen.subscribe(server -> this.vertx.createNetClient(new NetClientOptions()).connect(1234, "localhost", ar -> {
            this.assertTrue(ar.succeeded());
            NetSocket so = (NetSocket)ar.result();
            so.write("foo");
            so.close();
        }), error -> this.fail(error.getMessage()));
        final NetServer server2 = this.vertx.createNetServer(new NetServerOptions().setPort(1234).setHost("localhost"));
        Observable socketObs = server2.connectStream().toObservable();
        socketObs.subscribe((Subscriber)new Subscriber<NetSocket>(){

            public void onNext(NetSocket o) {
                Observable dataObs = o.toObservable();
                dataObs.subscribe((Observer)new Observer<Buffer>(){
                    LinkedList<Buffer> buffers = new LinkedList();

                    public void onNext(Buffer buffer) {
                        this.buffers.add(buffer);
                    }

                    public void onError(Throwable e) {
                        CoreApiTest.this.fail(e.getMessage());
                    }

                    public void onCompleted() {
                        CoreApiTest.this.assertEquals(1L, this.buffers.size());
                        CoreApiTest.this.assertEquals("foo", this.buffers.get(0).toString("UTF-8"));
                        server2.close();
                    }
                });
            }

            public void onError(Throwable e) {
                CoreApiTest.this.fail(e.getMessage());
            }

            public void onCompleted() {
                CoreApiTest.this.testComplete();
            }
        });
        server2.listen(onListen.toHandler());
        this.await();
    }

    @Test
    public void testObservableWebSocket() {
        ObservableFuture onListen = RxHelper.observableFuture();
        onListen.subscribe(server -> this.vertx.createHttpClient(new HttpClientOptions()).webSocket(8080, "localhost", "/some/path", ar -> {
            if (ar.succeeded()) {
                WebSocket ws = (WebSocket)ar.result();
                ws.write(Buffer.buffer((String)"foo"));
                ws.close();
            } else {
                this.fail(ar.cause().getMessage());
            }
        }));
        final HttpServer server2 = this.vertx.createHttpServer(new HttpServerOptions().setPort(8080).setHost("localhost"));
        Observable socketObs = server2.webSocketStream().toObservable();
        socketObs.subscribe((Subscriber)new Subscriber<ServerWebSocket>(){

            public void onNext(ServerWebSocket o) {
                Observable dataObs = o.toObservable();
                dataObs.subscribe((Observer)new Observer<Buffer>(){
                    LinkedList<Buffer> buffers = new LinkedList();

                    public void onNext(Buffer buffer) {
                        this.buffers.add(buffer);
                    }

                    public void onError(Throwable e) {
                        CoreApiTest.this.fail(e.getMessage());
                    }

                    public void onCompleted() {
                        CoreApiTest.this.assertEquals(1L, this.buffers.size());
                        CoreApiTest.this.assertEquals("foo", this.buffers.get(0).toString("UTF-8"));
                        server2.close();
                    }
                });
            }

            public void onError(Throwable e) {
                CoreApiTest.this.fail(e.getMessage());
            }

            public void onCompleted() {
                CoreApiTest.this.testComplete();
            }
        });
        server2.listen(onListen.toHandler());
        this.await();
    }

    @Test
    public void testObservableHttpRequest() {
        final HttpServer server = this.vertx.createHttpServer(new HttpServerOptions().setPort(8080).setHost("localhost"));
        Observable socketObs = server.requestStream().toObservable();
        socketObs.subscribe((Subscriber)new Subscriber<HttpServerRequest>(){

            public void onNext(HttpServerRequest o) {
                Observable dataObs = o.toObservable();
                dataObs.subscribe((Observer)new Observer<Buffer>(){
                    LinkedList<Buffer> buffers = new LinkedList();

                    public void onNext(Buffer buffer) {
                        this.buffers.add(buffer);
                    }

                    public void onError(Throwable e) {
                        CoreApiTest.this.fail(e.getMessage());
                    }

                    public void onCompleted() {
                        CoreApiTest.this.assertEquals(1L, this.buffers.size());
                        CoreApiTest.this.assertEquals("foo", this.buffers.get(0).toString("UTF-8"));
                        server.close();
                    }
                });
            }

            public void onError(Throwable e) {
                CoreApiTest.this.fail(e.getMessage());
            }

            public void onCompleted() {
                CoreApiTest.this.testComplete();
            }
        });
        Single onListen = server.rxListen();
        onListen.subscribe(s -> this.vertx.createHttpClient(new HttpClientOptions()).rxRequest(HttpMethod.PUT, 8080, "localhost", "/some/path").subscribe(req -> {
            req.putHeader("Content-Length", "3");
            req.write("foo");
        }), error -> this.fail(error.getMessage()));
        this.await();
    }

    @Test
    public void testConcatOperator() {
        Observable o1 = this.vertx.timerStream(100L).toObservable();
        Observable o2 = this.vertx.timerStream(100L).toObservable();
        Observable obs = Observable.concat((Observable)o1, (Observable)o2);
        AtomicInteger count = new AtomicInteger();
        obs.subscribe(msg -> count.incrementAndGet(), err -> this.fail(), () -> {
            this.assertEquals(2L, count.get());
            this.testComplete();
        });
        this.await();
    }

    @Test
    public void testScheduledTimer() {
        this.vertx.runOnContext(v -> {
            final long startTime = System.currentTimeMillis();
            final Context initCtx = Vertx.currentContext();
            Observable.timer((long)100L, (long)100L, (TimeUnit)TimeUnit.MILLISECONDS, (Scheduler)io.vertx.rxjava.core.RxHelper.scheduler((Vertx)this.vertx)).take(10).subscribe((Observer)new Observer<Long>(){

                public void onNext(Long value) {
                    CoreApiTest.this.assertEquals(initCtx.getDelegate(), Vertx.currentContext().getDelegate());
                }

                public void onError(Throwable e) {
                    CoreApiTest.this.fail("unexpected failure");
                }

                public void onCompleted() {
                    long timeTaken = System.currentTimeMillis() - startTime;
                    CoreApiTest.this.assertTrue("Was expecting to have time taken | " + timeTaken + " -  1000 | < 200", Math.abs(timeTaken - 1000L) < 1000L);
                    CoreApiTest.this.testComplete();
                }
            });
        });
        this.await();
    }

    @Test
    public void testScheduledBuffer() {
        this.vertx.runOnContext(v -> {
            final long startTime = System.currentTimeMillis();
            final Context initCtx = Vertx.currentContext();
            Observable.timer((long)10L, (long)10L, (TimeUnit)TimeUnit.MILLISECONDS, (Scheduler)io.vertx.rxjava.core.RxHelper.scheduler((Vertx)this.vertx)).buffer(100L, TimeUnit.MILLISECONDS, io.vertx.rxjava.core.RxHelper.scheduler((Vertx)this.vertx)).take(10).subscribe((Observer)new Observer<List<Long>>(){
                private int eventCount = 0;

                public void onNext(List<Long> value) {
                    ++this.eventCount;
                    CoreApiTest.this.assertEquals(initCtx.getDelegate(), Vertx.currentContext().getDelegate());
                }

                public void onError(Throwable e) {
                    CoreApiTest.this.fail("unexpected failure");
                }

                public void onCompleted() {
                    long timeTaken = System.currentTimeMillis() - startTime;
                    CoreApiTest.this.assertEquals(10L, this.eventCount);
                    CoreApiTest.this.assertTrue("Was expecting to have time taken | " + timeTaken + " -  1000 | < 200", Math.abs(timeTaken - 1000L) < 1000L);
                    CoreApiTest.this.testComplete();
                }
            });
        });
        this.await();
    }

    @Test
    public void testTimeMap() {
        this.vertx.runOnContext(v -> {
            final Context initCtx = Vertx.currentContext();
            EventBus eb = this.vertx.eventBus();
            ReadStream consumer = eb.localConsumer("the-address").bodyStream();
            Subscriber<String> observer = new Subscriber<String>(){
                boolean first = true;

                public void onNext(String s) {
                    if (this.first) {
                        this.first = false;
                        CoreApiTest.this.assertEquals(initCtx.getDelegate(), Vertx.currentContext().getDelegate());
                        CoreApiTest.this.assertEquals("msg1msg2msg3", s);
                        CoreApiTest.this.testComplete();
                    }
                }

                public void onError(Throwable e) {
                    CoreApiTest.this.fail(e.getMessage());
                }

                public void onCompleted() {
                }
            };
            Observable observable = consumer.toObservable();
            observable.buffer(500L, TimeUnit.MILLISECONDS, io.vertx.rxjava.core.RxHelper.scheduler((Vertx)this.vertx)).map(samples -> samples.stream().reduce("", (a, b) -> a + b)).subscribe((Observer)observer);
            eb.send("the-address", (Object)"msg1");
            eb.send("the-address", (Object)"msg2");
            eb.send("the-address", (Object)"msg3");
        });
        this.await();
    }

    @Test
    public void testObserverToFuture() {
        final HttpServer server = this.vertx.createHttpServer(new HttpServerOptions().setPort(8080)).requestHandler(req -> {});
        final AtomicInteger count = new AtomicInteger();
        Observer<HttpServer> observer = new Observer<HttpServer>(){

            public void onCompleted() {
                server.close();
                CoreApiTest.this.assertEquals(1L, count.get());
                CoreApiTest.this.testComplete();
            }

            public void onError(Throwable e) {
                CoreApiTest.this.fail(e.getMessage());
            }

            public void onNext(HttpServer httpServer) {
                count.incrementAndGet();
            }
        };
        Single onListen = server.rxListen();
        onListen.subscribe((Observer)observer);
        this.await();
    }

    @Test
    public void testObserverToHandler() throws Exception {
        final AtomicInteger count = new AtomicInteger();
        Observer<Long> observer = new Observer<Long>(){

            public void onCompleted() {
                CoreApiTest.this.assertEquals(1L, count.get());
                CoreApiTest.this.testComplete();
            }

            public void onError(Throwable e) {
                CoreApiTest.this.fail(e.getMessage());
            }

            public void onNext(Long l) {
                count.incrementAndGet();
            }
        };
        this.vertx.setTimer(1L, RxHelper.toHandler((Observer)observer));
        this.await();
    }

    @Test
    public void testHttpClient() {
        HttpServer server = this.vertx.createHttpServer(new HttpServerOptions().setPort(8080));
        server.requestStream().handler(req -> req.response().setChunked(true).end("some_content"));
        try {
            server.listen(ar -> {
                HttpClient client = this.vertx.createHttpClient(new HttpClientOptions());
                client.rxRequest(HttpMethod.GET, 8080, "localhost", "/the_uri").flatMap(HttpClientRequest::rxSend).subscribe(resp -> {
                    Buffer content = Buffer.buffer();
                    Observable observable = resp.toObservable();
                    observable.forEach(arg_0 -> ((Buffer)content).appendBuffer(arg_0), err -> this.fail(), () -> {
                        this.assertEquals("some_content", content.toString("UTF-8"));
                        this.testComplete();
                    });
                });
            });
            this.await();
        }
        finally {
            server.close();
        }
    }

    @Test
    public void testHttpClientFlatMap() {
        HttpServer server = this.vertx.createHttpServer(new HttpServerOptions().setPort(8080));
        server.requestStream().handler(req -> req.response().setChunked(true).end("some_content"));
        server.listen(ar -> {
            HttpClient client = this.vertx.createHttpClient(new HttpClientOptions());
            Single req = client.rxRequest(HttpMethod.GET, 8080, "localhost", "/the_uri").flatMap(HttpClientRequest::rxSend);
            Buffer content = Buffer.buffer();
            req.flatMapObservable(HttpClientResponse::toObservable).forEach(arg_0 -> ((Buffer)content).appendBuffer(arg_0), err -> this.fail(), () -> {
                server.close();
                this.assertEquals("some_content", content.toString("UTF-8"));
                this.testComplete();
            });
        });
        this.await();
    }

    @Test
    public void testHttpClientFlatMapUnmarshallPojo() {
        HttpServer server = this.vertx.createHttpServer(new HttpServerOptions().setPort(8080));
        server.requestStream().handler(req -> req.response().setChunked(true).end("{\"foo\":\"bar\"}"));
        server.listen(ar -> {
            HttpClient client = this.vertx.createHttpClient(new HttpClientOptions());
            Single req = client.rxRequest(HttpMethod.GET, 8080, "localhost", "/the_uri").flatMap(HttpClientRequest::rxSend);
            ArrayList objects = new ArrayList();
            req.flatMapObservable(HttpClientResponse::toObservable).lift(io.vertx.rxjava.core.RxHelper.unmarshaller(SimplePojo.class)).forEach(objects::add, err -> this.fail(), () -> {
                server.close();
                this.assertEquals(Arrays.asList(new SimplePojo("bar")), objects);
                this.testComplete();
            });
        });
        this.await();
    }

    @Test
    public void testHttpClientConnectionFailure() {
        HttpClient client = this.vertx.createHttpClient(new HttpClientOptions());
        Single req = client.rxRequest(HttpMethod.GET, 9998, "255.255.255.255", "/the_uri").flatMap(HttpClientRequest::rxSend);
        req.subscribe(resp -> this.fail(), err -> this.testComplete());
        this.await();
    }

    @Test
    public void testHttpClientConnectionFailureFlatMap() {
        HttpClient client = this.vertx.createHttpClient(new HttpClientOptions());
        Single req = client.rxRequest(HttpMethod.GET, 9998, "255.255.255.255", "/the_uri").flatMap(HttpClientRequest::rxSend);
        req.flatMapObservable(HttpClientResponse::toObservable).forEach(buffer -> this.fail(), err -> this.testComplete(), () -> ((CoreApiTest)this).fail());
        this.await();
    }

    @Test
    public void testWebsocketClient() {
        HttpServer server = this.vertx.createHttpServer(new HttpServerOptions().setPort(8080));
        server.webSocketStream().handler(ws -> {
            ws.write(Buffer.buffer((String)"some_content"));
            ws.close();
        });
        server.listen(ar -> {
            HttpClient client = this.vertx.createHttpClient(new HttpClientOptions());
            client.webSocket(8080, "localhost", "/the_uri", ar2 -> {
                if (ar2.succeeded()) {
                    WebSocket ws = (WebSocket)ar2.result();
                    Buffer content = Buffer.buffer();
                    Observable observable = ws.toObservable();
                    observable.forEach(arg_0 -> ((Buffer)content).appendBuffer(arg_0), err -> this.fail(), () -> {
                        server.close();
                        this.assertEquals("some_content", content.toString("UTF-8"));
                        this.testComplete();
                    });
                }
            });
        });
        this.await();
    }

    @Test
    public void testWebsocketClientFlatMap() {
        HttpServer server = this.vertx.createHttpServer(new HttpServerOptions().setPort(8080));
        server.webSocketStream().handler(ws -> {
            ws.write(Buffer.buffer((String)"some_content"));
            ws.close();
        });
        server.listen(ar -> {
            HttpClient client = this.vertx.createHttpClient(new HttpClientOptions());
            Buffer content = Buffer.buffer();
            client.rxWebSocket(8080, "localhost", "/the_uri").flatMapObservable(WebSocket::toObservable).forEach(arg_0 -> ((Buffer)content).appendBuffer(arg_0), err -> this.fail(), () -> {
                server.close();
                this.assertEquals("some_content", content.toString("UTF-8"));
                this.testComplete();
            });
        });
        this.await();
    }

    @Test
    public void testDeployVerticle() throws Exception {
        final CountDownLatch deployLatch = new CountDownLatch(2);
        io.vertx.rxjava.core.RxHelper.deployVerticle((Vertx)this.vertx, (Verticle)new AbstractVerticle(){

            public void start() {
                deployLatch.countDown();
            }
        }).subscribe(resp -> deployLatch.countDown());
        this.awaitLatch(deployLatch);
    }

    @Test
    public void testRecordParser() {
        Single source = this.vertx.fileSystem().rxOpen("src/test/resources/test.txt", new OpenOptions());
        this.waitFor(5);
        source.map(file -> RecordParser.newDelimited((String)"\n", (Observable)file.toObservable())).flatMapObservable(RecordParser::toObservable).doOnNext(v -> this.complete()).doOnCompleted(() -> this.complete()).ignoreElements().toCompletable().subscribe(() -> this.complete(), arg_0 -> ((CoreApiTest)this).fail(arg_0));
        this.await();
    }
}

