Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Failing Test]: Various TPC-DS queries throw NPEs using SparkRunner #28256

Closed
1 of 15 tasks
mosche opened this issue Aug 31, 2023 · 27 comments · Fixed by #29162
Closed
1 of 15 tasks

[Failing Test]: Various TPC-DS queries throw NPEs using SparkRunner #28256

mosche opened this issue Aug 31, 2023 · 27 comments · Fixed by #29162
Assignees
Labels

Comments

@mosche
Copy link
Member

mosche commented Aug 31, 2023

What happened?

Various TPC-DS queries started throwing NPEs with the SparkRunner some while back (see here):

java.lang.NullPointerException
        at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:903)
        at org.apache.beam.sdk.util.WindowedValue$TimestampedWindowedValue.<init>(WindowedValue.java:312)
        at org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow.<init>(WindowedValue.java:329)
        at org.apache.beam.sdk.util.WindowedValue.of(WindowedValue.java:95)

Without looking further into the underlying root cause, this seems to be related to #27617.

Issue Failure

Failure: Test is flaky

Issue Priority

Priority: 2 (backlog / disabled test but we think the product is healthy)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@mosche
Copy link
Member Author

mosche commented Sep 8, 2023

cc @aromanenko-dev

@aromanenko-dev
Copy link
Contributor

aromanenko-dev commented Sep 27, 2023

After a quick investigation with git bisect, I confirm that it was caused by this change #27617 and this is the first bad commit 05305ede45366f158f27fc2b83b9ce00db4df2ab.

Interesting that seems it affects only Spark RDD runner with some types of pipelines, though there are no failures at VR tests.

@aromanenko-dev
Copy link
Contributor

The CLI command to reproduce the issue:

./gradlew :sdks:java:testing:tpcds:run -Ptpcds.runner=":runners:spark:3" -Ptpcds.args=" \
  --runner=SparkRunner \
  --queries=3 \
  --tpcParallel=1 \
  --dataDirectory=/path/to/input/data/ \
  --dataSize=1GB \
  --sourceType=PARQUET \
  --resultsDirectory=/path/to/results/"

@echauchot
Copy link
Contributor

@aromanenko-dev thanks for the root cause analysis. Do you use Beam schemas in TPCDS implementation ?

@aromanenko-dev
Copy link
Contributor

@echauchot Yes, CSV or Parquet schema is converted into Beam schema to be able executed with Beam SQL.

@aromanenko-dev
Copy link
Contributor

Full stacktrace:

23/10/03 15:21:23 ERROR org.apache.spark.executor.Executor: Exception in task 1.0 in stage 9.0 (TID 45)
java.lang.NullPointerException
        at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:903)
        at org.apache.beam.sdk.util.WindowedValue$TimestampedWindowedValue.<init>(WindowedValue.java:312)
        at org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow.<init>(WindowedValue.java:329)
        at org.apache.beam.sdk.util.WindowedValue.of(WindowedValue.java:95)
        at org.apache.beam.runners.spark.translation.SparkCombineFn$SingleWindowWindowedAccumulator.extractOutput(SparkCombineFn.java:251)
        at org.apache.beam.runners.spark.translation.SparkCombineFn.extractOutputStream(SparkCombineFn.java:774)
        at org.apache.beam.runners.spark.translation.TransformTranslator$5.lambda$evaluate$8d6d352$1(TransformTranslator.java:351)
        at org.apache.spark.api.java.JavaPairRDD.$anonfun$flatMapValues$1(JavaPairRDD.scala:680)
        at org.apache.spark.rdd.PairRDDFunctions.$anonfun$flatMapValues$3(PairRDDFunctions.scala:763)
        at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:32)
        at org.apache.beam.runners.spark.translation.MultiDoFnFunction.call(MultiDoFnFunction.java:130)
        at org.apache.beam.runners.spark.translation.MultiDoFnFunction.call(MultiDoFnFunction.java:60)
        at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsToPair$1(JavaRDDLike.scala:186)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)

@echauchot
Copy link
Contributor

yes timestamp is null in some cases

@echauchot
Copy link
Contributor

The CLI command to reproduce the issue:

./gradlew :sdks:java:testing:tpcds:run -Ptpcds.runner=":runners:spark:3" -Ptpcds.args=" \
  --runner=SparkRunner \
  --queries=3 \
  --tpcParallel=1 \
  --dataDirectory=/path/to/input/data/ \
  --dataSize=1GB \
  --sourceType=PARQUET \
  --resultsDirectory=/path/to/results/"

Can be reproduced using CSV input files ?

@aromanenko-dev
Copy link
Contributor

Yes, I have the same issue with CSV files.
Btw, a quick fix to make it working with generated CSV data #28819

@echauchot
Copy link
Contributor

getting a different exception on master using --sourceType=CSV :
Caused by: java.lang.IllegalArgumentException: Expect 28 fields, but actually 29 at org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils.csvLines2BeamRows(BeamTableUtils.java:76) at org.apache.beam.sdk.tpcds.CsvToRow.lambda$expand$43aa1fdf$1(CsvToRow.java:54) at org.apache.beam.sdk.transforms.FlatMapElements$3.processElement(FlatMapElements.java:167)

@echauchot
Copy link
Contributor

ah yes, getting the NPE if I apply #28819

@aromanenko-dev
Copy link
Contributor

CC: @je-ik Since you worked on Group/Combine transform translations for original Spark RDD runner, could you take a look? Is it a Spark runner issue?

@je-ik
Copy link
Contributor

je-ik commented Oct 27, 2023

Oh my, this is an old history. :)
I walked through the code and I actually don't understand why is the accTimestamp set to null if the timestamp should be BoundedWindow.TIMESTAMP_MIN_VALUE.

I created #29162, it seems to pass locally all tests and validatesRunner suites, can you try this patch?

@aromanenko-dev
Copy link
Contributor

aromanenko-dev commented Oct 27, 2023

@je-ik Thanks!

I quickly tested it with a couple of TPC-DS queries that were failing and it passes now.

So, I think if ValidateRunner tests pass, we have to merge this fix. Though, it's strange that this issue was not caught by any of VR test running with SparkRunner

@je-ik
Copy link
Contributor

je-ik commented Oct 27, 2023

Yes, I'd only like to walk the code again to be sure exactly what might be the impact of the fix. Yes, it is strange it was not caught by VR tests. I'll look into it.

@je-ik je-ik self-assigned this Oct 27, 2023
@je-ik
Copy link
Contributor

je-ik commented Oct 30, 2023

I don't have a background about the TPC-DS queries, do we have the input data that I can pass to the gradle command to reproduce the NPEs (the dataDirectory)?

@je-ik
Copy link
Contributor

je-ik commented Oct 30, 2023

Ah, I see, gs://beam-tpcds/datasets/parquet/nonpartitioned.

@je-ik
Copy link
Contributor

je-ik commented Oct 30, 2023

I'm unable to reproduce the error locally. Complete command-line:

./gradlew :sdks:java:testing:tpcds:run -Ptpcds.runner=":runners:spark:3" -Ptpcds.args=" \
  --runner=SparkRunner \
  --queries=3,3,3,3,3,3,3,3,3,3,3 \
  --tpcParallel=1 \
  --dataDirectory=gs://beam-tpcds/datasets/parquet/partitioned/1GB \
  --dataSize=1GB \
  --sourceType=PARQUET \
  --resultsDirectory=/tmp/tpc-ds-results/"

All attempts pass as Successful on current master (c2816c8d97). The same for both partitioned and nonpartitioned versions.

@aromanenko-dev
Copy link
Contributor

aromanenko-dev commented Oct 30, 2023

@je-ik Hmm, interesting. A couple of notes:

  1. Why do you specify in your command several queries that are the same? Only one should be enough.
  2. Could you change an input path to gs://beam-tpcds/datasets/parquet/partitioned/ and rerun? Seems like we have an issue here in Beam TPC-DS runner that have to fail if it doesn't read any records

@je-ik
Copy link
Contributor

je-ik commented Oct 30, 2023

@je-ik Hmm, interesting. A couple of notes:

1. Why do you specify in your command several queries that are the same? Only one should be enough.

Just to re-run the test multiple times to reveal any flakes.

2. Could you change an input path to `gs://beam-tpcds/datasets/parquet/partitioned/` and rerun? Seems like we have an issue here in Beam TPC-DS runner that have to fail if it doesn't read any records

I tried both, same results.

@je-ik
Copy link
Contributor

je-ik commented Oct 30, 2023

Ah, I see. I need to remove the 1GB suffix. Yes, I'll try, thanks!

@aromanenko-dev
Copy link
Contributor

Yes, I'll fix this

@je-ik
Copy link
Contributor

je-ik commented Oct 30, 2023

I'm obviously doing something wrong.

Running the command like this

$ ./gradlew :sdks:java:testing:tpcds:run -Ptpcds.runner=":runners:spark:3" -Ptpcds.args=" \
  --runner=SparkRunner \
  --queries=3 \
  --tpcParallel=1 \
  --dataDirectory=gs://beam-tpcds/datasets/parquet/partitioned/ \
  --dataSize=1GB \
  --sourceType=PARQUET \
  --resultsDirectory=/tmp/tpc-ds-results/"

I get a success

+--------------+------------------------------+--------------+------------+--------------+--------------------------------+--------------------------------+----------------------+
|  Query Name  |           Job Name           |  Data Size   |  Dialect   |    Status    |           Start Time           |            End Time            |  Elapsed Time(sec)   |
+--------------+------------------------------+--------------+------------+--------------+--------------------------------+--------------------------------+----------------------+
|    query3    |  query3result1698672244841   |     1GB      |  Calcite   |  Successful  |  Mon Oct 30 14:24:08 CET 2023  |  Mon Oct 30 14:24:15 CET 2023  |        6.483         |
+--------------+------------------------------+--------------+------------+--------------+--------------------------------+--------------------------------+----------------------+

but the outputs are empty

$ ls -l /tmp/tpc-ds-results/1GB/
total 0
-rw-rw-r-- 1 honza honza 0 Oct 30 14:24 query3result1698672244841-00000-of-00001.txt

Accessing the bucket seems to be working fine, e.g.:

$ gsutil ls -l gs://beam-tpcds/datasets/parquet/partitioned/1GB/catalog_page
         8  2021-03-24T06:03:33Z  gs://beam-tpcds/datasets/parquet/partitioned/1GB/catalog_page/._SUCCESS.crc
      5456  2021-03-24T06:03:33Z  gs://beam-tpcds/datasets/parquet/partitioned/1GB/catalog_page/.part-00000-43e37567-6034-4fae-bda9-db2a85216f3f-c000.snappy.parquet.crc
         0  2021-03-24T06:03:34Z  gs://beam-tpcds/datasets/parquet/partitioned/1GB/catalog_page/_SUCCESS
    697339  2021-03-24T06:03:34Z  gs://beam-tpcds/datasets/parquet/partitioned/1GB/catalog_page/part-00000-43e37567-6034-4fae-bda9-db2a85216f3f-c000.snappy.parquet
TOTAL: 4 objects, 702803 bytes (686.33 KiB)

@aromanenko-dev
Copy link
Contributor

If the results are empty then it's very likely that the input was empty too (for some reasons) - ParquetIO doesn't fail if it didn't find any input files.
Could you try to run it with gs://beam-tpcds/datasets/parquet/nonpartitioned as input path? This is what TPC-DS Jenkins job does.

@je-ik
Copy link
Contributor

je-ik commented Oct 30, 2023

Same results. Runs OK, but empty outputs.

@je-ik
Copy link
Contributor

je-ik commented Oct 30, 2023

Update: I removed the last slash, and it failed! :)

+--------------+------------------------------+--------------+------------+----------+--------------+------------+----------------------+
|  Query Name  |           Job Name           |  Data Size   |  Dialect   |  Status  |  Start Time  |  End Time  |  Elapsed Time(sec)   |
+--------------+------------------------------+--------------+------------+----------+--------------+------------+----------------------+
|    query3    |  query3result1698674602714   |     1GB      |  Calcite   |  Failed  |              |            |                      |
+--------------+------------------------------+--------------+------------+----------+--------------+------------+----------------------+

command-line:

$ ./gradlew :sdks:java:testing:tpcds:run -Ptpcds.runner=":runners:spark:3" -Ptpcds.args=" \
  --runner=SparkRunner \
  --queries=3 \
  --tpcParallel=1 \
  --dataDirectory=gs://beam-tpcds/datasets/parquet/nonpartitioned \
  --dataSize=1GB \
  --sourceType=PARQUET \
  --resultsDirectory=/tmp/tpc-ds-results/"

And I got the NPE:

Caused by: java.lang.NullPointerException
        at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:903)
        at org.apache.beam.sdk.util.WindowedValue$TimestampedWindowedValue.<init>(WindowedValue.java:312)
        at org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow.<init>(WindowedValue.java:329)
        at org.apache.beam.sdk.util.WindowedValue.of(WindowedValue.java:95)

@aromanenko-dev
Copy link
Contributor

I created an issue for that #29198

je-ik added a commit that referenced this issue Oct 31, 2023
@github-actions github-actions bot added this to the 2.52.0 Release milestone Oct 31, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants