/*
 * Decompiled with CFR 0.152.
 */
package com.github.davidmoten.rx.operators;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.util.concurrent.atomic.AtomicLong;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.observables.StringObservable;
import rx.observers.Subscribers;
import rx.subjects.PublishSubject;
import rx.subscriptions.Subscriptions;

public class OperatorFileTailer
implements Observable.Operator<byte[], Object> {
    private final File file;
    private final AtomicLong currentPosition = new AtomicLong();
    private final int maxBytesPerEmission;

    public OperatorFileTailer(File file, long startPosition, int maxBytesPerEmission) {
        if (file == null) {
            throw new NullPointerException("file cannot be null");
        }
        this.file = file;
        this.currentPosition.set(startPosition);
        this.maxBytesPerEmission = maxBytesPerEmission;
    }

    public OperatorFileTailer(File file, long startPosition) {
        this(file, startPosition, 8192);
    }

    public Subscriber<? super Object> call(Subscriber<? super byte[]> child) {
        PublishSubject subject = PublishSubject.create();
        Subscriber parent = Subscribers.from((Observer)subject);
        child.add((Subscription)parent);
        subject.concatMap(OperatorFileTailer.reportNewLines(this.file, this.currentPosition, this.maxBytesPerEmission)).unsafeSubscribe(child);
        return parent;
    }

    private static Func1<Object, Observable<byte[]>> reportNewLines(final File file, final AtomicLong currentPosition, final int maxBytesPerEmission) {
        return new Func1<Object, Observable<byte[]>>(){

            public Observable<byte[]> call(Object event) {
                long length;
                WatchEvent w;
                String kind;
                if (event instanceof WatchEvent && (kind = (w = (WatchEvent)event).kind().name()).equals(StandardWatchEventKinds.ENTRY_CREATE.name())) {
                    currentPosition.set(0L);
                }
                if ((length = file.length()) > currentPosition.get()) {
                    try {
                        FileInputStream fis = new FileInputStream(file);
                        fis.skip(currentPosition.get());
                        Func0 subscriptionFactory = OperatorFileTailer.createSubscriptionFactory(fis);
                        Func1 observableFactory = OperatorFileTailer.createObservableFactory(fis, currentPosition, maxBytesPerEmission);
                        return Observable.using((Func0)subscriptionFactory, (Func1)observableFactory);
                    }
                    catch (IOException e) {
                        return Observable.error((Throwable)e);
                    }
                }
                return Observable.empty();
            }
        };
    }

    private static Func0<Subscription> createSubscriptionFactory(final InputStream is) {
        return new Func0<Subscription>(){

            public Subscription call() {
                return Subscriptions.create((Action0)new Action0(){

                    public void call() {
                        try {
                            is.close();
                        }
                        catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                    }
                });
            }
        };
    }

    private static Func1<Subscription, Observable<byte[]>> createObservableFactory(final FileInputStream fis, final AtomicLong currentPosition, final int maxBytesPerEmission) {
        return new Func1<Subscription, Observable<byte[]>>(){

            public Observable<byte[]> call(Subscription subscription) {
                return StringObservable.from((InputStream)fis, (int)maxBytesPerEmission).doOnNext((Action1)new Action1<byte[]>(){

                    public void call(byte[] bytes) {
                        currentPosition.addAndGet(bytes.length);
                    }
                });
            }
        };
    }
}

