Skip to content

Commit

Permalink
config: add support to file:// and http(s):// URIs
Browse files Browse the repository at this point in the history
Extends the current plugin config to use instead a URI. In the case
of `file://` the behavior is the same as it is currently.
In the case of `http(s)://` it will fetch the URI and try to
evaluate it as a wasm payload.

This PR is based on earlier work on `dapr/component-contrib`.
See: dapr/components-contrib#3005

Signed-off-by: Edoardo Vacchi <[email protected]>
  • Loading branch information
evacchi committed Jul 28, 2023
1 parent ce8959d commit 4da9ede
Show file tree
Hide file tree
Showing 9 changed files with 310 additions and 101 deletions.
2 changes: 1 addition & 1 deletion internal/e2e/profiler/profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func main() {
)

// Pass the profiling context to the plugin.
plugin, err := wasm.NewFromConfig(ctx, wasm.PluginConfig{GuestPath: guestPath})
plugin, err := wasm.NewFromConfig(ctx, wasm.PluginConfig{GuestURL: "file://" + guestPath})
if err != nil {
log.Panicln("failed to create plugin:", err)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/e2e/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
func TestCycleStateCoherence(t *testing.T) {
ctx := context.Background()

plugin, err := wasm.NewFromConfig(ctx, wasm.PluginConfig{GuestPath: test.PathTestCycleState})
plugin, err := wasm.NewFromConfig(ctx, wasm.PluginConfig{GuestURL: test.URLTestCycleState})
if err != nil {
t.Fatalf("failed to create plugin: %v", err)
}
Expand Down Expand Up @@ -116,7 +116,7 @@ func BenchmarkExample_NodeNumber(b *testing.B) {

func newNodeNumberPlugin(ctx context.Context, t e2e.Testing, reverse bool) framework.Plugin {
plugin, err := wasm.NewFromConfig(ctx, wasm.PluginConfig{
GuestPath: test.PathExampleNodeNumber,
GuestURL: test.URLExampleNodeNumber,
GuestConfig: fmt.Sprintf(`{"reverse": %v}`, reverse),
})
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions scheduler/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ replace (

require (
github.com/google/uuid v1.3.0
github.com/stretchr/testify v1.8.1
github.com/tetratelabs/wazero v1.3.1
k8s.io/api v0.27.3
k8s.io/apimachinery v0.27.3
Expand Down Expand Up @@ -90,6 +91,7 @@ require (
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/selinux v1.10.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
Expand Down
6 changes: 4 additions & 2 deletions scheduler/plugin/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package wasm

type PluginConfig struct {
// GuestPath is the path to the guest wasm.
GuestPath string `json:"guestPath"`
// GuestURL is the URL to the guest wasm.
// Valid schemes are file:// for a local file or http[s]:// for one
// retrieved via HTTP.
GuestURL string `json:"guestURL"`

// GuestConfig is any configuration to give to the guest.
GuestConfig string `json:"guestConfig"`
Expand Down
65 changes: 65 additions & 0 deletions scheduler/plugin/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package wasm

import (
"context"
"fmt"
"io"
"net/http"
"net/url"
)

// httpClient decorates an http.Client with convenience methods.
type httpClient struct {
c http.Client
}

// newHTTPFetcher is a constructor for httpFetcher.
//
// It is possible to plug a custom http.RoundTripper to handle other concerns (e.g. retries)
// Compression is handled transparently and automatically by http.Client.
func newHTTPCLient(transport http.RoundTripper) *httpClient {
return &httpClient{
c: http.Client{Transport: transport},
}
}

// fetch returns a byte slice of the wasm module found at the given URL, or an error otherwise.
func (f *httpClient) get(ctx context.Context, u *url.URL) ([]byte, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
if err != nil {
return nil, err
}
resp, err := f.c.Do(req.WithContext(ctx))
if err != nil {
return nil, err
}

if resp.StatusCode != http.StatusOK {
io.Copy(io.Discard, resp.Body) //nolint
resp.Body.Close()
return nil, fmt.Errorf("received %v status code from %q", resp.StatusCode, u)
}

bytes, err := io.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return nil, err
}
return bytes, nil
}
89 changes: 89 additions & 0 deletions scheduler/plugin/http_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package wasm

import (
"compress/gzip"
"context"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"

"github.com/stretchr/testify/require"
)

var wasmMagicNumber = []byte{0x00, 0x61, 0x73, 0x6d}

func TestWasmHTTPFetch(t *testing.T) {
wasmBinary := wasmMagicNumber
wasmBinary = append(wasmBinary, 0x00, 0x00, 0x00, 0x00)
cases := []struct {
name string
handler http.HandlerFunc
expectedError string
}{
{
name: "plain wasm binary",
handler: func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write(wasmBinary)
},
},
// Compressed payloads are handled automatically by http.Client.
{
name: "compressed payload",
handler: func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Content-Encoding", "gzip")

gw := gzip.NewWriter(w)
defer gw.Close()
_, _ = gw.Write(wasmBinary)
},
},
{
name: "http error",
handler: func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
},
expectedError: "received 500 status code",
},
}

for _, proto := range []string{"http", "https"} {
t.Run(proto, func(t *testing.T) {
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
ts := httptest.NewServer(tc.handler)
defer ts.Close()
c := newHTTPCLient(http.DefaultTransport)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
parse, err := url.Parse(ts.URL)
require.NoError(t, err)
_, err = c.get(ctx, parse)
if tc.expectedError != "" {
require.ErrorContains(t, err, tc.expectedError)
return
}
require.NoError(t, err, "Wasm download got an unexpected error: %v", err)
})
}
})
}
}
22 changes: 20 additions & 2 deletions scheduler/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package wasm
import (
"context"
"fmt"
"net/http"
"net/url"
"os"
"sync/atomic"
"time"
Expand Down Expand Up @@ -51,9 +53,9 @@ func New(configuration runtime.Object, frameworkHandle framework.Handle) (framew
// NewFromConfig is like New, except it allows us to explicitly provide the
// context and configuration of the plugin. This allows flexibility in tests.
func NewFromConfig(ctx context.Context, config PluginConfig) (framework.Plugin, error) {
guestBin, err := os.ReadFile(config.GuestPath)
guestBin, err := readFromURI(ctx, config.GuestURL)
if err != nil {
return nil, fmt.Errorf("wasm: error reading guest binary at %s: %w", config.GuestPath, err)
return nil, fmt.Errorf("wasm: error reading guest binary at %s: %w", config.GuestURL, err)
}

runtime, guestModule, err := prepareRuntime(ctx, guestBin, config.GuestConfig)
Expand All @@ -77,6 +79,22 @@ func NewFromConfig(ctx context.Context, config PluginConfig) (framework.Plugin,
}
}

func readFromURI(ctx context.Context, u string) ([]byte, error) {
uri, err := url.ParseRequestURI(u)
if err != nil {
return nil, err
}
switch uri.Scheme {
case "file":
return os.ReadFile(uri.Path)
case "http", "https":
c := newHTTPCLient(http.DefaultTransport)
return c.get(ctx, uri)
default:
return nil, fmt.Errorf("unsupported URL scheme: %s", uri.Scheme)
}
}

// newWasmPlugin is extracted to prevent small bugs: The caller must close the
// wazero.Runtime to avoid leaking mmapped files.
func newWasmPlugin(ctx context.Context, runtime wazero.Runtime, guestModule wazero.CompiledModule, config PluginConfig) (*wasmPlugin, error) {
Expand Down
Loading

0 comments on commit 4da9ede

Please sign in to comment.