Skip to content

Commit

Permalink
Merge pull request kubernetes-sigs#64 from codefromthecrypt/config
Browse files Browse the repository at this point in the history
Add guest config and update NodeNumber plugin to use it
  • Loading branch information
k8s-ci-robot authored Jul 26, 2023
2 parents 2087d20 + 51f0d53 commit 09b43c4
Show file tree
Hide file tree
Showing 23 changed files with 285 additions and 18 deletions.
6 changes: 5 additions & 1 deletion examples/nodenumber/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,18 @@ import (

"sigs.k8s.io/kube-scheduler-wasm-extension/examples/nodenumber/plugin"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/enqueue"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/host"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/prescore"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/score"
)

// main is compiled to a WebAssembly function named "_start", called by the
// wasm scheduler plugin during initialization.
func main() {
plugin := &plugin.NodeNumber{}
plugin, err := plugin.New(host.Get())
if err != nil {
panic(err)
}
// Below is like `var _ api.EnqueueExtensions = plugin`, except it also
// wires up functions the host should provide (go:wasmimport).
enqueue.SetPlugin(plugin)
Expand Down
Binary file modified examples/nodenumber/main.wasm
Binary file not shown.
19 changes: 18 additions & 1 deletion examples/nodenumber/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@
// - Doesn't return an error if state has the wrong type, as it is
// impossible: this panics instead with the default message.
// - TODO: logging
// - TODO: config
//
// See https://github.com/kubernetes-sigs/kube-scheduler-simulator/blob/simulator/v0.1.0/simulator/docs/sample/nodenumber/plugin.go
//
// Note: This is intentionally separate from the main package, for testing.
package plugin

import (
"encoding/json"
"fmt"

"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api/proto"
)
Expand All @@ -50,6 +52,21 @@ type NodeNumber struct {
reverse bool
}

// New creates a new NodeNumber plugin for the given host or returns an error.
func New(host api.Host) (*NodeNumber, error) {
var args nodeNumberArgs
if config := host.GetConfig(); config != nil {
if err := json.Unmarshal(config, &args); err != nil {
return nil, fmt.Errorf("decode arg into NodeNumberArgs: %w", err)
}
}
return &NodeNumber{reverse: args.Reverse}, nil
}

type nodeNumberArgs struct {
Reverse bool `json:"reverse"`
}

const (
// Name is the name of the plugin used in the plugin registry and configurations.
Name = "NodeNumber"
Expand Down
12 changes: 12 additions & 0 deletions guest/api/host.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package api

// Host is the WebAssembly host side of the scheduler. Specifically, this
// scheduler framework.Plugin written in Go, which runs the Plugin this SDK
// compiles to Wasm.
type Host interface {
// GetConfig reads any configuration set by the host.
//
// Note: This is not updated dynamically.
GetConfig() []byte
// ^-- Note: This is a []byte, not a string, for json.Unmarshaler.
}
57 changes: 57 additions & 0 deletions guest/host/host.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package host imports an api.Host from the WebAssembly host.
package host

import (
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/mem"
)

// Get can be called at any time, to use features exported by the host.
//
// For example:
//
// func main() {
// config := host.Get().GetConfig()
// // decode yaml
// }
func Get() api.Host {
return currentHost
}

var currentHost api.Host = &host{}

type host struct {
config []byte
}

func (n *host) GetConfig() []byte {
return n.lazyConfig()
}

func (n *host) lazyConfig() []byte {
if config := n.config; config != nil {
return config
}

// Wrap to avoid TinyGo 0.28: cannot use an exported function as value
n.config = mem.GetBytes(func(ptr uint32, limit mem.BufLimit) (len uint32) {
return getConfig(ptr, limit)
})
return n.config
}
24 changes: 24 additions & 0 deletions guest/host/imports.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
//go:build tinygo.wasm

/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package host

import "sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/mem"

//go:wasmimport k8s.io/scheduler get_config
func getConfig(ptr uint32, limit mem.BufLimit) (len uint32)
24 changes: 24 additions & 0 deletions guest/host/imports_stub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
//go:build !tinygo.wasm

/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package host

import "sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/mem"

// getConfig is stubbed for compilation outside TinyGo.
func getConfig(ptr uint32, limit mem.BufLimit) (len uint32) { return }
21 changes: 21 additions & 0 deletions guest/internal/mem/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,27 @@ func Update(
return updater(readBuf)
}

func GetBytes(fn func(ptr uint32, limit BufLimit) (len uint32)) []byte {
size := fn(uint32(readBufPtr), readBufLimit)
if size == 0 {
return nil
}

buf := make([]byte, size)

// If the function result fit in our read buffer, copy it out.
if size <= readBufLimit {
copy(buf, readBuf[:size])
return buf
}

// If the size returned from the function was larger than our read buffer,
// we need to execute it again.
ptr := unsafe.Pointer(&buf[0])
_ = fn(uint32(uintptr(ptr)), size)
return buf
}

func GetString(fn func(ptr uint32, limit BufLimit) (len uint32)) string {
size := fn(uint32(readBufPtr), readBufLimit)
if size == 0 {
Expand Down
25 changes: 20 additions & 5 deletions internal/e2e/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package scheduler_test

import (
"context"
"fmt"
"io"
"testing"

Expand Down Expand Up @@ -52,7 +53,7 @@ func TestCycleStateCoherence(t *testing.T) {

func TestExample_NodeNumber(t *testing.T) {
ctx := context.Background()
plugin := newPlugin(ctx, t)
plugin := newNodeNumberPlugin(ctx, t, false)
defer plugin.(io.Closer).Close()

pod := &v1.Pod{Spec: v1.PodSpec{NodeName: "happy8"}}
Expand All @@ -74,18 +75,29 @@ func TestExample_NodeNumber(t *testing.T) {
t.Fatalf("unexpected score: want %v, have %v", want, have)
}
})

t.Run("Reverse means score zero on match", func(t *testing.T) {
// This proves we can read configuration.
reversed := newNodeNumberPlugin(ctx, t, true)
defer reversed.(io.Closer).Close()

score := e2e.RunAll(ctx, t, reversed, pod, nodeInfoWithName("glad8"))
if want, have := int64(0), score; want != have {
t.Fatalf("unexpected score: want %v, have %v", want, have)
}
})
}

func BenchmarkExample_NodeNumber(b *testing.B) {
ctx := context.Background()
b.Run("New", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
newPlugin(ctx, b).(io.Closer).Close()
newNodeNumberPlugin(ctx, b, false).(io.Closer).Close()
}
})

plugin := newPlugin(ctx, b)
plugin := newNodeNumberPlugin(ctx, b, false)
defer plugin.(io.Closer).Close()

pod := *test.PodReal // copy
Expand All @@ -102,8 +114,11 @@ func BenchmarkExample_NodeNumber(b *testing.B) {
})
}

func newPlugin(ctx context.Context, t e2e.Testing) framework.Plugin {
plugin, err := wasm.NewFromConfig(ctx, wasm.PluginConfig{GuestPath: test.PathExampleNodeNumber})
func newNodeNumberPlugin(ctx context.Context, t e2e.Testing, reverse bool) framework.Plugin {
plugin, err := wasm.NewFromConfig(ctx, wasm.PluginConfig{
GuestPath: test.PathExampleNodeNumber,
GuestConfig: fmt.Sprintf(`{"reverse": %v}`, reverse),
})
if err != nil {
t.Fatalf("failed to create plugin: %v", err)
}
Expand Down
3 changes: 3 additions & 0 deletions scheduler/plugin/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ type PluginConfig struct {
// GuestPath is the path to the guest wasm.
GuestPath string `json:"guestPath"`

// GuestConfig is any configuration to give to the guest.
GuestConfig string `json:"guestConfig"`

// Args are the os.Args the guest will receive, exposed for tests.
Args []string
}
20 changes: 19 additions & 1 deletion scheduler/plugin/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
k8sApiNodeName = "nodeName"
k8sApiPod = "pod"
k8sScheduler = "k8s.io/scheduler"
k8sSchedulerGetConfig = "get_config"
k8sSchedulerResultClusterEvents = "result.cluster_events"
k8sSchedulerResultNodeNames = "result.node_names"
k8sSchedulerResultStatusReason = "result.status_reason"
Expand All @@ -56,8 +57,12 @@ func instantiateHostApi(ctx context.Context, runtime wazero.Runtime) (wazeroapi.
Instantiate(ctx)
}

func instantiateHostScheduler(ctx context.Context, runtime wazero.Runtime) (wazeroapi.Module, error) {
func instantiateHostScheduler(ctx context.Context, runtime wazero.Runtime, guestConfig string) (wazeroapi.Module, error) {
host := &host{guestConfig: guestConfig}
return runtime.NewHostModuleBuilder(k8sScheduler).
NewFunctionBuilder().
WithGoModuleFunction(wazeroapi.GoModuleFunc(host.k8sSchedulerGetConfigFn), []wazeroapi.ValueType{i32, i32}, []wazeroapi.ValueType{i32}).
WithParameterNames("buf", "buf_limit").Export(k8sSchedulerGetConfig).
NewFunctionBuilder().
WithGoModuleFunction(wazeroapi.GoModuleFunc(k8sSchedulerResultClusterEventsFn), []wazeroapi.ValueType{i32, i32}, []wazeroapi.ValueType{}).
WithParameterNames("buf", "buf_len").Export(k8sSchedulerResultClusterEvents).
Expand Down Expand Up @@ -154,6 +159,19 @@ func k8sApiPodFn(ctx context.Context, mod wazeroapi.Module, stack []uint64) {
stack[0] = uint64(marshalIfUnderLimit(mod.Memory(), pod, buf, bufLimit))
}

type host struct {
guestConfig string
}

func (h host) k8sSchedulerGetConfigFn(_ context.Context, mod wazeroapi.Module, stack []uint64) {
buf := uint32(stack[0])
bufLimit := bufLimit(stack[1])

config := h.guestConfig

stack[0] = uint64(writeStringIfUnderLimit(mod.Memory(), config, buf, bufLimit))
}

// k8sSchedulerResultClusterEventsFn is a function used by the wasm guest to set the
// cluster events result from guestExportEnqueue.
func k8sSchedulerResultClusterEventsFn(ctx context.Context, mod wazeroapi.Module, stack []uint64) {
Expand Down
2 changes: 1 addition & 1 deletion scheduler/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func NewFromConfig(ctx context.Context, config PluginConfig) (framework.Plugin,
return nil, fmt.Errorf("wasm: error reading guest binary at %s: %w", config.GuestPath, err)
}

runtime, guestModule, err := prepareRuntime(ctx, guestBin)
runtime, guestModule, err := prepareRuntime(ctx, guestBin, config.GuestConfig)
if err != nil {
return nil, err
}
Expand Down
23 changes: 22 additions & 1 deletion scheduler/plugin/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ func TestPreFilter(t *testing.T) {
tests := []struct {
name string
guestPath string
guestConfig string
args []string
globals map[string]int32
pod *v1.Pod
Expand Down Expand Up @@ -391,6 +392,26 @@ wasm error: unreachable
wasm stack trace:
panic_on_prefilter.$1() i32`,
},
{
name: "panic no guestConfig",
guestPath: test.PathErrorPanicOnGetConfig,
pod: test.PodSmall,
expectedStatusCode: framework.Error,
expectedStatusMessage: `wasm: prefilter error: wasm error: unreachable
wasm stack trace:
panic_on_get_config.$2() i32`,
},
{ // This only tests that configuration gets assigned
name: "panic guestConfig",
guestPath: test.PathErrorPanicOnGetConfig,
guestConfig: "hello",
pod: test.PodSmall,
expectedStatusCode: framework.Error,
expectedStatusMessage: `wasm: prefilter error: hello
wasm error: unreachable
wasm stack trace:
panic_on_get_config.$2() i32`,
},
}

for _, tc := range tests {
Expand All @@ -400,7 +421,7 @@ wasm stack trace:
guestPath = test.PathTestFilter
}

p, err := wasm.NewFromConfig(ctx, wasm.PluginConfig{GuestPath: guestPath, Args: tc.args})
p, err := wasm.NewFromConfig(ctx, wasm.PluginConfig{GuestPath: guestPath, Args: tc.args, GuestConfig: tc.guestConfig})
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions scheduler/plugin/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

// prepareRuntime compiles the guest and instantiates any host modules it needs.
func prepareRuntime(ctx context.Context, guestBin []byte) (runtime wazero.Runtime, guest wazero.CompiledModule, err error) {
func prepareRuntime(ctx context.Context, guestBin []byte, guestConfig string) (runtime wazero.Runtime, guest wazero.CompiledModule, err error) {
// Create the runtime, which when closed releases any resources associated with it.
runtime = wazero.NewRuntimeWithConfig(ctx, wazero.NewRuntimeConfig().
// Here are settings required by the wasm profiler wzprof:
Expand Down Expand Up @@ -63,7 +63,7 @@ func prepareRuntime(ctx context.Context, guestBin []byte) (runtime wazero.Runtim
}
fallthrough // proceed to more imports
case imports&importK8sScheduler != 0:
if _, err = instantiateHostScheduler(ctx, runtime); err != nil {
if _, err = instantiateHostScheduler(ctx, runtime, guestConfig); err != nil {
err = fmt.Errorf("wasm: error instantiating scheduler host functions: %w", err)
return
}
Expand Down
Loading

0 comments on commit 09b43c4

Please sign in to comment.