Skip to content

Commit

Permalink
[VL] Add metric to indicate aggregation pushdown (#7729)
Browse files Browse the repository at this point in the history
 [VL] add metric to indicate aggregation pushdown

Related Velox code:
https://github.com/facebookincubator/velox/blob/main/velox/vector/LazyVector.cpp#L72


Co-authored-by: zhli1142015 <[email protected]>
  • Loading branch information
zhli1142015 and zhli1142015 authored Oct 31, 2024
1 parent b24a067 commit 0737113
Show file tree
Hide file tree
Showing 8 changed files with 21 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class Metrics implements IMetrics {
public long[] numDynamicFiltersAccepted;
public long[] numReplacedWithDynamicFilterRows;
public long[] flushRowCount;
public long[] loadedToValueHook;
public long[] skippedSplits;
public long[] processedSplits;
public long[] skippedStrides;
Expand Down Expand Up @@ -79,6 +80,7 @@ public Metrics(
long[] numDynamicFiltersAccepted,
long[] numReplacedWithDynamicFilterRows,
long[] flushRowCount,
long[] loadedToValueHook,
long[] scanTime,
long[] skippedSplits,
long[] processedSplits,
Expand Down Expand Up @@ -113,6 +115,7 @@ public Metrics(
this.numDynamicFiltersAccepted = numDynamicFiltersAccepted;
this.numReplacedWithDynamicFilterRows = numReplacedWithDynamicFilterRows;
this.flushRowCount = flushRowCount;
this.loadedToValueHook = loadedToValueHook;
this.skippedSplits = skippedSplits;
this.processedSplits = processedSplits;
this.skippedStrides = skippedStrides;
Expand Down Expand Up @@ -152,6 +155,7 @@ public OperatorMetrics getOperatorMetrics(int index) {
numDynamicFiltersAccepted[index],
numReplacedWithDynamicFilterRows[index],
flushRowCount[index],
loadedToValueHook[index],
scanTime[index],
skippedSplits[index],
processedSplits[index],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class OperatorMetrics implements IOperatorMetrics {
public long numDynamicFiltersAccepted;
public long numReplacedWithDynamicFilterRows;
public long flushRowCount;
public long loadedToValueHook;
public long skippedSplits;
public long processedSplits;
public long skippedStrides;
Expand Down Expand Up @@ -74,6 +75,7 @@ public OperatorMetrics(
long numDynamicFiltersAccepted,
long numReplacedWithDynamicFilterRows,
long flushRowCount,
long loadedToValueHook,
long scanTime,
long skippedSplits,
long processedSplits,
Expand Down Expand Up @@ -107,6 +109,7 @@ public OperatorMetrics(
this.numDynamicFiltersAccepted = numDynamicFiltersAccepted;
this.numReplacedWithDynamicFilterRows = numReplacedWithDynamicFilterRows;
this.flushRowCount = flushRowCount;
this.loadedToValueHook = loadedToValueHook;
this.skippedSplits = skippedSplits;
this.processedSplits = processedSplits;
this.skippedStrides = skippedStrides;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"number of spilled partitions"),
"aggSpilledFiles" -> SQLMetrics.createMetric(sparkContext, "number of spilled files"),
"flushRowCount" -> SQLMetrics.createMetric(sparkContext, "number of flushed rows"),
"loadedToValueHook" -> SQLMetrics.createMetric(
sparkContext,
"number of pushdown aggregations"),
"rowConstructionCpuCount" -> SQLMetrics.createMetric(
sparkContext,
"rowConstruction cpu wall time count"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class HashAggregateMetricsUpdaterImpl(val metrics: Map[String, SQLMetric])
val aggSpilledPartitions: SQLMetric = metrics("aggSpilledPartitions")
val aggSpilledFiles: SQLMetric = metrics("aggSpilledFiles")
val flushRowCount: SQLMetric = metrics("flushRowCount")
val loadedToValueHook: SQLMetric = metrics("loadedToValueHook")

val rowConstructionCpuCount: SQLMetric = metrics("rowConstructionCpuCount")
val rowConstructionWallNanos: SQLMetric = metrics("rowConstructionWallNanos")
Expand Down Expand Up @@ -76,6 +77,7 @@ class HashAggregateMetricsUpdaterImpl(val metrics: Map[String, SQLMetric])
aggSpilledPartitions += aggMetrics.spilledPartitions
aggSpilledFiles += aggMetrics.spilledFiles
flushRowCount += aggMetrics.flushRowCount
loadedToValueHook += aggMetrics.loadedToValueHook
idx += 1

if (aggParams.rowConstructionNeeded) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ object MetricsUtil extends Logging {
var numDynamicFiltersAccepted: Long = 0
var numReplacedWithDynamicFilterRows: Long = 0
var flushRowCount: Long = 0
var loadedToValueHook: Long = 0
var scanTime: Long = 0
var skippedSplits: Long = 0
var processedSplits: Long = 0
Expand All @@ -141,6 +142,7 @@ object MetricsUtil extends Logging {
numDynamicFiltersAccepted += metrics.numDynamicFiltersAccepted
numReplacedWithDynamicFilterRows += metrics.numReplacedWithDynamicFilterRows
flushRowCount += metrics.flushRowCount
loadedToValueHook += metrics.loadedToValueHook
scanTime += metrics.scanTime
skippedSplits += metrics.skippedSplits
processedSplits += metrics.processedSplits
Expand Down Expand Up @@ -174,6 +176,7 @@ object MetricsUtil extends Logging {
numDynamicFiltersAccepted,
numReplacedWithDynamicFilterRows,
flushRowCount,
loadedToValueHook,
scanTime,
skippedSplits,
processedSplits,
Expand Down
3 changes: 2 additions & 1 deletion cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
metricsBuilderClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/metrics/Metrics;");

metricsBuilderConstructor = getMethodIdOrError(
env, metricsBuilderClass, "<init>", "([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J)V");
env, metricsBuilderClass, "<init>", "([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J)V");

nativeColumnarToRowInfoClass =
createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/vectorized/NativeColumnarToRowInfo;");
Expand Down Expand Up @@ -499,6 +499,7 @@ JNIEXPORT jobject JNICALL Java_org_apache_gluten_metrics_IteratorMetricsJniWrapp
longArray[Metrics::kNumDynamicFiltersAccepted],
longArray[Metrics::kNumReplacedWithDynamicFilterRows],
longArray[Metrics::kFlushRowCount],
longArray[Metrics::kLoadedToValueHook],
longArray[Metrics::kScanTime],
longArray[Metrics::kSkippedSplits],
longArray[Metrics::kProcessedSplits],
Expand Down
1 change: 1 addition & 0 deletions cpp/core/utils/Metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ struct Metrics {
kNumDynamicFiltersAccepted,
kNumReplacedWithDynamicFilterRows,
kFlushRowCount,
kLoadedToValueHook,
kScanTime,
kSkippedSplits,
kProcessedSplits,
Expand Down
3 changes: 3 additions & 0 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const std::string kDynamicFiltersProduced = "dynamicFiltersProduced";
const std::string kDynamicFiltersAccepted = "dynamicFiltersAccepted";
const std::string kReplacedWithDynamicFilterRows = "replacedWithDynamicFilterRows";
const std::string kFlushRowCount = "flushRowCount";
const std::string kLoadedToValueHook = "loadedToValueHook";
const std::string kTotalScanTime = "totalScanTime";
const std::string kSkippedSplits = "skippedSplits";
const std::string kProcessedSplits = "processedSplits";
Expand Down Expand Up @@ -389,6 +390,8 @@ void WholeStageResultIterator::collectMetrics() {
metrics_->get(Metrics::kNumReplacedWithDynamicFilterRows)[metricIndex] =
runtimeMetric("sum", second->customStats, kReplacedWithDynamicFilterRows);
metrics_->get(Metrics::kFlushRowCount)[metricIndex] = runtimeMetric("sum", second->customStats, kFlushRowCount);
metrics_->get(Metrics::kLoadedToValueHook)[metricIndex] =
runtimeMetric("sum", second->customStats, kLoadedToValueHook);
metrics_->get(Metrics::kScanTime)[metricIndex] = runtimeMetric("sum", second->customStats, kTotalScanTime);
metrics_->get(Metrics::kSkippedSplits)[metricIndex] = runtimeMetric("sum", second->customStats, kSkippedSplits);
metrics_->get(Metrics::kProcessedSplits)[metricIndex] =
Expand Down

0 comments on commit 0737113

Please sign in to comment.