Skip to content

Commit

Permalink
Issue #144: allow writing Spark Long to BQ TIME type
Browse files Browse the repository at this point in the history
  • Loading branch information
vishalkarve15 committed Jul 14, 2023
1 parent 2f66df1 commit f4a3a43
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -339,10 +339,13 @@ && typeWriteable(sourceField.getType(), destinationField.getType())
}

// allowing widening narrow numeric into bignumeric
// allowing writing long to time
@VisibleForTesting
static boolean typeWriteable(LegacySQLTypeName sourceType, LegacySQLTypeName destinationType) {
return (sourceType.equals(LegacySQLTypeName.NUMERIC)
&& destinationType.equals(LegacySQLTypeName.BIGNUMERIC))
|| (sourceType.equals(LegacySQLTypeName.INTEGER)
&& destinationType.equals(LegacySQLTypeName.TIME))
|| sourceType.equals(destinationType);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1209,6 +1209,43 @@ public void testWriteNumericsToWiderFields() throws Exception {
.isEqualTo(new BigDecimal("12345.123450000000000"));
}

@Test
public void testWriteLongToTimeField() throws Exception {
IntegrationTestUtils.runQuery(
String.format(
"CREATE TABLE `%s.%s` (name STRING, wake_up_time TIME)",
testDataset, testTable));
String name = "abc";
Long wakeUpTime = 36000000000L;
Dataset<Row> df =
spark.createDataFrame(
Arrays.asList(RowFactory.create(name, wakeUpTime)),
structType(
StructField.apply("name", DataTypes.StringType, true, Metadata.empty()),
StructField.apply("wake_up_time", DataTypes.LongType, true, Metadata.empty())));
df.write()
.format("bigquery")
.mode(SaveMode.Append)
.option("dataset", testDataset.toString())
.option("table", testTable)
.option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET)
.option("writeMethod", writeMethod.toString())
.save();

Dataset<Row> resultDF =
spark
.read()
.format("bigquery")
.option("dataset", testDataset.toString())
.option("table", testTable)
.load();
List<Row> result = resultDF.collectAsList();
assertThat(result).hasSize(1);
Row head = result.get(0);
assertThat(head.getString(head.fieldIndex("name"))).isEqualTo("abc");
assertThat(head.getString(head.fieldIndex("wake_up_time"))).isEqualTo(36000000000L);
}

public void testWriteSchemaSubset() throws Exception {
StructType initialSchema =
structType(
Expand Down

0 comments on commit f4a3a43

Please sign in to comment.