diff --git a/flint/docs/index.md b/flint/docs/index.md index 81761d63d4..0a6453d999 100644 --- a/flint/docs/index.md +++ b/flint/docs/index.md @@ -219,6 +219,7 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i IMMEDIATE(true), WAIT_UNTIL(wait_for)] - `spark.datasource.flint.read.scroll_size`: default value is 100. - `spark.flint.optimizer.enabled`: default is true. +- `spark.flint.index.hybridscan.enabled`: default is false. #### Data Type Mapping diff --git a/flint/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 b/flint/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 index 383b9d63ba..75f71f3ead 100644 --- a/flint/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 +++ b/flint/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 @@ -19,7 +19,12 @@ statement ; skippingIndexStatement - : dropSkippingIndexStatement + : describeSkippingIndexStatement + | dropSkippingIndexStatement + ; + +describeSkippingIndexStatement + : (DESC | DESCRIBE) SKIPPING INDEX ON tableName=multipartIdentifier ; dropSkippingIndexStatement diff --git a/flint/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 b/flint/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 index e0c579e6f6..6836b7cacd 100644 --- a/flint/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 +++ b/flint/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 @@ -113,6 +113,8 @@ SKIPPING : 'SKIPPING'; SEMICOLON: ';'; +DESC: 'DESC'; +DESCRIBE: 'DESCRIBE'; DOT: '.'; DROP: 'DROP'; INDEX: 'INDEX'; diff --git a/flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala b/flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala index da171eb26e..efa89b1612 100644 --- a/flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala +++ b/flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala @@ -92,6 +92,10 @@ object FlintSparkConf { val OPTIMIZER_RULE_ENABLED = FlintConfig("spark.flint.optimizer.enabled") .doc("Enable Flint optimizer rule for query rewrite with Flint index") .createWithDefault("true") + + val HYBRID_SCAN_ENABLED = FlintConfig("spark.flint.index.hybridscan.enabled") + .doc("Enable hybrid scan to include latest source data not refreshed to index yet") + .createWithDefault("false") } /** @@ -114,6 +118,8 @@ class FlintSparkConf(properties: JMap[String, String]) extends Serializable { def isOptimizerEnabled: Boolean = OPTIMIZER_RULE_ENABLED.readFrom(reader).toBoolean + def isHybridScanEnabled: Boolean = HYBRID_SCAN_ENABLED.readFrom(reader).toBoolean + /** * spark.sql.session.timeZone */ diff --git a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala index dab01eefc3..b7ea2fa609 100644 --- a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala +++ b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala @@ -6,9 +6,9 @@ package org.opensearch.flint.spark.skipping import org.opensearch.flint.spark.FlintSpark -import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE} +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, SKIPPING_INDEX_TYPE} -import org.apache.spark.sql.{Column, DataFrame} +import org.apache.spark.sql.DataFrame import org.apache.spark.sql.catalyst.expressions.{And, Predicate} import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule @@ -38,7 +38,7 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan] val index = flint.describeIndex(indexName) if (index.exists(_.kind == SKIPPING_INDEX_TYPE)) { val skippingIndex = index.get.asInstanceOf[FlintSparkSkippingIndex] - val indexPred = rewriteToIndexPredicate(skippingIndex, condition) + val indexFilter = rewriteToIndexFilter(skippingIndex, condition) /* * Replace original file index with Flint skipping file index: @@ -47,9 +47,9 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan] * |- HadoopFsRelation * |- FileIndex <== replaced with FlintSkippingFileIndex */ - if (indexPred.isDefined) { - val filterByIndex = buildFilterIndexQuery(skippingIndex, indexPred.get) - val fileIndex = new FlintSparkSkippingFileIndex(location, filterByIndex) + if (indexFilter.isDefined) { + val indexScan = buildIndexScan(skippingIndex) + val fileIndex = FlintSparkSkippingFileIndex(location, indexScan, indexFilter.get) val indexRelation = baseRelation.copy(location = fileIndex)(baseRelation.sparkSession) filter.copy(child = relation.copy(relation = indexRelation)) } else { @@ -60,7 +60,7 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan] } } - private def rewriteToIndexPredicate( + private def rewriteToIndexFilter( index: FlintSparkSkippingIndex, condition: Predicate): Option[Predicate] = { @@ -71,15 +71,9 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan] .reduceOption(And(_, _)) } - private def buildFilterIndexQuery( - index: FlintSparkSkippingIndex, - rewrittenPredicate: Predicate): DataFrame = { - - // Get file list based on the rewritten predicates on index data + private def buildIndexScan(index: FlintSparkSkippingIndex): DataFrame = { flint.spark.read .format(FLINT_DATASOURCE) .load(index.name()) - .filter(new Column(rewrittenPredicate)) - .select(FILE_PATH_COLUMN) } } diff --git a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala index 7df222d8ee..ac954e7bb3 100644 --- a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala +++ b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala @@ -6,10 +6,13 @@ package org.opensearch.flint.spark.skipping import org.apache.hadoop.fs.{FileStatus, Path} +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.FILE_PATH_COLUMN -import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.{Column, DataFrame} +import org.apache.spark.sql.catalyst.expressions.{Expression, Predicate} import org.apache.spark.sql.execution.datasources.{FileIndex, PartitionDirectory} +import org.apache.spark.sql.flint.config.FlintSparkConf +import org.apache.spark.sql.functions.isnull import org.apache.spark.sql.types.StructType /** @@ -17,22 +20,29 @@ import org.apache.spark.sql.types.StructType * * @param baseFileIndex * original file index - * @param filterByIndex - * pushed down filtering on index data + * @param indexScan + * query skipping index DF with pushed down filters */ -case class FlintSparkSkippingFileIndex(baseFileIndex: FileIndex, filterByIndex: DataFrame) +case class FlintSparkSkippingFileIndex( + baseFileIndex: FileIndex, + indexScan: DataFrame, + indexFilter: Predicate) extends FileIndex { override def listFiles( partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { + // TODO: make this listFile call only in hybrid scan mode + val partitions = baseFileIndex.listFiles(partitionFilters, dataFilters) val selectedFiles = - filterByIndex.collect - .map(_.getString(0)) - .toSet + if (FlintSparkConf().isHybridScanEnabled) { + selectFilesFromIndexAndSource(partitions) + } else { + selectFilesFromIndexOnly() + } - val partitions = baseFileIndex.listFiles(partitionFilters, dataFilters) + // Keep partition files present in selected file list above partitions .map(p => p.copy(files = p.files.filter(f => isFileNotSkipped(selectedFiles, f)))) .filter(p => p.files.nonEmpty) @@ -48,6 +58,44 @@ case class FlintSparkSkippingFileIndex(baseFileIndex: FileIndex, filterByIndex: override def partitionSchema: StructType = baseFileIndex.partitionSchema + /* + * Left join source partitions and index data to keep unknown source files: + * Express the logic in SQL: + * SELECT left.file_path + * FROM partitions AS left + * LEFT JOIN indexScan AS right + * ON left.file_path = right.file_path + * WHERE right.file_path IS NULL + * OR [indexFilter] + */ + private def selectFilesFromIndexAndSource(partitions: Seq[PartitionDirectory]): Set[String] = { + val sparkSession = indexScan.sparkSession + import sparkSession.implicits._ + + partitions + .flatMap(_.files.map(f => f.getPath.toUri.toString)) + .toDF(FILE_PATH_COLUMN) + .join(indexScan, Seq(FILE_PATH_COLUMN), "left") + .filter(isnull(indexScan(FILE_PATH_COLUMN)) || new Column(indexFilter)) + .select(FILE_PATH_COLUMN) + .collect() + .map(_.getString(0)) + .toSet + } + + /* + * Consider file paths in index data alone. In this case, index filter can be pushed down + * to index store. + */ + private def selectFilesFromIndexOnly(): Set[String] = { + indexScan + .filter(new Column(indexFilter)) + .select(FILE_PATH_COLUMN) + .collect + .map(_.getString(0)) + .toSet + } + private def isFileNotSkipped(selectedFiles: Set[String], f: FileStatus) = { selectedFiles.contains(f.getPath.toUri.toString) } diff --git a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala index aef18051ba..4560b60b65 100644 --- a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala +++ b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala @@ -5,26 +5,47 @@ package org.opensearch.flint.spark.sql +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName -import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.DropSkippingIndexStatementContext +import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{DescribeSkippingIndexStatementContext, DropSkippingIndexStatementContext} +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.plans.logical.Command +import org.apache.spark.sql.types.StringType /** * Flint Spark AST builder that builds Spark command for Flint index statement. */ class FlintSparkSqlAstBuilder extends FlintSparkSqlExtensionsBaseVisitor[Command] { - override def visitDropSkippingIndexStatement( - ctx: DropSkippingIndexStatementContext): Command = { - FlintSparkSqlCommand { flint => + override def visitDescribeSkippingIndexStatement( + ctx: DescribeSkippingIndexStatementContext): Command = { + val outputSchema = Seq( + AttributeReference("indexed_col_name", StringType, nullable = false)(), + AttributeReference("data_type", StringType, nullable = false)(), + AttributeReference("skip_type", StringType, nullable = false)()) + + FlintSparkSqlCommand(outputSchema) { flint => + val indexName = getSkippingIndexName(ctx.tableName.getText) + flint + .describeIndex(indexName) + .map { case index: FlintSparkSkippingIndex => + index.indexedColumns.map(strategy => + Row(strategy.columnName, strategy.columnType, strategy.kind.toString)) + } + .getOrElse(Seq.empty) + } + } + + override def visitDropSkippingIndexStatement(ctx: DropSkippingIndexStatementContext): Command = + FlintSparkSqlCommand() { flint => val tableName = ctx.tableName.getText // TODO: handle schema name val indexName = getSkippingIndexName(tableName) flint.deleteIndex(indexName) Seq.empty } - } override def aggregateResult(aggregate: Command, nextResult: Command): Command = - if (nextResult != null) nextResult else aggregate; + if (nextResult != null) nextResult else aggregate } diff --git a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlCommand.scala b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlCommand.scala index ca39a293c0..19b3698681 100644 --- a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlCommand.scala +++ b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlCommand.scala @@ -8,6 +8,7 @@ package org.opensearch.flint.spark.sql import org.opensearch.flint.spark.FlintSpark import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.command.LeafRunnableCommand /** @@ -19,7 +20,12 @@ import org.apache.spark.sql.execution.command.LeafRunnableCommand * @param block * code block that triggers Flint core API */ -case class FlintSparkSqlCommand(block: FlintSpark => Seq[Row]) extends LeafRunnableCommand { +case class FlintSparkSqlCommand(override val output: Seq[Attribute] = Seq.empty)( + block: FlintSpark => Seq[Row]) + extends LeafRunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = block(new FlintSpark(sparkSession)) + + // Lazy arguments are required to specify here + override protected def otherCopyArgs: Seq[AnyRef] = block :: Nil } diff --git a/flint/flint-spark-integration/src/test/scala/org/apache/spark/FlintSuite.scala b/flint/flint-spark-integration/src/test/scala/org/apache/spark/FlintSuite.scala index 451c7e5dd6..ee8a52d968 100644 --- a/flint/flint-spark-integration/src/test/scala/org/apache/spark/FlintSuite.scala +++ b/flint/flint-spark-integration/src/test/scala/org/apache/spark/FlintSuite.scala @@ -10,6 +10,7 @@ import org.opensearch.flint.spark.FlintSparkExtensions import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation import org.apache.spark.sql.flint.config.FlintConfigEntry +import org.apache.spark.sql.flint.config.FlintSparkConf.HYBRID_SCAN_ENABLED import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -34,4 +35,13 @@ trait FlintSuite extends SharedSparkSession { protected def setFlintSparkConf[T](config: FlintConfigEntry[T], value: Any): Unit = { spark.conf.set(config.key, value.toString) } + + protected def withHybridScanEnabled(block: => Unit): Unit = { + setFlintSparkConf(HYBRID_SCAN_ENABLED, "true") + try { + block + } finally { + setFlintSparkConf(HYBRID_SCAN_ENABLED, "false") + } + } } diff --git a/flint/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndexSuite.scala b/flint/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndexSuite.scala new file mode 100644 index 0000000000..d2ef72158d --- /dev/null +++ b/flint/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndexSuite.scala @@ -0,0 +1,126 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.skipping + +import org.apache.hadoop.fs.{FileStatus, Path} +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito.when +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.FILE_PATH_COLUMN +import org.scalatest.matchers.should.Matchers +import org.scalatestplus.mockito.MockitoSugar.mock + +import org.apache.spark.FlintSuite +import org.apache.spark.sql.{Column, DataFrame, Row} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Literal, Predicate} +import org.apache.spark.sql.execution.datasources.{FileIndex, PartitionDirectory} +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.types._ + +class FlintSparkSkippingFileIndexSuite extends FlintSuite with Matchers { + + /** Test source partition data. */ + private val partition1 = "partition-1" -> Seq("file-1", "file-2") + private val partition2 = "partition-2" -> Seq("file-3") + + /** Test index data schema. */ + private val schema = Map((FILE_PATH_COLUMN, StringType), ("year", IntegerType)) + + test("should keep files returned from index") { + assertFlintFileIndex() + .withSourceFiles(Map(partition1)) + .withIndexData(schema, Seq(Row("file-1", 2023), Row("file-2", 2022))) + .withIndexFilter(col("year") === 2023) + .shouldScanSourceFiles(Map("partition-1" -> Seq("file-1"))) + } + + test("should keep files of multiple partitions returned from index") { + assertFlintFileIndex() + .withSourceFiles(Map(partition1, partition2)) + .withIndexData(schema, Seq(Row("file-1", 2023), Row("file-2", 2022), Row("file-3", 2023))) + .withIndexFilter(col("year") === 2023) + .shouldScanSourceFiles(Map("partition-1" -> Seq("file-1"), "partition-2" -> Seq("file-3"))) + } + + test("should skip unknown source files by default") { + assertFlintFileIndex() + .withSourceFiles(Map(partition1)) + .withIndexData( + schema, + Seq(Row("file-1", 2023)) // file-2 is not refreshed to index yet + ) + .withIndexFilter(col("year") === 2023) + .shouldScanSourceFiles(Map("partition-1" -> Seq("file-1"))) + } + + test("should not skip unknown source files in hybrid-scan mode") { + withHybridScanEnabled { + assertFlintFileIndex() + .withSourceFiles(Map(partition1)) + .withIndexData( + schema, + Seq(Row("file-1", 2023)) // file-2 is not refreshed to index yet + ) + .withIndexFilter(col("year") === 2023) + .shouldScanSourceFiles(Map("partition-1" -> Seq("file-1", "file-2"))) + } + } + + test("should not skip unknown source files of multiple partitions in hybrid-scan mode") { + withHybridScanEnabled { + assertFlintFileIndex() + .withSourceFiles(Map(partition1, partition2)) + .withIndexData( + schema, + Seq(Row("file-1", 2023)) // file-2 is not refreshed to index yet + ) + .withIndexFilter(col("year") === 2023) + .shouldScanSourceFiles( + Map("partition-1" -> Seq("file-1", "file-2"), "partition-2" -> Seq("file-3"))) + } + } + + private def assertFlintFileIndex(): AssertionHelper = { + new AssertionHelper + } + + private class AssertionHelper { + private val baseFileIndex = mock[FileIndex] + private var indexScan: DataFrame = _ + private var indexFilter: Predicate = _ + + def withSourceFiles(partitions: Map[String, Seq[String]]): AssertionHelper = { + when(baseFileIndex.listFiles(any(), any())) + .thenReturn(mockPartitions(partitions)) + this + } + + def withIndexData(columns: Map[String, DataType], data: Seq[Row]): AssertionHelper = { + val schema = StructType(columns.map { case (colName, colType) => + StructField(colName, colType, nullable = false) + }.toSeq) + indexScan = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) + this + } + + def withIndexFilter(pred: Column): AssertionHelper = { + indexFilter = pred.expr.asInstanceOf[Predicate] + this + } + + def shouldScanSourceFiles(partitions: Map[String, Seq[String]]): Unit = { + val fileIndex = FlintSparkSkippingFileIndex(baseFileIndex, indexScan, indexFilter) + fileIndex.listFiles(Seq.empty, Seq.empty) shouldBe mockPartitions(partitions) + } + + private def mockPartitions(partitions: Map[String, Seq[String]]): Seq[PartitionDirectory] = { + partitions.map { case (partitionName, filePaths) => + val files = filePaths.map(path => new FileStatus(0, false, 0, 0, 0, new Path(path))) + PartitionDirectory(InternalRow(Literal(partitionName)), files) + }.toSeq + } + } +} diff --git a/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index af7605d956..068100e814 100644 --- a/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -303,6 +303,32 @@ class FlintSparkSkippingIndexITSuite hasIndexFilter(col("MinMax_age_0") <= 25 && col("MinMax_age_1") >= 25)) } + test("should rewrite applicable query to scan latest source files in hybrid scan mode") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("month") + .create() + flint.refreshIndex(testIndex, FULL) + + // Generate a new source file which is not in index data + sql(s""" + | INSERT INTO $testTable + | PARTITION (year=2023, month=4) + | VALUES ('Hello', 35, 'Vancouver') + | """.stripMargin) + + withHybridScanEnabled { + val query = sql(s""" + | SELECT address + | FROM $testTable + | WHERE month = 4 + |""".stripMargin) + + checkAnswer(query, Seq(Row("Seattle"), Row("Vancouver"))) + } + } + test("should return empty if describe index not exist") { flint.describeIndex("non-exist") shouldBe empty } @@ -335,7 +361,7 @@ class FlintSparkSkippingIndexITSuite // Custom matcher to check if FlintSparkSkippingFileIndex has expected filter condition def hasIndexFilter(expect: Column): Matcher[FlintSparkSkippingFileIndex] = { Matcher { (fileIndex: FlintSparkSkippingFileIndex) => - val plan = fileIndex.filterByIndex.queryExecution.logical + val plan = fileIndex.indexScan.queryExecution.logical val hasExpectedFilter = plan.find { case Filter(actual, _) => actual.semanticEquals(expect.expr) diff --git a/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlSuite.scala b/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlSuite.scala index fffe1fe295..f45be95325 100644 --- a/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlSuite.scala +++ b/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlSuite.scala @@ -12,7 +12,7 @@ import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIn import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper import org.apache.spark.FlintSuite -import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.flint.config.FlintSparkConf.{HOST_ENDPOINT, HOST_PORT} class FlintSparkSqlSuite extends QueryTest with FlintSuite with OpenSearchSuite { @@ -25,16 +25,16 @@ class FlintSparkSqlSuite extends QueryTest with FlintSuite with OpenSearchSuite } /** Test table and index name */ - private val testTable = "test" + private val testTable = "flint_sql_test" private val testIndex = getSkippingIndexName(testTable) override def beforeAll(): Unit = { super.beforeAll() - sql(s""" | CREATE TABLE $testTable | ( - | name STRING + | name STRING, + | age INT | ) | USING CSV | OPTIONS ( @@ -48,13 +48,41 @@ class FlintSparkSqlSuite extends QueryTest with FlintSuite with OpenSearchSuite |""".stripMargin) } - test("drop skipping index") { + protected override def beforeEach(): Unit = { + super.beforeEach() flint .skippingIndex() .onTable(testTable) .addPartitions("year") + .addValueSet("name") + .addMinMax("age") .create() + } + + protected override def afterEach(): Unit = { + super.afterEach() + flint.deleteIndex(testIndex) + } + + test("describe skipping index") { + val result = sql(s"DESC SKIPPING INDEX ON $testTable") + checkAnswer( + result, + Seq( + Row("year", "int", "Partition"), + Row("name", "string", "ValuesSet"), + Row("age", "int", "MinMax"))) + } + + test("should return empty if no skipping index to describe") { + flint.deleteIndex(testIndex) + + val result = sql(s"DESC SKIPPING INDEX ON $testTable") + checkAnswer(result, Seq.empty) + } + + test("drop skipping index") { sql(s"DROP SKIPPING INDEX ON $testTable") flint.describeIndex(testIndex) shouldBe empty