/*
 * Decompiled with CFR 0.152.
 */
package io.reactivesocket.test;

import io.reactivesocket.Payload;
import io.reactivesocket.ReactiveSocket;
import io.reactivesocket.client.KeepAliveProvider;
import io.reactivesocket.client.ReactiveSocketClient;
import io.reactivesocket.client.SetupProvider;
import io.reactivesocket.transport.TransportClient;
import io.reactivesocket.util.PayloadImpl;
import io.reactivex.subscribers.TestSubscriber;
import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.function.Function;
import org.junit.Assert;
import org.junit.rules.ExternalResource;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;

public class ClientSetupRule
extends ExternalResource {
    private final Callable<SocketAddress> serverStarter;
    private final Function<SocketAddress, TransportClient> clientFactory;
    private SocketAddress serverAddress;
    private ReactiveSocket reactiveSocket;
    private ReactiveSocketClient reactiveSocketClient;

    public ClientSetupRule(Function<SocketAddress, TransportClient> clientFactory, Callable<SocketAddress> serverStarter) {
        this.clientFactory = clientFactory;
        this.serverStarter = serverStarter;
    }

    public Statement apply(final Statement base, Description description) {
        return new Statement(){

            public void evaluate() throws Throwable {
                ClientSetupRule.this.serverAddress = (SocketAddress)ClientSetupRule.this.serverStarter.call();
                TransportClient client = (TransportClient)ClientSetupRule.this.clientFactory.apply(ClientSetupRule.this.serverAddress);
                SetupProvider setup = SetupProvider.keepAlive((KeepAliveProvider)KeepAliveProvider.never()).disableLease();
                ClientSetupRule.this.reactiveSocketClient = ReactiveSocketClient.create((TransportClient)client, (SetupProvider)setup);
                base.evaluate();
            }
        };
    }

    public ReactiveSocketClient getClient() {
        return this.reactiveSocketClient;
    }

    public SocketAddress getServerAddress() {
        return this.serverAddress;
    }

    public ReactiveSocket getReactiveSocket() {
        if (null == this.reactiveSocket) {
            this.reactiveSocket = (ReactiveSocket)this.reactiveSocketClient.connect().block();
        }
        return this.reactiveSocket;
    }

    public void testFireAndForget(int count) {
        TestSubscriber ts = TestSubscriber.create();
        Flux.range((int)1, (int)count).flatMap(i -> this.getReactiveSocket().fireAndForget((Payload)new PayloadImpl("hello", "metadata"))).doOnError(Throwable::printStackTrace).subscribe((Subscriber)ts);
        ClientSetupRule.await(ts);
        ts.assertTerminated();
        ts.assertNoErrors();
        ts.assertTerminated();
    }

    public void testMetadata(int count) {
        TestSubscriber ts = TestSubscriber.create();
        Flux.range((int)1, (int)count).flatMap(i -> this.getReactiveSocket().metadataPush((Payload)new PayloadImpl("", "metadata"))).doOnError(Throwable::printStackTrace).subscribe((Subscriber)ts);
        ClientSetupRule.await(ts);
        ts.assertTerminated();
        ts.assertNoErrors();
        ts.assertTerminated();
    }

    public void testRequestResponseN(int count) {
        TestSubscriber ts = TestSubscriber.create();
        Flux.range((int)1, (int)count).flatMap(i -> this.getReactiveSocket().requestResponse((Payload)new PayloadImpl("hello", "metadata")).map(payload -> StandardCharsets.UTF_8.decode(payload.getData()).toString())).doOnError(Throwable::printStackTrace).subscribe((Subscriber)ts);
        ClientSetupRule.await(ts);
        ts.assertTerminated();
        ts.assertValueCount(count);
        ts.assertNoErrors();
        ts.assertTerminated();
    }

    public void testRequestStream() {
        this.testStream(socket -> socket.requestStream((Payload)new PayloadImpl("hello", "metadata")));
    }

    public void testRequestStreamWithRequestN() {
        this.testStreamRequestN(socket -> socket.requestStream((Payload)new PayloadImpl("hello", "metadata")));
    }

    private void testStreamRequestN(Function<ReactiveSocket, Flux<Payload>> invoker) {
        int count = 10;
        CountDownLatch latch = new CountDownLatch(count);
        TestSubscriber ts = TestSubscriber.create((long)(count / 2));
        Flux<Payload> publisher = invoker.apply(this.getReactiveSocket());
        publisher.doOnNext(s -> latch.countDown()).subscribe((Subscriber)ts);
        ts.request((long)(count / 2));
        try {
            latch.await();
        }
        catch (InterruptedException e) {
            Assert.fail((String)e.getMessage());
        }
        ts.assertNoErrors();
        ts.assertValueCount(count);
        ts.assertNotTerminated();
    }

    private void testStream(Function<ReactiveSocket, Flux<Payload>> invoker) {
        TestSubscriber ts = TestSubscriber.create();
        Flux<Payload> publisher = invoker.apply(this.getReactiveSocket());
        publisher.take(5L).subscribe((Subscriber)ts);
        ClientSetupRule.await(ts);
        ts.assertTerminated();
        ts.assertNoErrors();
        ts.assertValueCount(5);
        ts.assertTerminated();
    }

    private static void await(TestSubscriber<?> ts) {
        try {
            ts.await();
        }
        catch (InterruptedException e) {
            Assert.fail((String)"Interrupted while waiting for completion.");
        }
    }
}

