diff --git a/CHANGES.md b/CHANGES.md
index 086899529..f49b38cac 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -2,6 +2,7 @@
## Next
+* Issue #867: Support writing with RangePartitioning
* Issue #144: allow writing Spark String to BQ TIME type
* PR #1038: Logical plan now shows the BigQuery table of DirectBigQueryRelation. Thanks @idc101 !
diff --git a/README-template.md b/README-template.md
index 908e45db2..8313c0489 100644
--- a/README-template.md
+++ b/README-template.md
@@ -274,7 +274,7 @@ df.write \
Writing to existing partitioned tables (date partitioned, ingestion time partitioned and range
partitioned) in APPEND save mode is fully supported by the connector and the BigQuery Storage Write
-API. Partition overwrite and the use of `datePartition`, `partitionField` and `partitionType` as
+API. Partition overwrite and the use of `datePartition`, `partitionField`, `partitionType`, `partitionRangeStart`, `partitionRangeEnd`, `partitionRangeInterval` as
described below is not supported at this moment by the direct write method.
**Important:** Please refer to the [data ingestion pricing](https://cloud.google.com/bigquery/pricing#data_ingestion_pricing)
@@ -618,10 +618,12 @@ word-break:break-word
partitionField
|
- If field is specified together with `partitionType`, the table is partitioned by this field.
- The field must be a top-level TIMESTAMP or DATE field. Its mode must be NULLABLE
+ | If this field is specified, the table is partitioned by this field.
+ For Time partitioning, specify together with the option `partitionType`.
+ For Integer-range partitioning, specify together with the 3 options: `partitionRangeStart`, `partitionRangeEnd, `partitionRangeInterval`.
+ The field must be a top-level TIMESTAMP or DATE field for Time partitioning, or INT64 for Integer-range partitioning. Its mode must be NULLABLE
or REQUIRED.
- If the option is not set for a partitioned table, then the table will be partitioned by pseudo
+ If the option is not set for a Time partitioned table, then the table will be partitioned by pseudo
column, referenced via either'_PARTITIONTIME' as TIMESTAMP type, or
'_PARTITIONDATE' as DATE type.
(Optional).
@@ -642,13 +644,26 @@ word-break:break-word
|
partitionType
|
- Supported types are: HOUR, DAY, MONTH, YEAR
- This option is mandatory for a target table to be partitioned.
+ | Used to specify Time partitioning.
+ Supported types are: HOUR, DAY, MONTH, YEAR
+ This option is mandatory for a target table to be Time partitioned.
(Optional. Defaults to DAY if PartitionField is specified).
Not supported by the `DIRECT` write method.
|
Write |
+
+ partitionRangeStart ,
+ partitionRangeEnd ,
+ partitionRangeInterval
+ |
+ Used to specify Integer-range partitioning.
+ These options are mandatory for a target table to be Integer-range partitioned.
+ All 3 options must be specified.
+ Not supported by the `DIRECT` write method.
+ |
+ Write |
+
clusteredFields
|
diff --git a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryClient.java b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryClient.java
index f81da95ec..80528fa5e 100644
--- a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryClient.java
+++ b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryClient.java
@@ -31,6 +31,7 @@
import com.google.cloud.bigquery.LoadJobConfiguration;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.QueryJobConfiguration.Priority;
+import com.google.cloud.bigquery.RangePartitioning;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.Table;
@@ -604,6 +605,12 @@ public void loadDataIntoTable(
options.getPartitionField().ifPresent(timePartitionBuilder::setField);
jobConfiguration.setTimePartitioning(timePartitionBuilder.build());
}
+ if (options.getPartitionField().isPresent() && options.getPartitionRange().isPresent()) {
+ RangePartitioning.Builder rangePartitionBuilder = RangePartitioning.newBuilder();
+ options.getPartitionField().ifPresent(rangePartitionBuilder::setField);
+ options.getPartitionRange().ifPresent(rangePartitionBuilder::setRange);
+ jobConfiguration.setRangePartitioning(rangePartitionBuilder.build());
+ }
options
.getClusteredFields()
@@ -713,6 +720,8 @@ public interface LoadDataOptions {
Optional getPartitionType();
+ Optional getPartitionRange();
+
TimePartitioning.Type getPartitionTypeOrDefault();
OptionalLong getPartitionExpirationMs();
diff --git a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/SparkBigQueryConfig.java b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/SparkBigQueryConfig.java
index 6389e06a6..822bafb37 100644
--- a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/SparkBigQueryConfig.java
+++ b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/SparkBigQueryConfig.java
@@ -38,6 +38,7 @@
import com.google.cloud.bigquery.ParquetOptions;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.QueryJobConfiguration.Priority;
+import com.google.cloud.bigquery.RangePartitioning;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TimePartitioning;
import com.google.cloud.bigquery.connector.common.BigQueryClient;
@@ -183,6 +184,9 @@ public static WriteMethod from(@Nullable String writeMethod) {
Long partitionExpirationMs = null;
com.google.common.base.Optional partitionRequireFilter = empty();
com.google.common.base.Optional partitionType = empty();
+ com.google.common.base.Optional partitionRangeStart = empty();
+ com.google.common.base.Optional partitionRangeEnd = empty();
+ com.google.common.base.Optional partitionRangeInterval = empty();
com.google.common.base.Optional clusteredFields = empty();
com.google.common.base.Optional createDisposition = empty();
boolean optimizedEmptyProjection = true;
@@ -289,6 +293,11 @@ public static SparkBigQueryConfig from(
firstPresent(getOption(options, "project").toJavaUtil(), fallbackProject);
config.partitionType =
getOption(options, "partitionType").transform(TimePartitioning.Type::valueOf);
+ config.partitionRangeStart =
+ getOption(options, "partitionRangeStart").transform(Long::parseLong);
+ config.partitionRangeEnd = getOption(options, "partitionRangeEnd").transform(Long::parseLong);
+ config.partitionRangeInterval =
+ getOption(options, "partitionRangeInterval").transform(Long::parseLong);
Optional datePartitionParam = getOption(options, DATE_PARTITION_PARAM).toJavaUtil();
datePartitionParam.ifPresent(
date -> validateDateFormat(date, config.getPartitionTypeOrDefault(), DATE_PARTITION_PARAM));
@@ -820,6 +829,20 @@ public Optional getPartitionType() {
return partitionType.toJavaUtil();
}
+ public Optional getPartitionRange() {
+ if (partitionRangeStart.isPresent()
+ && partitionRangeEnd.isPresent()
+ && partitionRangeInterval.isPresent()) {
+ return Optional.of(
+ RangePartitioning.Range.newBuilder()
+ .setStart(partitionRangeStart.get())
+ .setEnd(partitionRangeEnd.get())
+ .setInterval(partitionRangeInterval.get())
+ .build());
+ }
+ return Optional.empty();
+ }
+
public TimePartitioning.Type getPartitionTypeOrDefault() {
return partitionType.or(TimePartitioning.Type.DAY);
}
diff --git a/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/SparkBigQueryConfigTest.java b/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/SparkBigQueryConfigTest.java
index 6356d321c..7bbef43d4 100644
--- a/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/SparkBigQueryConfigTest.java
+++ b/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/SparkBigQueryConfigTest.java
@@ -24,6 +24,7 @@
import com.google.auth.oauth2.ImpersonatedCredentials;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration.Priority;
+import com.google.cloud.bigquery.RangePartitioning;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TimePartitioning;
import com.google.cloud.bigquery.storage.v1.ArrowSerializationOptions.CompressionCodec;
@@ -107,6 +108,7 @@ public void testDefaults() {
assertThat(config.getPartitionExpirationMs()).isEqualTo(OptionalLong.empty());
assertThat(config.getPartitionRequireFilter()).isEqualTo(Optional.empty());
assertThat(config.getPartitionType()).isEqualTo(Optional.empty());
+ assertThat(config.getPartitionRange()).isEqualTo(Optional.empty());
assertThat(config.getClusteredFields()).isEqualTo(Optional.empty());
assertThat(config.getCreateDisposition()).isEqualTo(Optional.empty());
assertThat(config.getLoadSchemaUpdateOptions()).isEqualTo(ImmutableList.of());
@@ -228,6 +230,38 @@ public void testConfigFromOptions() {
assertThat(config.getKmsKeyName()).isEqualTo(Optional.of("some/key/name"));
}
+ @Test
+ public void testConfigFromOptions_rangePartitioning() {
+ Configuration hadoopConfiguration = new Configuration();
+ DataSourceOptions options =
+ new DataSourceOptions(
+ ImmutableMap.builder()
+ .put("table", "test_t")
+ .put("dataset", "test_d")
+ .put("project", "test_p")
+ .put("partitionRangeStart", "1")
+ .put("partitionRangeEnd", "20")
+ .put("partitionRangeInterval", "2")
+ .put("partitionField", "some_field")
+ .build());
+ SparkBigQueryConfig config =
+ SparkBigQueryConfig.from(
+ options.asMap(),
+ defaultGlobalOptions,
+ hadoopConfiguration,
+ ImmutableMap.of(),
+ DEFAULT_PARALLELISM,
+ new SQLConf(),
+ SPARK_VERSION,
+ Optional.empty(), /* tableIsMandatory */
+ true);
+ RangePartitioning.Range expectedRange =
+ RangePartitioning.Range.newBuilder().setStart(1L).setEnd(20L).setInterval(2L).build();
+ assertThat(config.getTableId()).isEqualTo(TableId.of("test_p", "test_d", "test_t"));
+ assertThat(config.getPartitionRange()).isEqualTo(Optional.of(expectedRange));
+ assertThat(config.getPartitionField()).isEqualTo(Optional.of("some_field"));
+ }
+
@Test
public void testCacheExpirationSetToZero() {
Configuration hadoopConfiguration = new Configuration();
diff --git a/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java b/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java
index b896136f3..5c2111aa4 100644
--- a/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java
+++ b/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java
@@ -19,6 +19,7 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeThat;
import com.google.cloud.bigquery.BigQuery;
@@ -27,6 +28,7 @@
import com.google.cloud.bigquery.Field.Mode;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.QueryJobConfiguration;
+import com.google.cloud.bigquery.RangePartitioning;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.Table;
@@ -41,6 +43,7 @@
import com.google.cloud.spark.bigquery.integration.model.Friend;
import com.google.cloud.spark.bigquery.integration.model.Link;
import com.google.cloud.spark.bigquery.integration.model.Person;
+import com.google.cloud.spark.bigquery.integration.model.RangeData;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.inject.ProvisionException;
@@ -1052,6 +1055,41 @@ private void testPartition(String partitionType) {
assertThat(readDF.count()).isEqualTo(3);
}
+ @Test
+ public void testPartitionRange() {
+ // partition write not supported in BQ Storage Write API
+ assumeThat(writeMethod, equalTo(SparkBigQueryConfig.WriteMethod.INDIRECT));
+
+ List data =
+ Arrays.asList(new RangeData("a", 1L), new RangeData("b", 5L), new RangeData("c", 11L));
+ Dataset df = spark.createDataset(data, Encoders.bean(RangeData.class)).toDF();
+ String table = testDataset.toString() + "." + testTable + "_range";
+ df.write()
+ .format("bigquery")
+ .option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET)
+ .option("partitionField", "rng")
+ .option("partitionRangeStart", "1")
+ .option("partitionRangeEnd", "21")
+ .option("partitionRangeInterval", "2")
+ .option("partitionRequireFilter", "true")
+ .option("table", table)
+ .option("writeMethod", writeMethod.toString())
+ .save();
+
+ Dataset readDF = spark.read().format("bigquery").load(table);
+ assertThat(readDF.count()).isEqualTo(3);
+ Table bqTable = bq.getTable(TableId.of(testDataset.toString(), testTable + "_range"));
+ assertThat(bqTable).isNotNull();
+ assertTrue(bqTable.getDefinition() instanceof StandardTableDefinition);
+ StandardTableDefinition bqTableDef = bqTable.getDefinition();
+ assertThat(bqTableDef.getRangePartitioning()).isNotNull();
+ RangePartitioning.Range expectedRange =
+ RangePartitioning.Range.newBuilder().setStart(1L).setEnd(21L).setInterval(2L).build();
+ String expectedField = "rng";
+ assertThat(bqTableDef.getRangePartitioning().getRange()).isEqualTo(expectedRange);
+ assertThat(bqTableDef.getRangePartitioning().getField()).isEqualTo(expectedField);
+ }
+
@Test
public void testCacheDataFrameInDataSource() {
// It takes some time for the data to be available for read via the Storage Read API, after it
diff --git a/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/model/RangeData.java b/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/model/RangeData.java
new file mode 100644
index 000000000..d10ca183e
--- /dev/null
+++ b/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/model/RangeData.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2021 Google Inc. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.spark.bigquery.integration.model;
+
+import com.google.common.base.Objects;
+import java.io.Serializable;
+
+public class RangeData implements Serializable {
+
+ private String str;
+ private Long rng;
+
+ public RangeData(String str, Long rng) {
+ this.str = str;
+ this.rng = rng;
+ }
+
+ public String getStr() {
+ return str;
+ }
+
+ public void setStr(String str) {
+ this.str = str;
+ }
+
+ public Long getRng() {
+ return rng;
+ }
+
+ public void setRng(Long rng) {
+ this.rng = rng;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof RangeData)) {
+ return false;
+ }
+ RangeData data = (RangeData) o;
+ return Objects.equal(str, data.str) && Objects.equal(rng, data.rng);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(str, rng);
+ }
+
+ @Override
+ public String toString() {
+ return "Data{" + "str='" + str + '\'' + ", rng=" + rng + '}';
+ }
+}