From 944c92623ae07430d48b7569ae52ec14eefcfb0a Mon Sep 17 00:00:00 2001 From: Nicholas Jiang Date: Sun, 4 Aug 2024 14:37:03 +0800 Subject: [PATCH] [GLUTEN-6695][CH] Introduce shuffleWallTime in CHMetricsApi to calculate the overall shuffle write time (#6696) --- .../apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala | 1 + .../spark/shuffle/CHCelebornColumnarShuffleWriter.scala | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala index 85b298fa4835..0ff53e1c5817 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala @@ -222,6 +222,7 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil { "spillTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time to spill"), "compressTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time to compress"), "prepareTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time to prepare"), + "shuffleWallTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "shuffle wall time"), "avgReadBatchNumRows" -> SQLMetrics .createAverageMetric(sparkContext, "avg read batch num rows"), "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), diff --git a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala index ae22a0890819..c7d7957c15b6 100644 --- a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala +++ b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala @@ -63,7 +63,9 @@ class CHCelebornColumnarShuffleWriter[K, V]( } else { initShuffleWriter(cb) val col = cb.column(0).asInstanceOf[CHColumnVector] + val startTime = System.nanoTime() jniWrapper.split(nativeShuffleWriter, col.getBlockAddress) + dep.metrics("shuffleWallTime").add(System.nanoTime() - startTime) dep.metrics("numInputRows").add(cb.numRows) dep.metrics("inputBatches").add(1) // This metric is important, AQE use it to decide if EliminateLimit @@ -77,8 +79,10 @@ class CHCelebornColumnarShuffleWriter[K, V]( return } + val startTime = System.nanoTime() splitResult = jniWrapper.stop(nativeShuffleWriter) + dep.metrics("shuffleWallTime").add(System.nanoTime() - startTime) dep.metrics("splitTime").add(splitResult.getSplitTime) dep.metrics("IOTime").add(splitResult.getDiskWriteTime) dep.metrics("serializeTime").add(splitResult.getSerializationTime)