Skip to content

Commit

Permalink
Introduce error details provider to get more information about except…
Browse files Browse the repository at this point in the history
…ons from plugins
  • Loading branch information
itsankit-google committed Oct 9, 2024
1 parent 61105ff commit fe13ee3
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.cdap.cdap.api.annotation.Beta;
import io.cdap.cdap.api.data.batch.Input;
import java.util.Map;

/**
* Context of a Batch Source.
Expand All @@ -43,4 +44,13 @@ public interface BatchSourceContext extends BatchContext {
* @return maximum number of records to read in preview mode.
*/
int getMaxPreviewRecords();

/**
* Overrides the error details provider class name.
*
* @param errorDetailsProviderClassName the error details provider class name.
*/
default void setErrorDetailsProvider(String errorDetailsProviderClassName) {
// no-op
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.etl.common;

import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
import io.cdap.cdap.api.exception.ErrorDetailsProvider;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.api.exception.ProgramFailureException;
import io.cdap.cdap.api.exception.WrappedStageException;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;

public class ExceptionUtils {
public static final String WRAPPED_STAGE_NAME = "io.cdap.pipeline.wrapped.stage.name";
public static final String EXCEPTION_DETAILS_CLASS_NAME_KEY =
"io.cdap.pipeline.error.details.provider.classname";

@Nullable
private static ErrorDetailsProvider getErrorDetailsProvider(Configuration conf) {
String errorDetailsProviderClassName = conf.get(EXCEPTION_DETAILS_CLASS_NAME_KEY);
if (errorDetailsProviderClassName == null) {
return null;
}
try {
return (ErrorDetailsProvider) conf.getClassLoader()
.loadClass(errorDetailsProviderClassName)
.newInstance();
} catch (Exception e) {
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
String.format("Unable to instantiate errorDetailsProvider class '%s'.",
errorDetailsProviderClassName), e.getMessage(), ErrorType.SYSTEM, false, e);
}
}

public static WrappedStageException handleException(Exception e, Configuration conf) {
ProgramFailureException exception = null;
if (!(e instanceof ProgramFailureException)) {
ErrorDetailsProvider errorDetailsProvider = getErrorDetailsProvider(conf);
exception = errorDetailsProvider == null ? null :
errorDetailsProvider.getExceptionDetails(e);
}
return new WrappedStageException(exception == null ? e : exception, getStageName(conf));
}

public static String getStageName(Configuration conf) {
return conf.get(WRAPPED_STAGE_NAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.cdap.etl.batch.BasicInputFormatProvider;
import io.cdap.cdap.etl.batch.preview.LimitingInputFormatProvider;
import io.cdap.cdap.etl.common.ExceptionUtils;
import io.cdap.cdap.etl.common.ExternalDatasets;
import io.cdap.cdap.etl.common.PipelineRuntime;
import io.cdap.cdap.etl.proto.v2.spec.StageSpec;
Expand All @@ -40,6 +41,7 @@ public class SparkBatchSourceContext extends SparkSubmitterContext implements Ba

private final SparkBatchSourceFactory sourceFactory;
private final boolean isPreviewEnabled;
private String errorDetailsProviderClassName;

public SparkBatchSourceContext(SparkBatchSourceFactory sourceFactory, SparkClientContext sparkContext,
PipelineRuntime pipelineRuntime, DatasetContext datasetContext, StageSpec stageSpec) {
Expand All @@ -50,6 +52,11 @@ public SparkBatchSourceContext(SparkBatchSourceFactory sourceFactory, SparkClien
this.isPreviewEnabled = stageSpec.isPreviewEnabled(sparkContext);
}

@Override
public void setErrorDetailsProvider(String errorDetailsProviderClassName) {
this.errorDetailsProviderClassName = errorDetailsProviderClassName;
}

@Override
public void setInput(Input input) {
Input trackableInput = input;
Expand All @@ -60,6 +67,9 @@ public void setInput(Input input) {
Map<String, String> conf = new HashMap<>(provider.getInputFormatConfiguration());
conf.put(StageTrackingInputFormat.DELEGATE_CLASS_NAME, provider.getInputFormatClassName());
conf.put(StageTrackingInputFormat.WRAPPED_STAGE_NAME, getStageName());
if (errorDetailsProviderClassName != null) {
conf.put(ExceptionUtils.EXCEPTION_DETAILS_CLASS_NAME_KEY, errorDetailsProviderClassName);
}
provider = new BasicInputFormatProvider(StageTrackingInputFormat.class.getName(), conf);
trackableInput = Input.of(trackableInput.getName(), provider).alias(trackableInput.getAlias());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@

package io.cdap.cdap.etl.spark.io;

import io.cdap.cdap.api.exception.WrappedStageException;
import io.cdap.cdap.etl.batch.DelegatingInputFormat;
import org.apache.hadoop.conf.Configuration;
import io.cdap.cdap.etl.common.ExceptionUtils;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
Expand Down Expand Up @@ -50,7 +49,7 @@ public List<InputSplit> getSplits(JobContext context) {
try {
return getDelegate(context.getConfiguration()).getSplits(context);
} catch (Exception e) {
throw new WrappedStageException(e, getStageName(context.getConfiguration()));
throw ExceptionUtils.handleException(e, context.getConfiguration());
}
}

Expand All @@ -61,18 +60,14 @@ public RecordReader<K, V> createRecordReader(InputSplit split,
// Spark already tracking metrics for file based input, hence we don't need to track again.
if (split instanceof FileSplit || split instanceof CombineFileSplit) {
return new StageTrackingRecordReader<>(super.createRecordReader(split, context),
getStageName(context.getConfiguration()));
ExceptionUtils.getStageName(context.getConfiguration()));
}

return new StageTrackingRecordReader<>(new TrackingRecordReader<>(
super.createRecordReader(split, new TrackingTaskAttemptContext(context))),
getStageName(context.getConfiguration()));
ExceptionUtils.getStageName(context.getConfiguration()));
} catch (Exception e) {
throw new WrappedStageException(e, getStageName(context.getConfiguration()));
throw ExceptionUtils.handleException(e, context.getConfiguration());
}
}

private String getStageName(Configuration conf) {
return conf.get(WRAPPED_STAGE_NAME);
}
}

0 comments on commit fe13ee3

Please sign in to comment.