/*
 * Decompiled with CFR 0.152.
 */
package com.github.davidmoten.rx;

import com.github.davidmoten.rx.CloseableObservableWithReset;
import com.github.davidmoten.rx.internal.operators.OperatorCollectWhile;
import com.github.davidmoten.rx.internal.operators.OrderedMerge;
import com.github.davidmoten.rx.observables.CachedObservable;
import com.github.davidmoten.util.Optional;
import java.util.Collection;
import java.util.Comparator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import rx.Observable;
import rx.Producer;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Func0;
import rx.functions.Func2;

public final class Obs {
    public static <T> CachedObservable<T> cache(Observable<T> source) {
        return new CachedObservable<T>(source);
    }

    public static <T> Observable<T> cache(Observable<T> source, final long duration, final TimeUnit unit, final Scheduler.Worker worker) {
        final AtomicReference<CachedObservable<T>> cacheRef = new AtomicReference<CachedObservable<T>>();
        CachedObservable<T> cache = new CachedObservable<T>(source);
        cacheRef.set(cache);
        return cache.doOnSubscribe(new Action0(){

            public void call() {
                Action0 action = new Action0(){

                    public void call() {
                        ((CachedObservable)((Object)cacheRef.get())).reset();
                    }
                };
                worker.schedule(action, duration, unit);
            }
        });
    }

    public static <T> CloseableObservableWithReset<T> cache(Observable<T> source, final long duration, final TimeUnit unit, final Scheduler scheduler) {
        final AtomicReference<CachedObservable<T>> cacheRef = new AtomicReference<CachedObservable<T>>();
        final AtomicReference workerRef = new AtomicReference(Optional.absent());
        CachedObservable<T> cache = new CachedObservable<T>(source);
        cacheRef.set(cache);
        Action0 closeAction = new Action0(){

            public void call() {
                Optional w;
                while ((w = (Optional)workerRef.get()) != null) {
                    if (!workerRef.compareAndSet(w, null)) continue;
                    if (w.isPresent()) {
                        ((Scheduler.Worker)w.get()).unsubscribe();
                    }
                    workerRef.set(null);
                    break;
                }
            }
        };
        Action0 resetAction = new Action0(){

            public void call() {
                Obs.startScheduledResetAgain(duration, unit, scheduler, cacheRef, workerRef);
            }
        };
        return new CloseableObservableWithReset<T>(cache, closeAction, resetAction);
    }

    private static <T> void startScheduledResetAgain(long duration, TimeUnit unit, Scheduler scheduler, final AtomicReference<CachedObservable<T>> cacheRef, AtomicReference<Optional<Scheduler.Worker>> workerRef) {
        Optional<Scheduler.Worker> w;
        Optional<Scheduler.Worker> wOld;
        Action0 action = new Action0(){

            public void call() {
                ((CachedObservable)((Object)cacheRef.get())).reset();
            }
        };
        do {
            if ((wOld = workerRef.get()) != null) continue;
            return;
        } while (!workerRef.compareAndSet(wOld, w = Optional.of(scheduler.createWorker())));
        if (wOld.isPresent()) {
            wOld.get().unsubscribe();
        }
        w.get().schedule(action, duration, unit);
    }

    public static <T> Observable<T> repeating(final T t) {
        return Observable.create((Observable.OnSubscribe)new Observable.OnSubscribe<T>(){

            public void call(final Subscriber<? super T> subscriber) {
                subscriber.setProducer(new Producer(){

                    public void request(long n) {
                        while (n-- > 0L && !subscriber.isUnsubscribed()) {
                            subscriber.onNext(t);
                        }
                    }
                });
            }
        });
    }

    public static <T, R> Observable<R> collectWhile(Observable<T> source, Func0<R> factory, Func2<R, T, R> aggregator, Func2<R, T, Boolean> condition) {
        return source.lift(new OperatorCollectWhile<R, T>(factory, aggregator, condition));
    }

    public static <T extends Comparable<? super T>> Observable<T> create(Collection<Observable<? extends T>> sources) {
        return Obs.create(sources, false);
    }

    public static <T> Observable<T> create(Collection<Observable<? extends T>> sources, Comparator<? super T> comparator) {
        return Obs.create(sources, comparator, false);
    }

    public static <T extends Comparable<? super T>> Observable<T> create(Collection<Observable<? extends T>> sources, boolean delayErrors) {
        return OrderedMerge.create(sources, delayErrors);
    }

    public static <T> Observable<T> create(Collection<Observable<? extends T>> sources, Comparator<? super T> comparator, boolean delayErrors) {
        return OrderedMerge.create(sources, comparator, delayErrors);
    }
}

