Skip to content

Commit

Permalink
Support spark.sql.datetime.java8API.enabled (#1303)
Browse files Browse the repository at this point in the history
Co-authored-by: Thomas Powell <[email protected]>
  • Loading branch information
tom-s-powell and Thomas Powell authored Oct 15, 2024
1 parent 44d21d8 commit 14fa9b8
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,16 @@
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Instant;
import java.time.LocalDate;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.ServiceLoader;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import javax.validation.constraints.NotNull;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -59,6 +62,9 @@ public class SparkBigQueryUtil {

private static final String SPARK_YARN_TAGS = "spark.yarn.tags";

private static final long MICROS_PER_SECOND = TimeUnit.SECONDS.toMicros(1);
private static final long MIN_SECONDS = Math.floorDiv(Long.MIN_VALUE, MICROS_PER_SECOND);

static final Properties BUILD_PROPERTIES = loadBuildProperties();

static final String CONNECTOR_VERSION = BUILD_PROPERTIES.getProperty("connector.version");
Expand Down Expand Up @@ -199,6 +205,20 @@ public static long sparkTimestampToBigQuery(Object sparkValue) {
if (sparkValue instanceof Long) {
return ((Number) sparkValue).longValue();
}

if (sparkValue instanceof Instant) {
Instant instant = (Instant) sparkValue;
long epochSeconds = instant.getEpochSecond();
if (epochSeconds == MIN_SECONDS) {
long us = Math.multiplyExact(epochSeconds + 1, MICROS_PER_SECOND);
return Math.addExact(
us, TimeUnit.NANOSECONDS.toMicros(instant.getNano()) - MICROS_PER_SECOND);
} else {
long us = Math.multiplyExact(epochSeconds, MICROS_PER_SECOND);
return Math.addExact(us, TimeUnit.NANOSECONDS.toMicros(instant.getNano()));
}
}

// need to return timestamp in epoch microseconds
java.sql.Timestamp timestamp = (java.sql.Timestamp) sparkValue;
long epochMillis = timestamp.getTime();
Expand All @@ -210,6 +230,12 @@ public static int sparkDateToBigQuery(Object sparkValue) {
if (sparkValue instanceof Number) {
return ((Number) sparkValue).intValue();
}

if (sparkValue instanceof LocalDate) {
LocalDate localDate = (LocalDate) sparkValue;
return Math.toIntExact(localDate.toEpochDay());
}

java.sql.Date sparkDate = (java.sql.Date) sparkValue;
return (int) sparkDate.toLocalDate().toEpochDay();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@
import com.google.cloud.bigquery.TimePartitioning;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.sql.Date;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDate;
import java.util.TimeZone;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
Expand Down Expand Up @@ -150,4 +155,23 @@ public void testExtractJobLabels_with_labels() {
assertThat(labels).containsEntry("dataproc_job_id", "d8f27392957446dbbd7dc28df568e4eb");
assertThat(labels).containsEntry("dataproc_job_uuid", "df379ef3-eeda-3234-8941-e1a36a1959a3");
}

@Test
public void testSparkDateToBigQuery() {
assertThat(SparkBigQueryUtil.sparkDateToBigQuery(16929L)).isEqualTo(16929L);
assertThat(SparkBigQueryUtil.sparkDateToBigQuery(Date.valueOf("2016-05-08"))).isEqualTo(16929);
assertThat(SparkBigQueryUtil.sparkDateToBigQuery(LocalDate.of(2016, 5, 8))).isEqualTo(16929);
}

@Test
public void testSparkTimestampToBigQuery() {
TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
assertThat(SparkBigQueryUtil.sparkTimestampToBigQuery(10L)).isEqualTo(10L);
assertThat(
SparkBigQueryUtil.sparkTimestampToBigQuery(Timestamp.valueOf("2016-05-08 00:00:01.01")))
.isEqualTo(1462665601010000L);
assertThat(
SparkBigQueryUtil.sparkTimestampToBigQuery(Instant.parse("2016-05-08T00:00:01.010Z")))
.isEqualTo(1462665601010000L);
}
}

0 comments on commit 14fa9b8

Please sign in to comment.