/*
 * Decompiled with CFR 0.152.
 */
package com.github.davidmoten.rx.operators;

import com.github.davidmoten.rx.Bytes;
import com.github.davidmoten.rx.subjects.PublishSubjectSingleSubscriber;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.util.concurrent.atomic.AtomicLong;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.observers.Subscribers;

public class OperatorFileTailer
implements Observable.Operator<byte[], Object> {
    private final File file;
    private final AtomicLong currentPosition = new AtomicLong();
    private final int maxBytesPerEmission;

    public OperatorFileTailer(File file, long startPosition, int maxBytesPerEmission) {
        if (file == null) {
            throw new NullPointerException("file cannot be null");
        }
        this.file = file;
        this.currentPosition.set(startPosition);
        this.maxBytesPerEmission = maxBytesPerEmission;
    }

    public OperatorFileTailer(File file, long startPosition) {
        this(file, startPosition, 8192);
    }

    public Subscriber<? super Object> call(Subscriber<? super byte[]> child) {
        PublishSubjectSingleSubscriber subject = PublishSubjectSingleSubscriber.create();
        Subscriber parent = Subscribers.from((Observer)subject);
        child.add((Subscription)parent);
        subject.concatMap(OperatorFileTailer.reportNewLines(this.file, this.currentPosition, this.maxBytesPerEmission)).unsafeSubscribe(child);
        return parent;
    }

    private static Func1<Object, Observable<byte[]>> reportNewLines(final File file, final AtomicLong currentPosition, final int maxBytesPerEmission) {
        return new Func1<Object, Observable<byte[]>>(){

            public Observable<byte[]> call(Object event) {
                long length;
                WatchEvent w;
                String kind;
                if (event instanceof WatchEvent && (kind = (w = (WatchEvent)event).kind().name()).equals(StandardWatchEventKinds.ENTRY_CREATE.name())) {
                    currentPosition.set(0L);
                }
                if ((length = file.length()) > currentPosition.get()) {
                    try {
                        final FileInputStream fis = new FileInputStream(file);
                        fis.skip(currentPosition.get());
                        return Observable.using((Func0)new Func0<InputStream>(){

                            public InputStream call() {
                                return fis;
                            }
                        }, (Func1)new Func1<InputStream, Observable<byte[]>>(){

                            public Observable<byte[]> call(InputStream t1) {
                                return Bytes.from((InputStream)fis, (int)maxBytesPerEmission).doOnNext((Action1)new Action1<byte[]>(){

                                    public void call(byte[] bytes) {
                                        currentPosition.addAndGet(bytes.length);
                                    }
                                });
                            }
                        }, (Action1)new Action1<InputStream>(){

                            public void call(InputStream is) {
                                try {
                                    is.close();
                                }
                                catch (IOException iOException) {
                                    // empty catch block
                                }
                            }
                        });
                    }
                    catch (IOException e) {
                        return Observable.error((Throwable)e);
                    }
                }
                return Observable.empty();
            }
        };
    }
}

