Skip to content

Commit

Permalink
Replace synthetic field this$0 by explicit instance (#27868)
Browse files Browse the repository at this point in the history
* Replace synthetic field this$0 by explicit instance

* Add abstract class reference to anonymous class extending it

Explicit outer class instance is hold in abstract class field variable, not in anonymous class

* Test for explicit instance of outer class in MapElements.MapDoFn

Testing both logical conditions in expand() method in MapElements

* Javadoc reviewed
  • Loading branch information
lukaszspyra authored Aug 22, 2023
1 parent 448184e commit a46a35b
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ private String replaceFnString(
String doFnName;
Class<?> enclosingClass = fnClass.getEnclosingClass();
if (enclosingClass != null && enclosingClass.equals(MapElements.class)) {
Field parent = fnClass.getDeclaredField("this$0");
Field parent = fnClass.getSuperclass().getDeclaredField("outer");
parent.setAccessible(true);
Field fnField = enclosingClass.getDeclaredField(fnFieldName);
fnField.setAccessible(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@
*/
package org.apache.beam.runners.spark;

import static org.apache.beam.sdk.transforms.Contextful.fn;
import static org.apache.beam.sdk.transforms.Requirements.requiresSideInputs;
import static org.hamcrest.MatcherAssert.assertThat;

import java.util.Arrays;
import java.util.Collections;
import org.apache.beam.runners.spark.examples.WordCount;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
Expand All @@ -39,12 +43,15 @@
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.hamcrest.Matchers;
Expand Down Expand Up @@ -160,6 +167,40 @@ public void debugStreamingPipeline() {
Matchers.equalTo(expectedPipeline));
}

@Test
public void debugBatchPipelineWithContextfulTransform() {
PipelineOptions options = contextRule.configure(PipelineOptionsFactory.create());
options.setRunner(SparkRunnerDebugger.class);
Pipeline pipeline = Pipeline.create(options);

final PCollectionView<Integer> view =
pipeline.apply("Dummy", Create.of(0)).apply(View.asSingleton());

pipeline
.apply(Create.of(Arrays.asList(0)))
.setCoder(VarIntCoder.of())
.apply(
MapElements.into(new TypeDescriptor<Integer>() {})
.via(fn((element, c) -> element, requiresSideInputs(view))));

SparkRunnerDebugger.DebugSparkPipelineResult result =
(SparkRunnerDebugger.DebugSparkPipelineResult) pipeline.run();

final String expectedPipeline =
"sparkContext.<impulse>()\n"
+ "_.mapPartitions(new org.apache.beam.sdk.transforms.Create$Values$2())\n"
+ "_.aggregate(..., new org.apache.beam.sdk.transforms.View$SingletonCombineFn(), ...)\n"
+ "_.<createPCollectionView>\n"
+ "sparkContext.<impulse>()\n"
+ "_.mapPartitions(new org.apache.beam.sdk.transforms.Create$Values$2())\n"
+ "_.mapPartitions(new org.apache.beam.sdk.transforms.Contextful())";

assertThat(
"Debug pipeline did not equal expected",
result.getDebugString(),
Matchers.equalTo(expectedPipeline));
}

private static class FormatKVFn extends DoFn<KV<String, String>, String> {
@SuppressWarnings("unused")
@ProcessElement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ public void processElement(

/** A DoFn implementation that handles a trivial map call. */
private abstract class MapDoFn extends DoFn<InputT, OutputT> {

/** Holds {@link MapDoFn#outer instance} of enclosing class, used by runner implementations. */
final MapElements<InputT, OutputT> outer = MapElements.this;

@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder.delegate(MapElements.this);
Expand Down

0 comments on commit a46a35b

Please sign in to comment.