Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: BigqueryIO is very slow if using storage api and dynamic destination to write data to over thousand different tables with high data skew #32508

Open
2 of 17 tasks
ns-shua opened this issue Sep 19, 2024 · 15 comments

Comments

@ns-shua
Copy link

ns-shua commented Sep 19, 2024

What happened?

I'm trying to use BigqueryIO and use the Storage API as suggested in at least once mode(both pipeline and IO) My requirement is to write data to over thousand table in different projects. And the data is highly skews the top 10 tables could take 80% of the traffic. I observe the pipeline becomes super slow and CPU utilization is almost always below 30%. I think it is the data skew problem. But our data is logically partitioned in that way that I have no control of it. I tried to write same volume to data to single table(all the tables are in same schema). It perform very well even with 1/4 of the machines. The document claims DynamicDestination should perform as good as single destination. Is there any performance issue or is there any suggestions?

Here is the code I use to write to different table

BigQueryIO.<KV<TopicMetadata, T>>write()
        .withFormatFunction(...)
        .withoutValidation()
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
        .optimizedWrites()
        .withClustering()
        .to(
            new SerializableFunction<..>() {...} // Here I tried both SerializableFunction and DynamicDestination class
         );

This code perform much much worse than

BigQueryIO.<KV<TopicMetadata, T>>write()
        .withFormatFunction(...)
        .withoutValidation()
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
        .optimizedWrites()
        .withClustering()
        .to(
            "project_all.example_dataset.alldata"
         );

with same amount of data

Writing to different tables the CPU usage is constantly below 30% while writing to single table CPU usage is constantly near 100%

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@liferoad
Copy link
Collaborator

Have you tried to profile the pipeline to figure out some potential issues?
cc @ahmedabu98

@ns-shua
Copy link
Author

ns-shua commented Sep 20, 2024

@liferoad There are some upstream transform I could improve but it has nothing to do with the bigquery write. The only difference in code is writing to one table or writing to many tables

@liferoad
Copy link
Collaborator

liferoad commented Sep 20, 2024

Added the dev list thread here: https://lists.apache.org/thread/gz5zhnworvcjog0o4g96lsqbw5tz6y03
@ns-shua -shua Have you opened a customer support ticket for Dataflow? It will be helpful to check your Dataflow jobs.

@ahmedabu98
Copy link
Contributor

@ns-shua
Copy link
Author

ns-shua commented Sep 21, 2024

@ahmedabu98
I believe if i don't use connection pool, writing to one table won't work. So yes I've set it to true
@liferoad
I asked in mailing channel also I created support ticket but so far I got 0 useful help or tip. They mentioned they found a hotkey? I'm not sure. Can you explain if the data volume is high skewed among all the tables what would auto sharding behave, does it create more workers for hot tables?

@liferoad
Copy link
Collaborator

What is your support ticket number? Is this streaming or batch?

@ns-shua
Copy link
Author

ns-shua commented Sep 21, 2024

@liferoad Case 53209037
I'm confused by the memory dump, I do see a lot of StorageApiWriteUnshardedRecords but I have withAutosharding() in my code

@ns-shua
Copy link
Author

ns-shua commented Sep 21, 2024

It is streaming at least once mode

@liferoad
Copy link
Collaborator

Can you share the latest entire code if possible? From the ticket, it seems the job with withAutosharding does not scale down.

@ns-shua
Copy link
Author

ns-shua commented Sep 22, 2024

@liferoad The latest entire code is a little bit complicated, I can give you a simplified version for the bigquery write part

PCollection<KV<TenantMetadata, Row>> data = .... // here the TenantMetadata has table destination information
BigQueryIO.<KV<TopicMetadata, Row>>write()
           .withFormatFunction(
                    elem -> BigQueryUtils.toTableRow(row(elem.getValue()))
           .withMethod(BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE).
           .withoutValidation()
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
        .optimizedWrites()
        .withClustering()
        .withAutoSharding()
       .withFailedInsertRetryPolicy(retryTransientErrors()
       .to(tenantProjectDestinations) // refer to the TenantProjectDestination function, because all the tables have identical schema so I choose the to(TableFunction) method
);

public class TenantProjectDestinations<T> implements
    SerializableFunction<ValueInSingleWindow<KV<TopicMetadata, T>>, TableDestination> {

    private static final Map<TopicMetadata, TableDestination> destinationCache = Maps.newHashMap();

    private final ValueProvider<String> tableId;

    public static <TableRowType> TenantProjectDestinations<TableRowType> of(
            ValueProvider<String> tableId) {
        return new TenantProjectDestinations<>(tableId);
    }

    private TenantProjectDestinations(
            ValueProvider<String> tableId) {
        this.tableId = tableId;
    }

    @Override
    public TableDestination apply(ValueInSingleWindow<KV<TopicMetadata, T>> input) {
      assert input != null;
      TopicMetadata k = Objects.requireNonNull(input.getValue()).getKey();
        destinationCache.computeIfAbsent(k, topicMetadata -> new TableDestination(
            new TableReference()
                .setProjectId(k.getProjectId())
                .setDatasetId(k.getDatasetId())
                .setTableId(tableId.get()),
            "TenantTable"));
        return destinationCache.get(k);
    }
}

@ns-shua
Copy link
Author

ns-shua commented Sep 22, 2024

@liferoad one thing I'm not sure is how do we get the schema of the table? From the internal implementation it looks like it cache the table schema by TableDestination, so it will get schema for each table once? If I use withSchema(...) Would it improve the performance. Thanks!
Also I'm confused when you say autoShading doesn't scale down. It looks like to me writing to BQ is the bottleneck, it could write at the speed of the upstream transform but most workers has CPU usage less than 50%

@ns-shua
Copy link
Author

ns-shua commented Sep 22, 2024

One thing I notice that doesn't look right to me is the memory usage keeps going up. I enabled the profiler and I saw a lot of message used by TwoLevelMessageConverterCache

@ns-shua
Copy link
Author

ns-shua commented Sep 22, 2024

@liferoad And in diagnostic errors table I see a lot of

WARNING 2024-09-22T03:32:15.969Z Operation ongoing in bundle for at least 10m00s without completing Processing times in each step(millis) Step name: WriteToCommonAndTenantProject/StorageApiLoads/Convert/Convert to message Time spent in this step: IntSummaryStatistics{count=2412, sum=107, min=0, average=0.044362, max=21} Step name: WriteToCommonAndTenantProjectFailedInsert/StorageApiLoads/StorageApiWriteInconsistent/Write Records Time spent in this step: IntSummaryStatistics{count=1934, sum=161, min=0, average=0.083247, max=85} Step name: WriteToCommonAndTenantProjectFailedInsert/PrepareWrite/ParDo(Anonymous) Time spent in this step: IntSummaryStatistics{count=1934, sum=0, min=0, average=0.000000, max=0} Step name: ExtractFeature Time spent in this step: IntSummaryStatistics{count=7, sum=5, min=0, average=0.714286, max=2} Step name: WriteToCommonAndTenantProject/StorageApiLoads/StorageApiWriteInconsistent/Write Records Time spent in this step: IntSummaryStatistics{count=2412, sum=14, min=0, average=0.005804, max=8} Step name: WriteToCommonAndTenantProject/StorageApiLoads/rewindowIntoGlobal/Window.Assign Time spent in this step: IntSummaryStatistics{count=2412, sum=0, min=0, average=0.000000, max=0} Step name: WriteToCommonAndTenantProjectFailedInsert/StorageApiLoads/Convert/Convert to message Time spent in this step: IntSummaryStatistics{count=1934, sum=7, min=0, average=0.003619, max=4} Step name: EnrichTenantMetadata Time spent in this step: IntSummaryStatistics{count=1206, sum=0, min=0, average=0.000000, max=0} Step name: Reshuffle.ViaRandomKey/Reshuffle/ExpandIterable Time spent in this step: IntSummaryStatistics{count=7, sum=0, min=0, average=0.000000, max=0} Step name: Reshuffle.ViaRandomKey/Values/Values/Map Time spent in this step: IntSummaryStatistics{count=7, sum=0, min=0, average=0.000000, max=0} Step name: DropFields.Inner/Select.Fields/ParDo(Select) Time spent in this step: IntSummaryStatistics{count=1206, sum=0, min=0, average=0.000000, max=0} Step name: GEFMessageDecode Time spent in this step: IntSummaryStatistics{count=7, sum=12, min=0, average=1.714286, max=11} Step name: WriteToCommonAndTenantProject/PrepareWrite/ParDo(Anonymous) Time spent in this step: IntSummaryStatistics{count=2412, sum=0, min=0, average=0.000000, max=0} Step name: WriteToCommonAndTenantProjectFailedInsert/StorageApiLoads/rewindowIntoGlobal/Window.Assign Time spent in this step: IntSummaryStatistics{count=1934, sum=0, min=0, average=0.000000, max=0} Step name: Reshuffle.ViaRandomKey/Reshuffle/GroupBy…
  {
    "insertId": "7981515283196192541:164283:0:12850531",
    "jsonPayload": {
      "worker": "txn-log-v4-static-tenant--09211208-8ico-harness-7411",
      "message": "Operation ongoing in bundle for at least 10m00s without completing\nProcessing times in each step(millis)\nStep name: WriteToCommonAndTenantProject/StorageApiLoads/Convert/Convert to message\nTime spent in this step: IntSummaryStatistics{count=2412, sum=107, min=0, average=0.044362, max=21}\nStep name: WriteToCommonAndTenantProjectFailedInsert/StorageApiLoads/StorageApiWriteInconsistent/Write Records\nTime spent in this step: IntSummaryStatistics{count=1934, sum=161, min=0, average=0.083247, max=85}\nStep name: WriteToCommonAndTenantProjectFailedInsert/PrepareWrite/ParDo(Anonymous)\nTime spent in this step: IntSummaryStatistics{count=1934, sum=0, min=0, average=0.000000, max=0}\nStep name: ExtractFeature\nTime spent in this step: IntSummaryStatistics{count=7, sum=5, min=0, average=0.714286, max=2}\nStep name: WriteToCommonAndTenantProject/StorageApiLoads/StorageApiWriteInconsistent/Write Records\nTime spent in this step: IntSummaryStatistics{count=2412, sum=14, min=0, average=0.005804, max=8}\nStep name: WriteToCommonAndTenantProject/StorageApiLoads/rewindowIntoGlobal/Window.Assign\nTime spent in this step: IntSummaryStatistics{count=2412, sum=0, min=0, average=0.000000, max=0}\nStep name: WriteToCommonAndTenantProjectFailedInsert/StorageApiLoads/Convert/Convert to message\nTime spent in this step: IntSummaryStatistics{count=1934, sum=7, min=0, average=0.003619, max=4}\nStep name: EnrichTenantMetadata\nTime spent in this step: IntSummaryStatistics{count=1206, sum=0, min=0, average=0.000000, max=0}\nStep name: Reshuffle.ViaRandomKey/Reshuffle/ExpandIterable\nTime spent in this step: IntSummaryStatistics{count=7, sum=0, min=0, average=0.000000, max=0}\nStep name: Reshuffle.ViaRandomKey/Values/Values/Map\nTime spent in this step: IntSummaryStatistics{count=7, sum=0, min=0, average=0.000000, max=0}\nStep name: DropFields.Inner/Select.Fields/ParDo(Select)\nTime spent in this step: IntSummaryStatistics{count=1206, sum=0, min=0, average=0.000000, max=0}\nStep name: GEFMessageDecode\nTime spent in this step: IntSummaryStatistics{count=7, sum=12, min=0, average=1.714286, max=11}\nStep name: WriteToCommonAndTenantProject/PrepareWrite/ParDo(Anonymous)\nTime spent in this step: IntSummaryStatistics{count=2412, sum=0, min=0, average=0.000000, max=0}\nStep name: WriteToCommonAndTenantProjectFailedInsert/StorageApiLoads/rewindowIntoGlobal/Window.Assign\nTime spent in this step: IntSummaryStatistics{count=1934, sum=0, min=0, average=0.000000, max=0}\nStep name: Reshuffle.ViaRandomKey/Reshuffle/GroupByKey/ReadStream\nTime spent in this step: IntSummaryStatistics{count=14, sum=0, min=0, average=0.000000, max=0}\n  at [email protected]/java.lang.Thread.sleep(Native Method)\n  at app//org.apache.beam.sdk.util.Sleeper$$Lambda$764/0x0000000800c70c40.sleep(Unknown Source)\n  at app//org.apache.beam.sdk.util.BackOffUtils.next(BackOffUtils.java:48)\n  at app//org.apache.beam.sdk.io.gcp.bigquery.RetryManager.await(RetryManager.java:317)\n  at app//org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn.flushAll(StorageApiWriteUnshardedRecords.java:1026)\n  at app//org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn.flushIfNecessary(StorageApiWriteUnshardedRecords.java:994)\n  at app//org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn.process(StorageApiWriteUnshardedRecords.java:1144)\n  at app//org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)\n",
      "thread": "36",
      "job": "2024-09-21_12_07_17-18246689280237981546",
      "logger": "org.apache.beam.runners.dataflow.worker.DataflowExecutionContext$DataflowExecutionStateTracker"
    },
    "resource": {
      "type": "dataflow_step",
      "labels": {
        "region": "australia-southeast1",
        "step_id": "",
        "job_id": "2024-09-21_12_07_17-18246689280237981546",
        "job_name": "txn-log-v4-static-tenant-filter-dynamicdestination",
        "project_id": "project-data-au"
      }
    },
    "timestamp": "2024-09-22T03:32:15.969Z",
    "severity": "WARNING",
    "labels": {
      "dataflow.googleapis.com/log_type": "supportability",
      "dataflow.googleapis.com/region": "australia-southeast1",
      "dataflow.googleapis.com/job_id": "2024-09-21_12_07_17-18246689280237981546",
      "dataflow.googleapis.com/job_name": "txn-log-v4-static-tenant-filter-dynamicdestination",
      "compute.googleapis.com/resource_type": "instance",
      "compute.googleapis.com/resource_name": "txn-log-v4-static-tenant--09211208-8ico-harness-7411",
      "compute.googleapis.com/resource_id": "7981515283196192541"
    },
    "logName": "projects/project-data-au/logs/dataflow.googleapis.com%2Fworker",
    "receiveTimestamp": "2024-09-22T03:32:25.025214266Z",
    "errorGroups": [
      {
        "id": "CLTglOLFqKTiag"
      }
    ]
  }

@liferoad
Copy link
Collaborator

@ns-shua our engineers provided more comments through your support ticket. Let us move the discussions to the support ticket. And I also shared this issue with our team. Thanks.

@ns-shua
Copy link
Author

ns-shua commented Sep 22, 2024

@liferoad Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants