Skip to content

Commit

Permalink
extract and plug-in SpannerTable+ScanBuilder
Browse files Browse the repository at this point in the history
  • Loading branch information
odeke-em committed Jul 28, 2023
1 parent b3aaa94 commit 74427e1
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,20 @@

package com.google.cloud.spark.spanner;

import com.google.cloud.spanner.connection.ConnectionOptions;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

public class SpannerScanBuilder implements ScanBuilder {
public SpannerScanBuilder() {}
private CaseInsensitiveStringMap opts;

public SpannerScanBuilder(CaseInsensitiveStringMap options) {
this.opts = opts;
}

@Override
public Scan build() {
// TODO: Implement me
return new SpannerScanner();
return new SpannerScanner(this.opts);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,26 @@

package com.google.cloud.spark.spanner;

import org.apache.spark.sql.connector.read.Batch;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

public class SpannerScanner implements Scan {
public SpannerScanner() {}
private SpannerTable spannerTable;

public SpannerScanner(CaseInsensitiveStringMap opts) {
this.spannerTable = new SpannerTable(opts);
}

@Override
public StructType readSchema() {
// TODO: Implement me
return this.spannerTable.schema();
}

@Override
public Batch toBatch() {
// TODO: Fill me in.
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.TimestampBound;
import com.google.cloud.spanner.connection.Connection;
import com.google.cloud.spanner.connection.ConnectionOptions;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -42,15 +40,19 @@
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.catalog.SupportsRead;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableProvider;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.connector.read.PartitionReader;
import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.sources.DataSourceRegister;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

public class SpannerSpark implements DataSourceRegister, TableProvider {
public class SpannerSpark implements DataSourceRegister, TableProvider, SupportsRead {
private BatchClient batchClient;
private Map<String, String> properties;

public SpannerSpark(Map<String, String> properties) {
SpannerOptions options = SpannerOptions.newBuilder().build();
Expand All @@ -61,6 +63,7 @@ public SpannerSpark(Map<String, String> properties) {
options.getProjectId(),
properties.get("instanceId"),
properties.get("databaseId")));
this.properties = properties;
}

public Dataset<Row> execute(SparkSession spark, String sqlStmt) {
Expand Down Expand Up @@ -178,38 +181,16 @@ public boolean supportsExternalMetadata() {
@Override
public Table getTable(
StructType schema, Transform[] partitioning, Map<String, String> properties) {
// 1. Create the DatabaseClient with the provided options:
// "instanceId": <INSTANCE_ID>
// "projectId": <PROJECT_ID>
// "databaseId": <DATABASE_ID>
String spannerUri =
String.format(
"cloudspanner:/projects/%s/instances/%s/databases/%s",
properties.get("projectId"),
properties.get("instanceId"),
properties.get("databaseId"));

ConnectionOptions.Builder builder = ConnectionOptions.newBuilder().setUri(spannerUri);
String gcpCredsUrl = properties.get("credentials");
if (gcpCredsUrl != null) {
builder = builder.setCredentialsUrl(gcpCredsUrl);
}
ConnectionOptions opts = builder.build();

try (Connection conn = opts.getConnection()) {
String tableName = properties.get("table");
// 3. Run an information schema query to get the type definition of the table.
Statement stmt =
Statement.newBuilder(
"SELECT COLUMN_NAME, ORDINAL_POSITION, IS_NULLABLE, SPANNER_TYPE "
+ "FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME=@tableName "
+ "ORDER BY ORDINAL_POSITION")
.bind("tableName")
.to(tableName)
.build();
try (final ResultSet rs = conn.executeQuery(stmt)) {
return new SpannerTable(tableName, rs);
}
}
return new SpannerTable(properties);
}

@Override
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
return new SpannerScanBuilder(options);
}

@Override
public Map<String, String> properties() {
return this.properties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@

package com.google.cloud.spark.spanner;

import com.google.cloud.spanner.connection.Connection;
import com.google.cloud.spanner.connection.ConnectionOptions;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.Struct;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.sql.connector.catalog.Table;
Expand All @@ -28,7 +32,39 @@ public class SpannerTable implements Table {
private String tableName;
private StructType tableSchema;

public SpannerTable(String tableName, ResultSet rs) {
public SpannerTable(Map<String, String> properties) {
String spannerUri =
String.format(
"cloudspanner:/projects/%s/instances/%s/databases/%s",
properties.get("projectId"),
properties.get("instanceId"),
properties.get("databaseId"));

ConnectionOptions.Builder builder = ConnectionOptions.newBuilder().setUri(spannerUri);
String gcpCredsUrl = properties.get("credentials");
if (gcpCredsUrl != null) {
builder = builder.setCredentialsUrl(gcpCredsUrl);
}
ConnectionOptions opts = builder.build();

try (Connection conn = opts.getConnection()) {
String tableName = properties.get("table");
// 3. Run an information schema query to get the type definition of the table.
Statement stmt =
Statement.newBuilder(
"SELECT COLUMN_NAME, ORDINAL_POSITION, IS_NULLABLE, SPANNER_TYPE "
+ "FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME=@tableName "
+ "ORDER BY ORDINAL_POSITION")
.bind("tableName")
.to(tableName)
.build();
try (final ResultSet rs = conn.executeQuery(stmt)) {
this.tableSchema = createSchema(tableName, rs);
}
}
}

public StructType createSchema(String tableName, ResultSet rs) {
this.tableName = tableName;

Integer columnSize = rs.getColumnCount();
Expand Down

0 comments on commit 74427e1

Please sign in to comment.