From 208e0ee951d4a2d6e8fab0384c348ad2f07f2386 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Fri, 25 Aug 2023 20:24:29 +0300 Subject: [PATCH] Ensure proper type conversions + handling nulls --- .../cloud/spark/spanner/SpannerUtils.java | 61 ++++++++++++++++--- .../cloud/spark/spanner/IntegrationTest.java | 34 +++++++++++ ...pannerInputPartitionReaderContextTest.java | 5 +- .../spark/spanner/SpannerScanBuilderTest.java | 14 +++++ .../cloud/spark/spanner/SpannerTableTest.java | 30 +++++++++ .../cloud/spark/spanner/SpannerUtilsTest.java | 14 +++++ .../google/cloud/spark/spanner/TestData.java | 14 +++++ 7 files changed, 161 insertions(+), 11 deletions(-) diff --git a/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerUtils.java b/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerUtils.java index 297f7fba..b3374f2b 100644 --- a/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerUtils.java +++ b/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerUtils.java @@ -23,6 +23,7 @@ import com.google.cloud.spanner.Struct; import com.google.cloud.spanner.connection.Connection; import com.google.cloud.spanner.connection.ConnectionOptions; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -111,13 +112,21 @@ public static InternalRow spannerStructToInternalRow( // programmatically by access of the type elements. String fieldTypeName = rs.getColumnType(i).toString(); + if (spannerRow.isNull(i) { + // TODO: Examine if we should perhaps translate to + // zero values for simple types like INT64, FLOAT64 + // or just keep it as null. + sparkRow.update(i, null); + continue; + } + switch (fieldTypeName) { case "BOOL": sparkRow.setBoolean(i, spannerRow.getBoolean(i)); break; case "DATE": - sparkRow.update(i, spannerRow.getDate(i)); + sparkRow.update(i, spannerRow.getDate(i).toJavaUtilDate(spannerRow.getDate(i))); break; case "FLOAT64": @@ -141,24 +150,56 @@ public static InternalRow spannerStructToInternalRow( break; case "TIMESTAMP": - sparkRow.update(i, spannerRow.getTimestamp(i)); + sparkRow.update(i, spannerRow.getTimestamp(i).toSqlTimestamp()); break; default: // "ARRAY", "STRUCT" if (fieldTypeName.indexOf("BYTES") == 0) { - sparkRow.update(i, spannerRow.getBytes(i)); + if (spannerRow.isNull(i)) { + sparkRow.update(i, null); + } else { + sparkRow.update(i, spannerRow.getBytes(i).toByteArray()); + } } else if (fieldTypeName.indexOf("STRING") == 0) { - sparkRow.update(i, spannerRow.getString(i)); + if (spannerRow.isNull(i)) { + sparkRow.update(i, null); + } else { + sparkRow.update(i, spannerRow.getString(i)); + } } else if (fieldTypeName.indexOf("ARRAY") == 0) { - sparkRow.update(i, spannerRow.getBooleanArray(i)); + if (spannerRow.isNull(i)) { + sparkRow.update(i, null); + } else { + sparkRow.update(i, spannerRow.getBooleanArray(i)); + } } else if (fieldTypeName.indexOf("ARRAY tsL = spannerRow.getTimestampList(i); + List endTsL = new ArrayList(tsL.size()); + tsL.forEach((ts) -> endTsL.add(ts.toSqlTimestamp())); + sparkRow.update(i, endTsL); + } } else if (fieldTypeName.indexOf("ARRAY opts = SpannerUtilsTest.connectionProperties(); for (final Partition partition : partitions) { - SpannerInputPartitionContext sCtx = new SpannerInputPartitionContext(txn, partition); + SpannerInputPartitionContext sCtx = + new SpannerInputPartitionContext(partition, txn.getBatchTransactionId(), opts); try { InputPartitionReaderContext ctx = sCtx.createPartitionReaderContext(); diff --git a/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/SpannerScanBuilderTest.java b/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/SpannerScanBuilderTest.java index 6423d2e4..38faaee6 100644 --- a/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/SpannerScanBuilderTest.java +++ b/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/SpannerScanBuilderTest.java @@ -1,3 +1,17 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package com.google.cloud.spark; import static org.junit.Assert.assertEquals; diff --git a/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/SpannerTableTest.java b/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/SpannerTableTest.java index 9e5bd3ce..1b9a3739 100644 --- a/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/SpannerTableTest.java +++ b/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/SpannerTableTest.java @@ -2,12 +2,20 @@ import static org.junit.Assert.assertEquals; +import com.google.cloud.spark.spanner.SpannerScanBuilder; import com.google.cloud.spark.spanner.SpannerTable; +import java.io.IOException; import java.util.Arrays; import java.util.Map; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.PartitionReader; +import org.apache.spark.sql.connector.read.PartitionReaderFactory; +import org.apache.spark.sql.connector.read.ScanBuilder; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -39,4 +47,26 @@ public void createSchema() { assertEquals(expectSchema.fieldNames(), actualSchema.fieldNames()); assertEquals(expectSchema.simpleString(), actualSchema.simpleString()); } + + @Test + public void show() { + Map props = SpannerUtilsTest.connectionProperties(); + SpannerTable st = new SpannerTable(null, props); + CaseInsensitiveStringMap csm = new CaseInsensitiveStringMap(props); + ScanBuilder sb = st.newScanBuilder(csm); + SpannerScanBuilder ssb = ((SpannerScanBuilder) sb); + InputPartition[] parts = ssb.planInputPartitions(); + PartitionReaderFactory prf = ssb.createReaderFactory(); + + for (InputPartition part : parts) { + PartitionReader ir = prf.createReader(part); + try { + while (ir.next()) { + InternalRow row = ir.get(); + System.out.println("row: " + row.toString()); + } + } catch (IOException e) { + } + } + } } diff --git a/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/SpannerUtilsTest.java b/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/SpannerUtilsTest.java index 35dbf770..7f6a9b09 100644 --- a/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/SpannerUtilsTest.java +++ b/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/SpannerUtilsTest.java @@ -1,3 +1,17 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package com.google.cloud.spark; import static org.junit.Assert.assertEquals; diff --git a/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/TestData.java b/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/TestData.java index 6f98d40c..bac0c0e4 100644 --- a/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/TestData.java +++ b/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/TestData.java @@ -1,3 +1,17 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package com.google.cloud.spark; import com.google.common.io.CharStreams;