Skip to content

Commit

Permalink
Ensure proper type conversions + handling nulls
Browse files Browse the repository at this point in the history
  • Loading branch information
odeke-em committed Aug 25, 2023
1 parent c83c4e2 commit 208e0ee
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.connection.Connection;
import com.google.cloud.spanner.connection.ConnectionOptions;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -111,13 +112,21 @@ public static InternalRow spannerStructToInternalRow(
// programmatically by access of the type elements.
String fieldTypeName = rs.getColumnType(i).toString();

if (spannerRow.isNull(i) {
// TODO: Examine if we should perhaps translate to
// zero values for simple types like INT64, FLOAT64
// or just keep it as null.
sparkRow.update(i, null);
continue;
}

switch (fieldTypeName) {
case "BOOL":
sparkRow.setBoolean(i, spannerRow.getBoolean(i));
break;

case "DATE":
sparkRow.update(i, spannerRow.getDate(i));
sparkRow.update(i, spannerRow.getDate(i).toJavaUtilDate(spannerRow.getDate(i)));
break;

case "FLOAT64":
Expand All @@ -141,24 +150,56 @@ public static InternalRow spannerStructToInternalRow(
break;

case "TIMESTAMP":
sparkRow.update(i, spannerRow.getTimestamp(i));
sparkRow.update(i, spannerRow.getTimestamp(i).toSqlTimestamp());
break;

default: // "ARRAY", "STRUCT"
if (fieldTypeName.indexOf("BYTES") == 0) {
sparkRow.update(i, spannerRow.getBytes(i));
if (spannerRow.isNull(i)) {
sparkRow.update(i, null);
} else {
sparkRow.update(i, spannerRow.getBytes(i).toByteArray());
}
} else if (fieldTypeName.indexOf("STRING") == 0) {
sparkRow.update(i, spannerRow.getString(i));
if (spannerRow.isNull(i)) {
sparkRow.update(i, null);
} else {
sparkRow.update(i, spannerRow.getString(i));
}
} else if (fieldTypeName.indexOf("ARRAY<BOOL>") == 0) {
sparkRow.update(i, spannerRow.getBooleanArray(i));
if (spannerRow.isNull(i)) {
sparkRow.update(i, null);
} else {
sparkRow.update(i, spannerRow.getBooleanArray(i));
}
} else if (fieldTypeName.indexOf("ARRAY<STRING") == 0) {
sparkRow.update(i, spannerRow.getStringList(i));
if (spannerRow.isNull(i)) {
sparkRow.update(i, null);
} else {
sparkRow.update(i, spannerRow.getStringList(i));
}
} else if (fieldTypeName.indexOf("ARRAY<TIMESTAMP") == 0) {
sparkRow.update(i, spannerRow.getTimestampList(i));
if (spannerRow.isNull(i)) {
sparkRow.update(i, null);
} else {
List<com.google.cloud.Timestamp> tsL = spannerRow.getTimestampList(i);
List<Timestamp> endTsL = new ArrayList<Timestamp>(tsL.size());
tsL.forEach((ts) -> endTsL.add(ts.toSqlTimestamp()));
sparkRow.update(i, endTsL);
}
} else if (fieldTypeName.indexOf("ARRAY<DATE") == 0) {
sparkRow.update(i, spannerRow.getDateList(i));
} else if (fieldTypeName.indexOf("STRUCT") == 0) {
// TODO: Convert into a Spark STRUCT.
if (spannerRow.isNull(i)) {
sparkRow.update(i, null);
sparkRow.update(i, null);
// TODO: Convert to the java.sql type.
sparkRow.update(i, spannerRow.getDateList(i));
} else if (fieldTypeName.indexOf("STRUCT") == 0) {
if (spannerRow.isNull(i)) {
sparkRow.update(i, null);
} else {
// TODO: Convert into a Spark STRUCT.
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,46 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.google.cloud.spark;

import org.apache.spark.sql.SparkSession;
import org.junit.Before;
import org.junit.Test;
import org.junit.rules.ExternalResource;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class IntegrationTest {

protected SparkSession spark;

@Test
public void testWholeSetup() {}

@Before
public void createTests() {}

protected static class SparkFactory extends ExternalResource {
@Override
protected void before() throws Throwable {
// Setup here.
}

@Override
protected void after() {
// Tear down here.
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
Expand Down Expand Up @@ -86,8 +87,10 @@ public void testCreatePartitionContext() throws Exception {

// Not using executor.execute as controlling immediate termination
// is non-granular and out of scope of these tests.
Map<String, String> opts = SpannerUtilsTest.connectionProperties();
for (final Partition partition : partitions) {
SpannerInputPartitionContext sCtx = new SpannerInputPartitionContext(txn, partition);
SpannerInputPartitionContext sCtx =
new SpannerInputPartitionContext(partition, txn.getBatchTransactionId(), opts);
try {
InputPartitionReaderContext<InternalRow> ctx = sCtx.createPartitionReaderContext();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.google.cloud.spark;

import static org.junit.Assert.assertEquals;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,20 @@

import static org.junit.Assert.assertEquals;

import com.google.cloud.spark.spanner.SpannerScanBuilder;
import com.google.cloud.spark.spanner.SpannerTable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.read.InputPartition;
import org.apache.spark.sql.connector.read.PartitionReader;
import org.apache.spark.sql.connector.read.PartitionReaderFactory;
import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand Down Expand Up @@ -39,4 +47,26 @@ public void createSchema() {
assertEquals(expectSchema.fieldNames(), actualSchema.fieldNames());
assertEquals(expectSchema.simpleString(), actualSchema.simpleString());
}

@Test
public void show() {
Map<String, String> props = SpannerUtilsTest.connectionProperties();
SpannerTable st = new SpannerTable(null, props);
CaseInsensitiveStringMap csm = new CaseInsensitiveStringMap(props);
ScanBuilder sb = st.newScanBuilder(csm);
SpannerScanBuilder ssb = ((SpannerScanBuilder) sb);
InputPartition[] parts = ssb.planInputPartitions();
PartitionReaderFactory prf = ssb.createReaderFactory();

for (InputPartition part : parts) {
PartitionReader<InternalRow> ir = prf.createReader(part);
try {
while (ir.next()) {
InternalRow row = ir.get();
System.out.println("row: " + row.toString());
}
} catch (IOException e) {
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.google.cloud.spark;

import static org.junit.Assert.assertEquals;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.google.cloud.spark;

import com.google.common.io.CharStreams;
Expand Down

0 comments on commit 208e0ee

Please sign in to comment.