From 73251374bd6e24d797ebaa0b309a75dace418cd0 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Thu, 27 Jul 2023 21:21:57 -0700 Subject: [PATCH] extract and plug-in SpannerTable+ScanBuilder --- .../spark/spanner/SpannerScanBuilder.java | 11 ++-- .../cloud/spark/spanner/SpannerScanner.java | 10 ++-- .../cloud/spark/spanner/SpannerSpark.java | 53 ++++++------------- .../cloud/spark/spanner/SpannerTable.java | 38 ++++++++++++- 4 files changed, 69 insertions(+), 43 deletions(-) diff --git a/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerScanBuilder.java b/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerScanBuilder.java index cd73448e..d89e53cb 100644 --- a/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerScanBuilder.java +++ b/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerScanBuilder.java @@ -14,15 +14,20 @@ package com.google.cloud.spark.spanner; +import com.google.cloud.spanner.connection.ConnectionOptions; import org.apache.spark.sql.connector.read.Scan; import org.apache.spark.sql.connector.read.ScanBuilder; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; public class SpannerScanBuilder implements ScanBuilder { - public SpannerScanBuilder() {} + private CaseInsensitiveStringMap opts; + + public SpannerScanBuilder(CaseInsensitiveStringMap options) { + this.opts = opts; + } @Override public Scan build() { - // TODO: Implement me - return new SpannerScanner(); + return new SpannerScanner(this.opts); } } diff --git a/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerScanner.java b/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerScanner.java index 75a3acf8..e3d51658 100644 --- a/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerScanner.java +++ b/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerScanner.java @@ -16,13 +16,17 @@ import org.apache.spark.sql.connector.read.Scan; import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; public class SpannerScanner implements Scan { - public SpannerScanner() {} + private SpannerTable spannerTable; + + public SpannerScanner(CaseInsensitiveStringMap opts) { + this.spannerTable = new SpannerTable(opts); + } @Override public StructType readSchema() { - // TODO: Implement me - return null; + return this.spannerTable.schema(); } } diff --git a/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerSpark.java b/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerSpark.java index 1a47a6a9..e40b5497 100644 --- a/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerSpark.java +++ b/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerSpark.java @@ -26,8 +26,6 @@ import com.google.cloud.spanner.Statement; import com.google.cloud.spanner.Struct; import com.google.cloud.spanner.TimestampBound; -import com.google.cloud.spanner.connection.Connection; -import com.google.cloud.spanner.connection.ConnectionOptions; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -42,15 +40,19 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.catalog.SupportsRead; import org.apache.spark.sql.connector.catalog.Table; import org.apache.spark.sql.connector.catalog.TableProvider; import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.connector.read.PartitionReader; +import org.apache.spark.sql.connector.read.ScanBuilder; import org.apache.spark.sql.sources.DataSourceRegister; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; -public class SpannerSpark implements DataSourceRegister, TableProvider { +public class SpannerSpark implements DataSourceRegister, TableProvider, SupportsRead { private BatchClient batchClient; + private Map properties; public SpannerSpark(Map properties) { SpannerOptions options = SpannerOptions.newBuilder().build(); @@ -61,6 +63,7 @@ public SpannerSpark(Map properties) { options.getProjectId(), properties.get("instanceId"), properties.get("databaseId"))); + this.properties = properties; } public Dataset execute(SparkSession spark, String sqlStmt) { @@ -178,38 +181,16 @@ public boolean supportsExternalMetadata() { @Override public Table getTable( StructType schema, Transform[] partitioning, Map properties) { - // 1. Create the DatabaseClient with the provided options: - // "instanceId": - // "projectId": - // "databaseId": - String spannerUri = - String.format( - "cloudspanner:/projects/%s/instances/%s/databases/%s", - properties.get("projectId"), - properties.get("instanceId"), - properties.get("databaseId")); - - ConnectionOptions.Builder builder = ConnectionOptions.newBuilder().setUri(spannerUri); - String gcpCredsUrl = properties.get("credentials"); - if (gcpCredsUrl != null) { - builder = builder.setCredentialsUrl(gcpCredsUrl); - } - ConnectionOptions opts = builder.build(); - - try (Connection conn = opts.getConnection()) { - String tableName = properties.get("table"); - // 3. Run an information schema query to get the type definition of the table. - Statement stmt = - Statement.newBuilder( - "SELECT COLUMN_NAME, ORDINAL_POSITION, IS_NULLABLE, SPANNER_TYPE " - + "FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME=@tableName " - + "ORDER BY ORDINAL_POSITION") - .bind("tableName") - .to(tableName) - .build(); - try (final ResultSet rs = conn.executeQuery(stmt)) { - return new SpannerTable(tableName, rs); - } - } + return new SpannerTable(properties); + } + + @Override + public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { + return new SpannerScanBuilder(options); + } + + @Override + public Map properties() { + return this.properties; } } diff --git a/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerTable.java b/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerTable.java index 3fef7676..fba0f098 100644 --- a/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerTable.java +++ b/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerTable.java @@ -14,8 +14,12 @@ package com.google.cloud.spark.spanner; +import com.google.cloud.spanner.connection.Connection; +import com.google.cloud.spanner.connection.ConnectionOptions; import com.google.cloud.spanner.ResultSet; +import com.google.cloud.spanner.Statement; import com.google.cloud.spanner.Struct; +import java.util.Map; import java.util.Set; import org.apache.commons.lang3.StringUtils; import org.apache.spark.sql.connector.catalog.Table; @@ -28,7 +32,39 @@ public class SpannerTable implements Table { private String tableName; private StructType tableSchema; - public SpannerTable(String tableName, ResultSet rs) { + public SpannerTable(Map properties) { + String spannerUri = + String.format( + "cloudspanner:/projects/%s/instances/%s/databases/%s", + properties.get("projectId"), + properties.get("instanceId"), + properties.get("databaseId")); + + ConnectionOptions.Builder builder = ConnectionOptions.newBuilder().setUri(spannerUri); + String gcpCredsUrl = properties.get("credentials"); + if (gcpCredsUrl != null) { + builder = builder.setCredentialsUrl(gcpCredsUrl); + } + ConnectionOptions opts = builder.build(); + + try (Connection conn = opts.getConnection()) { + String tableName = properties.get("table"); + // 3. Run an information schema query to get the type definition of the table. + Statement stmt = + Statement.newBuilder( + "SELECT COLUMN_NAME, ORDINAL_POSITION, IS_NULLABLE, SPANNER_TYPE " + + "FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME=@tableName " + + "ORDER BY ORDINAL_POSITION") + .bind("tableName") + .to(tableName) + .build(); + try (final ResultSet rs = conn.executeQuery(stmt)) { + this.tableSchema = createSchema(tableName, rs); + } + } + } + + public StructType createSchema(String tableName, ResultSet rs) { this.tableName = tableName; Integer columnSize = rs.getColumnCount();