Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

all: implement Spanner connector #59

Merged
merged 80 commits into from
Sep 23, 2023
Merged

all: implement Spanner connector #59

merged 80 commits into from
Sep 23, 2023

Conversation

odeke-em
Copy link
Collaborator

Allows us to build SQL from filters and required columns.

Updates #58

@odeke-em odeke-em force-pushed the implement-buildScan branch 4 times, most recently from 208e0ee to f44ba06 Compare August 25, 2023 17:37
@odeke-em odeke-em force-pushed the implement-buildScan branch 5 times, most recently from 79dfb48 to a669e9c Compare September 5, 2023 17:22
@halio-g
Copy link
Collaborator

halio-g commented Sep 6, 2023

/gcbrun

3 similar comments
@halio-g
Copy link
Collaborator

halio-g commented Sep 6, 2023

/gcbrun

@halio-g
Copy link
Collaborator

halio-g commented Sep 6, 2023

/gcbrun

@halio-g
Copy link
Collaborator

halio-g commented Sep 12, 2023

/gcbrun

Copy link
Collaborator

@halio-g halio-g left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add an integration test (spark session) one to test it ? Cause last time when I tested the range in the real dataproc job.

"2022-12-31" results in 19357 but really it should be 1672444800000000

Sorry, I misunderstood your question, it should be 19357 since I think spark receive the # of days from 1970-01-01 instead of microseconds.

@odeke-em odeke-em changed the title ScanBuilder: implement buildScan to construct SQL all: implement Spanner connector Sep 19, 2023
@odeke-em
Copy link
Collaborator Author

Can we add an integration test (spark session) one to test it ? Cause last time when I tested the range in the real dataproc job.

"2022-12-31" results in 19357 but really it should be 1672444800000000

Sorry, I misunderstood your question, it should be 19357 since I think spark receive the # of days from 1970-01-01 instead of microseconds.

No worries, over the weekend I reverted to return days since the epoch.

@odeke-em
Copy link
Collaborator Author

@davidrabinowitz @halio-g at current commit, these are the results from running this Python program

from pyspark.sql import SparkSession

def main():
    table = "exchange_rates"
    spark = SparkSession.builder.appName("Query Spanner").getOrCreate()
    df = spark.read.format('cloud-spanner') \
                .option("projectId", PROJECT_ID) \
                .option("instanceId", SPANNER_INSTANCE) \
                .option("databaseId", SPANNER_DATABASE) \
                .option("enableDataBoost", "true") \
                .option("table", table) \
                .load(table)
    df.printSchema()
    df.select("created_at", "value", "base_cur").filter((df["value"] > 3720) & (df["base_cur"] == "USD")).show()

which produces

root
 |-- id: string (nullable = false)
 |-- base_cur: string (nullable = false)
 |-- end_cur: string (nullable = false)
 |-- value: double (nullable = false)
 |-- data_src: string (nullable = false)
 |-- created_at: timestamp (nullable = false)
 |-- published_at: timestamp (nullable = true)

+--------------------+-----------+--------+
|          created_at|      value|base_cur|
+--------------------+-----------+--------+
|2023-08-30 20:56:...|3724.365901|     USD|
|2023-09-09 04:40:...|3738.384066|     USD|
|2023-09-07 05:44:...|3735.182945|     USD|
|2023-08-25 17:00:...|3730.606064|     USD|
|2023-09-15 10:08:...|3724.849835|     USD|
|2023-08-21 18:24:...|3727.353643|     USD|
|2023-08-20 07:52:...|3724.922281|     USD|
|2023-08-23 23:00:...|3738.342818|     USD|
|2023-08-14 10:00:...|3749.635375|     USD|
|2023-09-01 05:48:...|3730.472718|     USD|
|2023-09-15 21:40:...|3730.465981|     USD|
|2023-09-11 16:40:...|3723.400239|     USD|
|2023-09-13 13:16:...|3723.537745|     USD|
|2023-08-19 14:00:...|3724.922281|     USD|
|2023-09-06 02:56:...|3735.927727|     USD|

@halio-g
Copy link
Collaborator

halio-g commented Sep 19, 2023

@odeke-em
Copy link
Collaborator Author

@halio-g the presubmit check now passes all gucci, please take a look again!

@odeke-em odeke-em dismissed davidrabinowitz’s stale review September 19, 2023 19:24

Addressed almost all the comments, please re-review.

README.md Show resolved Hide resolved
examples/SpannerSpark.py Outdated Show resolved Hide resolved
@@ -12,6 +12,7 @@ CREATE TABLE games (
winner STRING(36),
created TIMESTAMP,
finished TIMESTAMP,
max_date DATE,
) PRIMARY KEY(gameUUID);

CREATE TABLE players (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be done in a follow up pr.

Please create table by types, for example:
CREATE TABLE IntTable(
not_null_index INT64 NOT NULL,
int_column INT64,
) PRIMARY KEY(not_null_index);

CREATE TABLE StringTable(
not_null_index STRING<10> NOT NULL,
max_string_col STRING,
max_string_col STRING<100>,
) PRIMARY KEY(not_null_index);

CREATE TABLE TimestampTable(
not_null_index TIMESTAMP NOT NULL,
timestamp_col TIMESTAMP,
) PRIMARY KEY(not_null_index);

Or creating a single table with all the types, for example:
CREATE TABLE IntegrationTestTable(
not_null_index INT64 NOT NULL,
max_string_col STRING,
100_string_col STRING<100>,
int_column INT64,
timestamp_col TIMESTAMP,
) PRIMARY KEY(not_null_index);

CREATE TABLE StringIndexTable(
string_index STRING<20> NOT NULL,
) PRIMARY KEY(string_index);

Keep that functionality in SpannerScanner. While here also
add filters to SpannerScanner to allow filtration.
This change implements a columns selector to reduce the amount
of data returned and the results were confirmed by a live query

```python
df.printSchema()
df.select("created_at", "value", "base_cur")
  .filter((df["value"] > 3720) & (df["base_cur"] == "USD"))
  .show()
```

which produced

```shell
root
 |-- id: string (nullable = false)
 |-- base_cur: string (nullable = false)
 |-- end_cur: string (nullable = false)
 |-- value: double (nullable = false)
 |-- data_src: string (nullable = false)
 |-- created_at: timestamp (nullable = false)
 |-- published_at: timestamp (nullable = true)

+--------------------+-----------+--------+
|          created_at|      value|base_cur|
+--------------------+-----------+--------+
|2023-08-30 20:56:...|3724.365901|     USD|
|2023-09-09 04:40:...|3738.384066|     USD|
|2023-09-07 05:44:...|3735.182945|     USD|
|2023-08-25 17:00:...|3730.606064|     USD|
|2023-09-15 10:08:...|3724.849835|     USD|
|2023-08-21 18:24:...|3727.353643|     USD|
|2023-08-20 07:52:...|3724.922281|     USD|
|2023-08-23 23:00:...|3738.342818|     USD|
|2023-08-14 10:00:...|3749.635375|     USD|
|2023-09-01 05:48:...|3730.472718|     USD|
|2023-09-15 21:40:...|3730.465981|     USD|
|2023-09-11 16:40:...|3723.400239|     USD|
|2023-09-13 13:16:...|3723.537745|     USD|
|2023-08-19 14:00:...|3724.922281|     USD|
|2023-09-06 02:56:...|3735.927727|     USD|
|2023-09-11 13:04:...|3723.400239|     USD|
|2023-09-14 17:56:...|3724.849835|     USD|
|2023-09-07 20:44:...|3739.570874|     USD|
|2023-08-22 07:00:...|3729.171921|     USD|
|2023-08-22 08:20:...|3729.171921|     USD|
+--------------------+-----------+--------+
```
Helps identify the connector to Google Cloud so that
later usage metrics, health checks, quota updates, optimizations
can be trivially made.
Allows Databoost to be disabled; it is on by default
given the point of this connector. However, there is
something to be said about compatibility so that by
default most users who haven't enabled Databoost can
still use it, but that's to be discussed for later.

Fixes #68
While here also fixed up the package path to be fully:

    package com.google.cloud.spark.spanner;

instead of erroneously:

    package com.google.cloud.spark;
…ataboost

Hao reasoned that databoost being enabled should be an opt-in
because it is expensive for customers so that's a good reason
to require it to be explicitly specified by the customer.
This fixes a long standing shutdown failure due to unclosed
Spanner objects.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants