Skip to content

Commit

Permalink
Datasource V2 data lake read support
Browse files Browse the repository at this point in the history
  • Loading branch information
liujiayi771 committed Nov 24, 2023
1 parent f456a7e commit 8a17fb5
Show file tree
Hide file tree
Showing 66 changed files with 906 additions and 389 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,4 +217,6 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
override def needOutputSchemaForPlan(): Boolean = true

override def allowDecimalArithmetic: Boolean = !SQLConf.get.decimalOperationsAllowPrecisionLoss

override def requiredInputFilePaths(): Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ case class CHFilterExecTransformer(condition: Expression, child: SparkPlan)
private def getLeftCondition: Expression = {
val scanFilters = child match {
// Get the filters including the manually pushed down ones.
case basicScanTransformer: BasicScanExecTransformer =>
case basicScanTransformer: BaseScanTransformer =>
basicScanTransformer.filterExprs()
// In ColumnarGuardRules, the child is still row-based. Need to get the original filters.
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class GlutenClickHouseDSV2Suite extends GlutenClickHouseTPCHAbstractSuite {
runTPCHQuery(1) {
df =>
val scanExec = df.queryExecution.executedPlan.collect {
case scanExec: BasicScanExecTransformer => scanExec
case scanExec: BaseScanTransformer => scanExec
}
assert(scanExec.size == 1)
}
Expand All @@ -48,7 +48,7 @@ class GlutenClickHouseDSV2Suite extends GlutenClickHouseTPCHAbstractSuite {
runTPCHQuery(2) {
df =>
val scanExec = df.queryExecution.executedPlan.collect {
case scanExec: BasicScanExecTransformer => scanExec
case scanExec: BaseScanTransformer => scanExec
}
assert(scanExec.size == 9)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class GlutenClickHouseFileFormatSuite
sql,
df => {
val csvFileScan = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
case f: FileSourceScanTransformer => f
}
assert(csvFileScan.size == 1)
}
Expand All @@ -189,7 +189,7 @@ class GlutenClickHouseFileFormatSuite
sql,
df => {
val csvFileScan = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
case f: FileSourceScanTransformer => f
}
assert(csvFileScan.size == 1)
}
Expand All @@ -211,7 +211,7 @@ class GlutenClickHouseFileFormatSuite
sql,
df => {
val csvFileScan = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
case f: FileSourceScanTransformer => f
}
assert(csvFileScan.size == 1)
},
Expand Down Expand Up @@ -449,7 +449,7 @@ class GlutenClickHouseFileFormatSuite

val result = df.collect()
val csvFileScan = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
case f: FileSourceScanTransformer => f
}

assert(csvFileScan.size == 1)
Expand Down Expand Up @@ -483,7 +483,7 @@ class GlutenClickHouseFileFormatSuite
.toDF()

val csvFileScan = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
case f: FileSourceScanTransformer => f
}
assert(csvFileScan.size == 1)
assert(df.collect().length == 12)
Expand Down Expand Up @@ -745,7 +745,7 @@ class GlutenClickHouseFileFormatSuite
checkAnswer(df, expectedAnswer)

val csvFileScan = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
case f: FileSourceScanTransformer => f
}
assert(csvFileScan.size == 1)
}
Expand Down Expand Up @@ -780,7 +780,7 @@ class GlutenClickHouseFileFormatSuite
checkAnswer(df1, expectedAnswer)

var csvFileScan = collect(df1.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
case f: FileSourceScanTransformer => f
}
assert(csvFileScan.size == 1)

Expand All @@ -805,7 +805,7 @@ class GlutenClickHouseFileFormatSuite
checkAnswer(df2, expectedAnswer)

csvFileScan = collect(df2.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
case f: FileSourceScanTransformer => f
}
assert(csvFileScan.size == 1)

Expand Down Expand Up @@ -879,7 +879,7 @@ class GlutenClickHouseFileFormatSuite
checkAnswer(df, expectedAnswer)

val csvFileScan = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
case f: FileSourceScanTransformer => f
}
assert(csvFileScan.size == 1)
}
Expand Down Expand Up @@ -913,7 +913,7 @@ class GlutenClickHouseFileFormatSuite
checkAnswer(df, expectedAnswer)

val csvFileScan = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
case f: FileSourceScanTransformer => f
}
assert(csvFileScan.size == 1)
}
Expand Down Expand Up @@ -945,7 +945,7 @@ class GlutenClickHouseFileFormatSuite
checkAnswer(df, expectedAnswer)

val csvFileScan = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
case f: FileSourceScanTransformer => f
}
assert(csvFileScan.size == 1)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,17 @@ class GlutenClickHouseTPCDSParquetAQESuite
true,
df => {
val foundDynamicPruningExpr = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
case f: FileSourceScanTransformer => f
}
assert(foundDynamicPruningExpr.size == 2)
assert(
foundDynamicPruningExpr(1)
.asInstanceOf[FileSourceScanExecTransformer]
.asInstanceOf[FileSourceScanTransformer]
.partitionFilters
.exists(_.isInstanceOf[DynamicPruningExpression]))
assert(
foundDynamicPruningExpr(1)
.asInstanceOf[FileSourceScanExecTransformer]
.asInstanceOf[FileSourceScanTransformer]
.selectedPartitions
.size == 1823)
}
Expand All @@ -137,7 +137,7 @@ class GlutenClickHouseTPCDSParquetAQESuite
df =>
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
val foundDynamicPruningExpr = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer if f.partitionFilters.exists {
case f: FileSourceScanTransformer if f.partitionFilters.exists {
case _: DynamicPruningExpression => true
case _ => false
} =>
Expand All @@ -160,7 +160,7 @@ class GlutenClickHouseTPCDSParquetAQESuite
df =>
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
val foundDynamicPruningExpr = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer if f.partitionFilters.exists {
case f: FileSourceScanTransformer if f.partitionFilters.exists {
case _: DynamicPruningExpression => true
case _ => false
} =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,17 @@ class GlutenClickHouseTPCDSParquetColumnarShuffleAQESuite
true,
df => {
val foundDynamicPruningExpr = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
case f: FileSourceScanTransformer => f
}
assert(foundDynamicPruningExpr.size == 2)
assert(
foundDynamicPruningExpr(1)
.asInstanceOf[FileSourceScanExecTransformer]
.asInstanceOf[FileSourceScanTransformer]
.partitionFilters
.exists(_.isInstanceOf[DynamicPruningExpression]))
assert(
foundDynamicPruningExpr(1)
.asInstanceOf[FileSourceScanExecTransformer]
.asInstanceOf[FileSourceScanTransformer]
.selectedPartitions
.size == 1823)
}
Expand Down Expand Up @@ -141,7 +141,7 @@ class GlutenClickHouseTPCDSParquetColumnarShuffleAQESuite
df =>
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
val foundDynamicPruningExpr = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer if f.partitionFilters.exists {
case f: FileSourceScanTransformer if f.partitionFilters.exists {
case _: DynamicPruningExpression => true
case _ => false
} =>
Expand All @@ -164,7 +164,7 @@ class GlutenClickHouseTPCDSParquetColumnarShuffleAQESuite
df =>
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
val foundDynamicPruningExpr = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer if f.partitionFilters.exists {
case f: FileSourceScanTransformer if f.partitionFilters.exists {
case _: DynamicPruningExpression => true
case _ => false
} =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,17 @@ class GlutenClickHouseTPCDSParquetColumnarShuffleSuite extends GlutenClickHouseT
true,
df => {
val foundDynamicPruningExpr = df.queryExecution.executedPlan.collect {
case f: FileSourceScanExecTransformer => f
case f: FileSourceScanTransformer => f
}
assert(foundDynamicPruningExpr.size == 2)
assert(
foundDynamicPruningExpr(1)
.asInstanceOf[FileSourceScanExecTransformer]
.asInstanceOf[FileSourceScanTransformer]
.partitionFilters
.exists(_.isInstanceOf[DynamicPruningExpression]))
assert(
foundDynamicPruningExpr(1)
.asInstanceOf[FileSourceScanExecTransformer]
.asInstanceOf[FileSourceScanTransformer]
.selectedPartitions
.size == 1823)
}
Expand Down Expand Up @@ -138,7 +138,7 @@ class GlutenClickHouseTPCDSParquetColumnarShuffleSuite extends GlutenClickHouseT
runTPCDSQuery("q21") {
df =>
val foundDynamicPruningExpr = df.queryExecution.executedPlan.find {
case f: FileSourceScanExecTransformer =>
case f: FileSourceScanTransformer =>
f.partitionFilters.exists {
case _: DynamicPruningExpression => true
case _ => false
Expand All @@ -162,7 +162,7 @@ class GlutenClickHouseTPCDSParquetColumnarShuffleSuite extends GlutenClickHouseT
runTPCDSQuery("q21") {
df =>
val foundDynamicPruningExpr = df.queryExecution.executedPlan.find {
case f: FileSourceScanExecTransformer =>
case f: FileSourceScanTransformer =>
f.partitionFilters.exists {
case _: DynamicPruningExpression => true
case _ => false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,17 +117,17 @@ class GlutenClickHouseTPCDSParquetGraceHashJoinSuite extends GlutenClickHouseTPC
true,
df => {
val foundDynamicPruningExpr = df.queryExecution.executedPlan.collect {
case f: FileSourceScanExecTransformer => f
case f: FileSourceScanTransformer => f
}
assert(foundDynamicPruningExpr.size == 2)
assert(
foundDynamicPruningExpr(1)
.asInstanceOf[FileSourceScanExecTransformer]
.asInstanceOf[FileSourceScanTransformer]
.partitionFilters
.exists(_.isInstanceOf[DynamicPruningExpression]))
assert(
foundDynamicPruningExpr(1)
.asInstanceOf[FileSourceScanExecTransformer]
.asInstanceOf[FileSourceScanTransformer]
.selectedPartitions
.size == 1823)
}
Expand All @@ -141,7 +141,7 @@ class GlutenClickHouseTPCDSParquetGraceHashJoinSuite extends GlutenClickHouseTPC
runTPCDSQuery("q21") {
df =>
val foundDynamicPruningExpr = df.queryExecution.executedPlan.find {
case f: FileSourceScanExecTransformer =>
case f: FileSourceScanTransformer =>
f.partitionFilters.exists {
case _: DynamicPruningExpression => true
case _ => false
Expand All @@ -164,7 +164,7 @@ class GlutenClickHouseTPCDSParquetGraceHashJoinSuite extends GlutenClickHouseTPC
runTPCDSQuery("q21") {
df =>
val foundDynamicPruningExpr = df.queryExecution.executedPlan.find {
case f: FileSourceScanExecTransformer =>
case f: FileSourceScanTransformer =>
f.partitionFilters.exists {
case _: DynamicPruningExpression => true
case _ => false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,17 +223,17 @@ class GlutenClickHouseTPCDSParquetSuite extends GlutenClickHouseTPCDSAbstractSui
true,
df => {
val foundDynamicPruningExpr = df.queryExecution.executedPlan.collect {
case f: FileSourceScanExecTransformer => f
case f: FileSourceScanTransformer => f
}
assert(foundDynamicPruningExpr.size == 2)
assert(
foundDynamicPruningExpr(1)
.asInstanceOf[FileSourceScanExecTransformer]
.asInstanceOf[FileSourceScanTransformer]
.partitionFilters
.exists(_.isInstanceOf[DynamicPruningExpression]))
assert(
foundDynamicPruningExpr(1)
.asInstanceOf[FileSourceScanExecTransformer]
.asInstanceOf[FileSourceScanTransformer]
.selectedPartitions
.size == 1823)
}
Expand All @@ -260,7 +260,7 @@ class GlutenClickHouseTPCDSParquetSuite extends GlutenClickHouseTPCDSAbstractSui
runTPCDSQuery("q21") {
df =>
val foundDynamicPruningExpr = df.queryExecution.executedPlan.find {
case f: FileSourceScanExecTransformer =>
case f: FileSourceScanTransformer =>
f.partitionFilters.exists {
case _: DynamicPruningExpression => true
case _ => false
Expand All @@ -284,7 +284,7 @@ class GlutenClickHouseTPCDSParquetSuite extends GlutenClickHouseTPCDSAbstractSui
runTPCDSQuery("q21") {
df =>
val foundDynamicPruningExpr = df.queryExecution.executedPlan.find {
case f: FileSourceScanExecTransformer =>
case f: FileSourceScanTransformer =>
f.partitionFilters.exists {
case _: DynamicPruningExpression => true
case _ => false
Expand Down
Loading

0 comments on commit 8a17fb5

Please sign in to comment.