Skip to content

Commit

Permalink
WIP - spark program runner (dataproc)
Browse files Browse the repository at this point in the history
  • Loading branch information
anwar6953 authored and Ali Anwar committed Mar 30, 2018
1 parent 46a1487 commit 4ebf9b7
Show file tree
Hide file tree
Showing 6 changed files with 519 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,29 +61,30 @@ public SubmitterPlugin(String stageName, Transactional transactional,

@Override
public void onFinish(final boolean succeeded) {
try {
transactional.execute(new TxRunnable() {
@Override
public void run(DatasetContext datasetContext) throws Exception {
T context = contextProvider.getContext(datasetContext);
//try {
// transactional.execute(new TxRunnable() {
// @Override
// public void run(DatasetContext datasetContext) throws Exception {
T context = contextProvider.getContext(null);
delegate.onRunFinish(succeeded, context);
}
});
} catch (TransactionFailureException e) {
LOG.warn("Error calling onRunFinish on stage {}", stageName);
}
//}
//});
//} catch (TransactionFailureException e) {
// LOG.warn("Error calling onRunFinish on stage {}", stageName);
//}
}

@Override
public void prepareRun() throws TransactionFailureException {
transactional.execute(new TxRunnable() {
@Override
public void run(DatasetContext datasetContext) throws Exception {
U context = contextProvider.getContext(datasetContext);
public void prepareRun() throws Exception {

//transactional.execute(new TxRunnable() {
// @Override
// public void run(DatasetContext datasetContext) throws Exception {
U context = contextProvider.getContext(null);
delegate.prepareRun(context);
prepareAction.act(context);
}
});
//}
//});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ public void run(JavaSparkExecutionContext sec) throws Exception {
// Execution the whole pipeline in one long transaction. This is because the Spark execution
// currently share the same contract and API as the MapReduce one.
// The API need to expose DatasetContext, hence it needs to be executed inside a transaction
Transactionals.execute(sec, this, Exception.class);
//Transactionals.execute(sec, this, Exception.class);
run(datasetContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ public SparkBatchSinkContext(SparkBatchSinkFactory sinkFactory, JavaSparkExecuti
@Override
public void addOutput(Output output) {
Output actualOutput = suffixOutput(getOutput(output));
Output trackableOutput = isPreviewEnabled ? actualOutput : ExternalDatasets.makeTrackable(admin, actualOutput);
sinkFactory.addOutput(getStageName(), trackableOutput);
//Output trackableOutput = isPreviewEnabled ? actualOutput : ExternalDatasets.makeTrackable(admin, actualOutput);
sinkFactory.addOutput(getStageName(), actualOutput);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public SparkBatchSourceContext(SparkBatchSourceFactory sourceFactory, SparkClien

@Override
public void setInput(Input input) {
Input trackableInput = ExternalDatasets.makeTrackable(admin, suffixInput(input));
//Input trackableInput = ExternalDatasets.makeTrackable(admin, suffixInput(input));
Input trackableInput = suffixInput(input);
sourceFactory.addInput(getStageName(), trackableInput);
}

Expand Down
Loading

0 comments on commit 4ebf9b7

Please sign in to comment.