Skip to content

Commit

Permalink
Introduce error details provider to get more information about except…
Browse files Browse the repository at this point in the history
…ions from plugins
  • Loading branch information
itsankit-google committed Oct 4, 2024
1 parent 61105ff commit b936c7a
Show file tree
Hide file tree
Showing 9 changed files with 141 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.api.exception;

import javax.annotation.Nullable;

/**
* Interface for providing error details.
*
* <p>
* Implementations of this interface can be used to provide more detailed error information
* for exceptions that occur within the code using {@link ProgramFailureException}.
* </p>
*/
public interface ErrorDetailsProvider<T> {

/**
* Returns a {@link RuntimeException} that wraps the given exception
* with more detailed information.
*
* @param e the exception to wrap
* @param conf configuration object
* @return a {@link RuntimeException} that wraps the given exception
* with more detailed information.

Check warning on line 38 in cdap-api-common/src/main/java/io/cdap/cdap/api/exception/ErrorDetailsProvider.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.javadoc.JavadocTagContinuationIndentationCheck

Line continuation have incorrect indentation level, expected level should be 4.
*/
RuntimeException getExceptionDetails(Throwable e, @Nullable T conf);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.cdap.cdap.etl.spark.io;

import io.cdap.cdap.api.exception.ErrorDetailsProvider;
import io.cdap.cdap.api.exception.WrappedStageException;
import io.cdap.cdap.etl.batch.DelegatingInputFormat;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -35,7 +36,8 @@
* @param <K> type of key to read
* @param <V> type of value to read
*/
public class StageTrackingInputFormat<K, V> extends DelegatingInputFormat<K, V> {
public class StageTrackingInputFormat<K, V> extends DelegatingInputFormat<K, V> implements
ErrorDetailsProvider<Configuration> {

public static final String DELEGATE_CLASS_NAME = "io.cdap.pipeline.tracking.input.classname";
public static final String WRAPPED_STAGE_NAME = "io.cdap.pipeline.wrapped.stage.name";
Expand Down Expand Up @@ -75,4 +77,14 @@ public RecordReader<K, V> createRecordReader(InputSplit split,
private String getStageName(Configuration conf) {
return conf.get(WRAPPED_STAGE_NAME);
}

@Override
public RuntimeException getExceptionDetails(Throwable e, Configuration conf) {
InputFormat<K, V> delegate = getDelegate(conf);
RuntimeException exception = null;
if (delegate instanceof ErrorDetailsProvider<?>) {
exception = ((ErrorDetailsProvider<Configuration>) delegate).getExceptionDetails(e, conf);
}
return new WrappedStageException(exception == null ? e : exception, getStageName(conf));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.cdap.cdap.etl.spark.io;

import io.cdap.cdap.api.exception.ErrorDetailsProvider;
import io.cdap.cdap.api.exception.WrappedStageException;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
Expand All @@ -34,7 +35,8 @@
* stage name where the error occurred.
* </p>
*/
public class StageTrackingOutputCommitter extends OutputCommitter {
public class StageTrackingOutputCommitter extends OutputCommitter
implements ErrorDetailsProvider<Void> {

private final OutputCommitter delegate;
private final String stageName;
Expand Down Expand Up @@ -102,4 +104,13 @@ public void recoverTask(TaskAttemptContext taskContext) {
throw new WrappedStageException(e, stageName);
}
}

@Override
public RuntimeException getExceptionDetails(Throwable e, Void conf) {
RuntimeException exception = null;
if (delegate instanceof ErrorDetailsProvider<?>) {
exception = ((ErrorDetailsProvider<?>) delegate).getExceptionDetails(e, null);
}
return new WrappedStageException(exception == null ? e : exception, stageName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

package io.cdap.cdap.etl.spark.io;

import io.cdap.cdap.api.exception.ErrorDetailsProvider;
import io.cdap.cdap.api.exception.WrappedStageException;
import io.cdap.cdap.etl.batch.DelegatingOutputFormat;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
Expand All @@ -33,7 +35,8 @@
* @param <K> type of key to write
* @param <V> type of value to write
*/
public class StageTrackingOutputFormat<K, V> extends DelegatingOutputFormat<K, V> {
public class StageTrackingOutputFormat<K, V> extends DelegatingOutputFormat<K, V>
implements ErrorDetailsProvider<Configuration> {
public static final String WRAPPED_STAGE_NAME = "io.cdap.pipeline.wrapped.stage.name";

@Override
Expand All @@ -51,16 +54,17 @@ public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) {
new TrackingRecordWriter(delegate.getRecordWriter(new TrackingTaskAttemptContext(context))),
getStageName(context.getConfiguration()));
} catch (Exception e) {
throw new WrappedStageException(e, getStageName(context.getConfiguration()));
throw getExceptionDetails(e, context.getConfiguration());
}
}

@Override
public void checkOutputSpecs(JobContext context) {
OutputFormat<K, V> delegate = getDelegate(context.getConfiguration());
try {
getDelegate(context.getConfiguration()).checkOutputSpecs(context);
delegate.checkOutputSpecs(context);
} catch (Exception e) {
throw new WrappedStageException(e, getStageName(context.getConfiguration()));
throw getExceptionDetails(e, context.getConfiguration());
}
}

Expand All @@ -79,11 +83,21 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
delegate.getOutputCommitter(new TrackingTaskAttemptContext(context))),
getStageName(context.getConfiguration()));
} catch (Exception e) {
throw new WrappedStageException(e, getStageName(context.getConfiguration()));
throw getExceptionDetails(e, context.getConfiguration());
}
}

private String getStageName(Configuration conf) {
return conf.get(WRAPPED_STAGE_NAME);
}

@Override
public RuntimeException getExceptionDetails(Throwable e, Configuration conf) {
OutputFormat<K, V> delegate = getDelegate(conf);
RuntimeException exception = null;
if (delegate instanceof ErrorDetailsProvider<?>) {
exception = ((ErrorDetailsProvider<Configuration>) delegate).getExceptionDetails(e, conf);
}
return new WrappedStageException(exception == null ? e : exception, getStageName(conf));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@

package io.cdap.cdap.etl.spark.io;

import io.cdap.cdap.api.exception.ErrorDetailsProvider;
import io.cdap.cdap.api.exception.WrappedStageException;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;

/**
* A delegating record writer that catches exceptions thrown during execution of a call
* and wraps them in a {@link WrappedStageException}.
Expand All @@ -38,7 +37,8 @@
* @param <K> type of key to read
* @param <V> type of value to read
*/
public class StageTrackingRecordReader<K, V> extends RecordReader<K, V> {
public class StageTrackingRecordReader<K, V> extends RecordReader<K, V> implements
ErrorDetailsProvider<Void> {

private final RecordReader<K, V> delegate;
private final String stageName;
Expand Down Expand Up @@ -101,4 +101,13 @@ public void close() {
throw new WrappedStageException(e, stageName);
}
}

@Override
public RuntimeException getExceptionDetails(Throwable e, Void conf) {
RuntimeException exception = null;
if (delegate instanceof ErrorDetailsProvider<?>) {
exception = ((ErrorDetailsProvider<?>) delegate).getExceptionDetails(e, null);
}
return new WrappedStageException(exception == null ? e : exception, stageName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.cdap.cdap.etl.spark.io;

import io.cdap.cdap.api.exception.ErrorDetailsProvider;
import io.cdap.cdap.api.exception.WrappedStageException;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
Expand All @@ -35,7 +36,8 @@
* @param <K> type of key to write
* @param <V> type of value to write
*/
public class StageTrackingRecordWriter<K, V> extends RecordWriter<K, V> {
public class StageTrackingRecordWriter<K, V> extends RecordWriter<K, V> implements
ErrorDetailsProvider<Void> {
private final RecordWriter<K, V> delegate;
private final String stageName;

Expand All @@ -61,4 +63,13 @@ public void close(TaskAttemptContext taskAttemptContext) {
throw new WrappedStageException(e, stageName);
}
}

@Override
public RuntimeException getExceptionDetails(Throwable e, Void conf) {
RuntimeException exception = null;
if (delegate instanceof ErrorDetailsProvider<?>) {
exception = ((ErrorDetailsProvider<?>) delegate).getExceptionDetails(e, null);
}
return new WrappedStageException(exception == null ? e : exception, stageName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.cdap.cdap.etl.spark.io;

import io.cdap.cdap.api.exception.ErrorDetailsProvider;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.OutputCommitter;
Expand All @@ -27,7 +28,7 @@
* A {@link OutputCommitter} that delegate all operations to another {@link OutputCommitter}, with counter metrics
* sending to Spark metrics.
*/
public class TrackingOutputCommitter extends OutputCommitter {
public class TrackingOutputCommitter extends OutputCommitter implements ErrorDetailsProvider<Void> {

private final OutputCommitter delegate;

Expand Down Expand Up @@ -85,4 +86,12 @@ public boolean isRecoverySupported() {
public void recoverTask(TaskAttemptContext taskContext) throws IOException {
delegate.recoverTask(new TrackingTaskAttemptContext(taskContext));
}

@Override
public RuntimeException getExceptionDetails(Throwable e, Void conf) {
if (delegate instanceof ErrorDetailsProvider<?>) {
return ((ErrorDetailsProvider<?>) delegate).getExceptionDetails(e, null);
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.cdap.cdap.etl.spark.io;

import io.cdap.cdap.api.exception.ErrorDetailsProvider;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
Expand All @@ -29,7 +30,8 @@
* @param <K> type of key to read
* @param <V> type of value to read
*/
public class TrackingRecordReader<K, V> extends RecordReader<K, V> {
public class TrackingRecordReader<K, V> extends RecordReader<K, V>
implements ErrorDetailsProvider<Void> {

private final RecordReader<K, V> delegate;

Expand Down Expand Up @@ -66,4 +68,12 @@ public float getProgress() throws IOException, InterruptedException {
public void close() throws IOException {
delegate.close();
}

@Override
public RuntimeException getExceptionDetails(Throwable e, Void conf) {
if (delegate instanceof ErrorDetailsProvider<?>) {
return ((ErrorDetailsProvider<?>) delegate).getExceptionDetails(e, null);
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.cdap.cdap.etl.spark.io;

import io.cdap.cdap.api.exception.ErrorDetailsProvider;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

Expand All @@ -28,7 +29,8 @@
* @param <K> type of key to write
* @param <V> type of value to write
*/
public class TrackingRecordWriter<K, V> extends RecordWriter<K, V> {
public class TrackingRecordWriter<K, V> extends RecordWriter<K, V>
implements ErrorDetailsProvider<Void> {

private final RecordWriter<K, V> delegate;

Expand All @@ -45,4 +47,12 @@ public void write(K key, V value) throws IOException, InterruptedException {
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
delegate.close(new TrackingTaskAttemptContext(context));
}

@Override
public RuntimeException getExceptionDetails(Throwable e, Void conf) {
if (delegate instanceof ErrorDetailsProvider<?>) {
return ((ErrorDetailsProvider<?>) delegate).getExceptionDetails(e, null);
}
return null;
}
}

0 comments on commit b936c7a

Please sign in to comment.