From 938afb186d8f3b0090b4d09098a8c8fe8b533197 Mon Sep 17 00:00:00 2001 From: itsankit-google Date: Wed, 9 Oct 2024 11:12:08 +0000 Subject: [PATCH] Introduce error details provider to get more information about exceptons from plugins --- .../cdap/etl/api/batch/BatchSinkContext.java | 9 +++ .../etl/api/batch/BatchSourceContext.java | 9 +++ .../api/exception/ErrorDetailsProvider.java | 43 ++++++++++ .../cdap/etl/api/exception/ErrorPhase.java | 42 ++++++++++ .../cdap/cdap/etl/common/ExceptionUtils.java | 78 +++++++++++++++++++ .../spark/batch/SparkBatchSinkContext.java | 15 +++- .../spark/batch/SparkBatchSourceContext.java | 11 +++ .../spark/io/StageTrackingInputFormat.java | 17 ++-- .../io/StageTrackingOutputCommitter.java | 26 +++++-- .../spark/io/StageTrackingOutputFormat.java | 24 ++++-- .../spark/io/StageTrackingRecordReader.java | 26 +++++-- .../spark/io/StageTrackingRecordWriter.java | 14 +++- 12 files changed, 281 insertions(+), 33 deletions(-) create mode 100644 cdap-app-templates/cdap-etl/cdap-etl-api/src/main/java/io/cdap/cdap/etl/api/exception/ErrorDetailsProvider.java create mode 100644 cdap-app-templates/cdap-etl/cdap-etl-api/src/main/java/io/cdap/cdap/etl/api/exception/ErrorPhase.java create mode 100644 cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/ExceptionUtils.java diff --git a/cdap-app-templates/cdap-etl/cdap-etl-api/src/main/java/io/cdap/cdap/etl/api/batch/BatchSinkContext.java b/cdap-app-templates/cdap-etl/cdap-etl-api/src/main/java/io/cdap/cdap/etl/api/batch/BatchSinkContext.java index 2e72184a3369..6f43988d5146 100644 --- a/cdap-app-templates/cdap-etl/cdap-etl-api/src/main/java/io/cdap/cdap/etl/api/batch/BatchSinkContext.java +++ b/cdap-app-templates/cdap-etl/cdap-etl-api/src/main/java/io/cdap/cdap/etl/api/batch/BatchSinkContext.java @@ -38,4 +38,13 @@ public interface BatchSinkContext extends BatchContext { * @return a boolean value which indicates the pipeline is running in preview mode. */ boolean isPreviewEnabled(); + + /** + * Overrides the error details provider class name. + * + * @param errorDetailsProviderClassName the error details provider class name. + */ + default void setErrorDetailsProvider(String errorDetailsProviderClassName) { + // no-op + } } diff --git a/cdap-app-templates/cdap-etl/cdap-etl-api/src/main/java/io/cdap/cdap/etl/api/batch/BatchSourceContext.java b/cdap-app-templates/cdap-etl/cdap-etl-api/src/main/java/io/cdap/cdap/etl/api/batch/BatchSourceContext.java index 4bcd0b92a227..3562718e30ee 100644 --- a/cdap-app-templates/cdap-etl/cdap-etl-api/src/main/java/io/cdap/cdap/etl/api/batch/BatchSourceContext.java +++ b/cdap-app-templates/cdap-etl/cdap-etl-api/src/main/java/io/cdap/cdap/etl/api/batch/BatchSourceContext.java @@ -43,4 +43,13 @@ public interface BatchSourceContext extends BatchContext { * @return maximum number of records to read in preview mode. */ int getMaxPreviewRecords(); + + /** + * Overrides the error details provider class name. + * + * @param errorDetailsProviderClassName the error details provider class name. + */ + default void setErrorDetailsProvider(String errorDetailsProviderClassName) { + // no-op + } } diff --git a/cdap-app-templates/cdap-etl/cdap-etl-api/src/main/java/io/cdap/cdap/etl/api/exception/ErrorDetailsProvider.java b/cdap-app-templates/cdap-etl/cdap-etl-api/src/main/java/io/cdap/cdap/etl/api/exception/ErrorDetailsProvider.java new file mode 100644 index 000000000000..fef20d00792a --- /dev/null +++ b/cdap-app-templates/cdap-etl/cdap-etl-api/src/main/java/io/cdap/cdap/etl/api/exception/ErrorDetailsProvider.java @@ -0,0 +1,43 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.etl.api.exception; + +import io.cdap.cdap.api.exception.ProgramFailureException; +import javax.annotation.Nullable; + +/** + * Interface for providing error details. + * + *

+ * Implementations of this interface can be used to provide more detailed error information + * for exceptions that occur within the code using {@link ProgramFailureException}. + *

+ */ +public interface ErrorDetailsProvider { + public static final String EXCEPTION_DETAILS_CLASS_NAME_KEY = + "io.cdap.pipeline.error.details.provider.classname"; + + /** + * Returns a {@link RuntimeException} that wraps the given exception with more detailed + * information. + * + * @param e the exception to wrap + * @return {@link ProgramFailureException} that wraps the given exception with more detailed information. + */ + @Nullable + ProgramFailureException getExceptionDetails(Exception e, ErrorPhase phase); +} diff --git a/cdap-app-templates/cdap-etl/cdap-etl-api/src/main/java/io/cdap/cdap/etl/api/exception/ErrorPhase.java b/cdap-app-templates/cdap-etl/cdap-etl-api/src/main/java/io/cdap/cdap/etl/api/exception/ErrorPhase.java new file mode 100644 index 000000000000..f30a868a5e16 --- /dev/null +++ b/cdap-app-templates/cdap-etl/cdap-etl-api/src/main/java/io/cdap/cdap/etl/api/exception/ErrorPhase.java @@ -0,0 +1,42 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +/** + * Enum representing the different phases of a stage where error can occur. + */ +package io.cdap.cdap.etl.api.exception; + +public enum ErrorPhase { + GETTING_SPLITS( "Getting Splits"), + READING_RECORDS("Reading Records"), + CHECKING_OUTPUT_SPECS("Checking Output Specs"), + WRITING_RECORDS("Writing Records"), + COMMITTING_RECORDS("Committing Records"); + + private final String displayName; + + ErrorPhase(String displayName) { + this.displayName = displayName; + } + + /** + * Returns a string representation of the error category enum. + */ + @Override + public String toString() { + return displayName; + } +} diff --git a/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/ExceptionUtils.java b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/ExceptionUtils.java new file mode 100644 index 000000000000..71412f69f0d1 --- /dev/null +++ b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/ExceptionUtils.java @@ -0,0 +1,78 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.etl.common; + +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; +import io.cdap.cdap.api.exception.ProgramFailureException; +import io.cdap.cdap.api.exception.WrappedStageException; +import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider; +import io.cdap.cdap.etl.api.exception.ErrorPhase; +import javax.annotation.Nullable; +import org.apache.hadoop.conf.Configuration; + +/** + * Utility class for handling exceptions. + */ +public class ExceptionUtils { + + /** + * Gets the {@link ErrorDetailsProvider} from the given {@link Configuration}. + * + * @param conf the configuration to get the error details provider from. + * @return the error details provider. + */ + @Nullable + public static ErrorDetailsProvider getErrorDetailsProvider(Configuration conf) { + String errorDetailsProviderClassName = + conf.get(ErrorDetailsProvider.EXCEPTION_DETAILS_CLASS_NAME_KEY); + if (errorDetailsProviderClassName == null) { + return null; + } + try { + return (ErrorDetailsProvider) conf.getClassLoader() + .loadClass(errorDetailsProviderClassName) + .newInstance(); + } catch (Exception e) { + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), + String.format("Unable to instantiate errorDetailsProvider class '%s'.", + errorDetailsProviderClassName), e.getMessage(), ErrorType.SYSTEM, false, e); + } + } + + /** + * Handles the given exception, wrapping it in a {@link WrappedStageException}. + * + * @param e the exception to handle. + * @param stageName the name of the stage where the exception occurred. + * @param errorDetailsProvider the error details provider. + * @param phase the phase of the stage where the exception occurred. + * @return the wrapped stage exception. + */ + public static WrappedStageException handleException(Exception e, String stageName, + @Nullable ErrorDetailsProvider errorDetailsProvider, @Nullable ErrorPhase phase) { + ProgramFailureException exception = null; + + if (!(e instanceof ProgramFailureException)) { + exception = errorDetailsProvider == null ? null : + errorDetailsProvider.getExceptionDetails(e, phase); + } + return new WrappedStageException(exception == null ? e : exception, stageName); + } +} diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/SparkBatchSinkContext.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/SparkBatchSinkContext.java index 72de9e08ebd9..ae113dd6bcfa 100644 --- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/SparkBatchSinkContext.java +++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/SparkBatchSinkContext.java @@ -22,6 +22,7 @@ import io.cdap.cdap.api.spark.JavaSparkExecutionContext; import io.cdap.cdap.api.spark.SparkClientContext; import io.cdap.cdap.etl.api.batch.BatchSinkContext; +import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider; import io.cdap.cdap.etl.batch.AbstractBatchContext; import io.cdap.cdap.etl.batch.BasicOutputFormatProvider; import io.cdap.cdap.etl.batch.preview.NullOutputFormatProvider; @@ -39,21 +40,27 @@ public class SparkBatchSinkContext extends AbstractBatchContext implements BatchSinkContext { private final SparkBatchSinkFactory sinkFactory; private final boolean isPreviewEnabled; + private String errorDetailsProviderClassName; public SparkBatchSinkContext(SparkBatchSinkFactory sinkFactory, SparkClientContext sparkContext, - PipelineRuntime pipelineRuntime, DatasetContext datasetContext, StageSpec stageSpec) { + PipelineRuntime pipelineRuntime, DatasetContext datasetContext, StageSpec stageSpec) { super(pipelineRuntime, stageSpec, datasetContext, sparkContext.getAdmin()); this.sinkFactory = sinkFactory; this.isPreviewEnabled = stageSpec.isPreviewEnabled(sparkContext); } public SparkBatchSinkContext(SparkBatchSinkFactory sinkFactory, JavaSparkExecutionContext sec, - DatasetContext datasetContext, PipelineRuntime pipelineRuntime, StageSpec stageSpec) { + DatasetContext datasetContext, PipelineRuntime pipelineRuntime, StageSpec stageSpec) { super(pipelineRuntime, stageSpec, datasetContext, sec.getAdmin()); this.sinkFactory = sinkFactory; this.isPreviewEnabled = stageSpec.isPreviewEnabled(sec); } + @Override + public void setErrorDetailsProvider(String errorDetailsProviderClassName) { + this.errorDetailsProviderClassName = errorDetailsProviderClassName; + } + @Override public void addOutput(Output output) { Output actualOutput = suffixOutput(getOutput(output)); @@ -64,6 +71,10 @@ public void addOutput(Output output) { Map conf = new HashMap<>(provider.getOutputFormatConfiguration()); conf.put(StageTrackingOutputFormat.DELEGATE_CLASS_NAME, provider.getOutputFormatClassName()); conf.put(StageTrackingOutputFormat.WRAPPED_STAGE_NAME, getStageName()); + if (errorDetailsProviderClassName != null) { + conf.put(ErrorDetailsProvider.EXCEPTION_DETAILS_CLASS_NAME_KEY, + errorDetailsProviderClassName); + } provider = new BasicOutputFormatProvider(StageTrackingOutputFormat.class.getName(), conf); actualOutput = Output.of(actualOutput.getName(), provider).alias(actualOutput.getAlias()); } diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/SparkBatchSourceContext.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/SparkBatchSourceContext.java index 5e05b4220b00..6f72ce8452e6 100644 --- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/SparkBatchSourceContext.java +++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/SparkBatchSourceContext.java @@ -21,6 +21,7 @@ import io.cdap.cdap.api.data.batch.InputFormatProvider; import io.cdap.cdap.api.spark.SparkClientContext; import io.cdap.cdap.etl.api.batch.BatchSourceContext; +import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider; import io.cdap.cdap.etl.batch.BasicInputFormatProvider; import io.cdap.cdap.etl.batch.preview.LimitingInputFormatProvider; import io.cdap.cdap.etl.common.ExternalDatasets; @@ -40,6 +41,7 @@ public class SparkBatchSourceContext extends SparkSubmitterContext implements Ba private final SparkBatchSourceFactory sourceFactory; private final boolean isPreviewEnabled; + private String errorDetailsProviderClassName; public SparkBatchSourceContext(SparkBatchSourceFactory sourceFactory, SparkClientContext sparkContext, PipelineRuntime pipelineRuntime, DatasetContext datasetContext, StageSpec stageSpec) { @@ -50,6 +52,11 @@ public SparkBatchSourceContext(SparkBatchSourceFactory sourceFactory, SparkClien this.isPreviewEnabled = stageSpec.isPreviewEnabled(sparkContext); } + @Override + public void setErrorDetailsProvider(String errorDetailsProviderClassName) { + this.errorDetailsProviderClassName = errorDetailsProviderClassName; + } + @Override public void setInput(Input input) { Input trackableInput = input; @@ -60,6 +67,10 @@ public void setInput(Input input) { Map conf = new HashMap<>(provider.getInputFormatConfiguration()); conf.put(StageTrackingInputFormat.DELEGATE_CLASS_NAME, provider.getInputFormatClassName()); conf.put(StageTrackingInputFormat.WRAPPED_STAGE_NAME, getStageName()); + if (errorDetailsProviderClassName != null) { + conf.put(ErrorDetailsProvider.EXCEPTION_DETAILS_CLASS_NAME_KEY, + errorDetailsProviderClassName); + } provider = new BasicInputFormatProvider(StageTrackingInputFormat.class.getName(), conf); trackableInput = Input.of(trackableInput.getName(), provider).alias(trackableInput.getAlias()); } diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingInputFormat.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingInputFormat.java index d090083b0b00..e0ec1a5be392 100644 --- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingInputFormat.java +++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingInputFormat.java @@ -16,8 +16,9 @@ package io.cdap.cdap.etl.spark.io; -import io.cdap.cdap.api.exception.WrappedStageException; +import io.cdap.cdap.etl.api.exception.ErrorPhase; import io.cdap.cdap.etl.batch.DelegatingInputFormat; +import io.cdap.cdap.etl.common.ExceptionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; @@ -47,28 +48,32 @@ protected String getDelegateClassNameKey() { @Override public List getSplits(JobContext context) { + Configuration conf = context.getConfiguration(); try { - return getDelegate(context.getConfiguration()).getSplits(context); + return getDelegate(conf).getSplits(context); } catch (Exception e) { - throw new WrappedStageException(e, getStageName(context.getConfiguration())); + throw ExceptionUtils.handleException(e, getStageName(conf), + ExceptionUtils.getErrorDetailsProvider(conf), ErrorPhase.GETTING_SPLITS); } } @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) { + Configuration conf = context.getConfiguration(); try { // Spark already tracking metrics for file based input, hence we don't need to track again. if (split instanceof FileSplit || split instanceof CombineFileSplit) { return new StageTrackingRecordReader<>(super.createRecordReader(split, context), - getStageName(context.getConfiguration())); + getStageName(conf), ExceptionUtils.getErrorDetailsProvider(conf)); } return new StageTrackingRecordReader<>(new TrackingRecordReader<>( super.createRecordReader(split, new TrackingTaskAttemptContext(context))), - getStageName(context.getConfiguration())); + getStageName(conf), ExceptionUtils.getErrorDetailsProvider(conf)); } catch (Exception e) { - throw new WrappedStageException(e, getStageName(context.getConfiguration())); + throw ExceptionUtils.handleException(e, getStageName(conf), + ExceptionUtils.getErrorDetailsProvider(conf), ErrorPhase.READING_RECORDS); } } diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingOutputCommitter.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingOutputCommitter.java index 8fe6cf166eae..87dd2ef684ba 100644 --- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingOutputCommitter.java +++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingOutputCommitter.java @@ -17,6 +17,9 @@ package io.cdap.cdap.etl.spark.io; import io.cdap.cdap.api.exception.WrappedStageException; +import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider; +import io.cdap.cdap.etl.api.exception.ErrorPhase; +import io.cdap.cdap.etl.common.ExceptionUtils; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -38,10 +41,13 @@ public class StageTrackingOutputCommitter extends OutputCommitter { private final OutputCommitter delegate; private final String stageName; + private final ErrorDetailsProvider errorDetailsProvider; - public StageTrackingOutputCommitter(OutputCommitter delegate, String stageName) { + public StageTrackingOutputCommitter(OutputCommitter delegate, String stageName, + ErrorDetailsProvider errorDetailsProvider) { this.delegate = delegate; this.stageName = stageName; + this.errorDetailsProvider = errorDetailsProvider; } @Override @@ -49,7 +55,8 @@ public void setupJob(JobContext jobContext) { try { delegate.setupJob(jobContext); } catch (Exception e) { - throw new WrappedStageException(e, stageName); + throw ExceptionUtils.handleException(e, stageName, errorDetailsProvider, + ErrorPhase.COMMITTING_RECORDS); } } @@ -58,7 +65,8 @@ public void setupTask(TaskAttemptContext taskAttemptContext) { try { delegate.setupTask(taskAttemptContext); } catch (Exception e) { - throw new WrappedStageException(e, stageName); + throw ExceptionUtils.handleException(e, stageName, errorDetailsProvider, + ErrorPhase.COMMITTING_RECORDS); } } @@ -67,7 +75,8 @@ public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) { try { return delegate.needsTaskCommit(taskAttemptContext); } catch (Exception e) { - throw new WrappedStageException(e, stageName); + throw ExceptionUtils.handleException(e, stageName, errorDetailsProvider, + ErrorPhase.COMMITTING_RECORDS); } } @@ -76,7 +85,8 @@ public void commitTask(TaskAttemptContext taskAttemptContext) { try { delegate.commitTask(taskAttemptContext); } catch (Exception e) { - throw new WrappedStageException(e, stageName); + throw ExceptionUtils.handleException(e, stageName, errorDetailsProvider, + ErrorPhase.COMMITTING_RECORDS); } } @@ -85,7 +95,8 @@ public void abortTask(TaskAttemptContext taskAttemptContext) { try { delegate.abortTask(taskAttemptContext); } catch (Exception e) { - throw new WrappedStageException(e, stageName); + throw ExceptionUtils.handleException(e, stageName, errorDetailsProvider, + ErrorPhase.COMMITTING_RECORDS); } } @@ -99,7 +110,8 @@ public void recoverTask(TaskAttemptContext taskContext) { try { delegate.recoverTask(taskContext); } catch (Exception e) { - throw new WrappedStageException(e, stageName); + throw ExceptionUtils.handleException(e, stageName, errorDetailsProvider, + ErrorPhase.COMMITTING_RECORDS); } } } diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingOutputFormat.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingOutputFormat.java index 9d95c9750188..5467c95d9fe7 100644 --- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingOutputFormat.java +++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingOutputFormat.java @@ -17,7 +17,9 @@ package io.cdap.cdap.etl.spark.io; import io.cdap.cdap.api.exception.WrappedStageException; +import io.cdap.cdap.etl.api.exception.ErrorPhase; import io.cdap.cdap.etl.batch.DelegatingOutputFormat; +import io.cdap.cdap.etl.common.ExceptionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; @@ -39,47 +41,53 @@ public class StageTrackingOutputFormat extends DelegatingOutputFormat getRecordWriter(TaskAttemptContext context) { OutputFormat delegate = getDelegate(context.getConfiguration()); + Configuration conf = context.getConfiguration(); try { // Spark already emitting bytes written metrics for file base output, // hence we don't want to double count if (delegate instanceof FileOutputFormat) { return new StageTrackingRecordWriter<>(delegate.getRecordWriter(context), - getStageName(context.getConfiguration())); + getStageName(conf), ExceptionUtils.getErrorDetailsProvider(conf)); } return new StageTrackingRecordWriter<>( new TrackingRecordWriter(delegate.getRecordWriter(new TrackingTaskAttemptContext(context))), - getStageName(context.getConfiguration())); + getStageName(conf), ExceptionUtils.getErrorDetailsProvider(conf)); } catch (Exception e) { - throw new WrappedStageException(e, getStageName(context.getConfiguration())); + throw ExceptionUtils.handleException(e, getStageName(conf), + ExceptionUtils.getErrorDetailsProvider(conf), ErrorPhase.WRITING_RECORDS); } } @Override public void checkOutputSpecs(JobContext context) { + Configuration conf = context.getConfiguration(); try { - getDelegate(context.getConfiguration()).checkOutputSpecs(context); + getDelegate(conf).checkOutputSpecs(context); } catch (Exception e) { - throw new WrappedStageException(e, getStageName(context.getConfiguration())); + throw ExceptionUtils.handleException(e, getStageName(conf), + ExceptionUtils.getErrorDetailsProvider(conf), ErrorPhase.CHECKING_OUTPUT_SPECS); } } @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) { OutputFormat delegate = getDelegate(context.getConfiguration()); + Configuration conf = context.getConfiguration(); try { // Spark already emitting bytes written metrics for file base output, // hence we don't want to double count if (delegate instanceof FileOutputFormat) { return new StageTrackingOutputCommitter(delegate.getOutputCommitter(context), - getStageName(context.getConfiguration())); + getStageName(conf), ExceptionUtils.getErrorDetailsProvider(conf)); } return new StageTrackingOutputCommitter(new TrackingOutputCommitter( delegate.getOutputCommitter(new TrackingTaskAttemptContext(context))), - getStageName(context.getConfiguration())); + getStageName(conf), ExceptionUtils.getErrorDetailsProvider(conf)); } catch (Exception e) { - throw new WrappedStageException(e, getStageName(context.getConfiguration())); + throw ExceptionUtils.handleException(e, getStageName(conf), + ExceptionUtils.getErrorDetailsProvider(conf), ErrorPhase.COMMITTING_RECORDS); } } diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingRecordReader.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingRecordReader.java index f5232103f962..578bd1a65a87 100644 --- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingRecordReader.java +++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingRecordReader.java @@ -17,6 +17,9 @@ package io.cdap.cdap.etl.spark.io; import io.cdap.cdap.api.exception.WrappedStageException; +import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider; +import io.cdap.cdap.etl.api.exception.ErrorPhase; +import io.cdap.cdap.etl.common.ExceptionUtils; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -42,10 +45,13 @@ public class StageTrackingRecordReader extends RecordReader { private final RecordReader delegate; private final String stageName; + private final ErrorDetailsProvider errorDetailsProvider; - public StageTrackingRecordReader(RecordReader delegate, String stageName) { + public StageTrackingRecordReader(RecordReader delegate, String stageName, + ErrorDetailsProvider errorDetailsProvider) { this.delegate = delegate; this.stageName = stageName; + this.errorDetailsProvider = errorDetailsProvider; } @Override @@ -53,7 +59,8 @@ public void initialize(InputSplit split, TaskAttemptContext context) { try { delegate.initialize(split, new TrackingTaskAttemptContext(context)); } catch (Exception e) { - throw new WrappedStageException(e, stageName); + throw ExceptionUtils.handleException(e, stageName, errorDetailsProvider, + ErrorPhase.READING_RECORDS); } } @@ -62,7 +69,8 @@ public boolean nextKeyValue() { try { return delegate.nextKeyValue(); } catch (Exception e) { - throw new WrappedStageException(e, stageName); + throw ExceptionUtils.handleException(e, stageName, errorDetailsProvider, + ErrorPhase.READING_RECORDS); } } @@ -71,7 +79,8 @@ public K getCurrentKey() { try { return delegate.getCurrentKey(); } catch (Exception e) { - throw new WrappedStageException(e, stageName); + throw ExceptionUtils.handleException(e, stageName, errorDetailsProvider, + ErrorPhase.READING_RECORDS); } } @@ -80,7 +89,8 @@ public V getCurrentValue() { try { return delegate.getCurrentValue(); } catch (Exception e) { - throw new WrappedStageException(e, stageName); + throw ExceptionUtils.handleException(e, stageName, errorDetailsProvider, + ErrorPhase.READING_RECORDS); } } @@ -89,7 +99,8 @@ public float getProgress() { try { return delegate.getProgress(); } catch (Exception e) { - throw new WrappedStageException(e, stageName); + throw ExceptionUtils.handleException(e, stageName, errorDetailsProvider, + ErrorPhase.READING_RECORDS); } } @@ -98,7 +109,8 @@ public void close() { try { delegate.close(); } catch (Exception e) { - throw new WrappedStageException(e, stageName); + throw ExceptionUtils.handleException(e, stageName, errorDetailsProvider, + ErrorPhase.READING_RECORDS); } } } diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingRecordWriter.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingRecordWriter.java index 377f93db604d..46fd1a4c2dc4 100644 --- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingRecordWriter.java +++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingRecordWriter.java @@ -17,6 +17,9 @@ package io.cdap.cdap.etl.spark.io; import io.cdap.cdap.api.exception.WrappedStageException; +import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider; +import io.cdap.cdap.etl.api.exception.ErrorPhase; +import io.cdap.cdap.etl.common.ExceptionUtils; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -38,10 +41,13 @@ public class StageTrackingRecordWriter extends RecordWriter { private final RecordWriter delegate; private final String stageName; + private final ErrorDetailsProvider errorDetailsProvider; - public StageTrackingRecordWriter(RecordWriter delegate, String stageName) { + public StageTrackingRecordWriter(RecordWriter delegate, String stageName, + ErrorDetailsProvider errorDetailsProvider) { this.delegate = delegate; this.stageName = stageName; + this.errorDetailsProvider = errorDetailsProvider; } @Override @@ -49,7 +55,8 @@ public void write(K k, V v) { try { delegate.write(k, v); } catch (Exception e) { - throw new WrappedStageException(e, stageName); + throw ExceptionUtils.handleException(e, stageName, errorDetailsProvider, + ErrorPhase.WRITING_RECORDS); } } @@ -58,7 +65,8 @@ public void close(TaskAttemptContext taskAttemptContext) { try { delegate.close(taskAttemptContext); } catch (Exception e) { - throw new WrappedStageException(e, stageName); + throw ExceptionUtils.handleException(e, stageName, errorDetailsProvider, + ErrorPhase.WRITING_RECORDS); } } }