Skip to content

Commit

Permalink
Introduce error details provider to get more information about except…
Browse files Browse the repository at this point in the history
…ons from plugins
  • Loading branch information
itsankit-google committed Oct 10, 2024
1 parent 61105ff commit 938afb1
Show file tree
Hide file tree
Showing 12 changed files with 281 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,13 @@ public interface BatchSinkContext extends BatchContext {
* @return a boolean value which indicates the pipeline is running in preview mode.
*/
boolean isPreviewEnabled();

/**
* Overrides the error details provider class name.
*
* @param errorDetailsProviderClassName the error details provider class name.
*/
default void setErrorDetailsProvider(String errorDetailsProviderClassName) {
// no-op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,13 @@ public interface BatchSourceContext extends BatchContext {
* @return maximum number of records to read in preview mode.
*/
int getMaxPreviewRecords();

/**
* Overrides the error details provider class name.
*
* @param errorDetailsProviderClassName the error details provider class name.
*/
default void setErrorDetailsProvider(String errorDetailsProviderClassName) {
// no-op
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.etl.api.exception;

import io.cdap.cdap.api.exception.ProgramFailureException;
import javax.annotation.Nullable;

/**
* Interface for providing error details.
*
* <p>
* Implementations of this interface can be used to provide more detailed error information
* for exceptions that occur within the code using {@link ProgramFailureException}.
* </p>
*/
public interface ErrorDetailsProvider {
public static final String EXCEPTION_DETAILS_CLASS_NAME_KEY =
"io.cdap.pipeline.error.details.provider.classname";

/**
* Returns a {@link RuntimeException} that wraps the given exception with more detailed
* information.
*
* @param e the exception to wrap
* @return {@link ProgramFailureException} that wraps the given exception with more detailed information.
*/
@Nullable
ProgramFailureException getExceptionDetails(Exception e, ErrorPhase phase);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

/**
* Enum representing the different phases of a stage where error can occur.
*/
package io.cdap.cdap.etl.api.exception;

public enum ErrorPhase {
GETTING_SPLITS( "Getting Splits"),
READING_RECORDS("Reading Records"),
CHECKING_OUTPUT_SPECS("Checking Output Specs"),
WRITING_RECORDS("Writing Records"),
COMMITTING_RECORDS("Committing Records");

private final String displayName;

ErrorPhase(String displayName) {
this.displayName = displayName;
}

/**
* Returns a string representation of the error category enum.
*/
@Override
public String toString() {
return displayName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.etl.common;

import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.api.exception.ProgramFailureException;
import io.cdap.cdap.api.exception.WrappedStageException;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider;
import io.cdap.cdap.etl.api.exception.ErrorPhase;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;

/**
* Utility class for handling exceptions.
*/
public class ExceptionUtils {

/**
* Gets the {@link ErrorDetailsProvider} from the given {@link Configuration}.
*
* @param conf the configuration to get the error details provider from.
* @return the error details provider.
*/
@Nullable
public static ErrorDetailsProvider getErrorDetailsProvider(Configuration conf) {
String errorDetailsProviderClassName =
conf.get(ErrorDetailsProvider.EXCEPTION_DETAILS_CLASS_NAME_KEY);
if (errorDetailsProviderClassName == null) {
return null;
}
try {
return (ErrorDetailsProvider) conf.getClassLoader()
.loadClass(errorDetailsProviderClassName)
.newInstance();
} catch (Exception e) {
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
String.format("Unable to instantiate errorDetailsProvider class '%s'.",
errorDetailsProviderClassName), e.getMessage(), ErrorType.SYSTEM, false, e);
}
}

/**
* Handles the given exception, wrapping it in a {@link WrappedStageException}.
*
* @param e the exception to handle.
* @param stageName the name of the stage where the exception occurred.
* @param errorDetailsProvider the error details provider.
* @param phase the phase of the stage where the exception occurred.
* @return the wrapped stage exception.
*/
public static WrappedStageException handleException(Exception e, String stageName,
@Nullable ErrorDetailsProvider errorDetailsProvider, @Nullable ErrorPhase phase) {
ProgramFailureException exception = null;

if (!(e instanceof ProgramFailureException)) {
exception = errorDetailsProvider == null ? null :
errorDetailsProvider.getExceptionDetails(e, phase);
}
return new WrappedStageException(exception == null ? e : exception, stageName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.cdap.cdap.api.spark.JavaSparkExecutionContext;
import io.cdap.cdap.api.spark.SparkClientContext;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider;
import io.cdap.cdap.etl.batch.AbstractBatchContext;
import io.cdap.cdap.etl.batch.BasicOutputFormatProvider;
import io.cdap.cdap.etl.batch.preview.NullOutputFormatProvider;
Expand All @@ -39,21 +40,27 @@
public class SparkBatchSinkContext extends AbstractBatchContext implements BatchSinkContext {
private final SparkBatchSinkFactory sinkFactory;
private final boolean isPreviewEnabled;
private String errorDetailsProviderClassName;

public SparkBatchSinkContext(SparkBatchSinkFactory sinkFactory, SparkClientContext sparkContext,
PipelineRuntime pipelineRuntime, DatasetContext datasetContext, StageSpec stageSpec) {
PipelineRuntime pipelineRuntime, DatasetContext datasetContext, StageSpec stageSpec) {
super(pipelineRuntime, stageSpec, datasetContext, sparkContext.getAdmin());
this.sinkFactory = sinkFactory;
this.isPreviewEnabled = stageSpec.isPreviewEnabled(sparkContext);
}

public SparkBatchSinkContext(SparkBatchSinkFactory sinkFactory, JavaSparkExecutionContext sec,
DatasetContext datasetContext, PipelineRuntime pipelineRuntime, StageSpec stageSpec) {
DatasetContext datasetContext, PipelineRuntime pipelineRuntime, StageSpec stageSpec) {
super(pipelineRuntime, stageSpec, datasetContext, sec.getAdmin());
this.sinkFactory = sinkFactory;
this.isPreviewEnabled = stageSpec.isPreviewEnabled(sec);
}

@Override
public void setErrorDetailsProvider(String errorDetailsProviderClassName) {
this.errorDetailsProviderClassName = errorDetailsProviderClassName;
}

@Override
public void addOutput(Output output) {
Output actualOutput = suffixOutput(getOutput(output));
Expand All @@ -64,6 +71,10 @@ public void addOutput(Output output) {
Map<String, String> conf = new HashMap<>(provider.getOutputFormatConfiguration());
conf.put(StageTrackingOutputFormat.DELEGATE_CLASS_NAME, provider.getOutputFormatClassName());
conf.put(StageTrackingOutputFormat.WRAPPED_STAGE_NAME, getStageName());
if (errorDetailsProviderClassName != null) {
conf.put(ErrorDetailsProvider.EXCEPTION_DETAILS_CLASS_NAME_KEY,
errorDetailsProviderClassName);
}
provider = new BasicOutputFormatProvider(StageTrackingOutputFormat.class.getName(), conf);
actualOutput = Output.of(actualOutput.getName(), provider).alias(actualOutput.getAlias());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.cdap.cdap.api.data.batch.InputFormatProvider;
import io.cdap.cdap.api.spark.SparkClientContext;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider;
import io.cdap.cdap.etl.batch.BasicInputFormatProvider;
import io.cdap.cdap.etl.batch.preview.LimitingInputFormatProvider;
import io.cdap.cdap.etl.common.ExternalDatasets;
Expand All @@ -40,6 +41,7 @@ public class SparkBatchSourceContext extends SparkSubmitterContext implements Ba

private final SparkBatchSourceFactory sourceFactory;
private final boolean isPreviewEnabled;
private String errorDetailsProviderClassName;

public SparkBatchSourceContext(SparkBatchSourceFactory sourceFactory, SparkClientContext sparkContext,
PipelineRuntime pipelineRuntime, DatasetContext datasetContext, StageSpec stageSpec) {
Expand All @@ -50,6 +52,11 @@ public SparkBatchSourceContext(SparkBatchSourceFactory sourceFactory, SparkClien
this.isPreviewEnabled = stageSpec.isPreviewEnabled(sparkContext);
}

@Override
public void setErrorDetailsProvider(String errorDetailsProviderClassName) {
this.errorDetailsProviderClassName = errorDetailsProviderClassName;
}

@Override
public void setInput(Input input) {
Input trackableInput = input;
Expand All @@ -60,6 +67,10 @@ public void setInput(Input input) {
Map<String, String> conf = new HashMap<>(provider.getInputFormatConfiguration());
conf.put(StageTrackingInputFormat.DELEGATE_CLASS_NAME, provider.getInputFormatClassName());
conf.put(StageTrackingInputFormat.WRAPPED_STAGE_NAME, getStageName());
if (errorDetailsProviderClassName != null) {
conf.put(ErrorDetailsProvider.EXCEPTION_DETAILS_CLASS_NAME_KEY,
errorDetailsProviderClassName);
}
provider = new BasicInputFormatProvider(StageTrackingInputFormat.class.getName(), conf);
trackableInput = Input.of(trackableInput.getName(), provider).alias(trackableInput.getAlias());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@

package io.cdap.cdap.etl.spark.io;

import io.cdap.cdap.api.exception.WrappedStageException;
import io.cdap.cdap.etl.api.exception.ErrorPhase;
import io.cdap.cdap.etl.batch.DelegatingInputFormat;
import io.cdap.cdap.etl.common.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
Expand Down Expand Up @@ -47,28 +48,32 @@ protected String getDelegateClassNameKey() {

@Override
public List<InputSplit> getSplits(JobContext context) {
Configuration conf = context.getConfiguration();
try {
return getDelegate(context.getConfiguration()).getSplits(context);
return getDelegate(conf).getSplits(context);
} catch (Exception e) {
throw new WrappedStageException(e, getStageName(context.getConfiguration()));
throw ExceptionUtils.handleException(e, getStageName(conf),
ExceptionUtils.getErrorDetailsProvider(conf), ErrorPhase.GETTING_SPLITS);
}
}

@Override
public RecordReader<K, V> createRecordReader(InputSplit split,
TaskAttemptContext context) {
Configuration conf = context.getConfiguration();
try {
// Spark already tracking metrics for file based input, hence we don't need to track again.
if (split instanceof FileSplit || split instanceof CombineFileSplit) {
return new StageTrackingRecordReader<>(super.createRecordReader(split, context),
getStageName(context.getConfiguration()));
getStageName(conf), ExceptionUtils.getErrorDetailsProvider(conf));
}

return new StageTrackingRecordReader<>(new TrackingRecordReader<>(
super.createRecordReader(split, new TrackingTaskAttemptContext(context))),
getStageName(context.getConfiguration()));
getStageName(conf), ExceptionUtils.getErrorDetailsProvider(conf));
} catch (Exception e) {
throw new WrappedStageException(e, getStageName(context.getConfiguration()));
throw ExceptionUtils.handleException(e, getStageName(conf),
ExceptionUtils.getErrorDetailsProvider(conf), ErrorPhase.READING_RECORDS);
}
}

Expand Down
Loading

0 comments on commit 938afb1

Please sign in to comment.