/*
 * Decompiled with CFR 0.152.
 */
package edu.cmu.graphchi.hadoop;

import edu.cmu.graphchi.ChiLogger;
import edu.cmu.graphchi.hadoop.HDFSGraphLoader;
import edu.cmu.graphchi.preprocessing.EdgeProcessor;
import edu.cmu.graphchi.preprocessing.FastSharder;
import java.io.IOException;
import java.util.logging.Logger;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.pig.Expression;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadMetadata;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.tools.pigstats.PigStatusReporter;

public abstract class PigGraphChiBase
extends LoadFunc
implements LoadMetadata {
    private static final Logger logger = ChiLogger.getLogger("pig-graphchi-base");
    private String location;
    private boolean activeNode = false;
    private Job job;
    private boolean ready = false;
    private String status = "initializing";

    protected PigGraphChiBase() {
    }

    protected abstract String getSchemaString();

    public ResourceSchema getSchema(String string, Job job) throws IOException {
        return null;
    }

    public ResourceStatistics getStatistics(String string, Job job) throws IOException {
        return null;
    }

    public String[] getPartitionKeys(String string, Job job) throws IOException {
        return null;
    }

    public void setPartitionFilter(Expression expression) throws IOException {
    }

    public InputFormat getInputFormat() throws IOException {
        return new PigTextInputFormat();
    }

    protected abstract int getNumShards();

    protected String getGraphName() {
        return "pigudfgraph";
    }

    public void setLocation(String string, Job job) throws IOException {
        logger.info("Set HDFS location for GraphChi Pig: " + string);
        PigTextInputFormat.setInputPaths((Job)job, (String)string);
        this.location = string;
        this.job = job;
    }

    public void setStatusString(String string) {
        this.status = string;
    }

    protected abstract void runGraphChi() throws Exception;

    protected abstract FastSharder createSharder(String var1, int var2) throws IOException;

    public void prepareToRead(RecordReader recordReader, final PigSplit pigSplit) throws IOException {
        try {
            int n = 0;
            for (String string : pigSplit.getLocations()) {
                System.out.println(n++ + "Split : " + string);
            }
            System.out.println("Num paths: " + pigSplit.getNumPaths());
            System.out.println("" + pigSplit.getConf());
            System.out.println("split index " + pigSplit.getSplitIndex());
            Thread thread = new Thread(new Runnable(){

                @Override
                public void run() {
                    int n = 0;
                    while (!PigGraphChiBase.this.ready) {
                        PigStatusReporter.getInstance().progress();
                        PigStatusReporter.getInstance().setStatus("GraphChi running (" + n + "): " + PigGraphChiBase.this.getStatusString());
                        try {
                            Thread.sleep(5000L);
                        }
                        catch (InterruptedException interruptedException) {}
                    }
                }
            });
            thread.start();
            if (pigSplit.getSplitIndex() > 0) {
                PigStatusReporter.getInstance().setStatus("Redundant GraphChi-mapper - will die");
                throw new RuntimeException("Split index > 0 -- this mapper will die (expected, not an error).");
            }
            this.activeNode = true;
            Thread thread2 = new Thread(new Runnable(){

                @Override
                public void run() {
                    try {
                        PigGraphChiBase.this.setStatusString("Preprocessing: reading data from HDFS: " + PigGraphChiBase.this.location);
                        final FastSharder fastSharder = PigGraphChiBase.this.createSharder(PigGraphChiBase.this.getGraphName(), PigGraphChiBase.this.getNumShards());
                        HDFSGraphLoader hDFSGraphLoader = new HDFSGraphLoader(PigGraphChiBase.this.location, new EdgeProcessor<Float>(){
                            long counter = 0L;

                            @Override
                            public Float receiveEdge(int n, int n2, String string) {
                                try {
                                    fastSharder.addEdge(n, n2, string);
                                    ++this.counter;
                                    if (this.counter % 100000L == 0L) {
                                        PigGraphChiBase.this.setStatusString("Preprocessing, read " + this.counter + " edges");
                                    }
                                }
                                catch (IOException iOException) {
                                    throw new RuntimeException(iOException);
                                }
                                return null;
                            }
                        });
                        hDFSGraphLoader.load(pigSplit.getConf());
                        PigGraphChiBase.this.setStatusString("Sharding...");
                        fastSharder.process();
                        logger.info("Starting to run GraphChi");
                        PigGraphChiBase.this.setStatusString("Start GraphChi engine");
                        PigGraphChiBase.this.runGraphChi();
                        logger.info("Ready.");
                    }
                    catch (Exception exception) {
                        exception.printStackTrace();
                    }
                    PigGraphChiBase.this.ready = true;
                }
            });
            thread2.start();
        }
        catch (Exception exception) {
            exception.printStackTrace();
        }
    }

    protected String getStatusString() {
        return this.status;
    }

    protected abstract Tuple getNextResult(TupleFactory var1) throws ExecException;

    public Tuple getNext() throws IOException {
        if (!this.activeNode) {
            return null;
        }
        while (!this.ready) {
            logger.info("GraphChi-Java running: waiting for graphchi-engine to finish: " + this.getStatusString());
            PigStatusReporter.getInstance().setStatus(this.getStatusString());
            PigStatusReporter.getInstance().progress();
            try {
                Thread.sleep(5000L);
            }
            catch (InterruptedException interruptedException) {}
        }
        return this.getNextResult(TupleFactory.getInstance());
    }
}

