Skip to content

Commit

Permalink
[GLUTEN-7775][CORE] Make sure the softaffinity hash executor list is …
Browse files Browse the repository at this point in the history
…in order

Before this pr, the order of the soft affinity hash executor list is random, will lead to cache data missing after restarting the spark applications. Now make sure the softaffinity hash executor list is in order, after restarting the spark application, the computed file preferred location are the same as the ones before restarting.

Close #7775.
  • Loading branch information
zzcclp committed Nov 1, 2024
1 parent 527ab16 commit 1091c43
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ abstract class AffinityManager extends LogLevelUtil with Logging {
GlutenConfig.GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_MAX_CACHE_ITEMS_DEFAULT_VALUE

// (execId, host) list
val fixedIdForExecutors = new mutable.ListBuffer[Option[(String, String)]]()
private val idForExecutors = new mutable.ListBuffer[(String, String)]()
var sortedIdForExecutors = new mutable.ListBuffer[(String, String)]()
// host list
val nodesExecutorsMap = new mutable.HashMap[String, mutable.HashSet[String]]()

Expand Down Expand Up @@ -96,27 +97,23 @@ abstract class AffinityManager extends LogLevelUtil with Logging {
try {
// first, check whether the execId exists
if (
!fixedIdForExecutors.exists(
!idForExecutors.exists(
exec => {
exec.isDefined && exec.get._1.equals(execHostId._1)
exec._1.equals(execHostId._1)
})
) {
val executorsSet =
nodesExecutorsMap.getOrElseUpdate(execHostId._2, new mutable.HashSet[String]())
executorsSet.add(execHostId._1)
if (fixedIdForExecutors.exists(_.isEmpty)) {
// replace the executor which was removed
val replaceIdx = fixedIdForExecutors.indexWhere(_.isEmpty)
fixedIdForExecutors(replaceIdx) = Option(execHostId)
} else {
fixedIdForExecutors += Option(execHostId)
}
idForExecutors += execHostId
sortedIdForExecutors = idForExecutors.sortBy(_._2)
totalRegisteredExecutors.addAndGet(1)
}
logOnLevel(
GlutenConfig.getConf.softAffinityLogLevel,
s"After adding executor ${execHostId._1} on host ${execHostId._2}, " +
s"fixedIdForExecutors is ${fixedIdForExecutors.mkString(",")}, " +
s"idForExecutors is ${idForExecutors.mkString(",")}, " +
s"sortedIdForExecutors is ${sortedIdForExecutors.mkString(",")}, " +
s"nodesExecutorsMap is ${nodesExecutorsMap.keySet.mkString(",")}, " +
s"actual executors count is ${totalRegisteredExecutors.intValue()}."
)
Expand All @@ -128,29 +125,27 @@ abstract class AffinityManager extends LogLevelUtil with Logging {
def handleExecutorRemoved(execId: String): Unit = {
resourceRWLock.writeLock().lock()
try {
val execIdx = fixedIdForExecutors.indexWhere(
val execIdx = idForExecutors.indexWhere(
execHost => {
if (execHost.isDefined) {
execHost.get._1.equals(execId)
} else {
false
}
execHost._1.equals(execId)
})
if (execIdx != -1) {
val findedExecId = fixedIdForExecutors(execIdx)
fixedIdForExecutors(execIdx) = None
val nodeExecs = nodesExecutorsMap(findedExecId.get._2)
nodeExecs -= findedExecId.get._1
val findedExecId = idForExecutors(execIdx)
idForExecutors.remove(execIdx)
val nodeExecs = nodesExecutorsMap(findedExecId._2)
nodeExecs -= findedExecId._1
if (nodeExecs.isEmpty) {
// there is no executor on this host, remove
nodesExecutorsMap.remove(findedExecId.get._2)
nodesExecutorsMap.remove(findedExecId._2)
}
sortedIdForExecutors = idForExecutors.sortBy(_._2)
totalRegisteredExecutors.addAndGet(-1)
}
logOnLevel(
GlutenConfig.getConf.softAffinityLogLevel,
s"After removing executor $execId, " +
s"fixedIdForExecutors is ${fixedIdForExecutors.mkString(",")}, " +
s"idForExecutors is ${idForExecutors.mkString(",")}, " +
s"sortedIdForExecutors is ${sortedIdForExecutors.mkString(",")}, " +
s"nodesExecutorsMap is ${nodesExecutorsMap.keySet.mkString(",")}, " +
s"actual executors count is ${totalRegisteredExecutors.intValue()}."
)
Expand Down Expand Up @@ -242,7 +237,7 @@ abstract class AffinityManager extends LogLevelUtil with Logging {
if (nodesExecutorsMap.size < 1) {
Array.empty
} else {
softAffinityAllocation.allocateExecs(file, fixedIdForExecutors)
softAffinityAllocation.allocateExecs(file, sortedIdForExecutors)
}
} finally {
resourceRWLock.readLock().unlock()
Expand All @@ -252,11 +247,11 @@ abstract class AffinityManager extends LogLevelUtil with Logging {
def askExecutors(f: FilePartition): Array[(String, String)] = {
resourceRWLock.readLock().lock()
try {
if (fixedIdForExecutors.size < 1) {
if (sortedIdForExecutors.size < 1) {
Array.empty
} else {
val result = getDuplicateReadingLocation(f)
result.filter(r => fixedIdForExecutors.exists(s => s.isDefined && s.get._1 == r._1)).toArray
result.filter(r => sortedIdForExecutors.exists(s => s._1 == r._1)).toArray
}
} finally {
resourceRWLock.readLock().unlock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,5 @@ trait SoftAffinityAllocationTrait {
)

/** allocate target executors for file */
def allocateExecs(
file: String,
candidates: ListBuffer[Option[(String, String)]]): Array[(String, String)]
def allocateExecs(file: String, candidates: ListBuffer[(String, String)]): Array[(String, String)]
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class SoftAffinityStrategy extends SoftAffinityAllocationTrait with Logging {
/** allocate target executors for file */
override def allocateExecs(
file: String,
candidates: ListBuffer[Option[(String, String)]]): Array[(String, String)] = {
candidates: ListBuffer[(String, String)]): Array[(String, String)] = {
if (candidates.size < 1) {
Array.empty
} else {
Expand All @@ -37,15 +37,10 @@ class SoftAffinityStrategy extends SoftAffinityAllocationTrait with Logging {
// TODO: try to use ConsistentHash
val mod = file.hashCode % candidatesSize
val c1 = if (mod < 0) (mod + candidatesSize) else mod
// check whether the executor with index c1 is down
if (candidates(c1).isDefined) {
resultSet.add(candidates(c1).get)
}
resultSet.add(candidates(c1))
for (i <- 1 until softAffinityReplicationNum) {
val c2 = (c1 + halfCandidatesSize + i) % candidatesSize
if (candidates(c2).isDefined) {
resultSet.add(candidates(c2).get)
}
resultSet.add(candidates(c2))
}
resultSet.toArray
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import org.apache.spark.sql.catalyst.expressions.PredicateHelper
import org.apache.spark.sql.execution.datasources.FilePartition
import org.apache.spark.sql.test.SharedSparkSession

import scala.collection.mutable.ListBuffer

class SoftAffinitySuite extends QueryTest with SharedSparkSession with PredicateHelper {

override protected def sparkConf: SparkConf = super.sparkConf
Expand Down Expand Up @@ -87,7 +89,7 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate
"fakePath1",
0,
200,
Array("host-4", "host-5")
Array("192.168.22.1", "host-5")
)
).toArray
)
Expand All @@ -99,9 +101,9 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate
val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations = locations)

val affinityResultSet = if (scalaVersion.startsWith("2.12")) {
Set("host-1", "host-4", "host-5")
Set("host-1", "host-5", "192.168.22.1")
} else if (scalaVersion.startsWith("2.13")) {
Set("host-5", "host-4", "host-2")
Set("192.168.22.1", "host-5", "host-2")
}

assertResult(affinityResultSet) {
Expand Down Expand Up @@ -136,30 +138,7 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate

val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations = locations)

assertResult(Set("executor_host-2_2", "executor_host-1_0")) {
nativePartition.preferredLocations().toSet
}
}

def generateNativePartition4(): Unit = {
val partition = FilePartition(
0,
Seq(
SparkShimLoader.getSparkShims.generatePartitionedFile(
InternalRow.empty,
"fakePath_0",
0,
100)
).toArray
)

val locations = SoftAffinity.getFilePartitionLocations(
partition.files.map(_.filePath.toString),
partition.preferredLocations())

val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations = locations)

assertResult(Set("executor_host-1_1")) {
assertResult(Set("executor_192.168.22.1_1", "executor_10.1.1.33_6")) {
nativePartition.preferredLocations().toSet
}
}
Expand Down Expand Up @@ -206,11 +185,11 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate
val addEvent0 = SparkListenerExecutorAdded(
System.currentTimeMillis(),
"0",
new ExecutorInfo("host-1", 3, null))
new ExecutorInfo("192.168.22.1", 3, null))
val addEvent1 = SparkListenerExecutorAdded(
System.currentTimeMillis(),
"1",
new ExecutorInfo("host-1", 3, null))
new ExecutorInfo("192.168.22.1", 3, null))
val addEvent2 = SparkListenerExecutorAdded(
System.currentTimeMillis(),
"2",
Expand All @@ -234,7 +213,7 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate
val addEvent6 = SparkListenerExecutorAdded(
System.currentTimeMillis(),
"6",
new ExecutorInfo("host-4", 3, null))
new ExecutorInfo("10.1.1.33", 3, null))

val removedEvent0 = SparkListenerExecutorRemoved(System.currentTimeMillis(), "0", "")
val removedEvent1 = SparkListenerExecutorRemoved(System.currentTimeMillis(), "1", "")
Expand All @@ -256,23 +235,35 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate
executorsListListener.onExecutorAdded(addEvent3_1)

assert(SoftAffinityManager.nodesExecutorsMap.size == 3)
assert(SoftAffinityManager.fixedIdForExecutors.size == 4)
assert(SoftAffinityManager.sortedIdForExecutors.size == 4)

executorsListListener.onExecutorRemoved(removedEvent3)
// test removing executor repeatedly
executorsListListener.onExecutorRemoved(removedEvent3_1)

assert(SoftAffinityManager.nodesExecutorsMap.size == 2)
assert(SoftAffinityManager.fixedIdForExecutors.size == 4)
assert(SoftAffinityManager.fixedIdForExecutors.exists(_.isEmpty))
assert(SoftAffinityManager.sortedIdForExecutors.size == 3)
assert(
SoftAffinityManager.sortedIdForExecutors.equals(
ListBuffer[(String, String)](("0", "192.168.22.1"), ("1", "192.168.22.1"), ("2", "host-2"))
))

executorsListListener.onExecutorAdded(addEvent4)
executorsListListener.onExecutorAdded(addEvent5)
executorsListListener.onExecutorAdded(addEvent6)

assert(SoftAffinityManager.nodesExecutorsMap.size == 4)
assert(SoftAffinityManager.fixedIdForExecutors.size == 6)
assert(!SoftAffinityManager.fixedIdForExecutors.exists(_.isEmpty))
assert(SoftAffinityManager.sortedIdForExecutors.size == 6)
assert(
SoftAffinityManager.sortedIdForExecutors.equals(
ListBuffer[(String, String)](
("6", "10.1.1.33"),
("0", "192.168.22.1"),
("1", "192.168.22.1"),
("2", "host-2"),
("5", "host-2"),
("4", "host-3"))
))

// all target hosts exist in computing hosts list, return the original hosts list
generateNativePartition1()
Expand All @@ -286,19 +277,21 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate
executorsListListener.onExecutorRemoved(removedEvent4)

assert(SoftAffinityManager.nodesExecutorsMap.size == 3)
assert(SoftAffinityManager.fixedIdForExecutors.size == 6)
assert(SoftAffinityManager.fixedIdForExecutors.exists(_.isEmpty))
assert(SoftAffinityManager.sortedIdForExecutors.size == 4)
assert(
SoftAffinityManager.sortedIdForExecutors.equals(
ListBuffer[(String, String)](
("6", "10.1.1.33"),
("0", "192.168.22.1"),
("1", "192.168.22.1"),
("5", "host-2"))
))

executorsListListener.onExecutorRemoved(removedEvent2)
executorsListListener.onExecutorRemoved(removedEvent4)

assert(SoftAffinityManager.nodesExecutorsMap.size == 3)
assert(SoftAffinityManager.fixedIdForExecutors.size == 6)
assert(SoftAffinityManager.fixedIdForExecutors.exists(_.isEmpty))

// there are only one target host existing in computing hosts list,
// but the hash executors were removed, so return the original hosts list
generateNativePartition4()
assert(SoftAffinityManager.sortedIdForExecutors.size == 4)

executorsListListener.onExecutorRemoved(removedEvent0)
executorsListListener.onExecutorRemoved(removedEvent1)
Expand All @@ -307,8 +300,7 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate
executorsListListener.onExecutorRemoved(removedEvent7)

assert(SoftAffinityManager.nodesExecutorsMap.isEmpty)
assert(SoftAffinityManager.fixedIdForExecutors.size == 6)
assert(SoftAffinityManager.fixedIdForExecutors.exists(_.isEmpty))
assert(SoftAffinityManager.sortedIdForExecutors.isEmpty)

// all executors were removed, return the original hosts list
generateNativePartition5()
Expand Down

0 comments on commit 1091c43

Please sign in to comment.