From 2ed95b00248a54ff70f3c9b56f9fd274a0f61860 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Fri, 11 Aug 2023 19:25:42 +0300 Subject: [PATCH] spannerspark: initial code skeleton for reads (#31) This code implements the basis of the Cloud Spanner->Spark connector. Current functionality ensures that you can create a DataFrame and then invoke .printSchema() --- .github/workflows/test.yml | 5 + .mvn/wrapper/MavenWrapperDownloader.java | 117 +++++++ .mvn/wrapper/maven-wrapper.properties | 2 + README.md | 46 +++ examples/SpannerSpark.java | 39 +++ examples/SpannerSpark.py | 26 ++ mvnw | 310 ++++++++++++++++++ mvnw.cmd | 182 ++++++++++ spark-3.1-spanner-lib/pom.xml | 6 + .../cloud/spark/spanner/DefaultSource.java | 17 + .../spark/spanner/SpannerBaseRelation.java | 81 +++++ .../cloud/spark/spanner/SpannerRDD.java | 17 + .../cloud/spark/spanner/SpannerRelation.java | 83 +++++ .../spark/spanner/SpannerScanBuilder.java | 78 +++++ .../cloud/spark/spanner/SpannerScanner.java | 42 +++ .../cloud/spark/spanner/SpannerTable.java | 183 +++++++++++ .../cloud/spark/spanner/SpannerUtils.java | 138 ++++++++ .../spanner/Spark31SpannerTableProvider.java | 44 ++- .../cloud/spark/spanner/SpannerTableTest.java | 131 ++++++++ spark-3.1-spanner/pom.xml | 7 + 20 files changed, 1550 insertions(+), 4 deletions(-) create mode 100644 .mvn/wrapper/MavenWrapperDownloader.java create mode 100644 .mvn/wrapper/maven-wrapper.properties create mode 100644 examples/SpannerSpark.java create mode 100644 examples/SpannerSpark.py create mode 100755 mvnw create mode 100644 mvnw.cmd create mode 100644 spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/DefaultSource.java create mode 100644 spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerBaseRelation.java create mode 100644 spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerRDD.java create mode 100644 spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerRelation.java create mode 100644 spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerScanBuilder.java create mode 100644 spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerScanner.java create mode 100644 spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerTable.java create mode 100644 spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerUtils.java create mode 100644 spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/SpannerTableTest.java diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 9970c76c..0f5775c9 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -26,3 +26,8 @@ jobs: cache: 'maven' - name: Run the Maven verify phase run: mvn --batch-mode --update-snapshots verify + + env: + JOB_TYPE: test + SPANNER_EMULATOR_HOST: localhost:9010 + GOOGLE_CLOUD_PROJECT: spark-spanner-test diff --git a/.mvn/wrapper/MavenWrapperDownloader.java b/.mvn/wrapper/MavenWrapperDownloader.java new file mode 100644 index 00000000..02d39d87 --- /dev/null +++ b/.mvn/wrapper/MavenWrapperDownloader.java @@ -0,0 +1,117 @@ +/* + * Copyright 2007-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import java.net.*; +import java.io.*; +import java.nio.channels.*; +import java.util.Properties; + +public class MavenWrapperDownloader { + + private static final String WRAPPER_VERSION = "0.5.6"; + /** + * Default URL to download the maven-wrapper.jar from, if no 'downloadUrl' is provided. + */ + private static final String DEFAULT_DOWNLOAD_URL = "https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/" + + WRAPPER_VERSION + "/maven-wrapper-" + WRAPPER_VERSION + ".jar"; + + /** + * Path to the maven-wrapper.properties file, which might contain a downloadUrl property to + * use instead of the default one. + */ + private static final String MAVEN_WRAPPER_PROPERTIES_PATH = + ".mvn/wrapper/maven-wrapper.properties"; + + /** + * Path where the maven-wrapper.jar will be saved to. + */ + private static final String MAVEN_WRAPPER_JAR_PATH = + ".mvn/wrapper/maven-wrapper.jar"; + + /** + * Name of the property which should be used to override the default download url for the wrapper. + */ + private static final String PROPERTY_NAME_WRAPPER_URL = "wrapperUrl"; + + public static void main(String args[]) { + System.out.println("- Downloader started"); + File baseDirectory = new File(args[0]); + System.out.println("- Using base directory: " + baseDirectory.getAbsolutePath()); + + // If the maven-wrapper.properties exists, read it and check if it contains a custom + // wrapperUrl parameter. + File mavenWrapperPropertyFile = new File(baseDirectory, MAVEN_WRAPPER_PROPERTIES_PATH); + String url = DEFAULT_DOWNLOAD_URL; + if(mavenWrapperPropertyFile.exists()) { + FileInputStream mavenWrapperPropertyFileInputStream = null; + try { + mavenWrapperPropertyFileInputStream = new FileInputStream(mavenWrapperPropertyFile); + Properties mavenWrapperProperties = new Properties(); + mavenWrapperProperties.load(mavenWrapperPropertyFileInputStream); + url = mavenWrapperProperties.getProperty(PROPERTY_NAME_WRAPPER_URL, url); + } catch (IOException e) { + System.out.println("- ERROR loading '" + MAVEN_WRAPPER_PROPERTIES_PATH + "'"); + } finally { + try { + if(mavenWrapperPropertyFileInputStream != null) { + mavenWrapperPropertyFileInputStream.close(); + } + } catch (IOException e) { + // Ignore ... + } + } + } + System.out.println("- Downloading from: " + url); + + File outputFile = new File(baseDirectory.getAbsolutePath(), MAVEN_WRAPPER_JAR_PATH); + if(!outputFile.getParentFile().exists()) { + if(!outputFile.getParentFile().mkdirs()) { + System.out.println( + "- ERROR creating output directory '" + outputFile.getParentFile().getAbsolutePath() + "'"); + } + } + System.out.println("- Downloading to: " + outputFile.getAbsolutePath()); + try { + downloadFileFromURL(url, outputFile); + System.out.println("Done"); + System.exit(0); + } catch (Throwable e) { + System.out.println("- Error downloading"); + e.printStackTrace(); + System.exit(1); + } + } + + private static void downloadFileFromURL(String urlString, File destination) throws Exception { + if (System.getenv("MVNW_USERNAME") != null && System.getenv("MVNW_PASSWORD") != null) { + String username = System.getenv("MVNW_USERNAME"); + char[] password = System.getenv("MVNW_PASSWORD").toCharArray(); + Authenticator.setDefault(new Authenticator() { + @Override + protected PasswordAuthentication getPasswordAuthentication() { + return new PasswordAuthentication(username, password); + } + }); + } + URL website = new URL(urlString); + ReadableByteChannel rbc; + rbc = Channels.newChannel(website.openStream()); + FileOutputStream fos = new FileOutputStream(destination); + fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE); + fos.close(); + rbc.close(); + } + +} \ No newline at end of file diff --git a/.mvn/wrapper/maven-wrapper.properties b/.mvn/wrapper/maven-wrapper.properties new file mode 100644 index 00000000..94f05e8f --- /dev/null +++ b/.mvn/wrapper/maven-wrapper.properties @@ -0,0 +1,2 @@ +distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.8.4/apache-maven-3.8.4-bin.zip +wrapperUrl=https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar \ No newline at end of file diff --git a/README.md b/README.md index df2ce007..5743e0d9 100644 --- a/README.md +++ b/README.md @@ -1 +1,47 @@ # spark-spanner-connector + +## Pre-requisites +- [ ] Ensure that [Databoost is enabled for your Cloud Spanner database](https://cloud.google.com/spanner/docs/databoost/databoost-applications#before_you_begin) + +## Running tests locally +Please run the [Cloud Spanner Emulator](https://cloud.google.com/spanner/docs/emulator) locally. + +Before running `mvn`, please set the variable `SPANNER_EMULATOR_HOST=localhost:9090` + +## Compiling the JAR +To compile it against Spark 3.1, Please run +```shell +./mvnw install -P3.1 +``` + +## Submitting the job to Google Dataproc +```shell +gcloud dataproc jobs submit pyspark --cluster "spanner-spark-cluster" \ + --jars=./spark-3.1-spanner/target/spark-3.1-spanner-0.0.1-SNAPSHOT.jar \ + --region us-central1 examples/SpannerSpark.py +``` + +## Submitting the Spark job locally +```shell +./bin/spark-shell --jars \ + local:/spark-spanner-connector/spark-3.1-spanner-lib/target/spark-3.1-spanner-lib-0.0.1-SNAPSHOT.jar +``` +which will pull up a spark shell and you can run it by passing in the options +to create the connection to Cloud Spanner. + +### Variables needed + +Variable|Comments +---|--- +projectId|The projectID containing the Cloud Spanner database +instanceId|The instanceID of the Cloud Spanner database +databaseId|The databaseID of the Cloud Spanner database +table|The Table of the Cloud Spanner database that you are reading from + + +### Spark shell example +```shell +var df = spark.read.format("cloud-spanner").option("table", "ATable").option("projectId", PROJECT_ID).option("instanceId", INSTANCE_ID).option("databaseId", DATABASE_ID).load() +df.show() +df.printSchema() +``` diff --git a/examples/SpannerSpark.java b/examples/SpannerSpark.java new file mode 100644 index 00000000..645ac50f --- /dev/null +++ b/examples/SpannerSpark.java @@ -0,0 +1,39 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.cloud.spark.spanner.examples; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +public class SpannerSpark { + public static void main(String[] args) { + SparkSession spark = SparkSession + .builder() + .appName("cloud spanner for census 2020") + .getOrCreate(); + + + Dataset df = spark.read() + .format("cloud-spanner") + .option("table", "people") + .option("projectId", System.getenv("SPANNER_SPARK_PROJECT")) + .option("instanceId", System.getenv("SPANNER_SPARK_INSTANCE")) + .option("database", System.getenv("SPANNER_SPARK_DATABASE")) + .load(); + df.show(); + df.printSchema(); + } +} diff --git a/examples/SpannerSpark.py b/examples/SpannerSpark.py new file mode 100644 index 00000000..ea9fdd13 --- /dev/null +++ b/examples/SpannerSpark.py @@ -0,0 +1,26 @@ +#/usr/bin/env python + +# Copyright 2023 Google LLC. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pyspark.sql import SparkSession + +def main(): + spark = SparkSession.builder.appName("Query Spanner").getOrCreate() + df = spark.read.format("cloud-spanner").option("table", "ATable").load() + df.show() + df.printSchema() + +if __name__ == '__main__': + main() diff --git a/mvnw b/mvnw new file mode 100755 index 00000000..e1a3c924 --- /dev/null +++ b/mvnw @@ -0,0 +1,310 @@ +#!/bin/sh +# ---------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# ---------------------------------------------------------------------------- + +# ---------------------------------------------------------------------------- +# Maven Start Up Batch script +# +# Required ENV vars: +# ------------------ +# JAVA_HOME - location of a JDK home dir +# +# Optional ENV vars +# ----------------- +# M2_HOME - location of maven2's installed home dir +# MAVEN_OPTS - parameters passed to the Java VM when running Maven +# e.g. to debug Maven itself, use +# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +# MAVEN_SKIP_RC - flag to disable loading of mavenrc files +# ---------------------------------------------------------------------------- + +if [ -z "$MAVEN_SKIP_RC" ] ; then + + if [ -f /etc/mavenrc ] ; then + . /etc/mavenrc + fi + + if [ -f "$HOME/.mavenrc" ] ; then + . "$HOME/.mavenrc" + fi + +fi + +# OS specific support. $var _must_ be set to either true or false. +cygwin=false; +darwin=false; +mingw=false +case "`uname`" in + CYGWIN*) cygwin=true ;; + MINGW*) mingw=true;; + Darwin*) darwin=true + # Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home + # See https://developer.apple.com/library/mac/qa/qa1170/_index.html + if [ -z "$JAVA_HOME" ]; then + if [ -x "/usr/libexec/java_home" ]; then + export JAVA_HOME="`/usr/libexec/java_home`" + else + export JAVA_HOME="/Library/Java/Home" + fi + fi + ;; +esac + +if [ -z "$JAVA_HOME" ] ; then + if [ -r /etc/gentoo-release ] ; then + JAVA_HOME=`java-config --jre-home` + fi +fi + +if [ -z "$M2_HOME" ] ; then + ## resolve links - $0 may be a link to maven's home + PRG="$0" + + # need this for relative symlinks + while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG="`dirname "$PRG"`/$link" + fi + done + + saveddir=`pwd` + + M2_HOME=`dirname "$PRG"`/.. + + # make it fully qualified + M2_HOME=`cd "$M2_HOME" && pwd` + + cd "$saveddir" + # echo Using m2 at $M2_HOME +fi + +# For Cygwin, ensure paths are in UNIX format before anything is touched +if $cygwin ; then + [ -n "$M2_HOME" ] && + M2_HOME=`cygpath --unix "$M2_HOME"` + [ -n "$JAVA_HOME" ] && + JAVA_HOME=`cygpath --unix "$JAVA_HOME"` + [ -n "$CLASSPATH" ] && + CLASSPATH=`cygpath --path --unix "$CLASSPATH"` +fi + +# For Mingw, ensure paths are in UNIX format before anything is touched +if $mingw ; then + [ -n "$M2_HOME" ] && + M2_HOME="`(cd "$M2_HOME"; pwd)`" + [ -n "$JAVA_HOME" ] && + JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`" +fi + +if [ -z "$JAVA_HOME" ]; then + javaExecutable="`which javac`" + if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then + # readlink(1) is not available as standard on Solaris 10. + readLink=`which readlink` + if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then + if $darwin ; then + javaHome="`dirname \"$javaExecutable\"`" + javaExecutable="`cd \"$javaHome\" && pwd -P`/javac" + else + javaExecutable="`readlink -f \"$javaExecutable\"`" + fi + javaHome="`dirname \"$javaExecutable\"`" + javaHome=`expr "$javaHome" : '\(.*\)/bin'` + JAVA_HOME="$javaHome" + export JAVA_HOME + fi + fi +fi + +if [ -z "$JAVACMD" ] ; then + if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + else + JAVACMD="`which java`" + fi +fi + +if [ ! -x "$JAVACMD" ] ; then + echo "Error: JAVA_HOME is not defined correctly." >&2 + echo " We cannot execute $JAVACMD" >&2 + exit 1 +fi + +if [ -z "$JAVA_HOME" ] ; then + echo "Warning: JAVA_HOME environment variable is not set." +fi + +CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher + +# traverses directory structure from process work directory to filesystem root +# first directory with .mvn subdirectory is considered project base directory +find_maven_basedir() { + + if [ -z "$1" ] + then + echo "Path not specified to find_maven_basedir" + return 1 + fi + + basedir="$1" + wdir="$1" + while [ "$wdir" != '/' ] ; do + if [ -d "$wdir"/.mvn ] ; then + basedir=$wdir + break + fi + # workaround for JBEAP-8937 (on Solaris 10/Sparc) + if [ -d "${wdir}" ]; then + wdir=`cd "$wdir/.."; pwd` + fi + # end of workaround + done + echo "${basedir}" +} + +# concatenates all lines of a file +concat_lines() { + if [ -f "$1" ]; then + echo "$(tr -s '\n' ' ' < "$1")" + fi +} + +BASE_DIR=`find_maven_basedir "$(pwd)"` +if [ -z "$BASE_DIR" ]; then + exit 1; +fi + +########################################################################################## +# Extension to allow automatically downloading the maven-wrapper.jar from Maven-central +# This allows using the maven wrapper in projects that prohibit checking in binary data. +########################################################################################## +if [ -r "$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" ]; then + if [ "$MVNW_VERBOSE" = true ]; then + echo "Found .mvn/wrapper/maven-wrapper.jar" + fi +else + if [ "$MVNW_VERBOSE" = true ]; then + echo "Couldn't find .mvn/wrapper/maven-wrapper.jar, downloading it ..." + fi + if [ -n "$MVNW_REPOURL" ]; then + jarUrl="$MVNW_REPOURL/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" + else + jarUrl="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" + fi + while IFS="=" read key value; do + case "$key" in (wrapperUrl) jarUrl="$value"; break ;; + esac + done < "$BASE_DIR/.mvn/wrapper/maven-wrapper.properties" + if [ "$MVNW_VERBOSE" = true ]; then + echo "Downloading from: $jarUrl" + fi + wrapperJarPath="$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" + if $cygwin; then + wrapperJarPath=`cygpath --path --windows "$wrapperJarPath"` + fi + + if command -v wget > /dev/null; then + if [ "$MVNW_VERBOSE" = true ]; then + echo "Found wget ... using wget" + fi + if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then + wget "$jarUrl" -O "$wrapperJarPath" + else + wget --http-user=$MVNW_USERNAME --http-password=$MVNW_PASSWORD "$jarUrl" -O "$wrapperJarPath" + fi + elif command -v curl > /dev/null; then + if [ "$MVNW_VERBOSE" = true ]; then + echo "Found curl ... using curl" + fi + if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then + curl -o "$wrapperJarPath" "$jarUrl" -f + else + curl --user $MVNW_USERNAME:$MVNW_PASSWORD -o "$wrapperJarPath" "$jarUrl" -f + fi + + else + if [ "$MVNW_VERBOSE" = true ]; then + echo "Falling back to using Java to download" + fi + javaClass="$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.java" + # For Cygwin, switch paths to Windows format before running javac + if $cygwin; then + javaClass=`cygpath --path --windows "$javaClass"` + fi + if [ -e "$javaClass" ]; then + if [ ! -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then + if [ "$MVNW_VERBOSE" = true ]; then + echo " - Compiling MavenWrapperDownloader.java ..." + fi + # Compiling the Java class + ("$JAVA_HOME/bin/javac" "$javaClass") + fi + if [ -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then + # Running the downloader + if [ "$MVNW_VERBOSE" = true ]; then + echo " - Running MavenWrapperDownloader.java ..." + fi + ("$JAVA_HOME/bin/java" -cp .mvn/wrapper MavenWrapperDownloader "$MAVEN_PROJECTBASEDIR") + fi + fi + fi +fi +########################################################################################## +# End of extension +########################################################################################## + +export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"} +if [ "$MVNW_VERBOSE" = true ]; then + echo $MAVEN_PROJECTBASEDIR +fi +MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS" + +# For Cygwin, switch paths to Windows format before running java +if $cygwin; then + [ -n "$M2_HOME" ] && + M2_HOME=`cygpath --path --windows "$M2_HOME"` + [ -n "$JAVA_HOME" ] && + JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"` + [ -n "$CLASSPATH" ] && + CLASSPATH=`cygpath --path --windows "$CLASSPATH"` + [ -n "$MAVEN_PROJECTBASEDIR" ] && + MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"` +fi + +# Provide a "standardized" way to retrieve the CLI args that will +# work with both Windows and non-Windows executions. +MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $@" +export MAVEN_CMD_LINE_ARGS + +WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +exec "$JAVACMD" \ + $MAVEN_OPTS \ + -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \ + "-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \ + ${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@" \ No newline at end of file diff --git a/mvnw.cmd b/mvnw.cmd new file mode 100644 index 00000000..8d4a214d --- /dev/null +++ b/mvnw.cmd @@ -0,0 +1,182 @@ +@REM ---------------------------------------------------------------------------- +@REM Licensed to the Apache Software Foundation (ASF) under one +@REM or more contributor license agreements. See the NOTICE file +@REM distributed with this work for additional information +@REM regarding copyright ownership. The ASF licenses this file +@REM to you under the Apache License, Version 2.0 (the +@REM "License"); you may not use this file except in compliance +@REM with the License. You may obtain a copy of the License at +@REM +@REM http://www.apache.org/licenses/LICENSE-2.0 +@REM +@REM Unless required by applicable law or agreed to in writing, +@REM software distributed under the License is distributed on an +@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +@REM KIND, either express or implied. See the License for the +@REM specific language governing permissions and limitations +@REM under the License. +@REM ---------------------------------------------------------------------------- + +@REM ---------------------------------------------------------------------------- +@REM Maven Start Up Batch script +@REM +@REM Required ENV vars: +@REM JAVA_HOME - location of a JDK home dir +@REM +@REM Optional ENV vars +@REM M2_HOME - location of maven2's installed home dir +@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands +@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a keystroke before ending +@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven +@REM e.g. to debug Maven itself, use +@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files +@REM ---------------------------------------------------------------------------- + +@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on' +@echo off +@REM set title of command window +title %0 +@REM enable echoing by setting MAVEN_BATCH_ECHO to 'on' +@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO% + +@REM set %HOME% to equivalent of $HOME +if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%") + +@REM Execute a user defined script before this one +if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre +@REM check for pre script, once with legacy .bat ending and once with .cmd ending +if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat" +if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd" +:skipRcPre + +@setlocal + +set ERROR_CODE=0 + +@REM To isolate internal variables from possible post scripts, we use another setlocal +@setlocal + +@REM ==== START VALIDATION ==== +if not "%JAVA_HOME%" == "" goto OkJHome + +echo. +echo Error: JAVA_HOME not found in your environment. >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +:OkJHome +if exist "%JAVA_HOME%\bin\java.exe" goto init + +echo. +echo Error: JAVA_HOME is set to an invalid directory. >&2 +echo JAVA_HOME = "%JAVA_HOME%" >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +@REM ==== END VALIDATION ==== + +:init + +@REM Find the project base dir, i.e. the directory that contains the folder ".mvn". +@REM Fallback to current working directory if not found. + +set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR% +IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir + +set EXEC_DIR=%CD% +set WDIR=%EXEC_DIR% +:findBaseDir +IF EXIST "%WDIR%"\.mvn goto baseDirFound +cd .. +IF "%WDIR%"=="%CD%" goto baseDirNotFound +set WDIR=%CD% +goto findBaseDir + +:baseDirFound +set MAVEN_PROJECTBASEDIR=%WDIR% +cd "%EXEC_DIR%" +goto endDetectBaseDir + +:baseDirNotFound +set MAVEN_PROJECTBASEDIR=%EXEC_DIR% +cd "%EXEC_DIR%" + +:endDetectBaseDir + +IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig + +@setlocal EnableExtensions EnableDelayedExpansion +for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a +@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS% + +:endReadAdditionalConfig + +SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe" +set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar" +set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +set DOWNLOAD_URL="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" + +FOR /F "tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO ( + IF "%%A"=="wrapperUrl" SET DOWNLOAD_URL=%%B +) + +@REM Extension to allow automatically downloading the maven-wrapper.jar from Maven-central +@REM This allows using the maven wrapper in projects that prohibit checking in binary data. +if exist %WRAPPER_JAR% ( + if "%MVNW_VERBOSE%" == "true" ( + echo Found %WRAPPER_JAR% + ) +) else ( + if not "%MVNW_REPOURL%" == "" ( + SET DOWNLOAD_URL="%MVNW_REPOURL%/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" + ) + if "%MVNW_VERBOSE%" == "true" ( + echo Couldn't find %WRAPPER_JAR%, downloading it ... + echo Downloading from: %DOWNLOAD_URL% + ) + + powershell -Command "&{"^ + "$webclient = new-object System.Net.WebClient;"^ + "if (-not ([string]::IsNullOrEmpty('%MVNW_USERNAME%') -and [string]::IsNullOrEmpty('%MVNW_PASSWORD%'))) {"^ + "$webclient.Credentials = new-object System.Net.NetworkCredential('%MVNW_USERNAME%', '%MVNW_PASSWORD%');"^ + "}"^ + "[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12; $webclient.DownloadFile('%DOWNLOAD_URL%', '%WRAPPER_JAR%')"^ + "}" + if "%MVNW_VERBOSE%" == "true" ( + echo Finished downloading %WRAPPER_JAR% + ) +) +@REM End of extension + +@REM Provide a "standardized" way to retrieve the CLI args that will +@REM work with both Windows and non-Windows executions. +set MAVEN_CMD_LINE_ARGS=%* + +%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %* +if ERRORLEVEL 1 goto error +goto end + +:error +set ERROR_CODE=1 + +:end +@endlocal & set ERROR_CODE=%ERROR_CODE% + +if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost +@REM check for post script, once with legacy .bat ending and once with .cmd ending +if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat" +if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd" +:skipRcPost + +@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on' +if "%MAVEN_BATCH_PAUSE%" == "on" pause + +if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE% + +exit /B %ERROR_CODE% \ No newline at end of file diff --git a/spark-3.1-spanner-lib/pom.xml b/spark-3.1-spanner-lib/pom.xml index ff1f16a2..8f57d5f1 100644 --- a/spark-3.1-spanner-lib/pom.xml +++ b/spark-3.1-spanner-lib/pom.xml @@ -28,5 +28,11 @@ com.google.cloud google-cloud-spanner + + junit + junit + 4.13.2 + test + diff --git a/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/DefaultSource.java b/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/DefaultSource.java new file mode 100644 index 00000000..47db4469 --- /dev/null +++ b/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/DefaultSource.java @@ -0,0 +1,17 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.cloud.spark.spanner; + +public class DefaultSource extends Spark31SpannerTableProvider {} diff --git a/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerBaseRelation.java b/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerBaseRelation.java new file mode 100644 index 00000000..b1fa4ab9 --- /dev/null +++ b/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerBaseRelation.java @@ -0,0 +1,81 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.cloud.spark.spanner; + +import java.util.HashMap; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.sources.BaseRelation; +import org.apache.spark.sql.sources.Filter; +import org.apache.spark.sql.types.StructType; + +public class SpannerBaseRelation extends BaseRelation { + private final SQLContext sqlContext; + private final SpannerScanner scan; + private final Dataset dataToWrite; + + public SpannerBaseRelation( + SQLContext sqlContext, + SaveMode mode, + scala.collection.immutable.Map parameters, + Dataset data) { + this.scan = new SpannerScanner(scalaToJavaMap(parameters)); + this.sqlContext = sqlContext; + this.dataToWrite = data; + } + + /* + * needsConversion is a BaseRelation method that returns whether it is needed + * to convert the objects in Row to internal representation, for example: + * java.lang.Decimal to Decimal + * java.lang.String to UTF8String + */ + @Override + public boolean needConversion() { + return false; + } + + @Override + public long sizeInBytes() { + // TODO: Calculate the sizes from the schema's values. + // TODO: Perhaps we can quickly calculate those sizes + // as we construct the table from Cloud Spanner. + return -1; + } + + @Override + public Filter[] unhandledFilters(Filter[] filters) { + // TODO: Implement me. + return null; + } + + @Override + public SQLContext sqlContext() { + return this.sqlContext; + } + + @Override + public StructType schema() { + return this.scan.readSchema(); + } + + private java.util.Map scalaToJavaMap(scala.collection.immutable.Map map) { + java.util.Map result = new HashMap<>(); + map.foreach(entry -> result.put(entry._1(), entry._2())); + return result; + } +} diff --git a/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerRDD.java b/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerRDD.java new file mode 100644 index 00000000..9198371f --- /dev/null +++ b/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerRDD.java @@ -0,0 +1,17 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.cloud.spark.spanner; + +public class SpannerRDD {} diff --git a/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerRelation.java b/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerRelation.java new file mode 100644 index 00000000..bbfd4c66 --- /dev/null +++ b/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerRelation.java @@ -0,0 +1,83 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.cloud.spark.spanner; + +import org.apache.spark.rdd.RDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.sources.Filter; +import org.apache.spark.sql.sources.InsertableRelation; +import org.apache.spark.sql.sources.PrunedFilteredScan; +import org.apache.spark.sql.sources.PrunedScan; +import org.apache.spark.sql.sources.TableScan; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SpannerRelation + implements InsertableRelation, TableScan, PrunedFilteredScan, PrunedScan { + + private final SparkSession session; + private final String table; + private static final Logger log = LoggerFactory.getLogger(SpannerRelation.class); + + public SpannerRelation(String table, SparkSession session) { + this.table = table; + this.session = session; + } + + /* + * This method overrides PrunedFilteredScan.buildScan + */ + @Override + public RDD buildScan(String[] requiredColumns, Filter[] filters) { + // TODO: Implement me. + // 1. Create a SQL query from the table by column names + // and the conjunction of filters by an "AND". + // SpannerTable st = new SpannerTable(this.session.getAll()); + + // 2. Query Cloud Spanner with the constructed SQL query. + // 3. Convert the results to RDD per ResultSet.Row, as we do in SpannerSpark.execute. + // TODO: https://github.com/GoogleCloudDataproc/spark-spanner-connector/issues/45 + log.error("Unimplemented:: buildScan"); + return null; + } + + /* + * This method overrides PrunedScan.buildScan + */ + @Override + public RDD buildScan(String[] requiredColumns) { + // TODO: Implement me. + return this.buildScan(requiredColumns, null); + } + + /* + * This method overrides TableScan.buildScan + */ + @Override + public RDD buildScan() { + return this.buildScan(null, null); + } + + /* + * This method overrides InsertableRelation.insert + */ + @Override + public void insert(Dataset data, boolean overwrite) { + // TODO: Implement me. + log.error("Unimplemented:: insert"); + } +} diff --git a/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerScanBuilder.java b/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerScanBuilder.java new file mode 100644 index 00000000..7be02334 --- /dev/null +++ b/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerScanBuilder.java @@ -0,0 +1,78 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.cloud.spark.spanner; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import org.apache.spark.sql.connector.read.Batch; +import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.PartitionReaderFactory; +import org.apache.spark.sql.connector.read.Scan; +import org.apache.spark.sql.connector.read.ScanBuilder; +import org.apache.spark.sql.connector.read.SupportsPushDownFilters; +import org.apache.spark.sql.sources.Filter; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +/* + * Allows us to implement ScanBuilder. + */ +public class SpannerScanBuilder implements Batch, ScanBuilder, SupportsPushDownFilters { + private CaseInsensitiveStringMap opts; + private Set filters; + private SpannerScanner scanner; + + public SpannerScanBuilder(CaseInsensitiveStringMap options) { + this.opts = opts; + this.filters = new HashSet(); + } + + @Override + public Scan build() { + this.scanner = new SpannerScanner(this.opts); + return this.scanner; + } + + public Batch toBatch() { + return this; + } + + @Override + public Filter[] pushedFilters() { + return this.filters.toArray(new Filter[this.filters.size()]); + } + + @Override + public Filter[] pushFilters(Filter[] filters) { + this.filters.addAll(Arrays.asList(filters)); + return this.filters.toArray(new Filter[this.filters.size()]); + } + + public StructType readSchema() { + return this.scanner.readSchema(); + } + + @Override + public PartitionReaderFactory createReaderFactory() { + return new SpannerPartitionReaderFactory(this.opts); + } + + @Override + public InputPartition[] planInputPartitions() { + // TODO: Fill me in. + return null; + } +} diff --git a/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerScanner.java b/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerScanner.java new file mode 100644 index 00000000..1bf2c544 --- /dev/null +++ b/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerScanner.java @@ -0,0 +1,42 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.cloud.spark.spanner; + +import java.util.Map; +import org.apache.spark.sql.connector.read.Batch; +import org.apache.spark.sql.connector.read.Scan; +import org.apache.spark.sql.types.StructType; + +/* + * SpannerScanner implements Scan. + */ +public class SpannerScanner implements Scan { + private SpannerTable spannerTable; + + public SpannerScanner(Map opts) { + this.spannerTable = new SpannerTable(null, opts); + } + + @Override + public StructType readSchema() { + return this.spannerTable.schema(); + } + + @Override + public Batch toBatch() { + // TODO: Fill me in. + return null; + } +} diff --git a/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerTable.java b/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerTable.java new file mode 100644 index 00000000..55162338 --- /dev/null +++ b/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerTable.java @@ -0,0 +1,183 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.cloud.spark.spanner; + +import com.google.cloud.spanner.ResultSet; +import com.google.cloud.spanner.Statement; +import com.google.cloud.spanner.Struct; +import com.google.cloud.spanner.connection.Connection; +import com.google.common.collect.ImmutableSet; +import java.util.Map; +import java.util.Set; +import org.apache.spark.sql.connector.catalog.SupportsRead; +import org.apache.spark.sql.connector.catalog.Table; +import org.apache.spark.sql.connector.catalog.TableCapability; +import org.apache.spark.sql.connector.read.ScanBuilder; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/* + * SpannerTable implements Table. + */ +public class SpannerTable implements Table, SupportsRead { + private String tableName; + private StructType tableSchema; + private static final ImmutableSet tableCapabilities = + ImmutableSet.of( + TableCapability.BATCH_READ, + TableCapability.BATCH_WRITE, + TableCapability.CONTINUOUS_READ, + TableCapability.TRUNCATE); + + private static final Logger log = LoggerFactory.getLogger(SpannerRelation.class); + + public SpannerTable(StructType providedSchema, Map properties) { + // TODO: Use providedSchema in building the SpannerTable. + try (Connection conn = SpannerUtils.connectionFromProperties(properties)) { + String tableName = properties.get("table"); + if (tableName == null) { + log.error("\"table\" is expecting in properties"); + } + + // 2. Run an information schema query to get the type definition of the table. + Statement stmt = + Statement.newBuilder( + "SELECT COLUMN_NAME, ORDINAL_POSITION, IS_NULLABLE='YES' AS ISNULLABLE, SPANNER_TYPE " + + "FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME=@tableName " + + "ORDER BY ORDINAL_POSITION") + .bind("tableName") + .to(tableName) + .build(); + try (final ResultSet rs = conn.executeQuery(stmt)) { + this.tableSchema = createSchema(tableName, rs); + } + } + } + + public StructType createSchema(String tableName, ResultSet rs) { + this.tableName = tableName; + + Integer columnSize = rs.getColumnCount(); + // Expecting resultset columns in the ordering: + // COLUMN_NAME, ORDINAL_POSITION, IS_NULLABLE, SPANNER_TYPE + // row1: + // ... + // rowN: + StructType schema = new StructType(); + while (rs.next()) { + Struct row = rs.getCurrentRowAsStruct(); + String columnName = row.getString(0); + // Integer ordinalPosition = column.getInt(1); + boolean isNullable = row.getBoolean(2); + DataType catalogType = SpannerTable.ofSpannerStrType(row.getString(3), isNullable); + schema = schema.add(columnName, catalogType, isNullable, "" /* No comments for the text */); + } + this.tableSchema = schema; + return schema; + } + + public static DataType ofSpannerStrType(String spannerStrType, boolean isNullable) { + switch (spannerStrType) { + case "BOOL": + return DataTypes.BooleanType; + + case "BYTES": + return DataTypes.createArrayType(DataTypes.ByteType); + + case "DATE": + return DataTypes.DateType; + + case "FLOAT64": + return DataTypes.DoubleType; + + case "INT64": + return DataTypes.LongType; + + case "JSON": + return DataTypes.createArrayType(DataTypes.StringType); + + case "NUMERIC": + return numericToCatalogDataType; + + case "STRING": + return DataTypes.StringType; + + case "TIMESTAMP": + return DataTypes.TimestampType; + } + + // STRING(MAX), STRING(10) are the correct type + // definitions for STRING in Cloud Spanner. + // Non-composite types like "STRING(N)" and "BYTES(N)" + // can immediately be returned by prefix matching. + if (spannerStrType.indexOf("STRING") == 0) { + return DataTypes.StringType; + } + if (spannerStrType.indexOf("BYTES") == 0) { + return DataTypes.createArrayType(DataTypes.ByteType); + } + + if (spannerStrType.indexOf("ARRAY") == 0) { + // Sample argument: ARRAY + int si = spannerStrType.indexOf("<"); + int se = spannerStrType.lastIndexOf(">"); + String str = spannerStrType.substring(si + 1, se); + // At this point, str=STRING(MAX) or str=ARRAY> + // ARRAY + DataType innerDataType = SpannerTable.ofSpannerStrType(str, isNullable); + return DataTypes.createArrayType(innerDataType, isNullable); + } + + // We are left with "STRUCT" + // TODO: Handle struct field traversal + return DataTypes.NullType; + } + + // Please see https://cloud.google.com/spanner/docs/storing-numeric-data#precision + // We are using (decimalPrecision=38, scale=9) + private static final DataType numericToCatalogDataType = DataTypes.createDecimalType(38, 9); + + @Override + public StructType schema() { + return this.tableSchema; + } + + /* + * Cloud Spanner tables support: + * BATCH_READ + * BATCH_WRITE + * CONTINUOUS_READ + * TRUNCATE + * as capabilities + */ + @Override + public Set capabilities() { + return tableCapabilities; + } + + @Override + public String name() { + return this.tableName; + } + + @Override + public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { + return new SpannerScanBuilder(options); + } +} diff --git a/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerUtils.java b/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerUtils.java new file mode 100644 index 00000000..fef77f40 --- /dev/null +++ b/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerUtils.java @@ -0,0 +1,138 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.cloud.spark.spanner; + +import com.google.cloud.spanner.BatchClient; +import com.google.cloud.spanner.DatabaseId; +import com.google.cloud.spanner.Partition; +import com.google.cloud.spanner.ResultSet; +import com.google.cloud.spanner.Spanner; +import com.google.cloud.spanner.SpannerOptions; +import com.google.cloud.spanner.Struct; +import com.google.cloud.spanner.connection.Connection; +import com.google.cloud.spanner.connection.ConnectionOptions; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SparkSession; + +public class SpannerUtils { + + public static Connection connectionFromProperties(Map properties) { + String connUriPrefix = "cloudspanner:"; + String emulatorHost = properties.get("emulatorHost"); + if (emulatorHost != null) { + connUriPrefix = "cloudspanner://" + emulatorHost; + } + + String spannerUri = + String.format( + connUriPrefix + "/projects/%s/instances/%s/databases/%s", + properties.get("projectId"), + properties.get("instanceId"), + properties.get("databaseId")); + + ConnectionOptions.Builder builder = ConnectionOptions.newBuilder().setUri(spannerUri); + String gcpCredsUrl = properties.get("credentials"); + if (gcpCredsUrl != null) { + builder = builder.setCredentialsUrl(gcpCredsUrl); + } + ConnectionOptions opts = builder.build(); + return opts.getConnection(); + } + + public static BatchClient batchClientFromProperties(Map properties) { + SpannerOptions options = + SpannerOptions.newBuilder().setProjectId(properties.get("projectId")).build(); + Spanner spanner = options.getService(); + return spanner.getBatchClient( + DatabaseId.of( + options.getProjectId(), properties.get("instanceId"), properties.get("databaseId"))); + } + + public static List resultSetToSparkRow(ResultSet rs) { + List rows = new ArrayList(); + while (rs.next()) { + rows.add(resultSetRowToSparkRow(rs)); + } + return rows; + } + + public static Row resultSetRowToSparkRow(ResultSet rs) { + Struct spannerRow = rs.getCurrentRowAsStruct(); + Integer columnCount = rs.getColumnCount(); + List objects = new ArrayList(); + + for (int columnIndex = 0; columnIndex < columnCount; columnIndex++) { + String fieldTypeName = rs.getColumnType(columnIndex).toString(); + + switch (fieldTypeName) { + case "BOOL": + objects.add(spannerRow.getBoolean(columnIndex)); + break; + case "DATE": + objects.add(spannerRow.getDate(columnIndex)); + break; + case "FLOAT64": + objects.add(spannerRow.getDouble(columnIndex)); + break; + case "INT64": + objects.add(spannerRow.getLong(columnIndex)); + break; + case "JSON": + objects.add(spannerRow.getBytes(columnIndex)); + break; + case "NUMERIC": + // TODO: Deal with the precision truncation since Cloud Spanner's precision + // has (precision=38, scale=9) while Apache Spark has (precision=N, scale=M) + objects.add(spannerRow.getBigDecimal(columnIndex)); + break; + case "TIMESTAMP": + objects.add(spannerRow.getTimestamp(columnIndex)); + break; + default: // "ARRAY", "STRUCT" + if (fieldTypeName.indexOf("BYTES") == 0) { + objects.add(spannerRow.getBytes(columnIndex)); + } else if (fieldTypeName.indexOf("STRING") == 0) { + objects.add(spannerRow.getString(columnIndex)); + } else if (fieldTypeName.indexOf("ARRAY") == 0) { + objects.add(spannerRow.getBooleanArray(columnIndex)); + } else if (fieldTypeName.indexOf("ARRAY datasetFromHashMap(SparkSession spark, Map> hm) { + List coalescedRows = new ArrayList(); + hm.values().forEach(coalescedRows::addAll); + Encoder rowEncoder = Encoders.bean(Row.class); + return spark.createDataset(coalescedRows, rowEncoder); + } +} diff --git a/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/Spark31SpannerTableProvider.java b/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/Spark31SpannerTableProvider.java index c277d66f..11bb5794 100644 --- a/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/Spark31SpannerTableProvider.java +++ b/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/Spark31SpannerTableProvider.java @@ -16,33 +16,69 @@ package com.google.cloud.spark.spanner; import java.util.Map; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.connector.catalog.Table; import org.apache.spark.sql.connector.catalog.TableProvider; import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.sources.BaseRelation; +import org.apache.spark.sql.sources.CreatableRelationProvider; import org.apache.spark.sql.sources.DataSourceRegister; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; -public class Spark31SpannerTableProvider implements DataSourceRegister, TableProvider { +public class Spark31SpannerTableProvider + implements CreatableRelationProvider, DataSourceRegister, TableProvider { + /* + * Infers the schema of the table identified by the given options. + */ @Override public StructType inferSchema(CaseInsensitiveStringMap options) { - return null; + SpannerTable st = new SpannerTable(null, options); + return st.schema(); } + /* + * Returns a Table instance with the specified table schema, + * partitioning and properties to perform a read or write. + */ @Override public Table getTable( StructType schema, Transform[] partitioning, Map properties) { - return null; + return new SpannerTable(schema, properties); } + /* + * Returns true if the source has the ability of + * accepting external table metadata when getting tables. + */ @Override public boolean supportsExternalMetadata() { return false; } + /* + * Implements DataSourceRegister.shortName(). This method allows Spark to match + * the DataSource when spark.read(...).format("spanner") is invoked. + */ @Override public String shortName() { - return "spanner"; + return "cloud-spanner"; + } + + /* + * Implements CreateRelationProvider.createRelation which essentially saves + * a DataFrame to the destination using the data-source specific parameters. + */ + @Override + public BaseRelation createRelation( + SQLContext sqlContext, + SaveMode mode, + scala.collection.immutable.Map parameters, + Dataset data) { + return new SpannerBaseRelation(sqlContext, mode, parameters, data); } } diff --git a/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/SpannerTableTest.java b/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/SpannerTableTest.java new file mode 100644 index 00000000..39bf5c25 --- /dev/null +++ b/spark-3.1-spanner-lib/src/test/java/com/google/cloud/spark/spanner/SpannerTableTest.java @@ -0,0 +1,131 @@ +package com.google.cloud.spark; + +import static org.junit.Assert.assertEquals; + +import com.google.api.gax.longrunning.OperationFuture; +import com.google.cloud.spanner.Database; +import com.google.cloud.spanner.DatabaseAdminClient; +import com.google.cloud.spanner.Instance; +import com.google.cloud.spanner.InstanceAdminClient; +import com.google.cloud.spanner.InstanceConfig; +import com.google.cloud.spanner.InstanceId; +import com.google.cloud.spanner.InstanceInfo; +import com.google.cloud.spanner.Spanner; +import com.google.cloud.spanner.SpannerOptions; +import com.google.cloud.spark.spanner.SpannerTable; +import com.google.spanner.admin.database.v1.CreateDatabaseMetadata; +import com.google.spanner.admin.instance.v1.CreateInstanceMetadata; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class SpannerTableTest { + + String databaseId = System.getenv("SPANNER_DATABASE_ID"); + String instanceId = System.getenv("SPANNER_INSTANCE_ID"); + String projectId = System.getenv("SPANNER_PROJECT_ID"); + String emulatorHost = System.getenv("SPANNER_EMULATOR_HOST"); + + DatabaseAdminClient dbAdminClient; + Spanner spanner; + + @Before + public void setUp() throws Exception { + SpannerOptions opts = SpannerOptions.newBuilder().setEmulatorHost(emulatorHost).build(); + spanner = opts.getService(); + // 1. Create the Spanner instance. + // TODO: Skip this process if the instance already exists. + InstanceAdminClient insAdminClient = spanner.getInstanceAdminClient(); + InstanceConfig config = insAdminClient.listInstanceConfigs().iterateAll().iterator().next(); + InstanceInfo insInfo = + InstanceInfo.newBuilder(InstanceId.of(projectId, instanceId)) + .setInstanceConfigId(config.getId()) + .setNodeCount(1) + .setDisplayName("SparkSpanner Test") + .build(); + OperationFuture iop = insAdminClient.createInstance(insInfo); + + try { + iop.get(); + } catch (Exception e) { + if (!e.toString().contains("ALREADY_EXISTS")) { + throw e; + } + } + + dbAdminClient = spanner.getDatabaseAdminClient(); + // 2. Create the database. + // TODO: Skip this process if the database already exists. + OperationFuture dop = + dbAdminClient.createDatabase( + instanceId, + databaseId, + Arrays.asList( + "CREATE TABLE ATable (\n" + + " A INT64 NOT NULL,\n" + + " B STRING(100),\n" + + " C BYTES(MAX),\n" + + " D TIMESTAMP,\n" + + " E NUMERIC,\n" + + " F ARRAY\n" + + ") PRIMARY KEY(A)")); + try { + dop.get(); + } catch (Exception e) { + if (!e.toString().contains("ALREADY_EXISTS")) { + throw e; + } + } + } + + @After + public void teardown() { + spanner.close(); + } + + private Map connectionProperties() { + Map props = new HashMap<>(); + props.put("databaseId", databaseId); + props.put("instanceId", instanceId); + props.put("projectId", projectId); + if (false) { + props.put("emulatorHost", emulatorHost); + } + props.put("table", "ATable"); + return props; + } + + @Test + public void createSchema() { + Map props = connectionProperties(); + SpannerTable st = new SpannerTable(null, props); + StructType actualSchema = st.schema(); + StructType expectSchema = + new StructType( + Arrays.asList( + new StructField("A", DataTypes.LongType, false, null), + new StructField("B", DataTypes.StringType, true, null), + new StructField( + "C", DataTypes.createArrayType(DataTypes.ByteType, true), true, null), + new StructField("D", DataTypes.TimestampType, true, null), + new StructField("E", DataTypes.createDecimalType(38, 9), true, null), + new StructField( + "F", DataTypes.createArrayType(DataTypes.StringType, true), true, null)) + .toArray(new StructField[0])); + + // Object.equals fails for StructType with fields so we'll + // firstly compare lengths, then fieldNames then the simpleString. + assertEquals(expectSchema.length(), actualSchema.length()); + assertEquals(expectSchema.fieldNames(), actualSchema.fieldNames()); + assertEquals(expectSchema.simpleString(), actualSchema.simpleString()); + } +} diff --git a/spark-3.1-spanner/pom.xml b/spark-3.1-spanner/pom.xml index fdfbb043..96cebd53 100644 --- a/spark-3.1-spanner/pom.xml +++ b/spark-3.1-spanner/pom.xml @@ -29,5 +29,12 @@ spark-3.1-spanner-lib ${project.version} + + + org.apache.spark + spark-sql_2.12 + 3.1.1 + +