-
Notifications
You must be signed in to change notification settings - Fork 134
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Provide hybrid scan setting for consistency requirement (#1819)
* Add UT for file index Signed-off-by: Chen Dai <[email protected]> * Add hybrid scan config and IT Signed-off-by: Chen Dai <[email protected]> * Implement select file logic for hybrid scan mode Signed-off-by: Chen Dai <[email protected]> * Add IT Signed-off-by: Chen Dai <[email protected]> --------- Signed-off-by: Chen Dai <[email protected]>
- Loading branch information
Showing
7 changed files
with
235 additions
and
24 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
126 changes: 126 additions & 0 deletions
126
...src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndexSuite.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.flint.spark.skipping | ||
|
||
import org.apache.hadoop.fs.{FileStatus, Path} | ||
import org.mockito.ArgumentMatchers.any | ||
import org.mockito.Mockito.when | ||
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.FILE_PATH_COLUMN | ||
import org.scalatest.matchers.should.Matchers | ||
import org.scalatestplus.mockito.MockitoSugar.mock | ||
|
||
import org.apache.spark.FlintSuite | ||
import org.apache.spark.sql.{Column, DataFrame, Row} | ||
import org.apache.spark.sql.catalyst.InternalRow | ||
import org.apache.spark.sql.catalyst.expressions.{Literal, Predicate} | ||
import org.apache.spark.sql.execution.datasources.{FileIndex, PartitionDirectory} | ||
import org.apache.spark.sql.functions.col | ||
import org.apache.spark.sql.types._ | ||
|
||
class FlintSparkSkippingFileIndexSuite extends FlintSuite with Matchers { | ||
|
||
/** Test source partition data. */ | ||
private val partition1 = "partition-1" -> Seq("file-1", "file-2") | ||
private val partition2 = "partition-2" -> Seq("file-3") | ||
|
||
/** Test index data schema. */ | ||
private val schema = Map((FILE_PATH_COLUMN, StringType), ("year", IntegerType)) | ||
|
||
test("should keep files returned from index") { | ||
assertFlintFileIndex() | ||
.withSourceFiles(Map(partition1)) | ||
.withIndexData(schema, Seq(Row("file-1", 2023), Row("file-2", 2022))) | ||
.withIndexFilter(col("year") === 2023) | ||
.shouldScanSourceFiles(Map("partition-1" -> Seq("file-1"))) | ||
} | ||
|
||
test("should keep files of multiple partitions returned from index") { | ||
assertFlintFileIndex() | ||
.withSourceFiles(Map(partition1, partition2)) | ||
.withIndexData(schema, Seq(Row("file-1", 2023), Row("file-2", 2022), Row("file-3", 2023))) | ||
.withIndexFilter(col("year") === 2023) | ||
.shouldScanSourceFiles(Map("partition-1" -> Seq("file-1"), "partition-2" -> Seq("file-3"))) | ||
} | ||
|
||
test("should skip unknown source files by default") { | ||
assertFlintFileIndex() | ||
.withSourceFiles(Map(partition1)) | ||
.withIndexData( | ||
schema, | ||
Seq(Row("file-1", 2023)) // file-2 is not refreshed to index yet | ||
) | ||
.withIndexFilter(col("year") === 2023) | ||
.shouldScanSourceFiles(Map("partition-1" -> Seq("file-1"))) | ||
} | ||
|
||
test("should not skip unknown source files in hybrid-scan mode") { | ||
withHybridScanEnabled { | ||
assertFlintFileIndex() | ||
.withSourceFiles(Map(partition1)) | ||
.withIndexData( | ||
schema, | ||
Seq(Row("file-1", 2023)) // file-2 is not refreshed to index yet | ||
) | ||
.withIndexFilter(col("year") === 2023) | ||
.shouldScanSourceFiles(Map("partition-1" -> Seq("file-1", "file-2"))) | ||
} | ||
} | ||
|
||
test("should not skip unknown source files of multiple partitions in hybrid-scan mode") { | ||
withHybridScanEnabled { | ||
assertFlintFileIndex() | ||
.withSourceFiles(Map(partition1, partition2)) | ||
.withIndexData( | ||
schema, | ||
Seq(Row("file-1", 2023)) // file-2 is not refreshed to index yet | ||
) | ||
.withIndexFilter(col("year") === 2023) | ||
.shouldScanSourceFiles( | ||
Map("partition-1" -> Seq("file-1", "file-2"), "partition-2" -> Seq("file-3"))) | ||
} | ||
} | ||
|
||
private def assertFlintFileIndex(): AssertionHelper = { | ||
new AssertionHelper | ||
} | ||
|
||
private class AssertionHelper { | ||
private val baseFileIndex = mock[FileIndex] | ||
private var indexScan: DataFrame = _ | ||
private var indexFilter: Predicate = _ | ||
|
||
def withSourceFiles(partitions: Map[String, Seq[String]]): AssertionHelper = { | ||
when(baseFileIndex.listFiles(any(), any())) | ||
.thenReturn(mockPartitions(partitions)) | ||
this | ||
} | ||
|
||
def withIndexData(columns: Map[String, DataType], data: Seq[Row]): AssertionHelper = { | ||
val schema = StructType(columns.map { case (colName, colType) => | ||
StructField(colName, colType, nullable = false) | ||
}.toSeq) | ||
indexScan = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) | ||
this | ||
} | ||
|
||
def withIndexFilter(pred: Column): AssertionHelper = { | ||
indexFilter = pred.expr.asInstanceOf[Predicate] | ||
this | ||
} | ||
|
||
def shouldScanSourceFiles(partitions: Map[String, Seq[String]]): Unit = { | ||
val fileIndex = FlintSparkSkippingFileIndex(baseFileIndex, indexScan, indexFilter) | ||
fileIndex.listFiles(Seq.empty, Seq.empty) shouldBe mockPartitions(partitions) | ||
} | ||
|
||
private def mockPartitions(partitions: Map[String, Seq[String]]): Seq[PartitionDirectory] = { | ||
partitions.map { case (partitionName, filePaths) => | ||
val files = filePaths.map(path => new FileStatus(0, false, 0, 0, 0, new Path(path))) | ||
PartitionDirectory(InternalRow(Literal(partitionName)), files) | ||
}.toSeq | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters