Skip to content

Commit

Permalink
Add Flint describe index SQL support (#1785)
Browse files Browse the repository at this point in the history
* Add empty impl and IT for desc skipping index

Signed-off-by: Chen Dai <[email protected]>

* Implement describe index and IT

Signed-off-by: Chen Dai <[email protected]>

* Add skip type in desc output

Signed-off-by: Chen Dai <[email protected]>

* Rename test table to avoid conflict

Signed-off-by: Chen Dai <[email protected]>

---------

Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen authored Jul 5, 2023
1 parent 102c8c8 commit 91b2a06
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ statement
;

skippingIndexStatement
: dropSkippingIndexStatement
: describeSkippingIndexStatement
| dropSkippingIndexStatement
;

describeSkippingIndexStatement
: (DESC | DESCRIBE) SKIPPING INDEX ON tableName=multipartIdentifier
;

dropSkippingIndexStatement
Expand Down
2 changes: 2 additions & 0 deletions flint/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ SKIPPING : 'SKIPPING';

SEMICOLON: ';';

DESC: 'DESC';
DESCRIBE: 'DESCRIBE';
DOT: '.';
DROP: 'DROP';
INDEX: 'INDEX';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,47 @@

package org.opensearch.flint.spark.sql

import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.DropSkippingIndexStatementContext
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{DescribeSkippingIndexStatementContext, DropSkippingIndexStatementContext}

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.Command
import org.apache.spark.sql.types.StringType

/**
* Flint Spark AST builder that builds Spark command for Flint index statement.
*/
class FlintSparkSqlAstBuilder extends FlintSparkSqlExtensionsBaseVisitor[Command] {

override def visitDropSkippingIndexStatement(
ctx: DropSkippingIndexStatementContext): Command = {
FlintSparkSqlCommand { flint =>
override def visitDescribeSkippingIndexStatement(
ctx: DescribeSkippingIndexStatementContext): Command = {
val outputSchema = Seq(
AttributeReference("indexed_col_name", StringType, nullable = false)(),
AttributeReference("data_type", StringType, nullable = false)(),
AttributeReference("skip_type", StringType, nullable = false)())

FlintSparkSqlCommand(outputSchema) { flint =>
val indexName = getSkippingIndexName(ctx.tableName.getText)
flint
.describeIndex(indexName)
.map { case index: FlintSparkSkippingIndex =>
index.indexedColumns.map(strategy =>
Row(strategy.columnName, strategy.columnType, strategy.kind.toString))
}
.getOrElse(Seq.empty)
}
}

override def visitDropSkippingIndexStatement(ctx: DropSkippingIndexStatementContext): Command =
FlintSparkSqlCommand() { flint =>
val tableName = ctx.tableName.getText // TODO: handle schema name
val indexName = getSkippingIndexName(tableName)
flint.deleteIndex(indexName)
Seq.empty
}
}

override def aggregateResult(aggregate: Command, nextResult: Command): Command =
if (nextResult != null) nextResult else aggregate;
if (nextResult != null) nextResult else aggregate
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.flint.spark.sql
import org.opensearch.flint.spark.FlintSpark

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.command.LeafRunnableCommand

/**
Expand All @@ -19,7 +20,12 @@ import org.apache.spark.sql.execution.command.LeafRunnableCommand
* @param block
* code block that triggers Flint core API
*/
case class FlintSparkSqlCommand(block: FlintSpark => Seq[Row]) extends LeafRunnableCommand {
case class FlintSparkSqlCommand(override val output: Seq[Attribute] = Seq.empty)(
block: FlintSpark => Seq[Row])
extends LeafRunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = block(new FlintSpark(sparkSession))

// Lazy arguments are required to specify here
override protected def otherCopyArgs: Seq[AnyRef] = block :: Nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIn
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper

import org.apache.spark.FlintSuite
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.flint.config.FlintSparkConf.{HOST_ENDPOINT, HOST_PORT}

class FlintSparkSqlSuite extends QueryTest with FlintSuite with OpenSearchSuite {
Expand All @@ -25,16 +25,16 @@ class FlintSparkSqlSuite extends QueryTest with FlintSuite with OpenSearchSuite
}

/** Test table and index name */
private val testTable = "test"
private val testTable = "flint_sql_test"
private val testIndex = getSkippingIndexName(testTable)

override def beforeAll(): Unit = {
super.beforeAll()

sql(s"""
| CREATE TABLE $testTable
| (
| name STRING
| name STRING,
| age INT
| )
| USING CSV
| OPTIONS (
Expand All @@ -48,13 +48,41 @@ class FlintSparkSqlSuite extends QueryTest with FlintSuite with OpenSearchSuite
|""".stripMargin)
}

test("drop skipping index") {
protected override def beforeEach(): Unit = {
super.beforeEach()
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year")
.addValueSet("name")
.addMinMax("age")
.create()
}

protected override def afterEach(): Unit = {
super.afterEach()
flint.deleteIndex(testIndex)
}

test("describe skipping index") {
val result = sql(s"DESC SKIPPING INDEX ON $testTable")

checkAnswer(
result,
Seq(
Row("year", "int", "Partition"),
Row("name", "string", "ValuesSet"),
Row("age", "int", "MinMax")))
}

test("should return empty if no skipping index to describe") {
flint.deleteIndex(testIndex)

val result = sql(s"DESC SKIPPING INDEX ON $testTable")
checkAnswer(result, Seq.empty)
}

test("drop skipping index") {
sql(s"DROP SKIPPING INDEX ON $testTable")

flint.describeIndex(testIndex) shouldBe empty
Expand Down

0 comments on commit 91b2a06

Please sign in to comment.