Skip to content

Commit

Permalink
Merge pull request #15726 from cdapio/CDAP-21061-wrap-plugin-methods
Browse files Browse the repository at this point in the history
[CDAP-21061] Wrap exceptions from process stage to throw WrappedStageException and add WrappedBatchAutoJoiner
  • Loading branch information
itsankit-google authored Oct 23, 2024
2 parents 72c3b10 + 8ae85fa commit ec0b906
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.cdap.cdap.etl.api.Transform;
import io.cdap.cdap.etl.api.action.Action;
import io.cdap.cdap.etl.api.batch.BatchAggregator;
import io.cdap.cdap.etl.api.batch.BatchAutoJoiner;
import io.cdap.cdap.etl.api.batch.BatchJoiner;
import io.cdap.cdap.etl.api.batch.BatchReducibleAggregator;
import io.cdap.cdap.etl.api.batch.BatchSink;
Expand Down Expand Up @@ -105,6 +106,8 @@ private Object wrapPlugin(String pluginId, Object plugin) {
return new WrappedBatchAggregator<>((BatchAggregator) plugin, caller, operationTimer);
} else if (plugin instanceof BatchJoiner) {
return new WrappedBatchJoiner<>((BatchJoiner) plugin, caller, operationTimer);
} else if (plugin instanceof BatchAutoJoiner) {
return new WrappedBatchAutoJoiner((BatchAutoJoiner) plugin, caller);
} else if (plugin instanceof PostAction) {
return new WrappedPostAction((PostAction) plugin, caller);
} else if (plugin instanceof SplitterTransform) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.etl.common.plugin;

import io.cdap.cdap.etl.api.MultiInputPipelineConfigurer;
import io.cdap.cdap.etl.api.batch.BatchAutoJoiner;
import io.cdap.cdap.etl.api.batch.BatchJoinerContext;
import io.cdap.cdap.etl.api.join.AutoJoinerContext;
import io.cdap.cdap.etl.api.join.JoinDefinition;
import java.util.concurrent.Callable;
import javax.annotation.Nullable;

/**
* Wrapper around {@link BatchAutoJoiner} that makes sure logging, classloading, and other pipeline
* capabilities are set up correctly.
*
*/
public class WrappedBatchAutoJoiner extends BatchAutoJoiner
implements PluginWrapper<BatchAutoJoiner> {

private final BatchAutoJoiner joiner;
private final Caller caller;

public WrappedBatchAutoJoiner(BatchAutoJoiner joiner, Caller caller) {
this.joiner = joiner;
this.caller = caller;
}


@Override
public void configurePipeline(MultiInputPipelineConfigurer multiInputPipelineConfigurer) {
caller.callUnchecked((Callable<Void>) () -> {
joiner.configurePipeline(multiInputPipelineConfigurer);
return null;
});
}

@Override
public void prepareRun(BatchJoinerContext context) throws Exception {
caller.call((Callable<Void>) () -> {
joiner.prepareRun(context);
return null;
});
}

@Override
public void onRunFinish(boolean succeeded, BatchJoinerContext context) {
caller.callUnchecked((Callable<Void>) () -> {
joiner.onRunFinish(succeeded, context);
return null;
});
}

@Nullable
@Override
public JoinDefinition define(AutoJoinerContext context) {
return caller.callUnchecked(() -> joiner.define(context));
}

@Override
public BatchAutoJoiner getWrapped() {
return joiner;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.cdap.cdap.api.data.DatasetContext;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.exception.WrappedStageException;
import io.cdap.cdap.api.macro.MacroEvaluator;
import io.cdap.cdap.api.plugin.PluginContext;
import io.cdap.cdap.api.spark.JavaSparkExecutionContext;
Expand Down Expand Up @@ -194,10 +195,22 @@ protected void processDag(PhaseSpec phaseSpec, String sourcePluginType, JavaSpar
//Emitted records and sinkRunnables will be populated as each stage is processed
SinkRunnableProvider sinkRunnableProvider = new BatchSinkRunnableProvider();
for (String stageName : groupedDag.getTopologicalOrder()) {
processStage(phaseSpec, sourcePluginType, sec, stagePartitions, pluginContext, collectors,
try {
processStage(phaseSpec, sourcePluginType, sec, stagePartitions, pluginContext, collectors,
pipelinePhase, functionCacheFactory, macroEvaluator, emittedRecords, groupedDag, groups,
branchers, shufflers, sinkRunnables, stageName, System.currentTimeMillis(), null,
sinkRunnableProvider);
} catch (Exception e) {
List<Throwable> causalChain = Throwables.getCausalChain(e);
for (Throwable t : causalChain) {
if (t instanceof WrappedStageException) {
// avoid double wrapping
throw e;
}
}
// this can occur in cases like `joins` where we do `SparkCollection#join`
throw new WrappedStageException(e, stageName);
}
}
//We should have all the sink runnables at this point, execute them
executeSinkRunnables(sec, sinkRunnables);
Expand Down

0 comments on commit ec0b906

Please sign in to comment.