Skip to content

Commit

Permalink
[GLUTEN-3553][CH] Support bucket scan for ch backend (#3618)
Browse files Browse the repository at this point in the history
* [GLUTEN-3553][CH] Support bucket scan for ch backend

Support bucket scan for ch backend, including parquet format and mergetree format.

Close #3553.

* revert velox_be.yml
  • Loading branch information
zzcclp authored Nov 8, 2023
1 parent 4a72871 commit 3ec7bed
Show file tree
Hide file tree
Showing 398 changed files with 7,499 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,18 +84,30 @@ class CHTransformerApi extends TransformerApi with Logging {
relation: HadoopFsRelation,
selectedPartitions: Array[PartitionDirectory],
output: Seq[Attribute],
bucketedScan: Boolean,
optionalBucketSet: Option[BitSet],
optionalNumCoalescedBuckets: Option[Int],
disableBucketedScan: Boolean): Seq[InputPartition] = {
if (relation.location.isInstanceOf[ClickHouseFileIndex]) {
// Generate NativeMergeTreePartition for MergeTree
relation.location.asInstanceOf[ClickHouseFileIndex].partsPartitions
relation.location
.asInstanceOf[ClickHouseFileIndex]
.partsPartitions(
relation,
selectedPartitions,
output,
bucketedScan,
optionalBucketSet,
optionalNumCoalescedBuckets,
disableBucketedScan
)
} else {
// Generate FilePartition for Parquet
CHInputPartitionsUtil(
relation,
selectedPartitions,
output,
bucketedScan,
optionalBucketSet,
optionalNumCoalescedBuckets,
disableBucketedScan).genInputPartitionSeq()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class BatchScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric])
}

object BatchScanMetricsUpdater {
val INCLUDING_PROCESSORS = Array("MergeTreeInOrder", "SubstraitFileSource")
val CH_PLAN_NODE_NAME = Array("MergeTreeInOrder", "SubstraitFileSource")
// in mergetree format, the processor name is `MergeTreeSelect(pool: XXX, algorithm: XXX)`
val INCLUDING_PROCESSORS = Array("MergeTreeSelect(pool", "SubstraitFileSource")
val CH_PLAN_NODE_NAME = Array("MergeTreeSelect(pool", "SubstraitFileSource")
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric
}

object FileSourceScanMetricsUpdater {
val INCLUDING_PROCESSORS = Array("MergeTreeInOrder", "SubstraitFileSource")
val CH_PLAN_NODE_NAME = Array("MergeTreeInOrder", "SubstraitFileSource")
// in mergetree format, the processor name is `MergeTreeSelect(pool: XXX, algorithm: XXX)`
val INCLUDING_PROCESSORS = Array("MergeTreeSelect(pool", "SubstraitFileSource")
val CH_PLAN_NODE_NAME = Array("MergeTreeSelect(pool", "SubstraitFileSource")
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.metric.SQLMetric

import java.lang.{Long => JLong}
import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}
import java.util.{ArrayList => JArrayList, Collections => JCollections, List => JList, Map => JMap}

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -143,11 +143,12 @@ object MetricsUtil extends Logging {
relMap
.get(operatorIdx)
.forEach(
_ => {
nodeMetricsList.add(metrics.metricsDataList.get(curMetricsIdx))
idx => {
nodeMetricsList.add(metrics.metricsDataList.get(idx.toInt))
curMetricsIdx -= 1
})

JCollections.reverse(nodeMetricsList)
val operatorMetrics = new OperatorMetrics(
nodeMetricsList,
joinParamsMap.getOrDefault(operatorIdx, null),
Expand Down Expand Up @@ -195,10 +196,10 @@ object MetricsUtil extends Logging {
val processors = MetricsUtil.getAllProcessorList(metricData)
processors.foreach(
processor => {
if (!includingMetrics.contains(processor.name)) {
if (!includingMetrics.exists(processor.name.startsWith(_))) {
extraTime += (processor.time / 1000L).toLong
}
if (planNodeNames.contains(processor.name)) {
if (planNodeNames.exists(processor.name.startsWith(_))) {
outputRows += processor.outputRows
outputBytes += processor.outputBytes
inputRows += processor.inputRows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,12 @@ case class CHInputPartitionsUtil(
relation: HadoopFsRelation,
selectedPartitions: Array[PartitionDirectory],
output: Seq[Attribute],
bucketedScan: Boolean,
optionalBucketSet: Option[BitSet],
optionalNumCoalescedBuckets: Option[Int],
disableBucketedScan: Boolean)
extends Logging {

private val bucketedScan: Boolean = {
if (
relation.sparkSession.sessionState.conf.bucketingEnabled && relation.bucketSpec.isDefined
&& !disableBucketedScan
) {
val spec = relation.bucketSpec.get
val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n))
bucketColumns.size == spec.bucketColumnNames.size
} else {
false
}
}

def genInputPartitionSeq(): Seq[InputPartition] = {
if (bucketedScan) {
genBucketedInputPartitionSeq()
Expand Down
Loading

0 comments on commit 3ec7bed

Please sign in to comment.