/*
 * Decompiled with CFR 0.152.
 */
package com.github.davidmoten.rx.operators;

import java.io.IOException;
import java.nio.file.ClosedWatchServiceException;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.concurrent.atomic.AtomicBoolean;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.observers.Subscribers;

public class OperatorWatchServiceEvents
implements Observable.Operator<WatchEvent<?>, WatchService> {
    public Subscriber<? super WatchService> call(final Subscriber<? super WatchEvent<?>> subscriber) {
        Subscriber result = Subscribers.from((Observer)new Observer<WatchService>(){

            public void onCompleted() {
                subscriber.onCompleted();
            }

            public void onError(Throwable e) {
                subscriber.onError(e);
            }

            public void onNext(WatchService watchService) {
                AtomicBoolean subscribed = new AtomicBoolean(true);
                if (!subscribed.get()) {
                    subscriber.onError((Throwable)new RuntimeException("WatchService closed. You can only subscribe once to a WatchService."));
                    return;
                }
                subscriber.add(OperatorWatchServiceEvents.createSubscriptionToCloseWatchService(watchService, subscribed, subscriber));
                OperatorWatchServiceEvents.emitEvents(watchService, subscriber, subscribed);
            }
        });
        subscriber.add((Subscription)result);
        return result;
    }

    private static void emitEvents(WatchService watchService, Subscriber<? super WatchEvent<?>> subscriber, AtomicBoolean subscribed) {
        WatchKey key = OperatorWatchServiceEvents.nextKey(watchService, subscriber, subscribed);
        while (key != null) {
            if (subscriber.isUnsubscribed()) {
                return;
            }
            for (WatchEvent<?> event : key.pollEvents()) {
                if (subscriber.isUnsubscribed()) {
                    return;
                }
                subscriber.onNext(event);
            }
            boolean valid = key.reset();
            if (!valid && subscribed.get()) {
                subscriber.onCompleted();
                return;
            }
            if (!valid) {
                return;
            }
            key = OperatorWatchServiceEvents.nextKey(watchService, subscriber, subscribed);
        }
    }

    private static WatchKey nextKey(WatchService watchService, Subscriber<? super WatchEvent<?>> subscriber, AtomicBoolean subscribed) {
        try {
            return watchService.take();
        }
        catch (ClosedWatchServiceException e) {
            if (subscribed.get()) {
                subscriber.onCompleted();
            }
            return null;
        }
        catch (InterruptedException e) {
            try {
                watchService.close();
            }
            catch (IOException iOException) {
                // empty catch block
            }
            return null;
        }
    }

    private static final Subscription createSubscriptionToCloseWatchService(final WatchService watchService, final AtomicBoolean subscribed, Subscriber<? super WatchEvent<?>> subscriber) {
        return new Subscription(){

            public void unsubscribe() {
                try {
                    watchService.close();
                }
                catch (ClosedWatchServiceException closedWatchServiceException) {
                }
                catch (IOException iOException) {
                }
                finally {
                    subscribed.set(false);
                }
            }

            public boolean isUnsubscribed() {
                return !subscribed.get();
            }
        };
    }
}

