Skip to content

Commit

Permalink
[GLUTEN-3553][CH] Support bucket scan for ch backend
Browse files Browse the repository at this point in the history
Support bucket scan for ch backend, including parquet format and mergetree format.

Close #3553.
  • Loading branch information
zzcclp committed Nov 7, 2023
1 parent a14baf3 commit 83ddc20
Show file tree
Hide file tree
Showing 399 changed files with 7,499 additions and 101 deletions.
19 changes: 0 additions & 19 deletions .github/workflows/velox_be.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,6 @@ name: Velox backend

on:
pull_request:
paths:
- '.github/**'
- 'pom.xml'
- 'backends-velox/**'
- 'gluten-celeborn/**'
- 'gluten-core/**'
- 'gluten-data/**'
- 'gluten-ut/**'
- 'shims/**'
- 'tools/gluten-it/**'
- 'tools/gluten-te/**'
- 'ep/build-arrow/**'
- 'ep/build-velox/**'
- 'cpp/*'
- 'cpp/CMake/**'
- 'cpp/velox/**'
- 'cpp/core/**'
- 'dev**'
# - 'substrait/substrait-spark/**'


concurrency:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,18 +84,30 @@ class CHTransformerApi extends TransformerApi with Logging {
relation: HadoopFsRelation,
selectedPartitions: Array[PartitionDirectory],
output: Seq[Attribute],
bucketedScan: Boolean,
optionalBucketSet: Option[BitSet],
optionalNumCoalescedBuckets: Option[Int],
disableBucketedScan: Boolean): Seq[InputPartition] = {
if (relation.location.isInstanceOf[ClickHouseFileIndex]) {
// Generate NativeMergeTreePartition for MergeTree
relation.location.asInstanceOf[ClickHouseFileIndex].partsPartitions
relation.location
.asInstanceOf[ClickHouseFileIndex]
.partsPartitions(
relation,
selectedPartitions,
output,
bucketedScan,
optionalBucketSet,
optionalNumCoalescedBuckets,
disableBucketedScan
)
} else {
// Generate FilePartition for Parquet
CHInputPartitionsUtil(
relation,
selectedPartitions,
output,
bucketedScan,
optionalBucketSet,
optionalNumCoalescedBuckets,
disableBucketedScan).genInputPartitionSeq()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class BatchScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric])
}

object BatchScanMetricsUpdater {
val INCLUDING_PROCESSORS = Array("MergeTreeInOrder", "SubstraitFileSource")
val CH_PLAN_NODE_NAME = Array("MergeTreeInOrder", "SubstraitFileSource")
// in mergetree format, the processor name is `MergeTreeSelect(pool: XXX, algorithm: XXX)`
val INCLUDING_PROCESSORS = Array("MergeTreeSelect(pool", "SubstraitFileSource")
val CH_PLAN_NODE_NAME = Array("MergeTreeSelect(pool", "SubstraitFileSource")
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric
}

object FileSourceScanMetricsUpdater {
val INCLUDING_PROCESSORS = Array("MergeTreeInOrder", "SubstraitFileSource")
val CH_PLAN_NODE_NAME = Array("MergeTreeInOrder", "SubstraitFileSource")
// in mergetree format, the processor name is `MergeTreeSelect(pool: XXX, algorithm: XXX)`
val INCLUDING_PROCESSORS = Array("MergeTreeSelect(pool", "SubstraitFileSource")
val CH_PLAN_NODE_NAME = Array("MergeTreeSelect(pool", "SubstraitFileSource")
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.metric.SQLMetric

import java.lang.{Long => JLong}
import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}
import java.util.{ArrayList => JArrayList, Collections => JCollections, List => JList, Map => JMap}

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -143,11 +143,12 @@ object MetricsUtil extends Logging {
relMap
.get(operatorIdx)
.forEach(
_ => {
nodeMetricsList.add(metrics.metricsDataList.get(curMetricsIdx))
idx => {
nodeMetricsList.add(metrics.metricsDataList.get(idx.toInt))
curMetricsIdx -= 1
})

JCollections.reverse(nodeMetricsList)
val operatorMetrics = new OperatorMetrics(
nodeMetricsList,
joinParamsMap.getOrDefault(operatorIdx, null),
Expand Down Expand Up @@ -195,10 +196,10 @@ object MetricsUtil extends Logging {
val processors = MetricsUtil.getAllProcessorList(metricData)
processors.foreach(
processor => {
if (!includingMetrics.contains(processor.name)) {
if (!includingMetrics.exists(processor.name.startsWith(_))) {
extraTime += (processor.time / 1000L).toLong
}
if (planNodeNames.contains(processor.name)) {
if (planNodeNames.exists(processor.name.startsWith(_))) {
outputRows += processor.outputRows
outputBytes += processor.outputBytes
inputRows += processor.inputRows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,12 @@ case class CHInputPartitionsUtil(
relation: HadoopFsRelation,
selectedPartitions: Array[PartitionDirectory],
output: Seq[Attribute],
bucketedScan: Boolean,
optionalBucketSet: Option[BitSet],
optionalNumCoalescedBuckets: Option[Int],
disableBucketedScan: Boolean)
extends Logging {

private val bucketedScan: Boolean = {
if (
relation.sparkSession.sessionState.conf.bucketingEnabled && relation.bucketSpec.isDefined
&& !disableBucketedScan
) {
val spec = relation.bucketSpec.get
val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n))
bucketColumns.size == spec.bucketColumnNames.size
} else {
false
}
}

def genInputPartitionSeq(): Seq[InputPartition] = {
if (bucketedScan) {
genBucketedInputPartitionSeq()
Expand Down
Loading

0 comments on commit 83ddc20

Please sign in to comment.