Skip to content

Commit

Permalink
[YUNIKORN-2834] [shim] Add non-YuniKorn allocation tracking logic (#918)
Browse files Browse the repository at this point in the history
Closes: #918

Signed-off-by: Peter Bacsko <[email protected]>
  • Loading branch information
pbacsko committed Oct 16, 2024
1 parent e9b05eb commit 753e4f8
Show file tree
Hide file tree
Showing 8 changed files with 370 additions and 211 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ go 1.22.0
toolchain go1.22.5

require (
github.com/apache/yunikorn-core v0.0.0-20241002095736-a2d3d43a145d
github.com/apache/yunikorn-core v0.0.0-20241003152125-4ea225160acf
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240924203603-aaf51c93d3a0
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cq
github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI=
github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g=
github.com/apache/yunikorn-core v0.0.0-20241002095736-a2d3d43a145d h1:awo2goBrw25P1aFNZgYJ0q7V+5ycMqMhvI60B75OzQg=
github.com/apache/yunikorn-core v0.0.0-20241002095736-a2d3d43a145d/go.mod h1:q6OXYpCTGvMJxsEorpIF6icKM/IioMmU6KcsclV1kI0=
github.com/apache/yunikorn-core v0.0.0-20241003152125-4ea225160acf h1:wKySiY4IA9Us287QRnIxFnuTHXaMSeQ3BhAwSrSW/sQ=
github.com/apache/yunikorn-core v0.0.0-20241003152125-4ea225160acf/go.mod h1:q6OXYpCTGvMJxsEorpIF6icKM/IioMmU6KcsclV1kI0=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240924203603-aaf51c93d3a0 h1:/9j0YXuifvoOl4YVEbO0r+DPkkYLzaQ+/ac+xCc7SY8=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240924203603-aaf51c93d3a0/go.mod h1:co3uU98sj1CUTPNTM13lTyi+CY0DOgDndDW2KiUjktU=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
Expand Down
66 changes: 25 additions & 41 deletions pkg/cache/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,23 +365,26 @@ func (ctx *Context) updateForeignPod(pod *v1.Pod) {
podStatusBefore = string(oldPod.Status.Phase)
}

// conditions for allocate:
// 1. pod was previously assigned
// 2. pod is now assigned
// 3. pod is not in terminated state
// 4. pod references a known node
if oldPod == nil && utils.IsAssignedPod(pod) && !utils.IsPodTerminated(pod) {
// conditions for allocate/update:
// 1. pod is now assigned
// 2. pod is not in terminated state
// 3. pod references a known node
if utils.IsAssignedPod(pod) && !utils.IsPodTerminated(pod) {
if ctx.schedulerCache.UpdatePod(pod) {
// pod was accepted by a real node
log.Log(log.ShimContext).Debug("pod is assigned to a node, trigger occupied resource update",
zap.String("namespace", pod.Namespace),
zap.String("podName", pod.Name),
zap.String("podStatusBefore", podStatusBefore),
zap.String("podStatusCurrent", string(pod.Status.Phase)))
ctx.updateNodeOccupiedResources(pod.Spec.NodeName, pod.Namespace, pod.Name, common.GetPodResource(pod), schedulercache.AddOccupiedResource)
allocReq := common.CreateAllocationForForeignPod(pod)
if err := ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(allocReq); err != nil {
log.Log(log.ShimContext).Error("failed to add foreign allocation to the core",
zap.Error(err))
}
} else {
// pod is orphaned (references an unknown node)
log.Log(log.ShimContext).Info("skipping occupied resource update for assigned orphaned pod",
log.Log(log.ShimContext).Info("skipping updating allocation for assigned orphaned pod",
zap.String("namespace", pod.Namespace),
zap.String("podName", pod.Name),
zap.String("nodeName", pod.Spec.NodeName))
Expand All @@ -401,9 +404,13 @@ func (ctx *Context) updateForeignPod(pod *v1.Pod) {
zap.String("podStatusBefore", podStatusBefore),
zap.String("podStatusCurrent", string(pod.Status.Phase)))
// this means pod is terminated
// we need sub the occupied resource and re-sync with the scheduler-core
ctx.updateNodeOccupiedResources(pod.Spec.NodeName, pod.Namespace, pod.Name, common.GetPodResource(pod), schedulercache.SubOccupiedResource)
// remove from the scheduler cache and create release request to remove foreign allocation from the core
ctx.schedulerCache.RemovePod(pod)
releaseReq := common.CreateReleaseRequestForForeignPod(string(pod.UID), constants.DefaultPartition)
if err := ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(releaseReq); err != nil {
log.Log(log.ShimContext).Error("failed to remove foreign allocation from the core",
zap.Error(err))
}
} else {
// pod is orphaned (references an unknown node)
log.Log(log.ShimContext).Info("skipping occupied resource update for terminated orphaned pod",
Expand Down Expand Up @@ -453,40 +460,17 @@ func (ctx *Context) deleteYuniKornPod(pod *v1.Pod) {
func (ctx *Context) deleteForeignPod(pod *v1.Pod) {
ctx.lock.Lock()
defer ctx.lock.Unlock()
oldPod := ctx.schedulerCache.GetPod(string(pod.UID))
if oldPod == nil {
// if pod is not in scheduler cache, no node updates are needed
log.Log(log.ShimContext).Debug("unknown foreign pod deleted, no resource updated needed",
zap.String("namespace", pod.Namespace),
zap.String("podName", pod.Name))
return
releaseReq := common.CreateReleaseRequestForForeignPod(string(pod.UID), constants.DefaultPartition)
if err := ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(releaseReq); err != nil {
log.Log(log.ShimContext).Error("failed to remove foreign allocation from the core",
zap.Error(err))
}

// conditions for release:
// 1. pod is already assigned to a node
// 2. pod was not in a terminal state before
// 3. pod references a known node
if !utils.IsPodTerminated(oldPod) {
if !ctx.schedulerCache.IsPodOrphaned(string(oldPod.UID)) {
log.Log(log.ShimContext).Debug("foreign pod deleted, triggering occupied resource update",
zap.String("namespace", pod.Namespace),
zap.String("podName", pod.Name),
zap.String("podStatusBefore", string(oldPod.Status.Phase)),
zap.String("podStatusCurrent", string(pod.Status.Phase)))
// this means pod is terminated
// we need sub the occupied resource and re-sync with the scheduler-core
ctx.updateNodeOccupiedResources(pod.Spec.NodeName, pod.Namespace, pod.Name, common.GetPodResource(pod), schedulercache.SubOccupiedResource)
} else {
// pod is orphaned (references an unknown node)
log.Log(log.ShimContext).Info("skipping occupied resource update for removed orphaned pod",
zap.String("namespace", pod.Namespace),
zap.String("podName", pod.Name),
zap.String("nodeName", pod.Spec.NodeName))
}
ctx.schedulerCache.RemovePod(pod)
}
log.Log(log.ShimContext).Debug("removing pod from cache", zap.String("podName", pod.Name))
ctx.schedulerCache.RemovePod(pod)
}

//nolint:unused
func (ctx *Context) updateNodeOccupiedResources(nodeName string, namespace string, podName string, resource *si.Resource, opt schedulercache.UpdateType) {
if common.IsZero(resource) {
return
Expand Down Expand Up @@ -1613,7 +1597,7 @@ func (ctx *Context) decommissionNode(node *v1.Node) error {
}

func (ctx *Context) updateNodeResources(node *v1.Node, capacity *si.Resource, occupied *si.Resource) error {
request := common.CreateUpdateRequestForUpdatedNode(node.Name, capacity, occupied)
request := common.CreateUpdateRequestForUpdatedNode(node.Name, capacity, nil)
return ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateNode(request)
}

Expand Down
Loading

0 comments on commit 753e4f8

Please sign in to comment.