From 8c096bdad268af8591c3e308662744e675823e8e Mon Sep 17 00:00:00 2001 From: vishalkarve15 Date: Thu, 24 Oct 2024 19:31:30 +0530 Subject: [PATCH] Issue #1290: Stopped using metadata for optimized count path (#1293) * Issue #1290: Stopped using metadata for optimized count path * add changes, fix test * handle requirePartitionFilter * increase timeout for presubmit --- CHANGES.md | 1 + .../connector/common/BigQueryClient.java | 22 ++++++++++++++++--- cloudbuild/cloudbuild.yaml | 2 +- .../integration/WriteIntegrationTestBase.java | 17 ++++++++++++++ 4 files changed, 38 insertions(+), 4 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 9cfb0829a..ce14c77c6 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,7 @@ # Release Notes ## Next +* Issue #1290: Stopped using metadata for optimized count path ## 0.41.0 - 2024-09-05 diff --git a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryClient.java b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryClient.java index c639f5f6a..a68e0d7dd 100644 --- a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryClient.java +++ b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryClient.java @@ -15,8 +15,10 @@ */ package com.google.cloud.bigquery.connector.common; +import static com.google.cloud.bigquery.connector.common.BigQueryUtil.getPartitionFields; import static com.google.cloud.bigquery.connector.common.BigQueryUtil.getQueryForRangePartitionedTable; import static com.google.cloud.bigquery.connector.common.BigQueryUtil.getQueryForTimePartitionedTable; +import static com.google.cloud.bigquery.connector.common.BigQueryUtil.isBigQueryNativeTable; import com.google.cloud.BaseServiceException; import com.google.cloud.RetryOption; @@ -604,9 +606,23 @@ public long calculateTableSize(TableId tableId, Optional filter) { public long calculateTableSize(TableInfo tableInfo, Optional filter) { TableDefinition.Type type = tableInfo.getDefinition().getType(); - if (type == TableDefinition.Type.TABLE && !filter.isPresent()) { - return tableInfo.getNumRows().longValue(); - } else if (type == TableDefinition.Type.EXTERNAL && !filter.isPresent()) { + if ((type == TableDefinition.Type.EXTERNAL || type == TableDefinition.Type.TABLE) + && !filter.isPresent()) { + if (isBigQueryNativeTable(tableInfo) + && tableInfo.getRequirePartitionFilter() != null + && tableInfo.getRequirePartitionFilter()) { + List partitioningFields = getPartitionFields(tableInfo); + if (partitioningFields.isEmpty()) { + throw new IllegalStateException( + "Could not find partitioning columns for table requiring partition filter: " + + tableInfo.getTableId()); + } + String table = fullTableName(tableInfo.getTableId()); + return getNumberOfRows( + String.format( + "SELECT COUNT(*) from `%s` WHERE %s IS NOT NULL", + table, partitioningFields.get(0))); + } String table = fullTableName(tableInfo.getTableId()); return getNumberOfRows(String.format("SELECT COUNT(*) from `%s`", table)); } else if (type == TableDefinition.Type.VIEW diff --git a/cloudbuild/cloudbuild.yaml b/cloudbuild/cloudbuild.yaml index 020502eb6..1516ac11f 100644 --- a/cloudbuild/cloudbuild.yaml +++ b/cloudbuild/cloudbuild.yaml @@ -117,7 +117,7 @@ steps: # Tests take around 1 hr 15 mins in general. -timeout: 7200s +timeout: 9000s options: machineType: 'N1_HIGHCPU_32' diff --git a/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java b/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java index d6200e2be..83dcfa433 100644 --- a/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java +++ b/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java @@ -2664,6 +2664,23 @@ public void testTableDescriptionRemainsUnchanged() { .isEqualTo(bq.getTable(testDataset.toString(), testTable).getDescription()); } + @Test + public void testCountAfterWrite() { + IntegrationTestUtils.runQuery( + String.format("CREATE TABLE `%s.%s` (name STRING, age INT64)", testDataset, testTable)); + Dataset read1Df = spark.read().format("bigquery").load(fullTableName()); + assertThat(read1Df.count()).isEqualTo(0L); + + Dataset dfToWrite = + spark.createDataFrame( + Arrays.asList(RowFactory.create("foo", 10), RowFactory.create("bar", 20)), + new StructType().add("name", DataTypes.StringType).add("age", DataTypes.IntegerType)); + writeToBigQueryAvroFormat(dfToWrite, SaveMode.Append, "false"); + + Dataset read2Df = spark.read().format("bigquery").load(fullTableName()); + assertThat(read2Df.count()).isEqualTo(2L); + } + private TableResult insertAndGetTimestampNTZToBigQuery(LocalDateTime time, String format) throws InterruptedException { Preconditions.checkArgument(timeStampNTZType.isPresent(), "timestampNTZType not present");