Skip to content

Commit

Permalink
Wrap input/output format methods to throw wrapped stage exception
Browse files Browse the repository at this point in the history
  • Loading branch information
itsankit-google committed Oct 3, 2024
1 parent 81f928c commit 21f35d7
Show file tree
Hide file tree
Showing 12 changed files with 502 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

package io.cdap.cdap.etl.batch;

import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configurable;
Expand Down Expand Up @@ -55,17 +59,21 @@ public RecordReader<K, V> createRecordReader(InputSplit split,
* Returns the delegating {@link InputFormat} based on the current configuration.
*
* @param conf the Hadoop {@link Configuration} for this input format
* @throws IOException if failed to instantiate the input format class
*/
protected final InputFormat<K, V> getDelegate(Configuration conf) throws IOException {
protected final InputFormat<K, V> getDelegate(Configuration conf) {
String delegateClassName = conf.get(getDelegateClassNameKey());
if (delegateClassName == null) {
throw new IllegalArgumentException("Missing configuration " + getDelegateClassNameKey()
+ " for the InputFormat to use");
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
String.format("Missing configuration '%s' for the InputFormat to use.",
getDelegateClassNameKey()), String.format("Please provide correct configuration for"
+ "delegate InputFormat class key '%s'.", getDelegateClassNameKey()),
ErrorType.SYSTEM, false, null);
}
if (delegateClassName.equals(getClass().getName())) {
throw new IllegalArgumentException(
"Cannot delegate InputFormat to the same class " + delegateClassName);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
String.format("Cannot delegate InputFormat to the same class '%s'.", delegateClassName),
String.format("Please provide correct configuration for delegate "
+ "InputFormat class name '%s'.", delegateClassName), ErrorType.SYSTEM, false, null);
}
try {
//noinspection unchecked
Expand All @@ -76,8 +84,10 @@ protected final InputFormat<K, V> getDelegate(Configuration conf) throws IOExcep
((Configurable) inputFormat).setConf(conf);
}
return inputFormat;
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
throw new IOException("Unable to instantiate delegate input format " + delegateClassName, e);
} catch (Exception e) {
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
String.format("Unable to instantiate delegate input format class '%s'.",
delegateClassName), e.getMessage(), ErrorType.SYSTEM, false, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

package io.cdap.cdap.etl.batch;

import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import java.io.IOException;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -56,10 +60,21 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context)
* Returns the delegating {@link OutputFormat} based on the configuration.
*
* @param conf the Hadoop {@link Configuration} for this output format
* @throws IOException if failed to instantiate the output format class
*/
protected final OutputFormat<K, V> getDelegate(Configuration conf) throws IOException {
protected final OutputFormat<K, V> getDelegate(Configuration conf) {
String delegateClassName = conf.get(DELEGATE_CLASS_NAME);
if (delegateClassName == null) {
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
String.format("Missing configuration '%s' for the OutputFormat to use.",
DELEGATE_CLASS_NAME), String.format("Please provide correct configuration for delegate "
+ "OutputFormat class key '%s'.", DELEGATE_CLASS_NAME), ErrorType.SYSTEM, false, null);
}
if (delegateClassName.equals(getClass().getName())) {
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
String.format("Cannot delegate OutputFormat to the same class '%s'.", delegateClassName),
String.format("Please provide correct configuration for delegate "
+ "OutputFormat class name '%s'.", delegateClassName), ErrorType.SYSTEM, false, null);
}
try {
//noinspection unchecked
OutputFormat<K, V> outputFormat = (OutputFormat<K, V>) conf.getClassLoader()
Expand All @@ -69,8 +84,10 @@ protected final OutputFormat<K, V> getDelegate(Configuration conf) throws IOExce
((Configurable) outputFormat).setConf(conf);
}
return outputFormat;
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
throw new IOException("Unable to instantiate delegate output format " + delegateClassName, e);
} catch (Exception e) {
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
String.format("Unable to instantiate delegate output format class '%s'.",
delegateClassName), e.getMessage(), ErrorType.SYSTEM, false, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import io.cdap.cdap.etl.batch.preview.NullOutputFormatProvider;
import io.cdap.cdap.etl.common.PipelineRuntime;
import io.cdap.cdap.etl.proto.v2.spec.StageSpec;
import io.cdap.cdap.etl.spark.io.TrackingOutputFormat;
import io.cdap.cdap.etl.spark.io.StageTrackingOutputFormat;

import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -62,8 +62,9 @@ public void addOutput(Output output) {
if (actualOutput instanceof Output.OutputFormatProviderOutput) {
OutputFormatProvider provider = ((Output.OutputFormatProviderOutput) actualOutput).getOutputFormatProvider();
Map<String, String> conf = new HashMap<>(provider.getOutputFormatConfiguration());
conf.put(TrackingOutputFormat.DELEGATE_CLASS_NAME, provider.getOutputFormatClassName());
provider = new BasicOutputFormatProvider(TrackingOutputFormat.class.getName(), conf);
conf.put(StageTrackingOutputFormat.DELEGATE_CLASS_NAME, provider.getOutputFormatClassName());
conf.put(StageTrackingOutputFormat.WRAPPED_STAGE_NAME, getStageName());
provider = new BasicOutputFormatProvider(StageTrackingOutputFormat.class.getName(), conf);
actualOutput = Output.of(actualOutput.getName(), provider).alias(actualOutput.getAlias());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import io.cdap.cdap.etl.common.PipelineRuntime;
import io.cdap.cdap.etl.proto.v2.spec.StageSpec;
import io.cdap.cdap.etl.spark.SparkSubmitterContext;
import io.cdap.cdap.etl.spark.io.TrackingInputFormat;
import io.cdap.cdap.etl.spark.io.StageTrackingInputFormat;

import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -58,8 +58,9 @@ public void setInput(Input input) {
if (trackableInput instanceof Input.InputFormatProviderInput) {
InputFormatProvider provider = ((Input.InputFormatProviderInput) trackableInput).getInputFormatProvider();
Map<String, String> conf = new HashMap<>(provider.getInputFormatConfiguration());
conf.put(TrackingInputFormat.DELEGATE_CLASS_NAME, provider.getInputFormatClassName());
provider = new BasicInputFormatProvider(TrackingInputFormat.class.getName(), conf);
conf.put(StageTrackingInputFormat.DELEGATE_CLASS_NAME, provider.getInputFormatClassName());
conf.put(StageTrackingInputFormat.WRAPPED_STAGE_NAME, getStageName());
provider = new BasicInputFormatProvider(StageTrackingInputFormat.class.getName(), conf);
trackableInput = Input.of(trackableInput.getName(), provider).alias(trackableInput.getAlias());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2020 Cask Data, Inc.
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
Expand All @@ -16,39 +16,63 @@

package io.cdap.cdap.etl.spark.io;

import io.cdap.cdap.api.exception.WrappedStageException;
import io.cdap.cdap.etl.batch.DelegatingInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;
import java.util.List;

/**
* An {@link InputFormat} that enables metrics tracking through {@link TaskAttemptContext} counters to Spark metrics.
*
* @param <K> type of key to read
* @param <V> type of value to read
*/
public class TrackingInputFormat<K, V> extends DelegatingInputFormat<K, V> {
public class StageTrackingInputFormat<K, V> extends DelegatingInputFormat<K, V> {

public static final String DELEGATE_CLASS_NAME = "io.cdap.pipeline.tracking.input.classname";
public static final String WRAPPED_STAGE_NAME = "io.cdap.pipeline.wrapped.stage.name";

@Override
protected String getDelegateClassNameKey() {
return DELEGATE_CLASS_NAME;
}

@Override
public List<InputSplit> getSplits(JobContext context) {
try {
return getDelegate(context.getConfiguration()).getSplits(context);
} catch (Exception e) {
throw new WrappedStageException(e, getStageName(context.getConfiguration()));
}
}

@Override
public RecordReader<K, V> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
// Spark already tracking metrics for file based input, hence we don't need to track again.
if (split instanceof FileSplit || split instanceof CombineFileSplit) {
return super.createRecordReader(split, context);
TaskAttemptContext context) {
try {
// Spark already tracking metrics for file based input, hence we don't need to track again.
if (split instanceof FileSplit || split instanceof CombineFileSplit) {
return new StageTrackingRecordReader<>(super.createRecordReader(split, context),
getStageName(context.getConfiguration()));
}

return new StageTrackingRecordReader<>(new TrackingRecordReader<>(
super.createRecordReader(split, new TrackingTaskAttemptContext(context))),
getStageName(context.getConfiguration()));
} catch (Exception e) {
throw new WrappedStageException(e, getStageName(context.getConfiguration()));
}
}

return new TrackingRecordReader<>(super.createRecordReader(split, new TrackingTaskAttemptContext(context)));
private String getStageName(Configuration conf) {
return conf.get(WRAPPED_STAGE_NAME);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.etl.spark.io;

import io.cdap.cdap.api.exception.WrappedStageException;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

/**
* A delegating output format that catches exceptions thrown during execution of a call
* and wraps them in a {@link WrappedStageException}.
* This class is primarily used to associate the exception with a specific stage name in a pipeline,
* helping in better debugging and error tracking.
*
* <p>
* The class delegates the actual calling operation to another
* {@link TrackingOutputCommitter} instance and ensures that any exceptions thrown are caught and
* rethrown as a {@link WrappedStageException},which includes the
* stage name where the error occurred.
* </p>
*/
public class StageTrackingOutputCommitter extends OutputCommitter {

private final OutputCommitter delegate;
private final String stageName;

public StageTrackingOutputCommitter(OutputCommitter delegate, String stageName) {
this.delegate = delegate;
this.stageName = stageName;
}

@Override
public void setupJob(JobContext jobContext) {
try {
delegate.setupJob(jobContext);
} catch (Exception e) {
throw new WrappedStageException(e, stageName);
}
}

@Override
public void setupTask(TaskAttemptContext taskAttemptContext) {
try {
delegate.setupTask(taskAttemptContext);
} catch (Exception e) {
throw new WrappedStageException(e, stageName);
}
}

@Override
public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) {
try {
return delegate.needsTaskCommit(taskAttemptContext);
} catch (Exception e) {
throw new WrappedStageException(e, stageName);
}
}

@Override
public void commitTask(TaskAttemptContext taskAttemptContext) {
try {
delegate.commitTask(taskAttemptContext);
} catch (Exception e) {
throw new WrappedStageException(e, stageName);
}
}

@Override
public void abortTask(TaskAttemptContext taskAttemptContext) {
try {
delegate.abortTask(taskAttemptContext);
} catch (Exception e) {
throw new WrappedStageException(e, stageName);
}
}

@Override
public boolean isRecoverySupported() {
return delegate.isRecoverySupported();
}

@Override
public void recoverTask(TaskAttemptContext taskContext) {
try {
delegate.recoverTask(taskContext);
} catch (Exception e) {
throw new WrappedStageException(e, stageName);
}
}
}
Loading

0 comments on commit 21f35d7

Please sign in to comment.