diff --git a/cloudbuild/presubmit.sh b/cloudbuild/presubmit.sh index 18c4c630..85b09639 100644 --- a/cloudbuild/presubmit.sh +++ b/cloudbuild/presubmit.sh @@ -31,10 +31,9 @@ case $STEP in integrationtest-real-spanner) # Starts the Spanner emulator and setup the gcloud command. # Sets the env used in the integration test. - # - # TODO: Remove the test filter once Date zone issue is fixed in test SpannerInputPartitionReaderContextTest. $MVN test -Dtest=SpannerTableTest $MVN test -Dtest=SpannerScanBuilderTest + $MVN test -Dtest=SpannerInputPartitionReaderContextTest ;; esac diff --git a/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerUtils.java b/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerUtils.java index a49d7ad5..30ec0613 100644 --- a/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerUtils.java +++ b/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerUtils.java @@ -28,10 +28,9 @@ import com.google.cloud.spanner.connection.Connection; import com.google.cloud.spanner.connection.ConnectionOptions; import java.sql.Timestamp; -import java.time.Instant; +import java.time.ZoneId; import java.time.ZonedDateTime; import java.util.ArrayList; -import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -49,7 +48,7 @@ public class SpannerUtils { private static final ObjectMapper jsonMapper = new ObjectMapper(); - public static Long MILLISECOND_TO_DAYS = 1000 * 60 * 60 * 24L; + public static Long SECOND_TO_DAYS = 60 * 60 * 24L; public static Connection connectionFromProperties(Map properties) { String connUriPrefix = "cloudspanner:"; @@ -108,7 +107,7 @@ public static InternalRow resultSetRowToInternalRow(ResultSet rs) { return spannerStructToInternalRow(spannerRow); } - public static void asSparkDecimal(GenericInternalRow dest, java.math.BigDecimal v, int at) { + public static void toSparkDecimal(GenericInternalRow dest, java.math.BigDecimal v, int at) { // TODO: Deal with the precision truncation since Cloud Spanner's precision // has (precision=38, scale=9) while Apache Spark has (precision=N, scale=M) Decimal dec = new Decimal(); @@ -117,7 +116,7 @@ public static void asSparkDecimal(GenericInternalRow dest, java.math.BigDecimal } private static void spannerNumericToSpark(Struct src, GenericInternalRow dest, int at) { - asSparkDecimal(dest, src.getBigDecimal(at), at); + toSparkDecimal(dest, src.getBigDecimal(at), at); } public static Long timestampToLong(com.google.cloud.Timestamp ts) { @@ -130,19 +129,32 @@ public static Long timestampToLong(Timestamp ts) { return (ts.getTime() * 1_000_000) + (ts.getNanos() / 1000); } - public static Long zonedDateTimeToLong(ZonedDateTime zdt) { + public static Long zonedDateTimeToSparkTimestamp(ZonedDateTime zdt) { // Convert the zonedDateTime to microseconds which Spark supports. return zdt.toEpochSecond() * 1_000_000; } - public static Long dateToInteger(com.google.cloud.Date dc) { - Date d = dc.toJavaUtilDate(dc); - Instant ins = d.toInstant(); - return (ins.getEpochSecond() * 1_000_000); + /* + * zonedDateTimeToSparkDate converts a ZonedDateTime to number of days + * since the Epoch: January 1st 1970, which is what Spark understands. + */ + public static Integer zonedDateTimeToSparkDate(ZonedDateTime zdt) { + return ((Long) (zdt.toEpochSecond() / SECOND_TO_DAYS)).intValue(); } - public static Long dateToInteger(Date d) { - return ((Long) (d.getTime() * 1_000)); + private static ZoneId zoneUTC = ZoneId.of("UTC+00:00"); + + /* + * toSparkDate converts a Google Date into the number of Days since + * the Epoch: January 1st 1970, which is what Spark understands. + */ + public static Integer toSparkDate(com.google.cloud.Date dc) { + // Cloud Spanner doesn't attach a zone to the Date, returning the time in UTC + // so we can't let it be interpreted in that of the system, hence using ZonedTimeDate with UTC. + // and not converting it to JavaUtilDate which uses the local system's timezone. + ZonedDateTime zdt = + ZonedDateTime.of(dc.getYear(), dc.getMonth(), dc.getDayOfMonth(), 0, 0, 0, 0, zoneUTC); + return zonedDateTimeToSparkDate(zdt); } public static GenericArrayData timestampIterToSpark(Iterable tsIt) { @@ -151,15 +163,15 @@ public static GenericArrayData timestampIterToSpark(Iterable tsIt) { return new GenericArrayData(dest.toArray(new Long[0])); } - public static GenericArrayData dateIterToSpark(Iterable tsIt) { - List dest = new ArrayList<>(); - tsIt.forEach((ts) -> dest.add(dateToInteger(ts))); - return new GenericArrayData(dest.toArray(new Long[0])); + public static GenericArrayData zonedDateTimeIterToSparkDates(Iterable tsIt) { + List dest = new ArrayList<>(); + tsIt.forEach((ts) -> dest.add(zonedDateTimeToSparkDate(ts))); + return new GenericArrayData(dest.toArray(new Integer[0])); } - public static GenericArrayData zonedDateTimeIterToSparkInts(Iterable tsIt) { + public static GenericArrayData zonedDateTimeIterToSparkTimestamps(Iterable tsIt) { List dest = new ArrayList<>(); - tsIt.forEach((ts) -> dest.add(zonedDateTimeToLong(ts))); + tsIt.forEach((ts) -> dest.add(zonedDateTimeToSparkTimestamp(ts))); return new GenericArrayData(dest.toArray(new Long[0])); } @@ -181,7 +193,7 @@ public static InternalRow spannerStructToInternalRow(Struct spannerRow) { break; case DATE: - sparkRow.update(i, dateToInteger(spannerRow.getDate(i))); + sparkRow.update(i, toSparkDate(spannerRow.getDate(i))); break; case FLOAT64: @@ -246,9 +258,9 @@ public static InternalRow spannerStructToInternalRow(Struct spannerRow) { tsL.forEach((ts) -> endTsL.add(timestampToLong(ts))); sparkRow.update(i, new GenericArrayData(endTsL.toArray(new Long[0]))); } else if (fieldTypeName.indexOf("ARRAY") == 0) { - List endDL = new ArrayList<>(); - spannerRow.getDateList(i).forEach((ts) -> endDL.add(dateToInteger(ts))); - sparkRow.update(i, new GenericArrayData(endDL.toArray(new Long[0]))); + List endDL = new ArrayList<>(); + spannerRow.getDateList(i).forEach((ts) -> endDL.add(toSparkDate(ts))); + sparkRow.update(i, new GenericArrayData(endDL.toArray(new Integer[0]))); } else if (fieldTypeName.indexOf("ARRAY src = spannerRow.getStructList(i); List dest = new ArrayList<>(src.size()); diff --git a/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/SpannerInputPartitionReaderContextTest.java b/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/SpannerInputPartitionReaderContextTest.java index 72ee9d52..94f30645 100644 --- a/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/SpannerInputPartitionReaderContextTest.java +++ b/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/SpannerInputPartitionReaderContextTest.java @@ -129,12 +129,12 @@ public InternalRow makeCompositeTableRow( row.update(1, new GenericArrayData(A)); row.update(2, new GenericArrayData(toSparkStrList(B))); row.update(3, UTF8String.fromString(C)); - SpannerUtils.asSparkDecimal(row, D, 4); - row.update(5, SpannerUtils.zonedDateTimeToLong(E)); - row.update(6, SpannerUtils.zonedDateTimeToLong(F)); + SpannerUtils.toSparkDecimal(row, D, 4); + row.update(5, SpannerUtils.zonedDateTimeToSparkDate(E)); + row.update(6, SpannerUtils.zonedDateTimeToSparkTimestamp(F)); row.setBoolean(7, G); - row.update(8, SpannerUtils.zonedDateTimeIterToSparkInts(Arrays.asList(H))); - row.update(9, SpannerUtils.zonedDateTimeIterToSparkInts(Arrays.asList(I))); + row.update(8, SpannerUtils.zonedDateTimeIterToSparkDates(Arrays.asList(H))); + row.update(9, SpannerUtils.zonedDateTimeIterToSparkTimestamps(Arrays.asList(I))); return row; } @@ -161,9 +161,9 @@ public InternalRow makeGamesRow( } row.update(1, new GenericArrayData(dest.toArray(new UTF8String[0]))); row.update(2, UTF8String.fromString(winner)); - row.update(3, SpannerUtils.zonedDateTimeToLong(createdAt)); - row.update(4, SpannerUtils.zonedDateTimeToLong(finishedAt)); - row.update(5, SpannerUtils.zonedDateTimeToLong(maxDate)); + row.update(3, SpannerUtils.zonedDateTimeToSparkTimestamp(createdAt)); + row.update(4, SpannerUtils.zonedDateTimeToSparkTimestamp(finishedAt)); + row.update(5, SpannerUtils.zonedDateTimeToSparkTimestamp(maxDate)); return row; } @@ -239,12 +239,12 @@ public void testArraysConversions() { new String[] {"a", "b", "c"}, "foobar", new java.math.BigDecimal(2934), - ZonedDateTime.parse("2023-01-01T00:00:00Z"), + ZonedDateTime.parse("2022-12-31T23:59:59Z"), ZonedDateTime.parse("2023-08-26T12:22:05Z"), true, new ZonedDateTime[] { - ZonedDateTime.parse("2023-01-02T00:00:00Z"), - ZonedDateTime.parse("2023-12-31T00:00:00Z"), + ZonedDateTime.parse("2023-01-01T23:59:59Z"), + ZonedDateTime.parse("2023-12-30T23:59:59Z"), }, new ZonedDateTime[] { ZonedDateTime.parse("2023-08-26T12:11:10Z"),