Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support SharedLister for NodeInfo #115

Merged
merged 3 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ examples/advanced/main.wasm: examples/advanced/main.go
@(cd $(@D); tinygo build -o main.wasm -scheduler=none --no-debug -target=wasi .)

.PHONY: build-tinygo
build-tinygo: examples/nodenumber/main.wasm examples/advanced/main.wasm guest/testdata/cyclestate/main.wasm guest/testdata/filter/main.wasm guest/testdata/score/main.wasm \
build-tinygo: examples/nodenumber/main.wasm examples/advanced/main.wasm examples/imagelocality/main.wasm guest/testdata/cyclestate/main.wasm guest/testdata/filter/main.wasm guest/testdata/score/main.wasm \
guest/testdata/bind/main.wasm guest/testdata/reserve/main.wasm guest/testdata/handle/main.wasm guest/testdata/permit/main.wasm \
internal/e2e/scheduler_perf/wasm/nodenumber/main.wasm

Expand Down
Binary file modified examples/advanced/main.wasm
Binary file not shown.
2 changes: 1 addition & 1 deletion examples/advanced/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (pl *NodeNumber) EventsToRegister() []api.ClusterEvent {
}

// PreScore implements api.PreScorePlugin
func (pl *NodeNumber) PreScore(state api.CycleState, pod proto.Pod, _ proto.NodeList) *api.Status {
func (pl *NodeNumber) PreScore(state api.CycleState, pod proto.Pod, _ api.NodeInfoList) *api.Status {
pl.klog.InfoS("execute PreScore on NodeNumber plugin", "pod", klog.KObj(pod))

podnum, ok := lastNumber(pod.Spec().GetNodeName())
Expand Down
3 changes: 3 additions & 0 deletions examples/imagelocality/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Wasm ImageLocality

This is [ImageLocality plugin](https://github.com/kubernetes/kubernetes/tree/master/pkg/scheduler/framework/plugins/imagelocality) implemented with the wasm extension.
18 changes: 18 additions & 0 deletions examples/imagelocality/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
module sigs.k8s.io/kube-scheduler-wasm-extension/examples/imagelocality

go 1.20

require (
github.com/wasilibs/nottinygc v0.7.1
sigs.k8s.io/kube-scheduler-wasm-extension/guest v0.0.0-00010101000000-000000000000
)

require (
github.com/magefile/mage v1.14.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
sigs.k8s.io/kube-scheduler-wasm-extension/kubernetes/proto v0.0.0-00010101000000-000000000000 // indirect
)

replace sigs.k8s.io/kube-scheduler-wasm-extension/guest => ./../../guest

replace sigs.k8s.io/kube-scheduler-wasm-extension/kubernetes/proto => ./../../kubernetes/proto
11 changes: 11 additions & 0 deletions examples/imagelocality/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/magefile/mage v1.14.0 h1:6QDX3g6z1YvJ4olPhT1wksUcSa/V0a1B+pJb73fBjyo=
github.com/magefile/mage v1.14.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A=
github.com/wasilibs/nottinygc v0.7.1 h1:rKu19+SFniRNuSo5NX7/wxpSpXmMUmkcyt/YiWLJg8w=
github.com/wasilibs/nottinygc v0.7.1/go.mod h1:oDcIotskuYNMpqMF23l7Z8uzD4TC0WXHK8jetlB3HIo=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
99 changes: 99 additions & 0 deletions examples/imagelocality/image_locality.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package main

import (
"fmt"
"strings"

guestapi "sigs.k8s.io/kube-scheduler-wasm-extension/guest/api"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api/proto"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/handle/sharedlister/api"
)

// The two thresholds are used as bounds for the image score range. They correspond to a reasonable size range for
// container images compressed and stored in registries; 90%ile of images on dockerhub drops into this range.
sanposhiho marked this conversation as resolved.
Show resolved Hide resolved
const (
mb int64 = 1024 * 1024
minThreshold int64 = 23 * mb
maxContainerThreshold int64 = 1000 * mb
)

// imageLocality is a score plugin that favors nodes that already have requested pod container's images.
type imageLocality struct {
sharedLister api.SharedLister
}

// Score invoked at the score extension point.
func (pl *imageLocality) Score(state guestapi.CycleState, pod proto.Pod, nodeName string) (int32, *guestapi.Status) {
nodeInfo := pl.sharedLister.NodeInfos().Get(nodeName)
if nodeInfo == nil {
return 0, &guestapi.Status{Code: guestapi.StatusCodeError, Reason: fmt.Sprintf("failed to get node %q", nodeName)}
}

nodeInfos := pl.sharedLister.NodeInfos().List()
if nodeInfos == nil {
return 0, &guestapi.Status{Code: guestapi.StatusCodeError, Reason: "failed to list nodes"}
}
totalNumNodes := len(nodeInfos)

imageScores := sumImageScores(nodeInfo, pod, totalNumNodes)
score := calculatePriority(imageScores, len(pod.Spec().InitContainers)+len(pod.Spec().Containers))

return int32(score), nil
}

const (
// maxNodeScore is the maximum score a Score plugin is expected to return.
maxNodeScore int64 = 100
sanposhiho marked this conversation as resolved.
Show resolved Hide resolved
)

// calculatePriority returns the priority of a node. Given the sumScores of requested images on the node, the node's
// priority is obtained by scaling the maximum priority value with a ratio proportional to the sumScores.
func calculatePriority(sumScores int64, numContainers int) int64 {
maxThreshold := maxContainerThreshold * int64(numContainers)
if sumScores < minThreshold {
sumScores = minThreshold
} else if sumScores > maxThreshold {
sumScores = maxThreshold
}

return maxNodeScore * (sumScores - minThreshold) / (maxThreshold - minThreshold)
}

// sumImageScores returns the sum of image scores of all the containers that are already on the node.
// Each image receives a raw score of its size, scaled by scaledImageScore. The raw scores are later used to calculate
// the final score.
func sumImageScores(nodeInfo guestapi.NodeInfo, pod proto.Pod, totalNumNodes int) int64 {
var sum int64
for _, container := range pod.Spec().InitContainers {
if state, ok := nodeInfo.ImageStates()[normalizedImageName(*container.Image)]; ok {
sum += scaledImageScore(state, totalNumNodes)
}
}
for _, container := range pod.Spec().Containers {
if state, ok := nodeInfo.ImageStates()[normalizedImageName(*container.Image)]; ok {
sum += scaledImageScore(state, totalNumNodes)
}
}
return sum
}

// scaledImageScore returns an adaptively scaled score for the given state of an image.
// The size of the image is used as the base score, scaled by a factor which considers how much nodes the image has "spread" to.
// This heuristic aims to mitigate the undesirable "node heating problem", i.e., pods get assigned to the same or
// a few nodes due to image locality.
func scaledImageScore(imageState *guestapi.ImageStateSummary, totalNumNodes int) int64 {
spread := float64(imageState.NumNodes) / float64(totalNumNodes)
return int64(float64(imageState.Size) * spread)
}

// normalizedImageName returns the CRI compliant name for a given image.
// TODO: cover the corner cases of missed matches, e.g,
// 1. Using Docker as runtime and docker.io/library/test:tag in pod spec, but only test:tag will present in node status
// 2. Using the implicit registry, i.e., test:tag or library/test:tag in pod spec but only docker.io/library/test:tag
// in node status; note that if users consistently use one registry format, this should not happen.
func normalizedImageName(name string) string {
if strings.LastIndex(name, ":") <= strings.LastIndex(name, "/") {
name = name + ":latest"
}
return name
}
39 changes: 39 additions & 0 deletions examples/imagelocality/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
Copyright 2023 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/handle/sharedlister"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/score"
)

// main is compiled to an exported Wasm function named "_start", called by the
// Wasm scheduler plugin during initialization.
func main() {
// The plugin package uses only normal Go code, which allows it to be
// unit testable via `tinygo test -target=wasi` as well normal `go test`.
//
// The real implementations, such as `config.Get()` use Wasm host functions
// (go:wasmimport), which cannot be tested with `tinygo test -target=wasi`.
plugin := &imageLocality{
sharedLister: sharedlister.Get(),
}
// Instead of using `plugin.Set`, this configures only the interfaces
// implemented by the plugin. The Wasm host only calls functions imported,
// so this prevents additional overhead.
score.SetPlugin(plugin)
}
Binary file added examples/imagelocality/main.wasm
Binary file not shown.
2 changes: 1 addition & 1 deletion examples/nodenumber/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (pl *NodeNumber) EventsToRegister() []api.ClusterEvent {
}

// PreScore implements api.PreScorePlugin
func (pl *NodeNumber) PreScore(state api.CycleState, pod proto.Pod, _ proto.NodeList) *api.Status {
func (pl *NodeNumber) PreScore(state api.CycleState, pod proto.Pod, _ api.NodeInfoList) *api.Status {
recorder := eventrecorder.Get()

klog.InfoS("execute PreScore on NodeNumber plugin", "pod", klog.KObj(pod))
Expand Down
Binary file modified examples/nodenumber/main.wasm
Binary file not shown.
9 changes: 9 additions & 0 deletions guest/api/framework.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package api

// ImageStateSummary provides summarized information about the state of an image.
type ImageStateSummary struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor comment:
Nodes field is not necessary for now?
How about leaving the comment why this field is not implemented?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nodes field is not necessary for now?

Yes, we don't use it (actually nothing uses it in upstream too).

How about leaving the comment why this field is not implemented?

I don't want to leave comments on everything unsupported. (We have too many unsupported things.

// Size of the image
Size int64
// Used to track how many nodes have this image
NumNodes int
sanposhiho marked this conversation as resolved.
Show resolved Hide resolved
}
19 changes: 18 additions & 1 deletion guest/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ type EnqueueExtensions interface {
type PreScorePlugin interface {
Plugin

PreScore(state CycleState, pod proto.Pod, nodeList proto.NodeList) *Status
PreScore(state CycleState, pod proto.Pod, nodeList NodeInfoList) *Status
}

// ScorePlugin is a WebAssembly implementation of framework.ScorePlugin.
Expand Down Expand Up @@ -161,11 +161,28 @@ type PostBindPlugin interface {
PostBind(state CycleState, pod proto.Pod, nodeName string)
}

type NodeInfoList interface {
// List lists all NodeInfo in this list.
List() []NodeInfo
// Get returns the NodeInfo with the given name.
// It returns nil if this list doesn't have the NodeInfo with the given name.
Get(name string) NodeInfo
}

type NodeInfo interface {
// Metadata is a convenience that triggers Get.
proto.Metadata

Node() proto.Node
ImageStates() map[string]*ImageStateSummary

// ... we'll support more fields of NodeInfo of the scheduling framework.
}

type PodInfoList interface {
// Get returns the PodInfo with the given name.
// It returns nil if this list doesn't have the PodInfo with the given name.
Get(name string) PodInfo
}

type PodInfo interface {
Expand Down
2 changes: 1 addition & 1 deletion guest/bind/bind.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func _bind() uint32 { //nolint
return 0
}

nodeName := imports.NodeName()
nodeName := imports.CurrentNodeName()
// The parameters passed are lazy with regard to host functions. This means
// a no-op plugin should not have any unmarshal penalty.
s := bind.Bind(cyclestate.Values, cyclestate.Pod, nodeName)
Expand Down
107 changes: 6 additions & 101 deletions guest/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@ package filter

import (
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api/proto"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/handle/sharedlister"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/cyclestate"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/imports"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/plugin"
internalproto "sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/proto"
protoapi "sigs.k8s.io/kube-scheduler-wasm-extension/kubernetes/proto/api"
)

// filter is the current plugin assigned with SetPlugin.
Expand Down Expand Up @@ -67,105 +65,12 @@ func _filter() uint32 { //nolint
return 0
}

s := filter.Filter(cyclestate.Values, cyclestate.Pod, &NodeInfo{})

return imports.StatusToCode(s)
}

var _ api.NodeInfo = (*NodeInfo)(nil)

// nodeInfo is lazy so that a plugin which doesn't read fields avoids a
// relatively expensive unmarshal penalty.
//
// Note: Unlike proto.Pod, this is not special cased for the scheduling cycle.
type NodeInfo struct {
node proto.Node
}

func (n *NodeInfo) GetUid() string {
return n.lazyNode().GetUid()
}

func (n *NodeInfo) GetName() string {
return n.lazyNode().GetName()
}

func (n *NodeInfo) GetNamespace() string {
return n.lazyNode().GetNamespace()
}

func (n *NodeInfo) GetResourceVersion() string {
return n.lazyNode().GetResourceVersion()
}

func (n *NodeInfo) Node() proto.Node {
return n.lazyNode()
}

// lazyNode lazy initializes node from imports.Node.
func (n *NodeInfo) lazyNode() proto.Node {
if node := n.node; node != nil {
return node
}

var msg protoapi.Node
if err := imports.Node(msg.UnmarshalVT); err != nil {
panic(err.Error())
nodename := imports.CurrentNodeName()
if nodename == "" {
return imports.StatusToCode(&api.Status{Code: api.StatusCodeError, Reason: "could not get current node name"})
}
n.node = &internalproto.Node{Msg: &msg}
return n.node
}

type PodInfo struct {
pod proto.Pod
}

func (p *PodInfo) GetApiVersion() string {
return p.lazyPod().GetApiVersion()
}

func (p *PodInfo) GetKind() string {
return p.lazyPod().GetKind()
}

func (p *PodInfo) GetName() string {
return p.lazyPod().GetName()
}

func (p *PodInfo) GetNamespace() string {
return p.lazyPod().GetNamespace()
}

func (p *PodInfo) GetResourceVersion() string {
return p.lazyPod().GetNamespace()
}

func (p *PodInfo) GetUid() string {
return p.lazyPod().GetUid()
}

func (p *PodInfo) Pod() proto.Pod {
return p.lazyPod()
}
s := filter.Filter(cyclestate.Values, cyclestate.Pod, sharedlister.NodeInfos().Get(nodename))

func (p *PodInfo) Spec() *protoapi.PodSpec {
return p.lazyPod().Spec()
}

func (p *PodInfo) Status() *protoapi.PodStatus {
return p.lazyPod().Status()
}

// lazyPod lazy initializes pod from imports.Pod.
func (p *PodInfo) lazyPod() proto.Pod {
if pod := p.pod; pod != nil {
return pod
}

var msg protoapi.Pod
if err := imports.Pod(msg.UnmarshalVT); err != nil {
panic(err.Error())
}
p.pod = &internalproto.Pod{Msg: &msg}
return p.pod
return imports.StatusToCode(s)
}
2 changes: 0 additions & 2 deletions guest/handle/eventrecorder/eventrecorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ func EventfFn(msg internal.EventMessage) {
}

// Eventf is a convenience that calls the same method documented on api.Eventf.
//
// Note: See Info for unit test and benchmarking impact.
func Eventf(regarding proto.KObject, related proto.KObject, eventtype, reason, action, note string) {
eventRecorderInstance.Eventf(regarding, related, eventtype, reason, action, note)
}
Loading
Loading