From f522508563cc8b641003fd5a9317beadfdac8f8b Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Tue, 20 Feb 2024 13:55:56 -0500 Subject: [PATCH] flowtest: introduce script-based testing framework for Flow This commit introduces a new package, flow/flowtest, which can be used for running script-based test using Flow components. flowtest accepts a txtar archive, where the comment of the txtar archive denotes the script to run. Files in the txtar archive are unpacked to a temporary directory as part of the test; that temporary directory is also used as the working directory of the Flow controller. See for more information about the scripting language and available default commands. In addition to the default commands, the following Flow-specific commands are made available to scripts: - `controller.load [file]`: Load a file into the Flow controller. - `controller.start`: Start the Flow controller. - `assert.health [component] [health]`: Assert the health of a specific component. Note that `controller.start` should almost always be run as a background command to allow the controller to run for the duration of the test. Users of flowtest can provide custom commands on top of these by passing `WithExtraCommands` to `TestScript`. The following is an example script using these commands: # This file performs a basic test of loading a component and asserting its # health. controller.load main.river controller.start & assert.health local.file.example healthy -- main.river -- local.file "example" { filename = "./hello.txt" } -- hello.txt -- Hello, world! See the testdata/ directory for other test examples. --- go.mod | 5 +- go.sum | 2 + pkg/flow/flowtest/commands.go | 136 ++++++++++++ pkg/flow/flowtest/flowtest.go | 195 ++++++++++++++++++ pkg/flow/flowtest/flowtest_test.go | 22 ++ pkg/flow/flowtest/testdata/basic.txtar | 15 ++ .../flowtest/testdata/basic_failure.txtar | 14 ++ 7 files changed, 387 insertions(+), 2 deletions(-) create mode 100644 pkg/flow/flowtest/commands.go create mode 100644 pkg/flow/flowtest/flowtest.go create mode 100644 pkg/flow/flowtest/flowtest_test.go create mode 100644 pkg/flow/flowtest/testdata/basic.txtar create mode 100644 pkg/flow/flowtest/testdata/basic_failure.txtar diff --git a/go.mod b/go.mod index 7eaab13badd0..d87429cb7110 100644 --- a/go.mod +++ b/go.mod @@ -583,7 +583,7 @@ require ( golang.org/x/mod v0.14.0 // indirect golang.org/x/sync v0.5.0 // indirect golang.org/x/term v0.16.0 // indirect - golang.org/x/tools v0.15.0 // indirect + golang.org/x/tools v0.15.0 golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect gonum.org/v1/gonum v0.14.0 // indirect @@ -603,7 +603,7 @@ require ( sigs.k8s.io/structured-merge-diff/v4 v4.3.0 // indirect ) -require github.com/ianlancetaylor/demangle v0.0.0-20230524184225-eabc099b10ab +require github.com/ianlancetaylor/demangle v0.0.0-20230524184225-eabc099b10ab // indirect require ( connectrpc.com/connect v1.14.0 @@ -618,6 +618,7 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.42.0 golang.org/x/crypto/x509roots/fallback v0.0.0-20240208163226-62c9f1799c91 k8s.io/apimachinery v0.28.3 + rsc.io/script v0.0.2-0.20231205190631-334f6c18cff3 ) require ( diff --git a/go.sum b/go.sum index 2665dfe3c18e..1146275120e4 100644 --- a/go.sum +++ b/go.sum @@ -3201,6 +3201,8 @@ rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8 rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= +rsc.io/script v0.0.2-0.20231205190631-334f6c18cff3 h1:2vM6uMBq2/Dou/Wzu2p+yUFkuI3lgMbX0UYfVnzh0ck= +rsc.io/script v0.0.2-0.20231205190631-334f6c18cff3/go.mod h1:cKBjCtFBBeZ0cbYFRXkRoxP+xGqhArPa9t3VWhtXfzU= sigs.k8s.io/controller-runtime v0.16.2 h1:mwXAVuEk3EQf478PQwQ48zGOXvW27UJc8NHktQVuIPU= sigs.k8s.io/controller-runtime v0.16.2/go.mod h1:vpMu3LpI5sYWtujJOa2uPK61nB5rbwlN7BAB8aSLvGU= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= diff --git a/pkg/flow/flowtest/commands.go b/pkg/flow/flowtest/commands.go new file mode 100644 index 000000000000..09c08b43a983 --- /dev/null +++ b/pkg/flow/flowtest/commands.go @@ -0,0 +1,136 @@ +package flowtest + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/grafana/agent/component" + "github.com/grafana/agent/pkg/flow" + "github.com/grafana/dskit/backoff" + "rsc.io/script" +) + +func createCommands(f *flow.Flow, opts *options) map[string]script.Cmd { + cmds := script.DefaultCmds() + registerControllerCommands(f, cmds) + + for name, cmd := range opts.cmds { + cmds[name] = cmd + } + + return cmds +} + +func registerControllerCommands(f *flow.Flow, cmds map[string]script.Cmd) { + cmds["controller.load"] = script.Command( + script.CmdUsage{ + Summary: "load a file into the Flow controller", + Args: "file", + }, + func(s *script.State, args ...string) (script.WaitFunc, error) { + if len(args) != 1 { + return nil, script.ErrUsage + } + + bb, err := os.ReadFile(makeAbsolute(s.Getwd(), args[0])) + if err != nil { + return nil, err + } + src, err := flow.ParseSource(args[0], bb) + if err != nil { + return nil, fmt.Errorf("parsing %q: %w", args[0], err) + } + return nil, f.LoadSource(src, nil) + }, + ) + + cmds["controller.start"] = script.Command( + script.CmdUsage{ + Summary: "start the Flow controller", + Async: true, + Detail: []string{ + "This command should almost always be run in the background by appending a &, otherwise the controller will exit immediately after the command finishes'", + }, + }, + func(s *script.State, args ...string) (script.WaitFunc, error) { + if len(args) != 0 { + return nil, script.ErrUsage + } + + var wg sync.WaitGroup + wg.Add(1) + + ctx, cancel := context.WithCancel(s.Context()) + go func() { + defer wg.Done() + f.Run(ctx) + }() + + return script.WaitFunc(func(s *script.State) (stdout string, stderr string, err error) { + cancel() + wg.Wait() + + return "", "", nil + }), nil + }, + ) + + cmds["assert.health"] = script.Command( + script.CmdUsage{ + Summary: "assert a health of a component", + Args: "component expected_health", + }, + func(s *script.State, args ...string) (script.WaitFunc, error) { + if len(args) != 2 { + return nil, script.ErrUsage + } + + // TODO(rfratto): allow configuring via flags. + bc := backoff.Config{ + MinBackoff: 10 * time.Millisecond, + MaxBackoff: 1 * time.Second, + MaxRetries: 10, + } + bo := backoff.New(s.Context(), bc) + + check := func() error { + var expectedHealth component.HealthType + if err := expectedHealth.UnmarshalText([]byte(strings.ToLower(args[1]))); err != nil { + return err + } + + info, err := f.GetComponent(component.ParseID(args[0]), component.InfoOptions{GetHealth: true}) + if err != nil { + return err + } + + if info.Health.Health != expectedHealth { + return fmt.Errorf("expected %q, got %q", expectedHealth, info.Health.Health) + } + + return nil + } + + for bo.Ongoing() { + if err := check(); err == nil { + break + } + bo.Wait() + } + + return nil, bo.Err() + }, + ) +} + +func makeAbsolute(wd, path string) string { + if filepath.IsAbs(path) { + return path + } + return filepath.Join(wd, path) +} diff --git a/pkg/flow/flowtest/flowtest.go b/pkg/flow/flowtest/flowtest.go new file mode 100644 index 000000000000..105cf23f0170 --- /dev/null +++ b/pkg/flow/flowtest/flowtest.go @@ -0,0 +1,195 @@ +// Package flowtest provides a script-based testing framework for Flow. +// +// [TestScript] accepts a path to a txtar file, where the comment of the txtar +// file denotes the script to run. Files in the txtar archive are unpacked to a +// temporary directory as part of the test. +// +// See [rsc.io/script] for more information about the scripting language and +// available default commands. +// +// # Flow commands +// +// In addition to the default commands provided by [rsc.io/script], the +// following Flow-specific commands are available: +// +// - `controller.load [file]`: Load a file into the Flow controller. +// - `controller.start`: Start the Flow controller. +// - `assert.health [component] [health]`: Assert the health of a specific component. +// +// Note that `controller.start` should almost always be run in the background +// by appending a & to the end of the command, otherwise the controller will be +// terminated immediately after the command exits. +// +// Custom commands can be provided by passing [WithExtraCommands] to +// [TestScript]. +// +// # Example script +// +// The following is an example script which uses local.file: +// +// # This file performs a basic test of loading a component and asserting its +// # health. +// +// controller.load main.river +// controller.start & +// +// assert.health local.file.example healthy +// +// -- main.river -- +// local.file "example" { +// filename = "./hello.txt" +// } +// +// -- hello.txt -- +// Hello, world! +// +// See the testdata directory for more basic examples. +package flowtest + +import ( + "bufio" + "bytes" + "context" + "fmt" + "io" + "os" + "path/filepath" + + "github.com/grafana/agent/pkg/flow" + "github.com/grafana/agent/pkg/flow/logging" + "github.com/grafana/agent/pkg/flow/tracing" + "github.com/prometheus/client_golang/prometheus" + "golang.org/x/tools/txtar" + "rsc.io/script" +) + +// TestScript loads the file at filename and executes it as a script. A +// temporary working directory is created for running the script, and the +// working directory of the process is changed to the temporary directory for +// the duration of the call. +func TestScript(filename string, opts ...TestOption) error { + var o options + for _, opt := range opts { + opt(&o) + } + + // Create a temporary directory for the scope of our test to operate in. + tmpDir, err := os.MkdirTemp("", "flowtest-*") + if err != nil { + return fmt.Errorf("creating temporary directory: %w", err) + } + defer os.RemoveAll(tmpDir) + + archive, err := txtar.ParseFile(filename) + if err != nil { + return fmt.Errorf("loading script: %w", err) + } + + // Create a test controller for the script to interact with. The test + // controller's logs are buffered and printed to stderr on exit. This + // prevents log mangling when both the script engine and the Flow controller + // are writing logs at the same time (as the script engine will write partial + // lines). + var controllerLogs bytes.Buffer + defer func() { + fmt.Fprintln(os.Stderr, "[controller logs]") + io.Copy(os.Stderr, &controllerLogs) + }() + f, err := newTestController(&controllerLogs, filepath.Join(tmpDir, "data")) + if err != nil { + return fmt.Errorf("creating test controller: %w", err) + } + + // Create a state for the duration of our test, using it to unpack the txtar + // archive into our working directory. + // + // Because scripts are frequently asynchronous (such as controller.start), we + // need to close the state in a defer to make sure everything gets cleaned up + // properly. + s, err := script.NewState(context.Background(), tmpDir, nil) + if err != nil { + return fmt.Errorf("creating state: %w", err) + } + if err := s.ExtractFiles(archive); err != nil { + return fmt.Errorf("extracting archive: %w", err) + } + defer s.CloseAndWait(os.Stderr) + + // Finally, prepare for running the script: + // + // 1. Change our working directory to the temporary directory. This allows + // components which rely on the working directory to work properly. The + // previous working directory is restored on exit. This step *MUST* happen + // here at the end before executing the engine, otherwise calls to + // TestScript relying on the working directory of tests will fail. + // + // 2. Create a new engine and register commands to scripts to use. + // + // 3. Execute the script. + initWD, err := os.Getwd() + if err != nil { + return fmt.Errorf("getting working directory: %w", err) + } + if err := os.Chdir(tmpDir); err != nil { + return fmt.Errorf("changing working directory: %w", err) + } + defer os.Chdir(initWD) + + e := script.NewEngine() + e.Cmds = createCommands(f, &o) + return e.Execute(s, filename, bufio.NewReader(bytes.NewReader(archive.Comment)), os.Stderr) +} + +func newTestController(out io.Writer, dataDir string) (*flow.Flow, error) { + logger, err := logging.New(out, logging.Options{ + Level: logging.LevelDebug, + Format: logging.FormatDefault, + }) + if err != nil { + return nil, fmt.Errorf("creating logger: %w", err) + } + + tracer, err := tracing.New(tracing.Options{ + SamplingFraction: 0, + }) + if err != nil { + return nil, fmt.Errorf("creating tracer: %w", err) + } + + if err := os.MkdirAll(dataDir, 0700); err != nil { + return nil, fmt.Errorf("creating data directory: %w", err) + } + + f := flow.New(flow.Options{ + Logger: logger, + Tracer: tracer, + Reg: prometheus.NewRegistry(), + + DataPath: dataDir, + // TODO(rfratto): services? + }) + return f, nil +} + +// TestOption modifies default behavior of testing functions. +type TestOption func(*options) + +type options struct { + cmds map[string]script.Cmd +} + +// WithExtraCommands provides a list of extra commands available for scripts. +// If a key in cmds matches the name of an existing command, the existing +// command is shadowed by the command. +// +// WithExtraCommands can be passed multiple times. +func WithExtraCommands(cmds map[string]script.Cmd) TestOption { + return func(o *options) { + if o.cmds == nil { + o.cmds = make(map[string]script.Cmd) + } + for k, cmd := range cmds { + o.cmds[k] = cmd + } + } +} diff --git a/pkg/flow/flowtest/flowtest_test.go b/pkg/flow/flowtest/flowtest_test.go new file mode 100644 index 000000000000..3c1386f6db79 --- /dev/null +++ b/pkg/flow/flowtest/flowtest_test.go @@ -0,0 +1,22 @@ +package flowtest_test + +import ( + "io/fs" + "path/filepath" + "testing" + + "github.com/grafana/agent/pkg/flow/flowtest" + "github.com/stretchr/testify/require" +) + +func Test(t *testing.T) { + filepath.WalkDir("testdata/", func(path string, d fs.DirEntry, err error) error { + if filepath.Ext(path) != ".txtar" { + return nil + } + t.Run(d.Name(), func(t *testing.T) { + require.NoError(t, flowtest.TestScript(path)) + }) + return nil + }) +} diff --git a/pkg/flow/flowtest/testdata/basic.txtar b/pkg/flow/flowtest/testdata/basic.txtar new file mode 100644 index 000000000000..f725e77d63c5 --- /dev/null +++ b/pkg/flow/flowtest/testdata/basic.txtar @@ -0,0 +1,15 @@ +# This file performs a basic test of loading a component and asserting its +# health. + +controller.load main.river +controller.start & + +assert.health local.file.example healthy + +-- main.river -- +local.file "example" { + filename = "./hello.txt" +} + +-- hello.txt -- +Hello, world! diff --git a/pkg/flow/flowtest/testdata/basic_failure.txtar b/pkg/flow/flowtest/testdata/basic_failure.txtar new file mode 100644 index 000000000000..34775a18e76b --- /dev/null +++ b/pkg/flow/flowtest/testdata/basic_failure.txtar @@ -0,0 +1,14 @@ +# This file performs a basic test of loading a component which fails to +# evaluate. +# +# We start the controller first to allow us to probe for component health. + +controller.start & +! controller.load main.river + +assert.health local.file.example unhealthy + +-- main.river -- +local.file "example" { + filename = "does-not-exist.txt" +}