-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Bug]: IcebergIO - Write performance issues #32746
Comments
Hmmm, for streaming mode there should be one RecordWriterManager per bundle so I don't think there should be multiple threads trying to access one instance. But it is weird. If you're writing to one destination with no partitions, there should be exactly one writer per bundle at any given time. Perhaps old writers are not closing properly? Can you check if you have any logs for "Encountered an error when closing data writer..." (line 118)? |
P.S. are you running at Beam HEAD? We recently merged a change that adds a static table cache (#32686), so it should be loaded only once per RecordWriterManager |
Currently running a pipeline to try to repro your error: Map<String, Object> config =
ImmutableMap.<String, Object>builder()
.put("table", table)
.put("catalog_name", "test")
.put("catalog_properties",
ImmutableMap.<String, String>builder()
.put("warehouse", warehouse)
.put("gcp_project", "apache-beam-testing")
.put("gcp_location", "us-central1")
.put("catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog")
.build())
.put("triggering_frequency_seconds", 5)
.build();
Schema taxiSchema =
Schema.builder()
.addStringField("ride_id")
.addInt32Field("point_idx")
.addDoubleField("latitude")
.addDoubleField("longitude")
.addStringField("timestamp")
.addDoubleField("meter_reading")
.addDoubleField("meter_increment")
.addStringField("ride_status")
.addInt32Field("passenger_count")
.build();
Pipeline q = Pipeline.create(options);
q
.apply(PubsubIO.readStrings().fromTopic("projects/pubsub-public-data/topics/taxirides-realtime"))
.apply(JsonToRow.withSchema(taxiSchema))
.apply(Managed.write(Managed.ICEBERG).withConfig(config));
q.run(); Will let it run for some time. The throughput is sitting at ~2k rows per second. |
Strange, because I can see threads writing multiple files with few rows.
I do not have any errors when closing writers.
I am running version 2.59.0. |
I noticed, however, that I have not provided the triggering frequency. I will check if it changes something and let you know. I also found that the writer hangs indefinitely while trying to update the manifest file, but not timing out. Not sure why is that🤔
|
Yep the triggering frequency was one of the recently added features. Streaming writes before 2.60.0 may work in some cases but its rather unpredictable.
This is also one of the improvements -- we drastically reduced the number of manifest files we write. From my experience, the more manifest files we have, the longer it takes to update the next one Let me know how the new pipeline goes! |
I used 2.60.0-SNAPSHOT and a triggering frequency of 60s, but after some time I see the errors again:
However, it looks like it eventually succeeds and one snapshot is produced: |
Not sure if it can be of any help, but I am using the following
|
We're using version I canceled my previous repro attempt because it was healthy for 3+ hours, and attempting another run now with higher throughput and 60s triggering frequency. Any chance you can provide a repro? |
Ahh I'm seeing the error now
And also seeing the same number of these errors: Full stacktrace
|
The high latency warnings can be a little noisy -- they were added in in
Agreed that's weird for just 20 rows/sec. Is this a project-based metric? Are you sure nothing else is writing to GCS at the same time? |
I dropped the table but it's the same situation, still 5 records per file, not sure how to control the number of files per bundle. |
It's always one data file per bundle. The idea is to control the number of (concurrent) bundles
We have a fixed 512MB max file size (ref) |
Hmmm, I'm seeing an old metric that we dropped (
Yes they're similar. The idea is to redistribute data across workers.
This is hard to predict. In general autoscaling reacts to your backlog and throughput, and it may autoscale to more than the number of keys in your Redistribute.
That's a good first step! Let me know how it goes -- honestly you may end up only needing the Redistribute |
Yep, It works! 🥳 However, I'm still uncertain about how My concern is that with a fixed number of buckets defined using At the same time, it's not feasible to skip redistribution entirely, as seen from earlier attempts—the job becomes non-performant and, in some cases, indefinitely stuck without it. In contrast, I would expect |
Great stuff! Glad to see it getting off the ground :)
Yep that's very valid. I was hoping #32612 would take care of this, but looks like we're not quite there yet. In the meantime, for your concern about idle workers, you can always set an upper bound on autoscaling with the |
What happened?
I am trying to stream data from PubSub (with a throughput of 10-50 RPS) into an Iceberg Table (not partitioned) using the IcebergIO connector and Hive Metastore.
However, after some time, I see warning logs in the console like below:
From the thread dump (see threadump.txt) it looks like a significant number of DataflowWorkUnits threads (287) are in the
TIMED_WAITING
state, specifically waiting for a Hive client from the pool.However, I find it surprising that all these threads are attempting to create a new writer each time, which results in the concurrent reload of the table we saw above. Is this behavior expected? I suspect that the performance issues stem from the
RecordWriterManager
class not being thread-safe. Specifically, it appears that a race condition is occurring in this code snippet due to thecheck-then-act
logic:beam/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java
Lines 151 to 161 in 4cf99ea
Indeed, when I stop the job I am prompted with the error message: the evidence that too many writers got created.
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
The text was updated successfully, but these errors were encountered: