Skip to content

Commit

Permalink
[CELEBORN-1266] Improve log of current failed workers for WorkerStatu…
Browse files Browse the repository at this point in the history
…sTracker

### What changes were proposed in this pull request?

Improve log of current failed workers for `WorkerStatusTracker#recordWorkerFailure` and `WorkerStatusTracker#handleHeartbeatResponse`.

### Why are the changes needed?

It's recommended to improve the log of current failed workers in `recordWorkerFailure` and `handleHeartbeatResponse` of `WorkerStatusTracker`. Meanwhile the log level of current failed workers could be warn.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

No.

Closes apache#2290 from SteNicholas/CELEBORN-1266.

Authored-by: SteNicholas <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
  • Loading branch information
SteNicholas committed Feb 18, 2024
1 parent 67d8236 commit 3f5b1de
Showing 1 changed file with 40 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,23 +120,15 @@ class WorkerStatusTracker(

def recordWorkerFailure(failures: ShuffleFailedWorkers): Unit = {
if (!failures.isEmpty) {
val failedWorker = new ShuffleFailedWorkers(failures)
val failedWorkerMsg = failedWorker.asScala.map { case (worker, (status, time)) =>
val failedWorkers = new ShuffleFailedWorkers(failures)
val failedWorkersMsg = failedWorkers.asScala.map { case (worker, (status, time)) =>
s"${worker.readableAddress()} ${status.name()} ${Utils.formatTimestamp(time)}"
}.mkString("\n")
val excludedWorkerMsg = excludedWorkers.asScala.map { case (worker, (status, time)) =>
s"${worker.readableAddress()} ${status.name()} ${Utils.formatTimestamp(time)}"
}.mkString("\n")
val shuttingDownMsg = shuttingWorkers.asScala.map(_.readableAddress()).mkString("\n")
logInfo(
logWarning(
s"""
|Reporting failed worker:
|$failedWorkerMsg
|Current excluded worker:
|$excludedWorkerMsg
|Current shutting down worker:
|$shuttingDownMsg""".stripMargin)
failedWorker.asScala.foreach {
|Reporting failed workers:
|$failedWorkersMsg$currentFailedWorkers""".stripMargin)
failedWorkers.asScala.foreach {
case (worker, (StatusCode.WORKER_SHUTDOWN, _)) =>
shuttingWorkers.add(worker)
case (worker, (statusCode, registerTime)) if !excludedWorkers.containsKey(worker) =>
Expand Down Expand Up @@ -213,16 +205,41 @@ class WorkerStatusTracker(
}
}
if (statusChanged) {
logWarning("Worker status changed from application heartbeat response")
logInfo(
s"""
|Current excluded workers:
|${excludedWorkers.asScala.mkString("\n")}
|
|Current shutting down workers:
|${shuttingWorkers.asScala.map(_.readableAddress()).mkString("\n")}
|""".stripMargin)
logWarning(
s"Worker status changed from application heartbeat response.$currentFailedWorkers")
}
}
}

private def currentFailedWorkers: String = {
val excludedWorkersMsg =
excludedWorkers.asScala.groupBy(_._2._1).map { case (status, workers) =>
(
status,
workers.map { case (worker, (_, time)) =>
s"${worker.readableAddress()} ${Utils.formatTimestamp(time)}"
}.mkString("\n"))
}
val shutdownWorkersMsg = shuttingWorkers.asScala.map(_.readableAddress()).mkString("\n")
var failedWorkersMsg = ""
if (excludedWorkersMsg.contains(StatusCode.WORKER_EXCLUDED)) {
failedWorkersMsg +=
s"""
|Current excluded workers:
|${excludedWorkersMsg(StatusCode.WORKER_EXCLUDED)}""".stripMargin
}
if (excludedWorkersMsg.contains(StatusCode.WORKER_UNKNOWN)) {
failedWorkersMsg +=
s"""
|Current unknown workers:
|${excludedWorkersMsg(StatusCode.WORKER_UNKNOWN)}""".stripMargin
}
if (shutdownWorkersMsg.nonEmpty) {
failedWorkersMsg +=
s"""
|Current shutdown workers:
|$shutdownWorkersMsg""".stripMargin
}
failedWorkersMsg
}
}

0 comments on commit 3f5b1de

Please sign in to comment.