/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.driver.internal.reactive;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Stream;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.neo4j.driver.AccessMode;
import org.neo4j.driver.Query;
import org.neo4j.driver.Record;
import org.neo4j.driver.TransactionConfig;
import org.neo4j.driver.Value;
import org.neo4j.driver.Values;
import org.neo4j.driver.internal.InternalRecord;
import org.neo4j.driver.internal.async.NetworkSession;
import org.neo4j.driver.internal.async.UnmanagedTransaction;
import org.neo4j.driver.internal.cursor.RxResultCursor;
import org.neo4j.driver.internal.cursor.RxResultCursorImpl;
import org.neo4j.driver.internal.reactive.InternalRxResult;
import org.neo4j.driver.internal.reactive.InternalRxSession;
import org.neo4j.driver.internal.util.FixedRetryLogic;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.internal.value.IntegerValue;
import org.neo4j.driver.reactive.RxResult;
import org.neo4j.driver.reactive.RxSession;
import org.neo4j.driver.reactive.RxTransaction;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

class InternalRxSessionTest {
    InternalRxSessionTest() {
    }

    private static Stream<Function<RxSession, RxResult>> allSessionRunMethods() {
        return Stream.of(rxSession -> rxSession.run("RETURN 1"), rxSession -> rxSession.run("RETURN $x", Values.parameters((Object[])new Object[]{"x", 1})), rxSession -> rxSession.run("RETURN $x", Collections.singletonMap("x", 1)), rxSession -> rxSession.run("RETURN $x", (Record)new InternalRecord(Collections.singletonList("x"), new Value[]{new IntegerValue(1L)})), rxSession -> rxSession.run(new Query("RETURN $x", Values.parameters((Object[])new Object[]{"x", 1}))), rxSession -> rxSession.run(new Query("RETURN $x", Values.parameters((Object[])new Object[]{"x", 1})), TransactionConfig.empty()), rxSession -> rxSession.run("RETURN $x", Collections.singletonMap("x", 1), TransactionConfig.empty()), rxSession -> rxSession.run("RETURN 1", TransactionConfig.empty()));
    }

    private static Stream<Function<RxSession, Publisher<RxTransaction>>> allBeginTxMethods() {
        return Stream.of(rxSession -> rxSession.beginTransaction(), rxSession -> rxSession.beginTransaction(TransactionConfig.empty()));
    }

    private static Stream<Function<RxSession, Publisher<String>>> allRunTxMethods() {
        return Stream.of(rxSession -> rxSession.readTransaction(tx -> Flux.just((Object)"a")), rxSession -> rxSession.writeTransaction(tx -> Flux.just((Object)"a")), rxSession -> rxSession.readTransaction(tx -> Flux.just((Object)"a"), TransactionConfig.empty()), rxSession -> rxSession.writeTransaction(tx -> Flux.just((Object)"a"), TransactionConfig.empty()));
    }

    @ParameterizedTest
    @MethodSource(value={"allSessionRunMethods"})
    void shouldDelegateRun(Function<RxSession, RxResult> runReturnOne) throws Throwable {
        NetworkSession session = (NetworkSession)Mockito.mock(NetworkSession.class);
        RxResultCursor cursor = (RxResultCursor)Mockito.mock(RxResultCursorImpl.class);
        Mockito.when((Object)session.runRx((Query)ArgumentMatchers.any(Query.class), (TransactionConfig)ArgumentMatchers.any(TransactionConfig.class))).thenReturn(CompletableFuture.completedFuture(cursor));
        InternalRxSession rxSession = new InternalRxSession(session);
        RxResult result = runReturnOne.apply((RxSession)rxSession);
        CompletionStage cursorFuture = (CompletionStage)((InternalRxResult)result).cursorFutureSupplier().get();
        ((NetworkSession)Mockito.verify((Object)session)).runRx((Query)ArgumentMatchers.any(Query.class), (TransactionConfig)ArgumentMatchers.any(TransactionConfig.class));
        Assert.assertThat((Object)((RxResultCursor)Futures.getNow((CompletionStage)cursorFuture)), (Matcher)CoreMatchers.equalTo((Object)cursor));
    }

    @ParameterizedTest
    @MethodSource(value={"allSessionRunMethods"})
    void shouldReleaseConnectionIfFailedToRun(Function<RxSession, RxResult> runReturnOne) throws Throwable {
        RuntimeException error = new RuntimeException("Hi there");
        NetworkSession session = (NetworkSession)Mockito.mock(NetworkSession.class);
        Mockito.when((Object)session.runRx((Query)ArgumentMatchers.any(Query.class), (TransactionConfig)ArgumentMatchers.any(TransactionConfig.class))).thenReturn((Object)Futures.failedFuture((Throwable)error));
        Mockito.when((Object)session.releaseConnectionAsync()).thenReturn((Object)Futures.completedWithNull());
        InternalRxSession rxSession = new InternalRxSession(session);
        RxResult result = runReturnOne.apply((RxSession)rxSession);
        CompletionStage cursorFuture = (CompletionStage)((InternalRxResult)result).cursorFutureSupplier().get();
        ((NetworkSession)Mockito.verify((Object)session)).runRx((Query)ArgumentMatchers.any(Query.class), (TransactionConfig)ArgumentMatchers.any(TransactionConfig.class));
        RuntimeException t = (RuntimeException)Assertions.assertThrows(CompletionException.class, () -> Futures.getNow((CompletionStage)cursorFuture));
        Assert.assertThat((Object)t.getCause(), (Matcher)CoreMatchers.equalTo((Object)error));
        ((NetworkSession)Mockito.verify((Object)session)).releaseConnectionAsync();
    }

    @ParameterizedTest
    @MethodSource(value={"allBeginTxMethods"})
    void shouldDelegateBeginTx(Function<RxSession, Publisher<RxTransaction>> beginTx) throws Throwable {
        NetworkSession session = (NetworkSession)Mockito.mock(NetworkSession.class);
        UnmanagedTransaction tx = (UnmanagedTransaction)Mockito.mock(UnmanagedTransaction.class);
        Mockito.when((Object)session.beginTransactionAsync((TransactionConfig)ArgumentMatchers.any(TransactionConfig.class))).thenReturn(CompletableFuture.completedFuture(tx));
        InternalRxSession rxSession = new InternalRxSession(session);
        Publisher<RxTransaction> rxTx = beginTx.apply((RxSession)rxSession);
        StepVerifier.create((Publisher)Mono.from(rxTx)).expectNextCount(1L).verifyComplete();
        ((NetworkSession)Mockito.verify((Object)session)).beginTransactionAsync((TransactionConfig)ArgumentMatchers.any(TransactionConfig.class));
    }

    @ParameterizedTest
    @MethodSource(value={"allBeginTxMethods"})
    void shouldReleaseConnectionIfFailedToBeginTx(Function<RxSession, Publisher<RxTransaction>> beginTx) throws Throwable {
        RuntimeException error = new RuntimeException("Hi there");
        NetworkSession session = (NetworkSession)Mockito.mock(NetworkSession.class);
        Mockito.when((Object)session.beginTransactionAsync((TransactionConfig)ArgumentMatchers.any(TransactionConfig.class))).thenReturn((Object)Futures.failedFuture((Throwable)error));
        Mockito.when((Object)session.releaseConnectionAsync()).thenReturn((Object)Futures.completedWithNull());
        InternalRxSession rxSession = new InternalRxSession(session);
        Publisher<RxTransaction> rxTx = beginTx.apply((RxSession)rxSession);
        CompletableFuture txFuture = Mono.from(rxTx).toFuture();
        ((NetworkSession)Mockito.verify((Object)session)).beginTransactionAsync((TransactionConfig)ArgumentMatchers.any(TransactionConfig.class));
        RuntimeException t = (RuntimeException)Assertions.assertThrows(CompletionException.class, () -> Futures.getNow((CompletionStage)txFuture));
        Assert.assertThat((Object)t.getCause(), (Matcher)CoreMatchers.equalTo((Object)error));
        ((NetworkSession)Mockito.verify((Object)session)).releaseConnectionAsync();
    }

    @ParameterizedTest
    @MethodSource(value={"allRunTxMethods"})
    void shouldDelegateRunTx(Function<RxSession, Publisher<String>> runTx) throws Throwable {
        NetworkSession session = (NetworkSession)Mockito.mock(NetworkSession.class);
        UnmanagedTransaction tx = (UnmanagedTransaction)Mockito.mock(UnmanagedTransaction.class);
        Mockito.when((Object)tx.closeAsync(true)).thenReturn((Object)Futures.completedWithNull());
        Mockito.when((Object)session.beginTransactionAsync((AccessMode)ArgumentMatchers.any(AccessMode.class), (TransactionConfig)ArgumentMatchers.any(TransactionConfig.class))).thenReturn(CompletableFuture.completedFuture(tx));
        Mockito.when((Object)session.retryLogic()).thenReturn((Object)new FixedRetryLogic(1));
        InternalRxSession rxSession = new InternalRxSession(session);
        Publisher<String> strings = runTx.apply((RxSession)rxSession);
        StepVerifier.create((Publisher)Flux.from(strings)).expectNext((Object)"a").verifyComplete();
        ((NetworkSession)Mockito.verify((Object)session)).beginTransactionAsync((AccessMode)ArgumentMatchers.any(AccessMode.class), (TransactionConfig)ArgumentMatchers.any(TransactionConfig.class));
        ((UnmanagedTransaction)Mockito.verify((Object)tx)).closeAsync(true);
    }

    @Test
    void shouldRetryOnError() throws Throwable {
        int retryCount = 2;
        NetworkSession session = (NetworkSession)Mockito.mock(NetworkSession.class);
        UnmanagedTransaction tx = (UnmanagedTransaction)Mockito.mock(UnmanagedTransaction.class);
        Mockito.when((Object)tx.closeAsync(false)).thenReturn((Object)Futures.completedWithNull());
        Mockito.when((Object)session.beginTransactionAsync((AccessMode)ArgumentMatchers.any(AccessMode.class), (TransactionConfig)ArgumentMatchers.any(TransactionConfig.class))).thenReturn(CompletableFuture.completedFuture(tx));
        Mockito.when((Object)session.retryLogic()).thenReturn((Object)new FixedRetryLogic(retryCount));
        InternalRxSession rxSession = new InternalRxSession(session);
        Publisher strings = rxSession.readTransaction(t -> Flux.just((Object)"a").then(Mono.error((Throwable)new RuntimeException("Errored"))));
        StepVerifier.create((Publisher)Flux.from((Publisher)strings)).expectError(RuntimeException.class).verify();
        ((NetworkSession)Mockito.verify((Object)session, (VerificationMode)Mockito.times((int)(retryCount + 1)))).beginTransactionAsync((AccessMode)ArgumentMatchers.any(AccessMode.class), (TransactionConfig)ArgumentMatchers.any(TransactionConfig.class));
        ((UnmanagedTransaction)Mockito.verify((Object)tx, (VerificationMode)Mockito.times((int)(retryCount + 1)))).closeAsync(false);
    }

    @Test
    void shouldObtainResultIfRetrySucceed() throws Throwable {
        int retryCount = 2;
        NetworkSession session = (NetworkSession)Mockito.mock(NetworkSession.class);
        UnmanagedTransaction tx = (UnmanagedTransaction)Mockito.mock(UnmanagedTransaction.class);
        Mockito.when((Object)tx.closeAsync(false)).thenReturn((Object)Futures.completedWithNull());
        Mockito.when((Object)tx.closeAsync(true)).thenReturn((Object)Futures.completedWithNull());
        Mockito.when((Object)session.beginTransactionAsync((AccessMode)ArgumentMatchers.any(AccessMode.class), (TransactionConfig)ArgumentMatchers.any(TransactionConfig.class))).thenReturn(CompletableFuture.completedFuture(tx));
        Mockito.when((Object)session.retryLogic()).thenReturn((Object)new FixedRetryLogic(retryCount));
        InternalRxSession rxSession = new InternalRxSession(session);
        AtomicInteger count = new AtomicInteger();
        Publisher strings = rxSession.readTransaction(t -> {
            if (count.getAndIncrement() == retryCount) {
                return Flux.just((Object)"a");
            }
            return Flux.just((Object)"a").then(Mono.error((Throwable)new RuntimeException("Errored")));
        });
        StepVerifier.create((Publisher)Flux.from((Publisher)strings)).expectNext((Object)"a").verifyComplete();
        ((NetworkSession)Mockito.verify((Object)session, (VerificationMode)Mockito.times((int)(retryCount + 1)))).beginTransactionAsync((AccessMode)ArgumentMatchers.any(AccessMode.class), (TransactionConfig)ArgumentMatchers.any(TransactionConfig.class));
        ((UnmanagedTransaction)Mockito.verify((Object)tx, (VerificationMode)Mockito.times((int)retryCount))).closeAsync(false);
        ((UnmanagedTransaction)Mockito.verify((Object)tx)).closeAsync(true);
    }

    @Test
    void shouldDelegateBookmark() throws Throwable {
        NetworkSession session = (NetworkSession)Mockito.mock(NetworkSession.class);
        InternalRxSession rxSession = new InternalRxSession(session);
        rxSession.lastBookmark();
        ((NetworkSession)Mockito.verify((Object)session)).lastBookmark();
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{session});
    }

    @Test
    void shouldDelegateReset() throws Throwable {
        NetworkSession session = (NetworkSession)Mockito.mock(NetworkSession.class);
        Mockito.when((Object)session.resetAsync()).thenReturn((Object)Futures.completedWithNull());
        InternalRxSession rxSession = new InternalRxSession(session);
        Publisher mono = rxSession.reset();
        StepVerifier.create((Publisher)mono).verifyComplete();
        ((NetworkSession)Mockito.verify((Object)session)).resetAsync();
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{session});
    }

    @Test
    void shouldDelegateClose() throws Throwable {
        NetworkSession session = (NetworkSession)Mockito.mock(NetworkSession.class);
        Mockito.when((Object)session.closeAsync()).thenReturn((Object)Futures.completedWithNull());
        InternalRxSession rxSession = new InternalRxSession(session);
        Publisher mono = rxSession.close();
        StepVerifier.create((Publisher)mono).verifyComplete();
        ((NetworkSession)Mockito.verify((Object)session)).closeAsync();
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{session});
    }
}

