/*
 * Decompiled with CFR 0.152.
 */
package io.reactivesocket.client.filter;

import io.reactivesocket.ReactiveSocket;
import io.reactivesocket.reactivestreams.extensions.Px;
import io.reactivesocket.reactivestreams.extensions.Scheduler;
import io.reactivesocket.reactivestreams.extensions.internal.subscribers.Subscribers;
import io.reactivesocket.util.ReactiveSocketDecorator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

public final class ReactiveSockets {
    private ReactiveSockets() {
    }

    public static Function<ReactiveSocket, ReactiveSocket> timeout(long timeout, TimeUnit unit, Scheduler scheduler) {
        return src -> ReactiveSocketDecorator.wrap((ReactiveSocket)src).decorateAllResponses(ReactiveSockets._timeout(timeout, unit, scheduler)).decorateAllVoidResponses(ReactiveSockets._timeout(timeout, unit, scheduler)).finish();
    }

    public static Function<ReactiveSocket, ReactiveSocket> safeClose() {
        return src -> {
            AtomicInteger count = new AtomicInteger();
            AtomicBoolean closed = new AtomicBoolean();
            return ReactiveSocketDecorator.wrap((ReactiveSocket)src).close(reactiveSocket -> Px.defer(() -> {
                if (closed.compareAndSet(false, true)) {
                    if (count.get() == 0) {
                        return src.close();
                    }
                    return src.onClose();
                }
                return src.onClose();
            })).decorateAllResponses(ReactiveSockets._safeClose(src, closed, count)).decorateAllVoidResponses(ReactiveSockets._safeClose(src, closed, count)).finish();
        };
    }

    private static <T> Function<Publisher<T>, Publisher<T>> _timeout(long timeout, TimeUnit unit, Scheduler scheduler) {
        return t -> Px.from((Publisher)t).timeout(timeout, unit, scheduler);
    }

    private static <T> Function<Publisher<T>, Publisher<T>> _safeClose(ReactiveSocket src, AtomicBoolean closed, AtomicInteger count) {
        return t -> Px.from((Publisher)t).doOnSubscribe(s -> count.incrementAndGet()).doOnTerminate(() -> {
            if (count.decrementAndGet() == 0 && closed.get()) {
                src.close().subscribe((Subscriber)Subscribers.empty());
            }
        });
    }
}

