Skip to content

Commit

Permalink
revert toSparkTimestamp to original + add more comments
Browse files Browse the repository at this point in the history
  • Loading branch information
odeke-em committed Sep 17, 2023
1 parent 88e8fd0 commit ae593b0
Showing 1 changed file with 14 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,20 @@ private static void spannerNumericToSpark(Struct src, GenericInternalRow dest, i
toSparkDecimal(dest, src.getBigDecimal(at), at);
}

public static Long timestampToLong(com.google.cloud.Timestamp ts) {
public static Long toSparkTimestamp(com.google.cloud.Timestamp ts) {
// Convert the timestamp to microseconds, which is supported in the Spark.
return (ts.getSeconds() * 1_000_000) + (ts.getNanos() / 1000);
Timestamp sqlTs = ts.toSqlTimestamp();
return toSparkTimestamp(sqlTs);
}

public static Long timestampToLong(Timestamp ts) {
// Convert the timestamp to microseconds, which is supported in the Spark.
return (ts.getTime() * 1_000_000) + (ts.getNanos() / 1000);
/*
* toSparkTimestamp converts a java.sql.Timestamp to microseconds,
* stored in a Long as Spark expects.
*/
public static Long toSparkTimestamp(Timestamp ts) {
// ts.getTime() returns time in milliseconds, so *1000 -> microseconds
// ts.getNanos() returns time in nanoseconds, so /1000 -> microseconds
return (ts.getTime() * 1000) + (ts.getNanos() / 1000);
}

public static Long zonedDateTimeToSparkTimestamp(ZonedDateTime zdt) {
Expand Down Expand Up @@ -159,7 +165,7 @@ public static Integer toSparkDate(com.google.cloud.Date dc) {

public static GenericArrayData timestampIterToSpark(Iterable<Timestamp> tsIt) {
List<Long> dest = new ArrayList<>();
tsIt.forEach((ts) -> dest.add(timestampToLong(ts)));
tsIt.forEach((ts) -> dest.add(toSparkTimestamp(ts)));
return new GenericArrayData(dest.toArray(new Long[0]));
}

Expand Down Expand Up @@ -222,7 +228,7 @@ public static InternalRow spannerStructToInternalRow(Struct spannerRow) {

case TIMESTAMP:
// Convert the timestamp to microseconds, which is supported in the Spark.
sparkRow.update(i, timestampToLong(spannerRow.getTimestamp(i)));
sparkRow.update(i, toSparkTimestamp(spannerRow.getTimestamp(i)));
break;

case STRING:
Expand Down Expand Up @@ -253,9 +259,8 @@ public static InternalRow spannerStructToInternalRow(Struct spannerRow) {
src.forEach((s) -> dest.add(UTF8String.fromString(s)));
sparkRow.update(i, new GenericArrayData(dest.toArray(new UTF8String[0])));
} else if (fieldTypeName.indexOf("ARRAY<TIMESTAMP") == 0) {
List<com.google.cloud.Timestamp> tsL = spannerRow.getTimestampList(i);
List<Long> endTsL = new ArrayList<>();
tsL.forEach((ts) -> endTsL.add(timestampToLong(ts)));
spannerRow.getTimestampList(i).forEach((ts) -> endTsL.add(toSparkTimestamp(ts)));
sparkRow.update(i, new GenericArrayData(endTsL.toArray(new Long[0])));
} else if (fieldTypeName.indexOf("ARRAY<DATE>") == 0) {
List<Integer> endDL = new ArrayList<>();
Expand Down

0 comments on commit ae593b0

Please sign in to comment.