diff --git a/src/main/java/com/thinkaurelius/faunus/FaunusPipeline.java b/src/main/java/com/thinkaurelius/faunus/FaunusPipeline.java index b9e18939..3973044a 100644 --- a/src/main/java/com/thinkaurelius/faunus/FaunusPipeline.java +++ b/src/main/java/com/thinkaurelius/faunus/FaunusPipeline.java @@ -3,6 +3,7 @@ import com.thinkaurelius.faunus.formats.EdgeCopyMapReduce; import com.thinkaurelius.faunus.formats.MapReduceFormat; import com.thinkaurelius.faunus.mapreduce.FaunusCompiler; +import com.thinkaurelius.faunus.mapreduce.FaunusTool; import com.thinkaurelius.faunus.mapreduce.IdentityMap; import com.thinkaurelius.faunus.mapreduce.filter.BackFilterMapReduce; import com.thinkaurelius.faunus.mapreduce.filter.CyclicPathFilterMap; @@ -44,11 +45,13 @@ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.ToolRunner; import org.codehaus.groovy.jsr223.GroovyScriptEngineImpl; import javax.script.ScriptEngine; import javax.script.ScriptException; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -1034,7 +1037,7 @@ public String toString() { return this.stringRepresentation.toString(); } - private FaunusPipeline done() { + public FaunusPipeline done() throws Exception { if (!this.state.isLocked()) { final Pair> pair = this.state.popProperty(); if (null != pair) { @@ -1046,6 +1049,15 @@ private FaunusPipeline done() { this.state.lock(); } } + + if (MapReduceFormat.class.isAssignableFrom(this.graph.getGraphOutputFormat())) { + this.state.assertNotLocked(); + ((Class) this.graph.getGraphOutputFormat()).getConstructor().newInstance().addMapReduceJobs(this.compiler); + } + + this.compiler.completeSequence(); + this.compiler.composeJobs(); + return this; } @@ -1067,12 +1079,10 @@ public void submit() throws Exception { */ public void submit(final String script, final Boolean showHeader) throws Exception { this.done(); - if (MapReduceFormat.class.isAssignableFrom(this.graph.getGraphOutputFormat())) { - this.state.assertNotLocked(); - ((Class) this.graph.getGraphOutputFormat()).getConstructor().newInstance().addMapReduceJobs(this.compiler); - } - this.compiler.completeSequence(); - ToolRunner.run(this.compiler, new String[]{script, showHeader.toString()}); + + FaunusTool faunusTool = new FaunusTool(this.graph, this.compiler.getJobs()); + + ToolRunner.run(faunusTool, new String[]{script, showHeader.toString()}); } /** diff --git a/src/main/java/com/thinkaurelius/faunus/mapreduce/FaunusCompiler.java b/src/main/java/com/thinkaurelius/faunus/mapreduce/FaunusCompiler.java index 43d792c0..58b984e6 100644 --- a/src/main/java/com/thinkaurelius/faunus/mapreduce/FaunusCompiler.java +++ b/src/main/java/com/thinkaurelius/faunus/mapreduce/FaunusCompiler.java @@ -33,6 +33,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.Map; @@ -279,6 +280,10 @@ public void composeJobs() throws IOException { } } + public List getJobs() { + return this.jobs; + } + public int run(final String[] args) throws Exception { String script = null; boolean showHeader = true; @@ -288,11 +293,6 @@ public int run(final String[] args) throws Exception { showHeader = Boolean.valueOf(args[1]); } - final FileSystem hdfs = FileSystem.get(this.getConf()); - if (this.graph.getOutputLocationOverwrite() && hdfs.exists(this.graph.getOutputLocation())) { - hdfs.delete(this.graph.getOutputLocation(), true); - } - if (showHeader) { logger.info("Faunus: Graph Analytics Engine"); logger.info(" ,"); @@ -309,37 +309,20 @@ public int run(final String[] args) throws Exception { logger.info(" |/"); } - if (null != script && !script.isEmpty()) + if (null != script && !script.isEmpty()) { logger.info("Generating job chain: " + script); + } this.composeJobs(); logger.info("Compiled to " + this.jobs.size() + " MapReduce job(s)"); - final String jobPath = this.graph.getOutputLocation().toString() + "/" + Tokens.JOB; - for (int i = 0; i < this.jobs.size(); i++) { - final Job job = this.jobs.get(i); - try { - ((JobConfigurationFormat) (FormatTools.getBaseOutputFormatClass(job).newInstance())).updateJob(job); - } catch (final Exception e) { - } - logger.info("Executing job " + (i + 1) + " out of " + this.jobs.size() + ": " + job.getJobName()); - logger.info("Job data location: " + jobPath + "-" + i); - boolean success = job.waitForCompletion(true); - if (i > 0) { - final Path path = new Path(jobPath + "-" + (i - 1)); - // delete previous intermediate graph data - for (final FileStatus temp : hdfs.globStatus(new Path(path.toString() + "/" + Tokens.GRAPH + "*"))) { - hdfs.delete(temp.getPath(), true); - } - // delete previous intermediate graph data - for (final FileStatus temp : hdfs.globStatus(new Path(path.toString() + "/" + Tokens.PART + "*"))) { - hdfs.delete(temp.getPath(), true); - } - } - if (!success) { - logger.error("Faunus job error -- remaining MapReduce jobs have been canceled"); - return -1; - } + + FaunusJobControl faunusJobControl = new FaunusJobControl(graph, jobs); + faunusJobControl.run(); + + if (faunusJobControl.getFailedJobs().size() == 0) { + return 0; + } else { + return -1; } - return 0; } } \ No newline at end of file diff --git a/src/main/java/com/thinkaurelius/faunus/mapreduce/FaunusJobControl.java b/src/main/java/com/thinkaurelius/faunus/mapreduce/FaunusJobControl.java new file mode 100644 index 00000000..54c0aa70 --- /dev/null +++ b/src/main/java/com/thinkaurelius/faunus/mapreduce/FaunusJobControl.java @@ -0,0 +1,138 @@ +package com.thinkaurelius.faunus.mapreduce; + +import com.thinkaurelius.faunus.FaunusGraph; +import com.thinkaurelius.faunus.Tokens; +import com.thinkaurelius.faunus.formats.FormatTools; +import com.thinkaurelius.faunus.formats.JobConfigurationFormat; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.StringUtils; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.*; + +public class FaunusJobControl implements Runnable { + + public static final Logger logger = Logger.getLogger(FaunusJobControl.class); + + private final FaunusGraph graph; + private final LinkedList jobsInProgress = new LinkedList<>(); + private Job runningJob = null; + private final LinkedList successfulJobs = new LinkedList<>(); + private final LinkedList failedJobs = new LinkedList<>(); + + public FaunusJobControl(FaunusGraph graph, Collection jobs) { + this.graph = graph; + jobsInProgress.addAll(jobs); + } + + private List toList(LinkedList jobs) { + ArrayList retv = new ArrayList<>(); + synchronized (jobs) { + retv.addAll(jobs); + } + return retv; + } + + public List getJobsInProgress() { + return toList(jobsInProgress); + } + + synchronized public Job getRunningJob() { + return runningJob; + } + + public List getSuccessfulJobs() { + return toList(successfulJobs); + } + + public List getFailedJobs() { + return toList(failedJobs); + } + + synchronized public boolean allFinished() { + return jobsInProgress.isEmpty(); + } + + @Override + public void run() { + try { + final FileSystem hdfs = FileSystem.get(this.graph.getConf()); + if (this.graph.getOutputLocationOverwrite() && hdfs.exists(this.graph.getOutputLocation())) { + hdfs.delete(this.graph.getOutputLocation(), true); + } + + int jobCount = this.jobsInProgress.size(); + final String jobPath = this.graph.getOutputLocation().toString() + "/" + Tokens.JOB; + + Iterator it = jobsInProgress.iterator(); + for (int i = 0; i < jobCount; ++i) { + synchronized (this) { + runningJob = it.next(); + + try { + ((JobConfigurationFormat) (FormatTools.getBaseOutputFormatClass(runningJob).newInstance())).updateJob(runningJob); + } catch (final Exception e) { + } + + logger.info("Executing job " + (i + 1) + " out of " + jobCount + ": " + runningJob.getJobName()); + logger.info("Job data location: " + jobPath + "-" + i); + + runningJob.submit(); + } + + boolean success = runningJob.waitForCompletion(true); + + synchronized (this) { + if (success) { + successfulJobs.add(runningJob); + } else { + failedJobs.add(runningJob); + } + + it.remove(); + runningJob = null; + } + + if (i > 0) { + final Path path = new Path(jobPath + "-" + (i - 1)); + // delete previous intermediate graph data + for (final FileStatus temp : hdfs.globStatus(new Path(path.toString() + "/" + Tokens.GRAPH + "*"))) { + hdfs.delete(temp.getPath(), true); + } + // delete previous intermediate graph data + for (final FileStatus temp : hdfs.globStatus(new Path(path.toString() + "/" + Tokens.PART + "*"))) { + hdfs.delete(temp.getPath(), true); + } + } + if (!success) { + logger.error("Faunus job error -- remaining MapReduce jobs have been canceled"); + break; + } + } + + } catch (Throwable t) { + logger.error("Error while trying to run jobs.", t); + // Mark all the jobs as failed because we got something bad. + failAllJobs(t); + } + } + + synchronized private void failAllJobs(Throwable t) { + String message = "Unexpected System Error Occurred " + StringUtils.stringifyException(t); + + if (runningJob != null) { + try { + runningJob.killJob(); + } catch (IOException e) { + logger.error("Error trying to clean up " + runningJob.getJobName(), e); + } finally { + failedJobs.add(runningJob); + runningJob = null; + } + } + } +} diff --git a/src/main/java/com/thinkaurelius/faunus/mapreduce/FaunusTool.java b/src/main/java/com/thinkaurelius/faunus/mapreduce/FaunusTool.java new file mode 100644 index 00000000..c978420a --- /dev/null +++ b/src/main/java/com/thinkaurelius/faunus/mapreduce/FaunusTool.java @@ -0,0 +1,64 @@ +package com.thinkaurelius.faunus.mapreduce; + +import com.thinkaurelius.faunus.FaunusGraph; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.Tool; +import org.apache.log4j.Logger; + +import java.util.List; + +public class FaunusTool extends Configured implements Tool { + public static final Logger logger = Logger.getLogger(FaunusTool.class); + + private final FaunusGraph graph; + private final List jobs; + + public FaunusTool(FaunusGraph graph, List jobs) { + this.graph = graph; + this.jobs = jobs; + } + + @Override + public int run(String[] args) throws Exception { + + String script = null; + boolean showHeader = true; + + if (args.length == 2) { + script = args[0]; + showHeader = Boolean.valueOf(args[1]); + } + + if (showHeader) { + logger.info("Faunus: Graph Analytics Engine"); + logger.info(" ,"); + logger.info(" , |\\ ,__"); + logger.info(" |\\ \\/ `\\"); + logger.info(" \\ `-.:. `\\"); + logger.info(" `-.__ `\\/\\/\\|"); + logger.info(" / `'/ () \\"); + logger.info(" .' /\\ )"); + logger.info(" .-' .'| \\ \\__"); + logger.info(" .' __( \\ '`(()"); + logger.info("/_.'` `. | )("); + logger.info(" \\ |"); + logger.info(" |/"); + } + + if (null != script && !script.isEmpty()) { + logger.info("Generating job chain: " + script); + } + + logger.info("Compiled to " + this.jobs.size() + " MapReduce job(s)"); + + FaunusJobControl faunusJobControl = new FaunusJobControl(graph, jobs); + faunusJobControl.run(); + + if (faunusJobControl.getFailedJobs().size() == 0) { + return 0; + } else { + return -1; + } + } +}