From 4ebbaca174df09804075ac729736f0be3d76f819 Mon Sep 17 00:00:00 2001 From: itsankit-google Date: Tue, 1 Oct 2024 16:18:08 +0000 Subject: [PATCH] Wrap input/output format methods to throw wrapped stage exception --- .../cdap/etl/batch/DelegatingInputFormat.java | 26 +++-- .../etl/batch/DelegatingOutputFormat.java | 25 ++++- .../spark/batch/SparkBatchSinkContext.java | 7 +- .../spark/batch/SparkBatchSourceContext.java | 7 +- .../spark/io/StageTrackingInputFormat.java | 79 +++++++++++++ .../io/StageTrackingOutputCommitter.java | 105 ++++++++++++++++++ .../spark/io/StageTrackingOutputFormat.java | 87 +++++++++++++++ .../spark/io/StageTrackingRecordReader.java | 104 +++++++++++++++++ .../spark/io/StageTrackingRecordWriter.java | 64 +++++++++++ .../etl/spark/io/TrackingInputFormat.java | 54 --------- .../etl/spark/io/TrackingOutputFormat.java | 59 ---------- ...java => StageTrackingInputFormatTest.java} | 23 ++-- .../io/StageTrackingOutputFormatTest.java | 98 ++++++++++++++++ 13 files changed, 597 insertions(+), 141 deletions(-) create mode 100644 cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingInputFormat.java create mode 100644 cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingOutputCommitter.java create mode 100644 cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingOutputFormat.java create mode 100644 cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingRecordReader.java create mode 100644 cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingRecordWriter.java delete mode 100644 cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/TrackingInputFormat.java delete mode 100644 cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/TrackingOutputFormat.java rename cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/test/java/io/cdap/cdap/etl/spark/io/{TrackingInputFormatTest.java => StageTrackingInputFormatTest.java} (72%) create mode 100644 cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/test/java/io/cdap/cdap/etl/spark/io/StageTrackingOutputFormatTest.java diff --git a/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/batch/DelegatingInputFormat.java b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/batch/DelegatingInputFormat.java index ed6fd8fd9b88..ec184b1cd96e 100644 --- a/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/batch/DelegatingInputFormat.java +++ b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/batch/DelegatingInputFormat.java @@ -16,6 +16,10 @@ package io.cdap.cdap.etl.batch; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import java.io.IOException; import java.util.List; import org.apache.hadoop.conf.Configurable; @@ -55,17 +59,21 @@ public RecordReader createRecordReader(InputSplit split, * Returns the delegating {@link InputFormat} based on the current configuration. * * @param conf the Hadoop {@link Configuration} for this input format - * @throws IOException if failed to instantiate the input format class */ - protected final InputFormat getDelegate(Configuration conf) throws IOException { + protected final InputFormat getDelegate(Configuration conf) { String delegateClassName = conf.get(getDelegateClassNameKey()); if (delegateClassName == null) { - throw new IllegalArgumentException("Missing configuration " + getDelegateClassNameKey() - + " for the InputFormat to use"); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), + String.format("Missing configuration '%s' for the InputFormat to use.", + getDelegateClassNameKey()), String.format("Please provide correct configuration for" + + "delegate InputFormat class key '%s'.", getDelegateClassNameKey()), + ErrorType.SYSTEM, false, null); } if (delegateClassName.equals(getClass().getName())) { - throw new IllegalArgumentException( - "Cannot delegate InputFormat to the same class " + delegateClassName); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), + String.format("Cannot delegate InputFormat to the same class '%s'.", delegateClassName), + String.format("Please provide correct configuration for delegate " + + "InputFormat class name '%s'.", delegateClassName), ErrorType.SYSTEM, false, null); } try { //noinspection unchecked @@ -76,8 +84,10 @@ protected final InputFormat getDelegate(Configuration conf) throws IOExcep ((Configurable) inputFormat).setConf(conf); } return inputFormat; - } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { - throw new IOException("Unable to instantiate delegate input format " + delegateClassName, e); + } catch (Exception e) { + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), + String.format("Unable to instantiate delegate input format class '%s'.", + delegateClassName), e.getMessage(), ErrorType.SYSTEM, false, e); } } } diff --git a/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/batch/DelegatingOutputFormat.java b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/batch/DelegatingOutputFormat.java index d13e3c1f7300..d2a01ec5cef3 100644 --- a/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/batch/DelegatingOutputFormat.java +++ b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/batch/DelegatingOutputFormat.java @@ -16,6 +16,10 @@ package io.cdap.cdap.etl.batch; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import java.io.IOException; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; @@ -56,10 +60,21 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context) * Returns the delegating {@link OutputFormat} based on the configuration. * * @param conf the Hadoop {@link Configuration} for this output format - * @throws IOException if failed to instantiate the output format class */ - protected final OutputFormat getDelegate(Configuration conf) throws IOException { + protected final OutputFormat getDelegate(Configuration conf) { String delegateClassName = conf.get(DELEGATE_CLASS_NAME); + if (delegateClassName == null) { + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), + String.format("Missing configuration '%s' for the OutputFormat to use.", + DELEGATE_CLASS_NAME), String.format("Please provide correct configuration for delegate " + + "OutputFormat class key '%s'.", DELEGATE_CLASS_NAME), ErrorType.SYSTEM, false, null); + } + if (delegateClassName.equals(getClass().getName())) { + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), + String.format("Cannot delegate OutputFormat to the same class '%s'.", delegateClassName), + String.format("Please provide correct configuration for delegate " + + "OutputFormat class name '%s'.", delegateClassName), ErrorType.SYSTEM, false, null); + } try { //noinspection unchecked OutputFormat outputFormat = (OutputFormat) conf.getClassLoader() @@ -69,8 +84,10 @@ protected final OutputFormat getDelegate(Configuration conf) throws IOExce ((Configurable) outputFormat).setConf(conf); } return outputFormat; - } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { - throw new IOException("Unable to instantiate delegate output format " + delegateClassName, e); + } catch (Exception e) { + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), + String.format("Unable to instantiate delegate output format class '%s'.", + delegateClassName), e.getMessage(), ErrorType.SYSTEM, false, e); } } } diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/SparkBatchSinkContext.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/SparkBatchSinkContext.java index 3fcdfe411f68..72de9e08ebd9 100644 --- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/SparkBatchSinkContext.java +++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/SparkBatchSinkContext.java @@ -27,7 +27,7 @@ import io.cdap.cdap.etl.batch.preview.NullOutputFormatProvider; import io.cdap.cdap.etl.common.PipelineRuntime; import io.cdap.cdap.etl.proto.v2.spec.StageSpec; -import io.cdap.cdap.etl.spark.io.TrackingOutputFormat; +import io.cdap.cdap.etl.spark.io.StageTrackingOutputFormat; import java.util.HashMap; import java.util.Map; @@ -62,8 +62,9 @@ public void addOutput(Output output) { if (actualOutput instanceof Output.OutputFormatProviderOutput) { OutputFormatProvider provider = ((Output.OutputFormatProviderOutput) actualOutput).getOutputFormatProvider(); Map conf = new HashMap<>(provider.getOutputFormatConfiguration()); - conf.put(TrackingOutputFormat.DELEGATE_CLASS_NAME, provider.getOutputFormatClassName()); - provider = new BasicOutputFormatProvider(TrackingOutputFormat.class.getName(), conf); + conf.put(StageTrackingOutputFormat.DELEGATE_CLASS_NAME, provider.getOutputFormatClassName()); + conf.put(StageTrackingOutputFormat.WRAPPED_STAGE_NAME, getStageName()); + provider = new BasicOutputFormatProvider(StageTrackingOutputFormat.class.getName(), conf); actualOutput = Output.of(actualOutput.getName(), provider).alias(actualOutput.getAlias()); } diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/SparkBatchSourceContext.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/SparkBatchSourceContext.java index 962642c0c9e9..5e05b4220b00 100644 --- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/SparkBatchSourceContext.java +++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/SparkBatchSourceContext.java @@ -27,7 +27,7 @@ import io.cdap.cdap.etl.common.PipelineRuntime; import io.cdap.cdap.etl.proto.v2.spec.StageSpec; import io.cdap.cdap.etl.spark.SparkSubmitterContext; -import io.cdap.cdap.etl.spark.io.TrackingInputFormat; +import io.cdap.cdap.etl.spark.io.StageTrackingInputFormat; import java.util.HashMap; import java.util.Map; @@ -58,8 +58,9 @@ public void setInput(Input input) { if (trackableInput instanceof Input.InputFormatProviderInput) { InputFormatProvider provider = ((Input.InputFormatProviderInput) trackableInput).getInputFormatProvider(); Map conf = new HashMap<>(provider.getInputFormatConfiguration()); - conf.put(TrackingInputFormat.DELEGATE_CLASS_NAME, provider.getInputFormatClassName()); - provider = new BasicInputFormatProvider(TrackingInputFormat.class.getName(), conf); + conf.put(StageTrackingInputFormat.DELEGATE_CLASS_NAME, provider.getInputFormatClassName()); + conf.put(StageTrackingInputFormat.WRAPPED_STAGE_NAME, getStageName()); + provider = new BasicInputFormatProvider(StageTrackingInputFormat.class.getName(), conf); trackableInput = Input.of(trackableInput.getName(), provider).alias(trackableInput.getAlias()); } diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingInputFormat.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingInputFormat.java new file mode 100644 index 000000000000..48d74506552e --- /dev/null +++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingInputFormat.java @@ -0,0 +1,79 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.etl.spark.io; + +import io.cdap.cdap.api.exception.WrappedStageException; +import io.cdap.cdap.etl.batch.DelegatingInputFormat; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +import java.io.IOException; +import java.util.List; + +/** + * An {@link InputFormat} that enables metrics tracking through {@link TaskAttemptContext} counters to Spark metrics. + * + * @param type of key to read + * @param type of value to read + */ +public class StageTrackingInputFormat extends DelegatingInputFormat { + + public static final String DELEGATE_CLASS_NAME = "io.cdap.pipeline.tracking.input.classname"; + public static final String WRAPPED_STAGE_NAME = "io.cdap.pipeline.wrapped.stage.name"; + + @Override + protected String getDelegateClassNameKey() { + return DELEGATE_CLASS_NAME; + } + + @Override + public List getSplits(JobContext context) { + try { + return getDelegate(context.getConfiguration()).getSplits(context); + } catch (Exception e) { + throw new WrappedStageException(e, getStageName(context.getConfiguration())); + } + } + + @Override + public RecordReader createRecordReader(InputSplit split, + TaskAttemptContext context) throws IOException, InterruptedException { + try { + // Spark already tracking metrics for file based input, hence we don't need to track again. + if (split instanceof FileSplit || split instanceof CombineFileSplit) { + return new StageTrackingRecordReader<>(super.createRecordReader(split, context), + getStageName(context.getConfiguration())); + } + + return new StageTrackingRecordReader<>(new TrackingRecordReader<>( + super.createRecordReader(split, new TrackingTaskAttemptContext(context))), + getStageName(context.getConfiguration())); + } catch (Exception e) { + throw new WrappedStageException(e, getStageName(context.getConfiguration())); + } + } + + private String getStageName(Configuration conf) { + return conf.get(WRAPPED_STAGE_NAME); + } +} diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingOutputCommitter.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingOutputCommitter.java new file mode 100644 index 000000000000..8fe6cf166eae --- /dev/null +++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingOutputCommitter.java @@ -0,0 +1,105 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.etl.spark.io; + +import io.cdap.cdap.api.exception.WrappedStageException; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * A delegating output format that catches exceptions thrown during execution of a call + * and wraps them in a {@link WrappedStageException}. + * This class is primarily used to associate the exception with a specific stage name in a pipeline, + * helping in better debugging and error tracking. + * + *

+ * The class delegates the actual calling operation to another + * {@link TrackingOutputCommitter} instance and ensures that any exceptions thrown are caught and + * rethrown as a {@link WrappedStageException},which includes the + * stage name where the error occurred. + *

+ */ +public class StageTrackingOutputCommitter extends OutputCommitter { + + private final OutputCommitter delegate; + private final String stageName; + + public StageTrackingOutputCommitter(OutputCommitter delegate, String stageName) { + this.delegate = delegate; + this.stageName = stageName; + } + + @Override + public void setupJob(JobContext jobContext) { + try { + delegate.setupJob(jobContext); + } catch (Exception e) { + throw new WrappedStageException(e, stageName); + } + } + + @Override + public void setupTask(TaskAttemptContext taskAttemptContext) { + try { + delegate.setupTask(taskAttemptContext); + } catch (Exception e) { + throw new WrappedStageException(e, stageName); + } + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) { + try { + return delegate.needsTaskCommit(taskAttemptContext); + } catch (Exception e) { + throw new WrappedStageException(e, stageName); + } + } + + @Override + public void commitTask(TaskAttemptContext taskAttemptContext) { + try { + delegate.commitTask(taskAttemptContext); + } catch (Exception e) { + throw new WrappedStageException(e, stageName); + } + } + + @Override + public void abortTask(TaskAttemptContext taskAttemptContext) { + try { + delegate.abortTask(taskAttemptContext); + } catch (Exception e) { + throw new WrappedStageException(e, stageName); + } + } + + @Override + public boolean isRecoverySupported() { + return delegate.isRecoverySupported(); + } + + @Override + public void recoverTask(TaskAttemptContext taskContext) { + try { + delegate.recoverTask(taskContext); + } catch (Exception e) { + throw new WrappedStageException(e, stageName); + } + } +} diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingOutputFormat.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingOutputFormat.java new file mode 100644 index 000000000000..2ff29f9702c7 --- /dev/null +++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingOutputFormat.java @@ -0,0 +1,87 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.etl.spark.io; + +import io.cdap.cdap.etl.batch.DelegatingOutputFormat; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +import java.io.IOException; + +/** + * An {@link OutputFormat} that enables metrics tracking through {@link TaskAttemptContext} + * counters to Spark metrics. + * + * @param type of key to write + * @param type of value to write + */ +public class StageTrackingOutputFormat extends DelegatingOutputFormat { + + @Override + public RecordWriter getRecordWriter(TaskAttemptContext context) { + OutputFormat delegate = getDelegate(context.getConfiguration()); + try { + // Spark already emitting bytes written metrics for file base output, + // hence we don't want to double count + if (delegate instanceof FileOutputFormat) { + return new StageTrackingRecordWriter<>(delegate.getRecordWriter(context), + getStageName(context.getConfiguration())); + } + + return new StageTrackingRecordWriter<>( + new TrackingRecordWriter(delegate.getRecordWriter(new TrackingTaskAttemptContext(context))), + getStageName(context.getConfiguration())); + } catch (Exception e) { + throw new WrappedStageException(e, getStageName(context.getConfiguration())); + } + } + + @Override + public void checkOutputSpecs(JobContext context) { + try { + getDelegate(context.getConfiguration()).checkOutputSpecs(context); + } catch (Exception e) { + throw new WrappedStageException(e, getStageName(context.getConfiguration())); + } + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) { + OutputFormat delegate = getDelegate(context.getConfiguration()); + try { + // Spark already emitting bytes written metrics for file base output, + // hence we don't want to double count + if (delegate instanceof FileOutputFormat) { + return new StageTrackingOutputCommitter(delegate.getOutputCommitter(context), + getStageName(context.getConfiguration())); + } + + return new StageTrackingOutputCommitter(new TrackingOutputCommitter( + delegate.getOutputCommitter(new TrackingTaskAttemptContext(context))), + getStageName(context.getConfiguration())); + } catch (Exception e) { + throw new WrappedStageException(e, getStageName(context.getConfiguration())); + } + } + + private String getStageName(Configuration conf) { + return conf.get(WRAPPED_STAGE_NAME); + } +} diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingRecordReader.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingRecordReader.java new file mode 100644 index 000000000000..f5232103f962 --- /dev/null +++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingRecordReader.java @@ -0,0 +1,104 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.etl.spark.io; + +import io.cdap.cdap.api.exception.WrappedStageException; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; + +/** + * A delegating record writer that catches exceptions thrown during execution of a call + * and wraps them in a {@link WrappedStageException}. + * This class is primarily used to associate the exception with a specific stage name in a pipeline, + * helping in better debugging and error tracking. + * + *

+ * The class delegates the actual calling operation to another {@link TrackingRecordReader} instance + * and ensures that any exceptions thrown are caught and rethrown as a {@link WrappedStageException} + * , which includes the stage name where the error occurred. + *

+ * + * @param type of key to read + * @param type of value to read + */ +public class StageTrackingRecordReader extends RecordReader { + + private final RecordReader delegate; + private final String stageName; + + public StageTrackingRecordReader(RecordReader delegate, String stageName) { + this.delegate = delegate; + this.stageName = stageName; + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) { + try { + delegate.initialize(split, new TrackingTaskAttemptContext(context)); + } catch (Exception e) { + throw new WrappedStageException(e, stageName); + } + } + + @Override + public boolean nextKeyValue() { + try { + return delegate.nextKeyValue(); + } catch (Exception e) { + throw new WrappedStageException(e, stageName); + } + } + + @Override + public K getCurrentKey() { + try { + return delegate.getCurrentKey(); + } catch (Exception e) { + throw new WrappedStageException(e, stageName); + } + } + + @Override + public V getCurrentValue() { + try { + return delegate.getCurrentValue(); + } catch (Exception e) { + throw new WrappedStageException(e, stageName); + } + } + + @Override + public float getProgress() { + try { + return delegate.getProgress(); + } catch (Exception e) { + throw new WrappedStageException(e, stageName); + } + } + + @Override + public void close() { + try { + delegate.close(); + } catch (Exception e) { + throw new WrappedStageException(e, stageName); + } + } +} diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingRecordWriter.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingRecordWriter.java new file mode 100644 index 000000000000..377f93db604d --- /dev/null +++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/StageTrackingRecordWriter.java @@ -0,0 +1,64 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.etl.spark.io; + +import io.cdap.cdap.api.exception.WrappedStageException; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * A delegating record writer that catches exceptions thrown during execution of a call + * and wraps them in a {@link WrappedStageException}. + * This class is primarily used to associate the exception with a specific stage name in a pipeline, + * helping in better debugging and error tracking. + * + *

+ * The class delegates the actual calling operation to another {@link TrackingRecordWriter} instance + * and ensures that any exceptions thrown are caught and rethrown as a {@link WrappedStageException} + * , which includes the stage name where the error occurred. + *

+ * + * @param type of key to write + * @param type of value to write + */ +public class StageTrackingRecordWriter extends RecordWriter { + private final RecordWriter delegate; + private final String stageName; + + public StageTrackingRecordWriter(RecordWriter delegate, String stageName) { + this.delegate = delegate; + this.stageName = stageName; + } + + @Override + public void write(K k, V v) { + try { + delegate.write(k, v); + } catch (Exception e) { + throw new WrappedStageException(e, stageName); + } + } + + @Override + public void close(TaskAttemptContext taskAttemptContext) { + try { + delegate.close(taskAttemptContext); + } catch (Exception e) { + throw new WrappedStageException(e, stageName); + } + } +} diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/TrackingInputFormat.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/TrackingInputFormat.java deleted file mode 100644 index d768a89c9537..000000000000 --- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/TrackingInputFormat.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright © 2020 Cask Data, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package io.cdap.cdap.etl.spark.io; - -import io.cdap.cdap.etl.batch.DelegatingInputFormat; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; -import org.apache.hadoop.mapreduce.lib.input.FileSplit; - -import java.io.IOException; - -/** - * An {@link InputFormat} that enables metrics tracking through {@link TaskAttemptContext} counters to Spark metrics. - * - * @param type of key to read - * @param type of value to read - */ -public class TrackingInputFormat extends DelegatingInputFormat { - - public static final String DELEGATE_CLASS_NAME = "io.cdap.pipeline.tracking.input.classname"; - - @Override - protected String getDelegateClassNameKey() { - return DELEGATE_CLASS_NAME; - } - - @Override - public RecordReader createRecordReader(InputSplit split, - TaskAttemptContext context) throws IOException, InterruptedException { - // Spark already tracking metrics for file based input, hence we don't need to track again. - if (split instanceof FileSplit || split instanceof CombineFileSplit) { - return super.createRecordReader(split, context); - } - - return new TrackingRecordReader<>(super.createRecordReader(split, new TrackingTaskAttemptContext(context))); - } -} diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/TrackingOutputFormat.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/TrackingOutputFormat.java deleted file mode 100644 index f7b9f716cbc0..000000000000 --- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/io/TrackingOutputFormat.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright © 2020 Cask Data, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package io.cdap.cdap.etl.spark.io; - -import io.cdap.cdap.etl.batch.DelegatingOutputFormat; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; - -import java.io.IOException; - -/** - * An {@link OutputFormat} that enables metrics tracking through {@link TaskAttemptContext} counters to Spark metrics. - * - * @param type of key to write - * @param type of value to write - */ -public class TrackingOutputFormat extends DelegatingOutputFormat { - - @Override - public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { - OutputFormat delegate = getDelegate(context.getConfiguration()); - - // Spark already emitting bytes written metrics for file base output, hence we don't want to double count - if (delegate instanceof FileOutputFormat) { - return delegate.getRecordWriter(context); - } - - return new TrackingRecordWriter<>(delegate.getRecordWriter(new TrackingTaskAttemptContext(context))); - } - - @Override - public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { - OutputFormat delegate = getDelegate(context.getConfiguration()); - - // Spark already emitting bytes written metrics for file base output, hence we don't want to double count - if (delegate instanceof FileOutputFormat) { - return delegate.getOutputCommitter(context); - } - - return new TrackingOutputCommitter(delegate.getOutputCommitter(new TrackingTaskAttemptContext(context))); - } -} diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/test/java/io/cdap/cdap/etl/spark/io/TrackingInputFormatTest.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/test/java/io/cdap/cdap/etl/spark/io/StageTrackingInputFormatTest.java similarity index 72% rename from cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/test/java/io/cdap/cdap/etl/spark/io/TrackingInputFormatTest.java rename to cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/test/java/io/cdap/cdap/etl/spark/io/StageTrackingInputFormatTest.java index 633401748bf9..d7b9c83ffe2f 100644 --- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/test/java/io/cdap/cdap/etl/spark/io/TrackingInputFormatTest.java +++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/test/java/io/cdap/cdap/etl/spark/io/StageTrackingInputFormatTest.java @@ -1,5 +1,5 @@ /* - * Copyright © 2020 Cask Data, Inc. + * Copyright © 2024 Cask Data, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -16,6 +16,7 @@ package io.cdap.cdap.etl.spark.io; +import io.cdap.cdap.api.exception.ProgramFailureException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.InputFormat; @@ -33,9 +34,9 @@ import java.util.List; /** - * Unit tests for {@link TrackingInputFormat} class. + * Unit tests for {@link StageTrackingInputFormat} class. */ -public class TrackingInputFormatTest { +public class StageTrackingInputFormatTest { @ClassRule public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(); @@ -46,31 +47,33 @@ public void testDelegate() throws IOException, InterruptedException { Files.createFile(inputDir.toPath().resolve("test")); Configuration hConf = new Configuration(); - hConf.setClass(TrackingInputFormat.DELEGATE_CLASS_NAME, TextInputFormat.class, InputFormat.class); + hConf.setClass(StageTrackingInputFormat.DELEGATE_CLASS_NAME, + TextInputFormat.class, InputFormat.class); Job job = Job.getInstance(hConf); TextInputFormat.addInputPath(job, new Path(inputDir.toURI())); - TrackingInputFormat inputFormat = new TrackingInputFormat(); + StageTrackingInputFormat inputFormat = new StageTrackingInputFormat(); List splits = inputFormat.getSplits(job); Assert.assertEquals(1, splits.size()); } - @Test (expected = IllegalArgumentException.class) + @Test (expected = ProgramFailureException.class) public void testMissingDelegate() throws IOException, InterruptedException { Configuration hConf = new Configuration(); Job job = Job.getInstance(hConf); - TrackingInputFormat inputFormat = new TrackingInputFormat(); + StageTrackingInputFormat inputFormat = new StageTrackingInputFormat(); inputFormat.getSplits(job); } - @Test (expected = IllegalArgumentException.class) + @Test (expected = ProgramFailureException.class) public void testSelfDelegate() throws IOException, InterruptedException { Configuration hConf = new Configuration(); - hConf.setClass(TrackingInputFormat.DELEGATE_CLASS_NAME, TrackingInputFormat.class, InputFormat.class); + hConf.setClass(StageTrackingInputFormat.DELEGATE_CLASS_NAME, StageTrackingInputFormat.class, + InputFormat.class); Job job = Job.getInstance(hConf); - TrackingInputFormat inputFormat = new TrackingInputFormat(); + StageTrackingInputFormat inputFormat = new StageTrackingInputFormat(); inputFormat.getSplits(job); } } diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/test/java/io/cdap/cdap/etl/spark/io/StageTrackingOutputFormatTest.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/test/java/io/cdap/cdap/etl/spark/io/StageTrackingOutputFormatTest.java new file mode 100644 index 000000000000..42d708c296b3 --- /dev/null +++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/test/java/io/cdap/cdap/etl/spark/io/StageTrackingOutputFormatTest.java @@ -0,0 +1,98 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.etl.spark.io; + +import io.cdap.cdap.api.exception.ProgramFailureException; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.TaskAttemptContextImpl; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** + * Unit tests for {@link StageTrackingOutputFormat} class. + */ +public class StageTrackingOutputFormatTest { + + @ClassRule + public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(); + + @Test + public void testDelegate() throws IOException, InterruptedException { + File outputDir = TEMP_FOLDER.newFolder(); + Files.createFile(outputDir.toPath().resolve("test")); + + Configuration hConf = new Configuration(); + hConf.setClass(StageTrackingOutputFormat.DELEGATE_CLASS_NAME, + TextOutputFormat.class, OutputFormat.class); + + Job job = Job.getInstance(hConf); + + TextOutputFormat.setOutputPath(job, new Path(outputDir.toURI())); + + StageTrackingOutputFormat outputFormat = new StageTrackingOutputFormat(); + outputFormat.checkOutputSpecs(job); + + TaskAttemptID taskAttemptID = new TaskAttemptID(); + TaskAttemptContextImpl taskAttemptContext = + new TaskAttemptContextImpl((JobConf) hConf, taskAttemptID); + RecordWriter writer = outputFormat.getRecordWriter(taskAttemptContext); + writer.write(NullWritable.get(), new Text("Test Record")); + writer.close(taskAttemptContext); + + // Validate the output written to the file + File writtenFile = new File(outputDir, "part-r-00000"); + Assert.assertTrue(writtenFile.exists()); + List lines = Files.readAllLines(writtenFile.toPath()); + Assert.assertEquals(1, lines.size()); + Assert.assertEquals("Test Record", lines.get(0)); + } + + @Test (expected = ProgramFailureException.class) + public void testMissingDelegate() throws IOException { + Configuration hConf = new Configuration(); + Job job = Job.getInstance(hConf); + StageTrackingOutputFormat outputFormat = new StageTrackingOutputFormat(); + outputFormat.checkOutputSpecs(job); + } + + @Test (expected = ProgramFailureException.class) + public void testSelfDelegate() throws IOException { + Configuration hConf = new Configuration(); + hConf.setClass(StageTrackingInputFormat.DELEGATE_CLASS_NAME, StageTrackingInputFormat.class, + InputFormat.class); + + Job job = Job.getInstance(hConf); + StageTrackingOutputFormat outputFormat = new StageTrackingOutputFormat(); + outputFormat.checkOutputSpecs(job); + } +}