Skip to content

Commit

Permalink
[YUNIKORN-2933] Don't add duplicated taskGroup to app
Browse files Browse the repository at this point in the history
  • Loading branch information
XbaoWu authored and wuxiaobao committed Oct 17, 2024
1 parent 753e4f8 commit c71a339
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 17 deletions.
27 changes: 21 additions & 6 deletions pkg/cache/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type Application struct {
groups []string
taskMap map[string]*Task
tags map[string]string
taskGroups []TaskGroup
taskGroups map[string]*TaskGroup
taskGroupsDefinition string
schedulingParamsDefinition string
placeholderOwnerReferences []metav1.OwnerReference
Expand All @@ -71,6 +71,7 @@ func (app *Application) String() string {

func NewApplication(appID, queueName, user string, groups []string, tags map[string]string, scheduler api.SchedulerAPI) *Application {
taskMap := make(map[string]*Task)
taskGroups := make(map[string]*TaskGroup)
app := &Application{
applicationID: appID,
queue: queueName,
Expand All @@ -80,7 +81,7 @@ func NewApplication(appID, queueName, user string, groups []string, tags map[str
taskMap: taskMap,
tags: tags,
sm: newAppState(),
taskGroups: make([]TaskGroup, 0),
taskGroups: taskGroups,
lock: &locking.RWMutex{},
schedulerAPI: scheduler,
placeholderTimeoutInSec: 0,
Expand Down Expand Up @@ -166,9 +167,16 @@ func (app *Application) GetSchedulingParamsDefinition() string {
func (app *Application) setTaskGroups(taskGroups []TaskGroup) {
app.lock.Lock()
defer app.lock.Unlock()
app.taskGroups = taskGroups
for _, taskGroup := range app.taskGroups {
app.placeholderAsk = common.Add(app.placeholderAsk, common.GetTGResource(taskGroup.MinResource, int64(taskGroup.MinMember)))
for _, tg := range taskGroups {
taskGroup := tg
if _, exists := app.taskGroups[taskGroup.Name]; exists {
log.Log(log.ShimCacheApplication).Warn("duplicate task-group within the task-groups",
zap.String("appID", app.applicationID),
zap.String("groupName", taskGroup.Name))

Check warning on line 175 in pkg/cache/application.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/application.go#L173-L175

Added lines #L173 - L175 were not covered by tests
} else {
app.taskGroups[taskGroup.Name] = &taskGroup
app.placeholderAsk = common.Add(app.placeholderAsk, common.GetTGResource(taskGroup.MinResource, int64(taskGroup.MinMember)))
}
}
}

Expand All @@ -181,7 +189,14 @@ func (app *Application) getPlaceholderAsk() *si.Resource {
func (app *Application) getTaskGroups() []TaskGroup {
app.lock.RLock()
defer app.lock.RUnlock()
return app.taskGroups

taskGroups := make([]TaskGroup, 0)
if len(app.taskGroups) > 0 {
for _, taskGroup := range app.taskGroups {
taskGroups = append(taskGroups, *taskGroup)
}
}
return taskGroups
}

func (app *Application) setPlaceholderOwnerReferences(ref []metav1.OwnerReference) {
Expand Down
32 changes: 21 additions & 11 deletions pkg/cache/placeholder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,10 @@ func TestNewPlaceholder(t *testing.T) {
assert.Equal(t, app.placeholderAsk.Resources[siCommon.Memory].Value, int64(10*1024*1000*1000))
assert.Equal(t, app.placeholderAsk.Resources["pods"].Value, int64(10))

holder := newPlaceholder("ph-name", app, app.taskGroups[0])
tgs := app.getTaskGroups()
holder := newPlaceholder("ph-name", app, tgs[0])
assert.Equal(t, holder.appID, appID)
assert.Equal(t, holder.taskGroupName, app.taskGroups[0].Name)
assert.Equal(t, holder.taskGroupName, tgs[0].Name)
assert.Equal(t, holder.pod.Spec.SchedulerName, constants.SchedulerName)
assert.Equal(t, holder.pod.Name, "ph-name")
assert.Equal(t, holder.pod.Namespace, namespace)
Expand All @@ -132,7 +133,7 @@ func TestNewPlaceholder(t *testing.T) {
"labelKey1": "labelKeyValue1",
})
assert.Equal(t, len(holder.pod.Annotations), 7, "unexpected number of annotations")
assert.Equal(t, holder.pod.Annotations[constants.AnnotationTaskGroupName], app.taskGroups[0].Name)
assert.Equal(t, holder.pod.Annotations[constants.AnnotationTaskGroupName], tgs[0].Name)
assert.Equal(t, holder.pod.Annotations[constants.AnnotationPlaceholderFlag], constants.True)
assert.Equal(t, holder.pod.Annotations["annotationKey0"], "annotationValue0")
assert.Equal(t, holder.pod.Annotations["annotationKey1"], "annotationValue1")
Expand Down Expand Up @@ -163,7 +164,8 @@ func TestNewPlaceholderWithNodeSelectors(t *testing.T) {
"bob", testGroups, map[string]string{constants.AppTagNamespace: namespace}, mockedSchedulerAPI)
app.setTaskGroups(taskGroups)

holder := newPlaceholder("ph-name", app, app.taskGroups[0])
tgs := app.getTaskGroups()
holder := newPlaceholder("ph-name", app, tgs[0])
assert.Equal(t, len(holder.pod.Spec.NodeSelector), 2)
assert.Equal(t, holder.pod.Spec.NodeSelector["nodeType"], "test")
assert.Equal(t, holder.pod.Spec.NodeSelector["nodeState"], "healthy")
Expand All @@ -178,7 +180,8 @@ func TestNewPlaceholderWithTolerations(t *testing.T) {
"bob", testGroups, map[string]string{constants.AppTagNamespace: namespace}, mockedSchedulerAPI)
app.setTaskGroups(taskGroups)

holder := newPlaceholder("ph-name", app, app.taskGroups[0])
tgs := app.getTaskGroups()
holder := newPlaceholder("ph-name", app, tgs[0])
assert.Equal(t, len(holder.pod.Spec.Tolerations), 1)
tlr := holder.pod.Spec.Tolerations[0]
assert.Equal(t, tlr.Key, "key1")
Expand All @@ -196,7 +199,8 @@ func TestNewPlaceholderWithAffinity(t *testing.T) {
"bob", testGroups, map[string]string{constants.AppTagNamespace: namespace}, mockedSchedulerAPI)
app.setTaskGroups(taskGroups)

holder := newPlaceholder("ph-name", app, app.taskGroups[0])
tgs := app.getTaskGroups()
holder := newPlaceholder("ph-name", app, tgs[0])
assert.Equal(t, len(holder.pod.Spec.Affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution), 1)
term := holder.pod.Spec.Affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution
assert.Equal(t, term[0].TopologyKey, "topologyKey")
Expand All @@ -215,14 +219,16 @@ func TestNewPlaceholderTaskGroupsDefinition(t *testing.T) {
app := NewApplication(appID, queue,
"bob", testGroups, map[string]string{constants.AppTagNamespace: namespace}, mockedSchedulerAPI)
app.setTaskGroups(taskGroups)
holder := newPlaceholder("ph-name", app, app.taskGroups[0])
tgs := app.getTaskGroups()
holder := newPlaceholder("ph-name", app, tgs[0])
assert.Equal(t, "", holder.pod.Annotations[constants.AnnotationTaskGroups])

app = NewApplication(appID, queue,
"bob", testGroups, map[string]string{constants.AppTagNamespace: namespace}, mockedSchedulerAPI)
app.setTaskGroups(taskGroups)
app.setTaskGroupsDefinition("taskGroupsDef")
holder = newPlaceholder("ph-name", app, app.taskGroups[0])
tgs = app.getTaskGroups()
holder = newPlaceholder("ph-name", app, tgs[0])
assert.Equal(t, "taskGroupsDef", holder.pod.Annotations[constants.AnnotationTaskGroups])
var priority *int32
assert.Equal(t, priority, holder.pod.Spec.Priority)
Expand All @@ -234,7 +240,9 @@ func TestNewPlaceholderExtendedResources(t *testing.T) {
app := NewApplication(appID, queue,
"bob", testGroups, map[string]string{constants.AppTagNamespace: namespace}, mockedSchedulerAPI)
app.setTaskGroups(taskGroups)
holder := newPlaceholder("ph-name", app, app.taskGroups[0])

tgs := app.getTaskGroups()
holder := newPlaceholder("ph-name", app, tgs[0])
assert.Equal(t, len(holder.pod.Spec.Containers[0].Resources.Requests), 5, "expected requests not found")
assert.Equal(t, len(holder.pod.Spec.Containers[0].Resources.Limits), 5, "expected limits not found")
assert.Equal(t, holder.pod.Spec.Containers[0].Resources.Limits[gpu], holder.pod.Spec.Containers[0].Resources.Requests[gpu], "gpu: expected same value for request and limit")
Expand Down Expand Up @@ -271,7 +279,8 @@ func TestNewPlaceholderWithPriorityClassName(t *testing.T) {
app.taskMap[taskID1] = task1
app.setOriginatingTask(task1)

holder := newPlaceholder("ph-name", app, app.taskGroups[0])
tgs := app.getTaskGroups()
holder := newPlaceholder("ph-name", app, tgs[0])
assert.Equal(t, len(holder.pod.Spec.Containers[0].Resources.Requests), 5, "expected requests not found")
assert.Equal(t, len(holder.pod.Spec.Containers[0].Resources.Limits), 5, "expected limits not found")
assert.Equal(t, holder.pod.Spec.Containers[0].Resources.Limits[gpu], holder.pod.Spec.Containers[0].Resources.Requests[gpu], "gpu: expected same value for request and limit")
Expand All @@ -287,7 +296,8 @@ func TestNewPlaceholderWithTopologySpreadConstraints(t *testing.T) {
"bob", testGroups, map[string]string{constants.AppTagNamespace: namespace}, mockedSchedulerAPI)
app.setTaskGroups(taskGroups)

holder := newPlaceholder("ph-name", app, app.taskGroups[0])
tgs := app.getTaskGroups()
holder := newPlaceholder("ph-name", app, tgs[0])
assert.Equal(t, len(holder.pod.Spec.TopologySpreadConstraints), 1)
assert.Equal(t, holder.pod.Spec.TopologySpreadConstraints[0].MaxSkew, int32(1))
assert.Equal(t, holder.pod.Spec.TopologySpreadConstraints[0].TopologyKey, v1.LabelTopologyZone)
Expand Down

0 comments on commit c71a339

Please sign in to comment.