Skip to content

Commit

Permalink
Issue GoogleCloudDataproc#867: Support writing with RangePartitioning (
Browse files Browse the repository at this point in the history
…GoogleCloudDataproc#1013)

* Issue GoogleCloudDataproc#867: Support writing with RangePartitioning

* address comments
  • Loading branch information
vishalkarve15 authored Aug 8, 2023
1 parent bd33526 commit b3bda85
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Next

* Issue #867: Support writing with RangePartitioning
* Issue #144: allow writing Spark String to BQ TIME type
* PR #1038: Logical plan now shows the BigQuery table of DirectBigQueryRelation. Thanks @idc101 !

Expand Down
27 changes: 21 additions & 6 deletions README-template.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ df.write \

Writing to existing partitioned tables (date partitioned, ingestion time partitioned and range
partitioned) in APPEND save mode is fully supported by the connector and the BigQuery Storage Write
API. Partition overwrite and the use of `datePartition`, `partitionField` and `partitionType` as
API. Partition overwrite and the use of `datePartition`, `partitionField`, `partitionType`, `partitionRangeStart`, `partitionRangeEnd`, `partitionRangeInterval` as
described below is not supported at this moment by the direct write method.

**Important:** Please refer to the [data ingestion pricing](https://cloud.google.com/bigquery/pricing#data_ingestion_pricing)
Expand Down Expand Up @@ -618,10 +618,12 @@ word-break:break-word
<tr valign="top">
<td><code>partitionField</code>
</td>
<td>If field is specified together with `partitionType`, the table is partitioned by this field.
The field must be a top-level TIMESTAMP or DATE field. Its mode must be <strong>NULLABLE</strong>
<td>If this field is specified, the table is partitioned by this field.
<br/>For Time partitioning, specify together with the option `partitionType`.
<br/>For Integer-range partitioning, specify together with the 3 options: `partitionRangeStart`, `partitionRangeEnd, `partitionRangeInterval`.
<br/>The field must be a top-level TIMESTAMP or DATE field for Time partitioning, or INT64 for Integer-range partitioning. Its mode must be <strong>NULLABLE</strong>
or <strong>REQUIRED</strong>.
If the option is not set for a partitioned table, then the table will be partitioned by pseudo
If the option is not set for a Time partitioned table, then the table will be partitioned by pseudo
column, referenced via either<code>'_PARTITIONTIME' as TIMESTAMP</code> type, or
<code>'_PARTITIONDATE' as DATE</code> type.
<br/>(Optional).
Expand All @@ -642,13 +644,26 @@ word-break:break-word
<tr valign="top">
<td><code>partitionType</code>
</td>
<td>Supported types are: <code>HOUR, DAY, MONTH, YEAR</code>
<br/> This option is <b>mandatory</b> for a target table to be partitioned.
<td>Used to specify Time partitioning.
<br/>Supported types are: <code>HOUR, DAY, MONTH, YEAR</code>
<br/> This option is <b>mandatory</b> for a target table to be Time partitioned.
<br/>(Optional. Defaults to DAY if PartitionField is specified).
<br/><i>Not supported by the `DIRECT` write method.</i>
</td>
<td>Write</td>
</tr>
<tr valign="top">
<td><code>partitionRangeStart</code>,
<code>partitionRangeEnd</code>,
<code>partitionRangeInterval</code>
</td>
<td>Used to specify Integer-range partitioning.
<br/>These options are <b>mandatory</b> for a target table to be Integer-range partitioned.
<br/>All 3 options must be specified.
<br/><i>Not supported by the `DIRECT` write method.</i>
</td>
<td>Write</td>
</tr>
<tr valign="top">
<td><code>clusteredFields</code>
</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.cloud.bigquery.LoadJobConfiguration;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.QueryJobConfiguration.Priority;
import com.google.cloud.bigquery.RangePartitioning;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.Table;
Expand Down Expand Up @@ -604,6 +605,12 @@ public void loadDataIntoTable(
options.getPartitionField().ifPresent(timePartitionBuilder::setField);
jobConfiguration.setTimePartitioning(timePartitionBuilder.build());
}
if (options.getPartitionField().isPresent() && options.getPartitionRange().isPresent()) {
RangePartitioning.Builder rangePartitionBuilder = RangePartitioning.newBuilder();
options.getPartitionField().ifPresent(rangePartitionBuilder::setField);
options.getPartitionRange().ifPresent(rangePartitionBuilder::setRange);
jobConfiguration.setRangePartitioning(rangePartitionBuilder.build());
}

options
.getClusteredFields()
Expand Down Expand Up @@ -713,6 +720,8 @@ public interface LoadDataOptions {

Optional<TimePartitioning.Type> getPartitionType();

Optional<RangePartitioning.Range> getPartitionRange();

TimePartitioning.Type getPartitionTypeOrDefault();

OptionalLong getPartitionExpirationMs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.google.cloud.bigquery.ParquetOptions;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.QueryJobConfiguration.Priority;
import com.google.cloud.bigquery.RangePartitioning;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TimePartitioning;
import com.google.cloud.bigquery.connector.common.BigQueryClient;
Expand Down Expand Up @@ -183,6 +184,9 @@ public static WriteMethod from(@Nullable String writeMethod) {
Long partitionExpirationMs = null;
com.google.common.base.Optional<Boolean> partitionRequireFilter = empty();
com.google.common.base.Optional<TimePartitioning.Type> partitionType = empty();
com.google.common.base.Optional<Long> partitionRangeStart = empty();
com.google.common.base.Optional<Long> partitionRangeEnd = empty();
com.google.common.base.Optional<Long> partitionRangeInterval = empty();
com.google.common.base.Optional<String[]> clusteredFields = empty();
com.google.common.base.Optional<JobInfo.CreateDisposition> createDisposition = empty();
boolean optimizedEmptyProjection = true;
Expand Down Expand Up @@ -289,6 +293,11 @@ public static SparkBigQueryConfig from(
firstPresent(getOption(options, "project").toJavaUtil(), fallbackProject);
config.partitionType =
getOption(options, "partitionType").transform(TimePartitioning.Type::valueOf);
config.partitionRangeStart =
getOption(options, "partitionRangeStart").transform(Long::parseLong);
config.partitionRangeEnd = getOption(options, "partitionRangeEnd").transform(Long::parseLong);
config.partitionRangeInterval =
getOption(options, "partitionRangeInterval").transform(Long::parseLong);
Optional<String> datePartitionParam = getOption(options, DATE_PARTITION_PARAM).toJavaUtil();
datePartitionParam.ifPresent(
date -> validateDateFormat(date, config.getPartitionTypeOrDefault(), DATE_PARTITION_PARAM));
Expand Down Expand Up @@ -820,6 +829,20 @@ public Optional<TimePartitioning.Type> getPartitionType() {
return partitionType.toJavaUtil();
}

public Optional<RangePartitioning.Range> getPartitionRange() {
if (partitionRangeStart.isPresent()
&& partitionRangeEnd.isPresent()
&& partitionRangeInterval.isPresent()) {
return Optional.of(
RangePartitioning.Range.newBuilder()
.setStart(partitionRangeStart.get())
.setEnd(partitionRangeEnd.get())
.setInterval(partitionRangeInterval.get())
.build());
}
return Optional.empty();
}

public TimePartitioning.Type getPartitionTypeOrDefault() {
return partitionType.or(TimePartitioning.Type.DAY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.auth.oauth2.ImpersonatedCredentials;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration.Priority;
import com.google.cloud.bigquery.RangePartitioning;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TimePartitioning;
import com.google.cloud.bigquery.storage.v1.ArrowSerializationOptions.CompressionCodec;
Expand Down Expand Up @@ -107,6 +108,7 @@ public void testDefaults() {
assertThat(config.getPartitionExpirationMs()).isEqualTo(OptionalLong.empty());
assertThat(config.getPartitionRequireFilter()).isEqualTo(Optional.empty());
assertThat(config.getPartitionType()).isEqualTo(Optional.empty());
assertThat(config.getPartitionRange()).isEqualTo(Optional.empty());
assertThat(config.getClusteredFields()).isEqualTo(Optional.empty());
assertThat(config.getCreateDisposition()).isEqualTo(Optional.empty());
assertThat(config.getLoadSchemaUpdateOptions()).isEqualTo(ImmutableList.of());
Expand Down Expand Up @@ -228,6 +230,38 @@ public void testConfigFromOptions() {
assertThat(config.getKmsKeyName()).isEqualTo(Optional.of("some/key/name"));
}

@Test
public void testConfigFromOptions_rangePartitioning() {
Configuration hadoopConfiguration = new Configuration();
DataSourceOptions options =
new DataSourceOptions(
ImmutableMap.<String, String>builder()
.put("table", "test_t")
.put("dataset", "test_d")
.put("project", "test_p")
.put("partitionRangeStart", "1")
.put("partitionRangeEnd", "20")
.put("partitionRangeInterval", "2")
.put("partitionField", "some_field")
.build());
SparkBigQueryConfig config =
SparkBigQueryConfig.from(
options.asMap(),
defaultGlobalOptions,
hadoopConfiguration,
ImmutableMap.of(),
DEFAULT_PARALLELISM,
new SQLConf(),
SPARK_VERSION,
Optional.empty(), /* tableIsMandatory */
true);
RangePartitioning.Range expectedRange =
RangePartitioning.Range.newBuilder().setStart(1L).setEnd(20L).setInterval(2L).build();
assertThat(config.getTableId()).isEqualTo(TableId.of("test_p", "test_d", "test_t"));
assertThat(config.getPartitionRange()).isEqualTo(Optional.of(expectedRange));
assertThat(config.getPartitionField()).isEqualTo(Optional.of("some_field"));
}

@Test
public void testCacheExpirationSetToZero() {
Configuration hadoopConfiguration = new Configuration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeThat;

import com.google.cloud.bigquery.BigQuery;
Expand All @@ -27,6 +28,7 @@
import com.google.cloud.bigquery.Field.Mode;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.RangePartitioning;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.Table;
Expand All @@ -41,6 +43,7 @@
import com.google.cloud.spark.bigquery.integration.model.Friend;
import com.google.cloud.spark.bigquery.integration.model.Link;
import com.google.cloud.spark.bigquery.integration.model.Person;
import com.google.cloud.spark.bigquery.integration.model.RangeData;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.inject.ProvisionException;
Expand Down Expand Up @@ -1052,6 +1055,41 @@ private void testPartition(String partitionType) {
assertThat(readDF.count()).isEqualTo(3);
}

@Test
public void testPartitionRange() {
// partition write not supported in BQ Storage Write API
assumeThat(writeMethod, equalTo(SparkBigQueryConfig.WriteMethod.INDIRECT));

List<RangeData> data =
Arrays.asList(new RangeData("a", 1L), new RangeData("b", 5L), new RangeData("c", 11L));
Dataset<Row> df = spark.createDataset(data, Encoders.bean(RangeData.class)).toDF();
String table = testDataset.toString() + "." + testTable + "_range";
df.write()
.format("bigquery")
.option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET)
.option("partitionField", "rng")
.option("partitionRangeStart", "1")
.option("partitionRangeEnd", "21")
.option("partitionRangeInterval", "2")
.option("partitionRequireFilter", "true")
.option("table", table)
.option("writeMethod", writeMethod.toString())
.save();

Dataset<Row> readDF = spark.read().format("bigquery").load(table);
assertThat(readDF.count()).isEqualTo(3);
Table bqTable = bq.getTable(TableId.of(testDataset.toString(), testTable + "_range"));
assertThat(bqTable).isNotNull();
assertTrue(bqTable.getDefinition() instanceof StandardTableDefinition);
StandardTableDefinition bqTableDef = bqTable.getDefinition();
assertThat(bqTableDef.getRangePartitioning()).isNotNull();
RangePartitioning.Range expectedRange =
RangePartitioning.Range.newBuilder().setStart(1L).setEnd(21L).setInterval(2L).build();
String expectedField = "rng";
assertThat(bqTableDef.getRangePartitioning().getRange()).isEqualTo(expectedRange);
assertThat(bqTableDef.getRangePartitioning().getField()).isEqualTo(expectedField);
}

@Test
public void testCacheDataFrameInDataSource() {
// It takes some time for the data to be available for read via the Storage Read API, after it
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2021 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.spark.bigquery.integration.model;

import com.google.common.base.Objects;
import java.io.Serializable;

public class RangeData implements Serializable {

private String str;
private Long rng;

public RangeData(String str, Long rng) {
this.str = str;
this.rng = rng;
}

public String getStr() {
return str;
}

public void setStr(String str) {
this.str = str;
}

public Long getRng() {
return rng;
}

public void setRng(Long rng) {
this.rng = rng;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof RangeData)) {
return false;
}
RangeData data = (RangeData) o;
return Objects.equal(str, data.str) && Objects.equal(rng, data.rng);
}

@Override
public int hashCode() {
return Objects.hashCode(str, rng);
}

@Override
public String toString() {
return "Data{" + "str='" + str + '\'' + ", rng=" + rng + '}';
}
}

0 comments on commit b3bda85

Please sign in to comment.