Skip to content

Commit

Permalink
Introduce error details provider to get more information about except…
Browse files Browse the repository at this point in the history
…ons from plugins
  • Loading branch information
itsankit-google committed Oct 15, 2024
1 parent 61105ff commit 9d07fa9
Show file tree
Hide file tree
Showing 14 changed files with 379 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.cdap.cdap.api.dataset.InstanceConflictException;
import io.cdap.cdap.etl.api.TransformContext;
import io.cdap.cdap.etl.api.action.SettableArguments;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;

/**
* Context passed to Batch Source and Sink.
Expand Down Expand Up @@ -61,4 +62,13 @@ void createDataset(String datasetName, String typeName, DatasetProperties proper
*/
@Override
SettableArguments getArguments();

/**
* Overrides the error details provider specified in the stage.
*
* @param errorDetailsProviderSpec the error details provider spec.
*/
default void setErrorDetailsProvider(ErrorDetailsProviderSpec errorDetailsProviderSpec) {
// no-op
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.etl.api.exception;

/**
* Context for error details provider.
*
* <p>
* This class provides the context for the error details provider.
* </p>
*/
public class ErrorContext {
private final ErrorPhase phase;

public ErrorContext(ErrorPhase phase) {
this.phase = phase;
}

public ErrorPhase getPhase() {
return phase;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.etl.api.exception;

import io.cdap.cdap.api.exception.ProgramFailureException;
import javax.annotation.Nullable;

/**
* Interface for providing error details.
*
* <p>
* Implementations of this interface can be used to provide more detailed error information
* for exceptions that occur within the code using {@link ProgramFailureException}.
* </p>
*/
public interface ErrorDetailsProvider {

/**
* Returns a {@link ProgramFailureException} that wraps the given exception
* with more detailed information.
*
* @param e the exception to wrap.
* @param context the context of the error.
* @return {@link ProgramFailureException} that wraps the given exception
* with more detailed information.
*/
@Nullable
ProgramFailureException getExceptionDetails(Exception e, ErrorContext context);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.etl.api.exception;

public class ErrorDetailsProviderSpec {
private final String className;

public ErrorDetailsProviderSpec(String className) {
this.className = className;
}

public String getClassName() {
return className;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.etl.api.exception;

/**
* Enum representing the different phases of a stage where error can occur.
*/
public enum ErrorPhase {
SPLITTING("Splitting"),
READING("Reading"),
VALIDATING_OUTPUT_SPECS("Validating Output Specs"),
WRITING("Writing"),
COMMITTING("Committing");

private final String displayName;

ErrorPhase(String displayName) {
this.displayName = displayName;
}

/**
* Returns a string representation of the error phase enum.
*/
@Override
public String toString() {
return displayName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.etl.api.exception;

import io.cdap.cdap.api.exception.ProgramFailureException;
import javax.annotation.Nullable;

/**
* Noop implementation of {@link ErrorDetailsProvider}.
*/
public class NoopErrorDetailsProvider implements ErrorDetailsProvider {

@Nullable
@Override
public ProgramFailureException getExceptionDetails(Exception e, ErrorContext context) {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.etl.common;

import io.cdap.cdap.api.exception.ProgramFailureException;
import io.cdap.cdap.api.exception.WrappedStageException;
import io.cdap.cdap.etl.api.exception.ErrorContext;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider;
import io.cdap.cdap.etl.api.exception.ErrorPhase;
import io.cdap.cdap.etl.api.exception.NoopErrorDetailsProvider;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Utility class for handling exceptions.
*/
public class ExceptionUtils {
private static final Logger LOG = LoggerFactory.getLogger(ExceptionUtils.class);
public static final String ERROR_DETAILS_PROVIDER_CLASS_NAME_KEY =
"io.cdap.pipeline.error.details.provider.classname";

/**
* Gets the {@link ErrorDetailsProvider} from the given {@link Configuration}.
*
* @param conf the configuration to get the error details provider from.
* @return the error details provider.
*/
public static ErrorDetailsProvider getErrorDetailsProvider(Configuration conf) {
String errorDetailsProviderClassName =
conf.get(ERROR_DETAILS_PROVIDER_CLASS_NAME_KEY);
if (errorDetailsProviderClassName == null) {
return new NoopErrorDetailsProvider();
}
try {
return (ErrorDetailsProvider) conf.getClassLoader()
.loadClass(errorDetailsProviderClassName)
.newInstance();
} catch (Exception e) {
LOG.warn(String.format("Unable to instantiate errorDetailsProvider class '%s'.",
errorDetailsProviderClassName), e);
return new NoopErrorDetailsProvider();
}
}

/**
* Handles the given exception, wrapping it in a {@link WrappedStageException}.
*
* @param e the exception to handle.
* @param stageName the name of the stage where the exception occurred.
* @param errorDetailsProvider the error details provider.
* @param phase the phase of the stage where the exception occurred.
* @return the wrapped stage exception.
*/
public static WrappedStageException handleException(Exception e, String stageName,
ErrorDetailsProvider errorDetailsProvider, ErrorPhase phase) {
ProgramFailureException exception = null;

if (!(e instanceof ProgramFailureException)) {
exception = errorDetailsProvider == null ? null :
errorDetailsProvider.getExceptionDetails(e, new ErrorContext(phase));
}
return new WrappedStageException(exception == null ? e : exception, stageName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import io.cdap.cdap.api.spark.JavaSparkExecutionContext;
import io.cdap.cdap.api.spark.SparkClientContext;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
import io.cdap.cdap.etl.batch.AbstractBatchContext;
import io.cdap.cdap.etl.batch.BasicOutputFormatProvider;
import io.cdap.cdap.etl.batch.preview.NullOutputFormatProvider;
import io.cdap.cdap.etl.common.ExceptionUtils;
import io.cdap.cdap.etl.common.PipelineRuntime;
import io.cdap.cdap.etl.proto.v2.spec.StageSpec;
import io.cdap.cdap.etl.spark.io.StageTrackingOutputFormat;
Expand All @@ -39,21 +41,27 @@
public class SparkBatchSinkContext extends AbstractBatchContext implements BatchSinkContext {
private final SparkBatchSinkFactory sinkFactory;
private final boolean isPreviewEnabled;
private ErrorDetailsProviderSpec errorDetailsProviderSpec;

public SparkBatchSinkContext(SparkBatchSinkFactory sinkFactory, SparkClientContext sparkContext,
PipelineRuntime pipelineRuntime, DatasetContext datasetContext, StageSpec stageSpec) {
PipelineRuntime pipelineRuntime, DatasetContext datasetContext, StageSpec stageSpec) {
super(pipelineRuntime, stageSpec, datasetContext, sparkContext.getAdmin());
this.sinkFactory = sinkFactory;
this.isPreviewEnabled = stageSpec.isPreviewEnabled(sparkContext);
}

public SparkBatchSinkContext(SparkBatchSinkFactory sinkFactory, JavaSparkExecutionContext sec,
DatasetContext datasetContext, PipelineRuntime pipelineRuntime, StageSpec stageSpec) {
DatasetContext datasetContext, PipelineRuntime pipelineRuntime, StageSpec stageSpec) {
super(pipelineRuntime, stageSpec, datasetContext, sec.getAdmin());
this.sinkFactory = sinkFactory;
this.isPreviewEnabled = stageSpec.isPreviewEnabled(sec);
}

@Override
public void setErrorDetailsProvider(ErrorDetailsProviderSpec errorDetailsProviderSpec) {
this.errorDetailsProviderSpec = errorDetailsProviderSpec;
}

@Override
public void addOutput(Output output) {
Output actualOutput = suffixOutput(getOutput(output));
Expand All @@ -64,6 +72,10 @@ public void addOutput(Output output) {
Map<String, String> conf = new HashMap<>(provider.getOutputFormatConfiguration());
conf.put(StageTrackingOutputFormat.DELEGATE_CLASS_NAME, provider.getOutputFormatClassName());
conf.put(StageTrackingOutputFormat.WRAPPED_STAGE_NAME, getStageName());
if (errorDetailsProviderSpec != null) {
conf.put(ExceptionUtils.ERROR_DETAILS_PROVIDER_CLASS_NAME_KEY,
errorDetailsProviderSpec.getClassName());
}
provider = new BasicOutputFormatProvider(StageTrackingOutputFormat.class.getName(), conf);
actualOutput = Output.of(actualOutput.getName(), provider).alias(actualOutput.getAlias());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import io.cdap.cdap.api.data.batch.InputFormatProvider;
import io.cdap.cdap.api.spark.SparkClientContext;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
import io.cdap.cdap.etl.batch.BasicInputFormatProvider;
import io.cdap.cdap.etl.batch.preview.LimitingInputFormatProvider;
import io.cdap.cdap.etl.common.ExceptionUtils;
import io.cdap.cdap.etl.common.ExternalDatasets;
import io.cdap.cdap.etl.common.PipelineRuntime;
import io.cdap.cdap.etl.proto.v2.spec.StageSpec;
Expand All @@ -40,16 +42,24 @@ public class SparkBatchSourceContext extends SparkSubmitterContext implements Ba

private final SparkBatchSourceFactory sourceFactory;
private final boolean isPreviewEnabled;
private ErrorDetailsProviderSpec errorDetailsProviderSpec;

public SparkBatchSourceContext(SparkBatchSourceFactory sourceFactory, SparkClientContext sparkContext,
PipelineRuntime pipelineRuntime, DatasetContext datasetContext, StageSpec stageSpec) {
public SparkBatchSourceContext(SparkBatchSourceFactory sourceFactory,
SparkClientContext sparkContext, PipelineRuntime pipelineRuntime,
DatasetContext datasetContext, StageSpec stageSpec) {
super(sparkContext, pipelineRuntime, datasetContext, StageSpec.
createCopy(stageSpec, sparkContext.getDataTracer(stageSpec.getName()).getMaximumTracedRecords(),
sparkContext.getDataTracer(stageSpec.getName()).isEnabled()));
createCopy(stageSpec, sparkContext.getDataTracer(
stageSpec.getName()).getMaximumTracedRecords(),
sparkContext.getDataTracer(stageSpec.getName()).isEnabled()));
this.sourceFactory = sourceFactory;
this.isPreviewEnabled = stageSpec.isPreviewEnabled(sparkContext);
}

@Override
public void setErrorDetailsProvider(ErrorDetailsProviderSpec errorDetailsProviderSpec) {
this.errorDetailsProviderSpec = errorDetailsProviderSpec;
}

@Override
public void setInput(Input input) {
Input trackableInput = input;
Expand All @@ -60,6 +70,10 @@ public void setInput(Input input) {
Map<String, String> conf = new HashMap<>(provider.getInputFormatConfiguration());
conf.put(StageTrackingInputFormat.DELEGATE_CLASS_NAME, provider.getInputFormatClassName());
conf.put(StageTrackingInputFormat.WRAPPED_STAGE_NAME, getStageName());
if (errorDetailsProviderSpec != null) {
conf.put(ExceptionUtils.ERROR_DETAILS_PROVIDER_CLASS_NAME_KEY,
errorDetailsProviderSpec.getClassName());
}
provider = new BasicInputFormatProvider(StageTrackingInputFormat.class.getName(), conf);
trackableInput = Input.of(trackableInput.getName(), provider).alias(trackableInput.getAlias());
}
Expand Down
Loading

0 comments on commit 9d07fa9

Please sign in to comment.