Skip to content

Commit

Permalink
Date: convert back to daysSinceEpoch + fix timezone discrepancies
Browse files Browse the repository at this point in the history
  • Loading branch information
odeke-em committed Sep 17, 2023
1 parent 15bda03 commit 88e8fd0
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 35 deletions.
3 changes: 1 addition & 2 deletions cloudbuild/presubmit.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ case $STEP in
integrationtest-real-spanner)
# Starts the Spanner emulator and setup the gcloud command.
# Sets the env used in the integration test.
#
# TODO: Remove the test filter once Date zone issue is fixed in test SpannerInputPartitionReaderContextTest.
$MVN test -Dtest=SpannerTableTest
$MVN test -Dtest=SpannerScanBuilderTest
$MVN test -Dtest=SpannerInputPartitionReaderContextTest
;;
esac

Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@
import com.google.cloud.spanner.connection.Connection;
import com.google.cloud.spanner.connection.ConnectionOptions;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -49,7 +48,7 @@
public class SpannerUtils {
private static final ObjectMapper jsonMapper = new ObjectMapper();

public static Long MILLISECOND_TO_DAYS = 1000 * 60 * 60 * 24L;
public static Long SECOND_TO_DAYS = 60 * 60 * 24L;

public static Connection connectionFromProperties(Map<String, String> properties) {
String connUriPrefix = "cloudspanner:";
Expand Down Expand Up @@ -108,7 +107,7 @@ public static InternalRow resultSetRowToInternalRow(ResultSet rs) {
return spannerStructToInternalRow(spannerRow);
}

public static void asSparkDecimal(GenericInternalRow dest, java.math.BigDecimal v, int at) {
public static void toSparkDecimal(GenericInternalRow dest, java.math.BigDecimal v, int at) {
// TODO: Deal with the precision truncation since Cloud Spanner's precision
// has (precision=38, scale=9) while Apache Spark has (precision=N, scale=M)
Decimal dec = new Decimal();
Expand All @@ -117,7 +116,7 @@ public static void asSparkDecimal(GenericInternalRow dest, java.math.BigDecimal
}

private static void spannerNumericToSpark(Struct src, GenericInternalRow dest, int at) {
asSparkDecimal(dest, src.getBigDecimal(at), at);
toSparkDecimal(dest, src.getBigDecimal(at), at);
}

public static Long timestampToLong(com.google.cloud.Timestamp ts) {
Expand All @@ -130,19 +129,32 @@ public static Long timestampToLong(Timestamp ts) {
return (ts.getTime() * 1_000_000) + (ts.getNanos() / 1000);
}

public static Long zonedDateTimeToLong(ZonedDateTime zdt) {
public static Long zonedDateTimeToSparkTimestamp(ZonedDateTime zdt) {
// Convert the zonedDateTime to microseconds which Spark supports.
return zdt.toEpochSecond() * 1_000_000;
}

public static Long dateToInteger(com.google.cloud.Date dc) {
Date d = dc.toJavaUtilDate(dc);
Instant ins = d.toInstant();
return (ins.getEpochSecond() * 1_000_000);
/*
* zonedDateTimeToSparkDate converts a ZonedDateTime to number of days
* since the Epoch: January 1st 1970, which is what Spark understands.
*/
public static Integer zonedDateTimeToSparkDate(ZonedDateTime zdt) {
return ((Long) (zdt.toEpochSecond() / SECOND_TO_DAYS)).intValue();
}

public static Long dateToInteger(Date d) {
return ((Long) (d.getTime() * 1_000));
private static ZoneId zoneUTC = ZoneId.of("UTC+00:00");

/*
* toSparkDate converts a Google Date into the number of Days since
* the Epoch: January 1st 1970, which is what Spark understands.
*/
public static Integer toSparkDate(com.google.cloud.Date dc) {
// Cloud Spanner doesn't attach a zone to the Date, returning the time in UTC
// so we can't let it be interpreted in that of the system, hence using ZonedTimeDate with UTC.
// and not converting it to JavaUtilDate which uses the local system's timezone.
ZonedDateTime zdt =
ZonedDateTime.of(dc.getYear(), dc.getMonth(), dc.getDayOfMonth(), 0, 0, 0, 0, zoneUTC);
return zonedDateTimeToSparkDate(zdt);
}

public static GenericArrayData timestampIterToSpark(Iterable<Timestamp> tsIt) {
Expand All @@ -151,15 +163,15 @@ public static GenericArrayData timestampIterToSpark(Iterable<Timestamp> tsIt) {
return new GenericArrayData(dest.toArray(new Long[0]));
}

public static GenericArrayData dateIterToSpark(Iterable<Date> tsIt) {
List<Long> dest = new ArrayList<>();
tsIt.forEach((ts) -> dest.add(dateToInteger(ts)));
return new GenericArrayData(dest.toArray(new Long[0]));
public static GenericArrayData zonedDateTimeIterToSparkDates(Iterable<ZonedDateTime> tsIt) {
List<Integer> dest = new ArrayList<>();
tsIt.forEach((ts) -> dest.add(zonedDateTimeToSparkDate(ts)));
return new GenericArrayData(dest.toArray(new Integer[0]));
}

public static GenericArrayData zonedDateTimeIterToSparkInts(Iterable<ZonedDateTime> tsIt) {
public static GenericArrayData zonedDateTimeIterToSparkTimestamps(Iterable<ZonedDateTime> tsIt) {
List<Long> dest = new ArrayList<>();
tsIt.forEach((ts) -> dest.add(zonedDateTimeToLong(ts)));
tsIt.forEach((ts) -> dest.add(zonedDateTimeToSparkTimestamp(ts)));
return new GenericArrayData(dest.toArray(new Long[0]));
}

Expand All @@ -181,7 +193,7 @@ public static InternalRow spannerStructToInternalRow(Struct spannerRow) {
break;

case DATE:
sparkRow.update(i, dateToInteger(spannerRow.getDate(i)));
sparkRow.update(i, toSparkDate(spannerRow.getDate(i)));
break;

case FLOAT64:
Expand Down Expand Up @@ -246,9 +258,9 @@ public static InternalRow spannerStructToInternalRow(Struct spannerRow) {
tsL.forEach((ts) -> endTsL.add(timestampToLong(ts)));
sparkRow.update(i, new GenericArrayData(endTsL.toArray(new Long[0])));
} else if (fieldTypeName.indexOf("ARRAY<DATE>") == 0) {
List<Long> endDL = new ArrayList<>();
spannerRow.getDateList(i).forEach((ts) -> endDL.add(dateToInteger(ts)));
sparkRow.update(i, new GenericArrayData(endDL.toArray(new Long[0])));
List<Integer> endDL = new ArrayList<>();
spannerRow.getDateList(i).forEach((ts) -> endDL.add(toSparkDate(ts)));
sparkRow.update(i, new GenericArrayData(endDL.toArray(new Integer[0])));
} else if (fieldTypeName.indexOf("ARRAY<STRUCT<") == 0) {
List<Struct> src = spannerRow.getStructList(i);
List<InternalRow> dest = new ArrayList<>(src.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,12 @@ public InternalRow makeCompositeTableRow(
row.update(1, new GenericArrayData(A));
row.update(2, new GenericArrayData(toSparkStrList(B)));
row.update(3, UTF8String.fromString(C));
SpannerUtils.asSparkDecimal(row, D, 4);
row.update(5, SpannerUtils.zonedDateTimeToLong(E));
row.update(6, SpannerUtils.zonedDateTimeToLong(F));
SpannerUtils.toSparkDecimal(row, D, 4);
row.update(5, SpannerUtils.zonedDateTimeToSparkDate(E));
row.update(6, SpannerUtils.zonedDateTimeToSparkTimestamp(F));
row.setBoolean(7, G);
row.update(8, SpannerUtils.zonedDateTimeIterToSparkInts(Arrays.asList(H)));
row.update(9, SpannerUtils.zonedDateTimeIterToSparkInts(Arrays.asList(I)));
row.update(8, SpannerUtils.zonedDateTimeIterToSparkDates(Arrays.asList(H)));
row.update(9, SpannerUtils.zonedDateTimeIterToSparkTimestamps(Arrays.asList(I)));
return row;
}

Expand All @@ -161,9 +161,9 @@ public InternalRow makeGamesRow(
}
row.update(1, new GenericArrayData(dest.toArray(new UTF8String[0])));
row.update(2, UTF8String.fromString(winner));
row.update(3, SpannerUtils.zonedDateTimeToLong(createdAt));
row.update(4, SpannerUtils.zonedDateTimeToLong(finishedAt));
row.update(5, SpannerUtils.zonedDateTimeToLong(maxDate));
row.update(3, SpannerUtils.zonedDateTimeToSparkTimestamp(createdAt));
row.update(4, SpannerUtils.zonedDateTimeToSparkTimestamp(finishedAt));
row.update(5, SpannerUtils.zonedDateTimeToSparkTimestamp(maxDate));
return row;
}

Expand Down Expand Up @@ -239,12 +239,12 @@ public void testArraysConversions() {
new String[] {"a", "b", "c"},
"foobar",
new java.math.BigDecimal(2934),
ZonedDateTime.parse("2023-01-01T00:00:00Z"),
ZonedDateTime.parse("2022-12-31T23:59:59Z"),
ZonedDateTime.parse("2023-08-26T12:22:05Z"),
true,
new ZonedDateTime[] {
ZonedDateTime.parse("2023-01-02T00:00:00Z"),
ZonedDateTime.parse("2023-12-31T00:00:00Z"),
ZonedDateTime.parse("2023-01-01T23:59:59Z"),
ZonedDateTime.parse("2023-12-30T23:59:59Z"),
},
new ZonedDateTime[] {
ZonedDateTime.parse("2023-08-26T12:11:10Z"),
Expand Down

0 comments on commit 88e8fd0

Please sign in to comment.