Skip to content

Commit

Permalink
[GLUTTEN-3851][VL] Add remaining filter time metric (#3852)
Browse files Browse the repository at this point in the history
[VL] Add remaining filter time metric
  • Loading branch information
zhli1142015 authored Nov 28, 2023
1 parent 8518680 commit e0f197f
Show file tree
Hide file tree
Showing 10 changed files with 39 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ class MetricsApiImpl extends MetricsApi with Logging {
"skippedSplits" -> SQLMetrics.createMetric(sparkContext, "number of skipped splits"),
"processedSplits" -> SQLMetrics.createMetric(sparkContext, "number of processed splits"),
"skippedStrides" -> SQLMetrics.createMetric(sparkContext, "number of skipped row groups"),
"processedStrides" -> SQLMetrics.createMetric(sparkContext, "number of processed row groups")
"processedStrides" -> SQLMetrics.createMetric(sparkContext, "number of processed row groups"),
"remainingFilterTime" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"remaining filter time")
)

override def genBatchScanTransformerMetricsUpdater(
Expand Down Expand Up @@ -96,7 +99,10 @@ class MetricsApiImpl extends MetricsApi with Logging {
"skippedSplits" -> SQLMetrics.createMetric(sparkContext, "number of skipped splits"),
"processedSplits" -> SQLMetrics.createMetric(sparkContext, "number of processed splits"),
"skippedStrides" -> SQLMetrics.createMetric(sparkContext, "number of skipped row groups"),
"processedStrides" -> SQLMetrics.createMetric(sparkContext, "number of processed row groups")
"processedStrides" -> SQLMetrics.createMetric(sparkContext, "number of processed row groups"),
"remainingFilterTime" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"remaining filter time")
)

override def genHiveTableScanTransformerMetricsUpdater(
Expand Down Expand Up @@ -132,7 +138,10 @@ class MetricsApiImpl extends MetricsApi with Logging {
"skippedSplits" -> SQLMetrics.createMetric(sparkContext, "number of skipped splits"),
"processedSplits" -> SQLMetrics.createMetric(sparkContext, "number of processed splits"),
"skippedStrides" -> SQLMetrics.createMetric(sparkContext, "number of skipped row groups"),
"processedStrides" -> SQLMetrics.createMetric(sparkContext, "number of processed row groups")
"processedStrides" -> SQLMetrics.createMetric(sparkContext, "number of processed row groups"),
"remainingFilterTime" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"remaining filter time")
)

override def genFileSourceScanTransformerMetricsUpdater(
Expand Down
7 changes: 4 additions & 3 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,8 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {

metricsBuilderClass = createGlobalClassReferenceOrError(env, "Lio/glutenproject/metrics/Metrics;");

metricsBuilderConstructor =
getMethodIdOrError(env, metricsBuilderClass, "<init>", "([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J)V");
metricsBuilderConstructor = getMethodIdOrError(
env, metricsBuilderClass, "<init>", "([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J)V");

serializedColumnarBatchIteratorClass =
createGlobalClassReferenceOrError(env, "Lio/glutenproject/vectorized/ColumnarBatchInIterator;");
Expand Down Expand Up @@ -485,7 +485,8 @@ JNIEXPORT jobject JNICALL Java_io_glutenproject_vectorized_ColumnarBatchOutItera
longArray[Metrics::kSkippedSplits],
longArray[Metrics::kProcessedSplits],
longArray[Metrics::kSkippedStrides],
longArray[Metrics::kProcessedStrides]);
longArray[Metrics::kProcessedStrides],
longArray[Metrics::kRemainingFilterTime]);

JNI_METHOD_END(nullptr)
}
Expand Down
1 change: 1 addition & 0 deletions cpp/core/utils/metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ struct Metrics {
kProcessedSplits,
kSkippedStrides,
kProcessedStrides,
kRemainingFilterTime,

// The end of enum items.
kEnd,
Expand Down
3 changes: 3 additions & 0 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ const std::string kSkippedSplits = "skippedSplits";
const std::string kProcessedSplits = "processedSplits";
const std::string kSkippedStrides = "skippedStrides";
const std::string kProcessedStrides = "processedStrides";
const std::string kRemainingFilterTime = "totalRemainingFilterTime";

// others
const std::string kHiveDefaultPartition = "__HIVE_DEFAULT_PARTITION__";
Expand Down Expand Up @@ -284,6 +285,8 @@ void WholeStageResultIterator::collectMetrics() {
metrics_->get(Metrics::kSkippedStrides)[metricIndex] = runtimeMetric("sum", second->customStats, kSkippedStrides);
metrics_->get(Metrics::kProcessedStrides)[metricIndex] =
runtimeMetric("sum", second->customStats, kProcessedStrides);
metrics_->get(Metrics::kRemainingFilterTime)[metricIndex] =
runtimeMetric("sum", second->customStats, kRemainingFilterTime);
metricIndex += 1;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class Metrics implements IMetrics {
public long[] processedSplits;
public long[] skippedStrides;
public long[] processedStrides;
public long[] remainingFilterTime;
public SingleMetric singleMetric = new SingleMetric();

/** Create an instance for native metrics. */
Expand Down Expand Up @@ -73,7 +74,8 @@ public Metrics(
long[] skippedSplits,
long[] processedSplits,
long[] skippedStrides,
long[] processedStrides) {
long[] processedStrides,
long[] remainingFilterTime) {
this.inputRows = inputRows;
this.inputVectors = inputVectors;
this.inputBytes = inputBytes;
Expand All @@ -100,6 +102,7 @@ public Metrics(
this.processedSplits = processedSplits;
this.skippedStrides = skippedStrides;
this.processedStrides = processedStrides;
this.remainingFilterTime = remainingFilterTime;
}

public OperatorMetrics getOperatorMetrics(int index) {
Expand Down Expand Up @@ -132,7 +135,8 @@ public OperatorMetrics getOperatorMetrics(int index) {
skippedSplits[index],
processedSplits[index],
skippedStrides[index],
processedStrides[index]);
processedStrides[index],
remainingFilterTime[index]);
}

public SingleMetric getSingleMetrics() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class OperatorMetrics implements IOperatorMetrics {
public long processedSplits;
public long skippedStrides;
public long processedStrides;
public long remainingFilterTime;

/** Create an instance for operator metrics. */
public OperatorMetrics(
Expand Down Expand Up @@ -69,7 +70,8 @@ public OperatorMetrics(
long skippedSplits,
long processedSplits,
long skippedStrides,
long processedStrides) {
long processedStrides,
long remainingFilterTime) {
this.inputRows = inputRows;
this.inputVectors = inputVectors;
this.inputBytes = inputBytes;
Expand All @@ -95,5 +97,6 @@ public OperatorMetrics(
this.processedSplits = processedSplits;
this.skippedStrides = skippedStrides;
this.processedStrides = processedStrides;
this.remainingFilterTime = remainingFilterTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class BatchScanMetricsUpdater(val metrics: Map[String, SQLMetric]) extends Metri
metrics("processedSplits") += operatorMetrics.processedSplits
metrics("skippedStrides") += operatorMetrics.skippedStrides
metrics("processedStrides") += operatorMetrics.processedStrides
metrics("remainingFilterTime") += operatorMetrics.remainingFilterTime
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric
val processedSplits: SQLMetric = metrics("processedSplits")
val skippedStrides: SQLMetric = metrics("skippedStrides")
val processedStrides: SQLMetric = metrics("processedStrides")
val remainingFilterTime: SQLMetric = metrics("remainingFilterTime")

override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {
inputMetrics.bridgeIncBytesRead(rawInputBytes.value)
Expand All @@ -68,6 +69,7 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric
processedSplits += operatorMetrics.processedSplits
skippedStrides += operatorMetrics.skippedStrides
processedStrides += operatorMetrics.processedStrides
remainingFilterTime += operatorMetrics.remainingFilterTime
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class HiveTableScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric]
val processedSplits: SQLMetric = metrics("processedSplits")
val skippedStrides: SQLMetric = metrics("skippedStrides")
val processedStrides: SQLMetric = metrics("processedStrides")
val remainingFilterTime: SQLMetric = metrics("remainingFilterTime")

override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {
inputMetrics.bridgeIncBytesRead(rawInputBytes.value)
Expand All @@ -63,6 +64,7 @@ class HiveTableScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric]
processedSplits += operatorMetrics.processedSplits
skippedStrides += operatorMetrics.skippedStrides
processedStrides += operatorMetrics.processedStrides
remainingFilterTime += operatorMetrics.remainingFilterTime
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ object MetricsUtil extends Logging {
var processedSplits: Long = 0
var skippedStrides: Long = 0
var processedStrides: Long = 0
var remainingFilterTime: Long = 0

val metricsIterator = operatorMetrics.iterator()
while (metricsIterator.hasNext) {
Expand All @@ -130,6 +131,7 @@ object MetricsUtil extends Logging {
processedSplits += metrics.processedSplits
skippedStrides += metrics.skippedStrides
processedStrides += metrics.processedStrides
remainingFilterTime += metrics.remainingFilterTime
}

new OperatorMetrics(
Expand Down Expand Up @@ -157,7 +159,8 @@ object MetricsUtil extends Logging {
skippedSplits,
processedSplits,
skippedStrides,
processedStrides
processedStrides,
remainingFilterTime
)
}

Expand Down

0 comments on commit e0f197f

Please sign in to comment.