From 3804b28848522e625ac9aa98e2f77cd56a80d814 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Thu, 7 Sep 2023 20:43:20 +0300 Subject: [PATCH] Use pushed down filters in planInputPartitions --- .../com/google/cloud/spark/spanner/SpannerScanBuilder.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerScanBuilder.java b/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerScanBuilder.java index 67511d79..e029cce8 100644 --- a/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerScanBuilder.java +++ b/spark-3.1-spanner-lib/src/main/java/com/google/cloud/spark/spanner/SpannerScanBuilder.java @@ -83,6 +83,10 @@ public InputPartition[] planInputPartitions() { // TODO: Receive the columns and filters that were pushed down. BatchClientWithCloser batchClient = SpannerUtils.batchClientFromProperties(this.opts); String sqlStmt = "SELECT * FROM " + this.opts.get("table"); + Filter[] filters = this.pushedFilters(); + if (filters.length > 0) { + sqlStmt += " WHERE " + SparkFilterUtils.getCompiledFilter(true, filters); + } try (BatchReadOnlyTransaction txn = batchClient.batchClient.batchReadOnlyTransaction(TimestampBound.strong())) { String mapAsJSON = SpannerUtils.serializeMap(this.opts.asCaseSensitiveMap());