Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CDAP-21070] Introduce error details provider to get more information about exceptons from plugins #15718

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.api.exception;

import javax.annotation.Nullable;

/**
* Interface for providing error details.
*
* <p>
* Implementations of this interface can be used to provide more detailed error information
* for exceptions that occur within the code using {@link ProgramFailureException}.
* </p>
*/
public interface ErrorDetailsProvider<T> {

/**
* Returns a {@link RuntimeException} that wraps the given exception
* with more detailed information.
*
* @param e the exception to wrap
* @param conf configuration object
* @return {@link RuntimeException} that wraps the given exception
* with more detailed information.
*/
RuntimeException getExceptionDetails(Throwable e, @Nullable T conf);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not clear on why this interface is needed and why we want to pass the Configuration through it. Can you describe how it is intended to be used? Maybe include one example test plugin in the PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface helps in cases where just generic exceptions like IOException are thrown from plugin but we want to generate further information from it. Like Google APIs through GoogleJsonResponseException, so google-cloud plugin classes can implement this interface, for example see PR: data-integrations/google-cloud#1450

Copy link
Member Author

@itsankit-google itsankit-google Oct 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intent of adding Configuration is for classes to load the delegate class and redirect the call to them. For example, in this case StageTrackingInputFormat.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be the plugin doing it? Why it is part of the cdap-api contract that has to be invoked from the platform?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be the plugin doing it? Why it is part of the cdap-api contract that has to be invoked from the platform?

Sorry, I didn't get your question exactly? Trying to give a shot at it:

Plugin is implementing the interface and extracting more details from it and then returning ProgramFailureException.

cdap-api just defines the interface.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the older PR: data-integrations/google-cloud#1445 without this interface. In every plugin we had to do all the wrapping stuff to throw the ProgramFailureException but now it just needs to implement this one method.

Copy link
Contributor

@albertshau albertshau Oct 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, the idea is to try and consolidate the exception handling in a single place instead of in a bunch of places in each InputFormat/OutputFormat method?

I'm not sure that's actually what we want. Exceptions should generally be handled where they are thrown, otherwise a lot of context is lost and the logic is brittle. For example, some layer in between may wrap one of the exceptions and unknowingly break the exception handling. Also, a NOT_FOUND error in one part of the code (ex: writing to GCS) may be an error while a NOT_FOUND in another part of the code (ex: deleting a GCS object) should get ignored. Putting all the exception handling in a single place encourages broad general handling when it is often not a great solution.

It does sometimes make sense, like when we're setting up a HTTP server we register an Exception Handler class that handles very generic common logic like mapping various known exceptions into their corresponding http error codes, etc. For use cases like this it would be better to have an API for registering some exception handling class rather than having a method in the InputFormat/OutputFormat. Though again I'm not sure that's really what we want in this situation.

Copy link
Member Author

@itsankit-google itsankit-google Oct 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not trying to handle any exceptions, the behavior can be different in every plugin. What the method does is just wrapping the exception in a particular platform exception after extracting more information out of it.

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.cdap.cdap.etl.spark.io;

import io.cdap.cdap.api.exception.ErrorDetailsProvider;
import io.cdap.cdap.api.exception.WrappedStageException;
import io.cdap.cdap.etl.batch.DelegatingInputFormat;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -35,7 +36,8 @@
* @param <K> type of key to read
* @param <V> type of value to read
*/
public class StageTrackingInputFormat<K, V> extends DelegatingInputFormat<K, V> {
public class StageTrackingInputFormat<K, V> extends DelegatingInputFormat<K, V> implements
ErrorDetailsProvider<Configuration> {

public static final String DELEGATE_CLASS_NAME = "io.cdap.pipeline.tracking.input.classname";
public static final String WRAPPED_STAGE_NAME = "io.cdap.pipeline.wrapped.stage.name";
Expand All @@ -50,7 +52,7 @@ public List<InputSplit> getSplits(JobContext context) {
try {
return getDelegate(context.getConfiguration()).getSplits(context);
} catch (Exception e) {
throw new WrappedStageException(e, getStageName(context.getConfiguration()));
throw getExceptionDetails(e, context.getConfiguration());
}
}

Expand All @@ -68,11 +70,21 @@ public RecordReader<K, V> createRecordReader(InputSplit split,
super.createRecordReader(split, new TrackingTaskAttemptContext(context))),
getStageName(context.getConfiguration()));
} catch (Exception e) {
throw new WrappedStageException(e, getStageName(context.getConfiguration()));
throw getExceptionDetails(e, context.getConfiguration());
}
}

private String getStageName(Configuration conf) {
return conf.get(WRAPPED_STAGE_NAME);
}

@Override
public RuntimeException getExceptionDetails(Throwable e, Configuration conf) {
InputFormat<K, V> delegate = getDelegate(conf);
RuntimeException exception = null;
if (delegate instanceof ErrorDetailsProvider<?>) {
exception = ((ErrorDetailsProvider<Configuration>) delegate).getExceptionDetails(e, conf);
}
return new WrappedStageException(exception == null ? e : exception, getStageName(conf));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.cdap.cdap.etl.spark.io;

import io.cdap.cdap.api.exception.ErrorDetailsProvider;
import io.cdap.cdap.api.exception.WrappedStageException;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
Expand All @@ -34,7 +35,8 @@
* stage name where the error occurred.
* </p>
*/
public class StageTrackingOutputCommitter extends OutputCommitter {
public class StageTrackingOutputCommitter extends OutputCommitter
implements ErrorDetailsProvider<Void> {

private final OutputCommitter delegate;
private final String stageName;
Expand All @@ -49,7 +51,7 @@ public void setupJob(JobContext jobContext) {
try {
delegate.setupJob(jobContext);
} catch (Exception e) {
throw new WrappedStageException(e, stageName);
throw getExceptionDetails(e, null);
}
}

Expand All @@ -58,7 +60,7 @@ public void setupTask(TaskAttemptContext taskAttemptContext) {
try {
delegate.setupTask(taskAttemptContext);
} catch (Exception e) {
throw new WrappedStageException(e, stageName);
throw getExceptionDetails(e, null);
}
}

Expand All @@ -67,7 +69,7 @@ public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) {
try {
return delegate.needsTaskCommit(taskAttemptContext);
} catch (Exception e) {
throw new WrappedStageException(e, stageName);
throw getExceptionDetails(e, null);
}
}

Expand All @@ -76,7 +78,7 @@ public void commitTask(TaskAttemptContext taskAttemptContext) {
try {
delegate.commitTask(taskAttemptContext);
} catch (Exception e) {
throw new WrappedStageException(e, stageName);
throw getExceptionDetails(e, null);
}
}

Expand All @@ -85,7 +87,7 @@ public void abortTask(TaskAttemptContext taskAttemptContext) {
try {
delegate.abortTask(taskAttemptContext);
} catch (Exception e) {
throw new WrappedStageException(e, stageName);
throw getExceptionDetails(e, null);
}
}

Expand All @@ -99,7 +101,16 @@ public void recoverTask(TaskAttemptContext taskContext) {
try {
delegate.recoverTask(taskContext);
} catch (Exception e) {
throw new WrappedStageException(e, stageName);
throw getExceptionDetails(e, null);
}
}

@Override
public RuntimeException getExceptionDetails(Throwable e, Void conf) {
RuntimeException exception = null;
if (delegate instanceof ErrorDetailsProvider<?>) {
exception = ((ErrorDetailsProvider<?>) delegate).getExceptionDetails(e, null);
}
return new WrappedStageException(exception == null ? e : exception, stageName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.cdap.cdap.etl.spark.io;

import io.cdap.cdap.api.exception.ErrorDetailsProvider;
import io.cdap.cdap.api.exception.WrappedStageException;
import io.cdap.cdap.etl.batch.DelegatingOutputFormat;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -33,7 +34,8 @@
* @param <K> type of key to write
* @param <V> type of value to write
*/
public class StageTrackingOutputFormat<K, V> extends DelegatingOutputFormat<K, V> {
public class StageTrackingOutputFormat<K, V> extends DelegatingOutputFormat<K, V>
implements ErrorDetailsProvider<Configuration> {
public static final String WRAPPED_STAGE_NAME = "io.cdap.pipeline.wrapped.stage.name";

@Override
Expand All @@ -51,16 +53,17 @@ public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) {
new TrackingRecordWriter(delegate.getRecordWriter(new TrackingTaskAttemptContext(context))),
getStageName(context.getConfiguration()));
} catch (Exception e) {
throw new WrappedStageException(e, getStageName(context.getConfiguration()));
throw getExceptionDetails(e, context.getConfiguration());
}
}

@Override
public void checkOutputSpecs(JobContext context) {
OutputFormat<K, V> delegate = getDelegate(context.getConfiguration());
try {
getDelegate(context.getConfiguration()).checkOutputSpecs(context);
delegate.checkOutputSpecs(context);
} catch (Exception e) {
throw new WrappedStageException(e, getStageName(context.getConfiguration()));
throw getExceptionDetails(e, context.getConfiguration());
}
}

Expand All @@ -79,11 +82,21 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
delegate.getOutputCommitter(new TrackingTaskAttemptContext(context))),
getStageName(context.getConfiguration()));
} catch (Exception e) {
throw new WrappedStageException(e, getStageName(context.getConfiguration()));
throw getExceptionDetails(e, context.getConfiguration());
}
}

private String getStageName(Configuration conf) {
return conf.get(WRAPPED_STAGE_NAME);
}

@Override
public RuntimeException getExceptionDetails(Throwable e, Configuration conf) {
OutputFormat<K, V> delegate = getDelegate(conf);
RuntimeException exception = null;
if (delegate instanceof ErrorDetailsProvider<?>) {
exception = ((ErrorDetailsProvider<Configuration>) delegate).getExceptionDetails(e, conf);
}
return new WrappedStageException(exception == null ? e : exception, getStageName(conf));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@

package io.cdap.cdap.etl.spark.io;

import io.cdap.cdap.api.exception.ErrorDetailsProvider;
import io.cdap.cdap.api.exception.WrappedStageException;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;

/**
* A delegating record writer that catches exceptions thrown during execution of a call
* and wraps them in a {@link WrappedStageException}.
Expand All @@ -38,7 +37,8 @@
* @param <K> type of key to read
* @param <V> type of value to read
*/
public class StageTrackingRecordReader<K, V> extends RecordReader<K, V> {
public class StageTrackingRecordReader<K, V> extends RecordReader<K, V> implements
ErrorDetailsProvider<Void> {

private final RecordReader<K, V> delegate;
private final String stageName;
Expand All @@ -53,7 +53,7 @@ public void initialize(InputSplit split, TaskAttemptContext context) {
try {
delegate.initialize(split, new TrackingTaskAttemptContext(context));
} catch (Exception e) {
throw new WrappedStageException(e, stageName);
throw getExceptionDetails(e, null);
}
}

Expand All @@ -62,7 +62,7 @@ public boolean nextKeyValue() {
try {
return delegate.nextKeyValue();
} catch (Exception e) {
throw new WrappedStageException(e, stageName);
throw getExceptionDetails(e, null);
}
}

Expand All @@ -71,7 +71,7 @@ public K getCurrentKey() {
try {
return delegate.getCurrentKey();
} catch (Exception e) {
throw new WrappedStageException(e, stageName);
throw getExceptionDetails(e, null);
}
}

Expand All @@ -80,7 +80,7 @@ public V getCurrentValue() {
try {
return delegate.getCurrentValue();
} catch (Exception e) {
throw new WrappedStageException(e, stageName);
throw getExceptionDetails(e, null);
}
}

Expand All @@ -89,7 +89,7 @@ public float getProgress() {
try {
return delegate.getProgress();
} catch (Exception e) {
throw new WrappedStageException(e, stageName);
throw getExceptionDetails(e, null);
}
}

Expand All @@ -98,7 +98,16 @@ public void close() {
try {
delegate.close();
} catch (Exception e) {
throw new WrappedStageException(e, stageName);
throw getExceptionDetails(e, null);
}
}

@Override
public RuntimeException getExceptionDetails(Throwable e, Void conf) {
RuntimeException exception = null;
if (delegate instanceof ErrorDetailsProvider<?>) {
exception = ((ErrorDetailsProvider<?>) delegate).getExceptionDetails(e, null);
}
return new WrappedStageException(exception == null ? e : exception, stageName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.cdap.cdap.etl.spark.io;

import io.cdap.cdap.api.exception.ErrorDetailsProvider;
import io.cdap.cdap.api.exception.WrappedStageException;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
Expand All @@ -35,7 +36,8 @@
* @param <K> type of key to write
* @param <V> type of value to write
*/
public class StageTrackingRecordWriter<K, V> extends RecordWriter<K, V> {
public class StageTrackingRecordWriter<K, V> extends RecordWriter<K, V> implements
ErrorDetailsProvider<Void> {
private final RecordWriter<K, V> delegate;
private final String stageName;

Expand All @@ -49,7 +51,7 @@ public void write(K k, V v) {
try {
delegate.write(k, v);
} catch (Exception e) {
throw new WrappedStageException(e, stageName);
throw getExceptionDetails(e, null);
}
}

Expand All @@ -58,7 +60,16 @@ public void close(TaskAttemptContext taskAttemptContext) {
try {
delegate.close(taskAttemptContext);
} catch (Exception e) {
throw new WrappedStageException(e, stageName);
throw getExceptionDetails(e, null);
}
}

@Override
public RuntimeException getExceptionDetails(Throwable e, Void conf) {
RuntimeException exception = null;
if (delegate instanceof ErrorDetailsProvider<?>) {
exception = ((ErrorDetailsProvider<?>) delegate).getExceptionDetails(e, null);
}
return new WrappedStageException(exception == null ? e : exception, stageName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.cdap.cdap.etl.spark.io;

import io.cdap.cdap.api.exception.ErrorDetailsProvider;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.OutputCommitter;
Expand All @@ -27,7 +28,7 @@
* A {@link OutputCommitter} that delegate all operations to another {@link OutputCommitter}, with counter metrics
* sending to Spark metrics.
*/
public class TrackingOutputCommitter extends OutputCommitter {
public class TrackingOutputCommitter extends OutputCommitter implements ErrorDetailsProvider<Void> {

private final OutputCommitter delegate;

Expand Down Expand Up @@ -85,4 +86,12 @@ public boolean isRecoverySupported() {
public void recoverTask(TaskAttemptContext taskContext) throws IOException {
delegate.recoverTask(new TrackingTaskAttemptContext(taskContext));
}

@Override
public RuntimeException getExceptionDetails(Throwable e, Void conf) {
if (delegate instanceof ErrorDetailsProvider<?>) {
return ((ErrorDetailsProvider<?>) delegate).getExceptionDetails(e, null);
}
return null;
}
}
Loading
Loading