diff --git a/go.mod b/go.mod index 7eaab13badd0..d87429cb7110 100644 --- a/go.mod +++ b/go.mod @@ -583,7 +583,7 @@ require ( golang.org/x/mod v0.14.0 // indirect golang.org/x/sync v0.5.0 // indirect golang.org/x/term v0.16.0 // indirect - golang.org/x/tools v0.15.0 // indirect + golang.org/x/tools v0.15.0 golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect gonum.org/v1/gonum v0.14.0 // indirect @@ -603,7 +603,7 @@ require ( sigs.k8s.io/structured-merge-diff/v4 v4.3.0 // indirect ) -require github.com/ianlancetaylor/demangle v0.0.0-20230524184225-eabc099b10ab +require github.com/ianlancetaylor/demangle v0.0.0-20230524184225-eabc099b10ab // indirect require ( connectrpc.com/connect v1.14.0 @@ -618,6 +618,7 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.42.0 golang.org/x/crypto/x509roots/fallback v0.0.0-20240208163226-62c9f1799c91 k8s.io/apimachinery v0.28.3 + rsc.io/script v0.0.2-0.20231205190631-334f6c18cff3 ) require ( diff --git a/go.sum b/go.sum index 2665dfe3c18e..1146275120e4 100644 --- a/go.sum +++ b/go.sum @@ -3201,6 +3201,8 @@ rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8 rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= +rsc.io/script v0.0.2-0.20231205190631-334f6c18cff3 h1:2vM6uMBq2/Dou/Wzu2p+yUFkuI3lgMbX0UYfVnzh0ck= +rsc.io/script v0.0.2-0.20231205190631-334f6c18cff3/go.mod h1:cKBjCtFBBeZ0cbYFRXkRoxP+xGqhArPa9t3VWhtXfzU= sigs.k8s.io/controller-runtime v0.16.2 h1:mwXAVuEk3EQf478PQwQ48zGOXvW27UJc8NHktQVuIPU= sigs.k8s.io/controller-runtime v0.16.2/go.mod h1:vpMu3LpI5sYWtujJOa2uPK61nB5rbwlN7BAB8aSLvGU= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= diff --git a/pkg/flow/flowtest/commands.go b/pkg/flow/flowtest/commands.go new file mode 100644 index 000000000000..09c08b43a983 --- /dev/null +++ b/pkg/flow/flowtest/commands.go @@ -0,0 +1,136 @@ +package flowtest + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/grafana/agent/component" + "github.com/grafana/agent/pkg/flow" + "github.com/grafana/dskit/backoff" + "rsc.io/script" +) + +func createCommands(f *flow.Flow, opts *options) map[string]script.Cmd { + cmds := script.DefaultCmds() + registerControllerCommands(f, cmds) + + for name, cmd := range opts.cmds { + cmds[name] = cmd + } + + return cmds +} + +func registerControllerCommands(f *flow.Flow, cmds map[string]script.Cmd) { + cmds["controller.load"] = script.Command( + script.CmdUsage{ + Summary: "load a file into the Flow controller", + Args: "file", + }, + func(s *script.State, args ...string) (script.WaitFunc, error) { + if len(args) != 1 { + return nil, script.ErrUsage + } + + bb, err := os.ReadFile(makeAbsolute(s.Getwd(), args[0])) + if err != nil { + return nil, err + } + src, err := flow.ParseSource(args[0], bb) + if err != nil { + return nil, fmt.Errorf("parsing %q: %w", args[0], err) + } + return nil, f.LoadSource(src, nil) + }, + ) + + cmds["controller.start"] = script.Command( + script.CmdUsage{ + Summary: "start the Flow controller", + Async: true, + Detail: []string{ + "This command should almost always be run in the background by appending a &, otherwise the controller will exit immediately after the command finishes'", + }, + }, + func(s *script.State, args ...string) (script.WaitFunc, error) { + if len(args) != 0 { + return nil, script.ErrUsage + } + + var wg sync.WaitGroup + wg.Add(1) + + ctx, cancel := context.WithCancel(s.Context()) + go func() { + defer wg.Done() + f.Run(ctx) + }() + + return script.WaitFunc(func(s *script.State) (stdout string, stderr string, err error) { + cancel() + wg.Wait() + + return "", "", nil + }), nil + }, + ) + + cmds["assert.health"] = script.Command( + script.CmdUsage{ + Summary: "assert a health of a component", + Args: "component expected_health", + }, + func(s *script.State, args ...string) (script.WaitFunc, error) { + if len(args) != 2 { + return nil, script.ErrUsage + } + + // TODO(rfratto): allow configuring via flags. + bc := backoff.Config{ + MinBackoff: 10 * time.Millisecond, + MaxBackoff: 1 * time.Second, + MaxRetries: 10, + } + bo := backoff.New(s.Context(), bc) + + check := func() error { + var expectedHealth component.HealthType + if err := expectedHealth.UnmarshalText([]byte(strings.ToLower(args[1]))); err != nil { + return err + } + + info, err := f.GetComponent(component.ParseID(args[0]), component.InfoOptions{GetHealth: true}) + if err != nil { + return err + } + + if info.Health.Health != expectedHealth { + return fmt.Errorf("expected %q, got %q", expectedHealth, info.Health.Health) + } + + return nil + } + + for bo.Ongoing() { + if err := check(); err == nil { + break + } + bo.Wait() + } + + return nil, bo.Err() + }, + ) +} + +func makeAbsolute(wd, path string) string { + if filepath.IsAbs(path) { + return path + } + return filepath.Join(wd, path) +} diff --git a/pkg/flow/flowtest/flowtest.go b/pkg/flow/flowtest/flowtest.go new file mode 100644 index 000000000000..105cf23f0170 --- /dev/null +++ b/pkg/flow/flowtest/flowtest.go @@ -0,0 +1,195 @@ +// Package flowtest provides a script-based testing framework for Flow. +// +// [TestScript] accepts a path to a txtar file, where the comment of the txtar +// file denotes the script to run. Files in the txtar archive are unpacked to a +// temporary directory as part of the test. +// +// See [rsc.io/script] for more information about the scripting language and +// available default commands. +// +// # Flow commands +// +// In addition to the default commands provided by [rsc.io/script], the +// following Flow-specific commands are available: +// +// - `controller.load [file]`: Load a file into the Flow controller. +// - `controller.start`: Start the Flow controller. +// - `assert.health [component] [health]`: Assert the health of a specific component. +// +// Note that `controller.start` should almost always be run in the background +// by appending a & to the end of the command, otherwise the controller will be +// terminated immediately after the command exits. +// +// Custom commands can be provided by passing [WithExtraCommands] to +// [TestScript]. +// +// # Example script +// +// The following is an example script which uses local.file: +// +// # This file performs a basic test of loading a component and asserting its +// # health. +// +// controller.load main.river +// controller.start & +// +// assert.health local.file.example healthy +// +// -- main.river -- +// local.file "example" { +// filename = "./hello.txt" +// } +// +// -- hello.txt -- +// Hello, world! +// +// See the testdata directory for more basic examples. +package flowtest + +import ( + "bufio" + "bytes" + "context" + "fmt" + "io" + "os" + "path/filepath" + + "github.com/grafana/agent/pkg/flow" + "github.com/grafana/agent/pkg/flow/logging" + "github.com/grafana/agent/pkg/flow/tracing" + "github.com/prometheus/client_golang/prometheus" + "golang.org/x/tools/txtar" + "rsc.io/script" +) + +// TestScript loads the file at filename and executes it as a script. A +// temporary working directory is created for running the script, and the +// working directory of the process is changed to the temporary directory for +// the duration of the call. +func TestScript(filename string, opts ...TestOption) error { + var o options + for _, opt := range opts { + opt(&o) + } + + // Create a temporary directory for the scope of our test to operate in. + tmpDir, err := os.MkdirTemp("", "flowtest-*") + if err != nil { + return fmt.Errorf("creating temporary directory: %w", err) + } + defer os.RemoveAll(tmpDir) + + archive, err := txtar.ParseFile(filename) + if err != nil { + return fmt.Errorf("loading script: %w", err) + } + + // Create a test controller for the script to interact with. The test + // controller's logs are buffered and printed to stderr on exit. This + // prevents log mangling when both the script engine and the Flow controller + // are writing logs at the same time (as the script engine will write partial + // lines). + var controllerLogs bytes.Buffer + defer func() { + fmt.Fprintln(os.Stderr, "[controller logs]") + io.Copy(os.Stderr, &controllerLogs) + }() + f, err := newTestController(&controllerLogs, filepath.Join(tmpDir, "data")) + if err != nil { + return fmt.Errorf("creating test controller: %w", err) + } + + // Create a state for the duration of our test, using it to unpack the txtar + // archive into our working directory. + // + // Because scripts are frequently asynchronous (such as controller.start), we + // need to close the state in a defer to make sure everything gets cleaned up + // properly. + s, err := script.NewState(context.Background(), tmpDir, nil) + if err != nil { + return fmt.Errorf("creating state: %w", err) + } + if err := s.ExtractFiles(archive); err != nil { + return fmt.Errorf("extracting archive: %w", err) + } + defer s.CloseAndWait(os.Stderr) + + // Finally, prepare for running the script: + // + // 1. Change our working directory to the temporary directory. This allows + // components which rely on the working directory to work properly. The + // previous working directory is restored on exit. This step *MUST* happen + // here at the end before executing the engine, otherwise calls to + // TestScript relying on the working directory of tests will fail. + // + // 2. Create a new engine and register commands to scripts to use. + // + // 3. Execute the script. + initWD, err := os.Getwd() + if err != nil { + return fmt.Errorf("getting working directory: %w", err) + } + if err := os.Chdir(tmpDir); err != nil { + return fmt.Errorf("changing working directory: %w", err) + } + defer os.Chdir(initWD) + + e := script.NewEngine() + e.Cmds = createCommands(f, &o) + return e.Execute(s, filename, bufio.NewReader(bytes.NewReader(archive.Comment)), os.Stderr) +} + +func newTestController(out io.Writer, dataDir string) (*flow.Flow, error) { + logger, err := logging.New(out, logging.Options{ + Level: logging.LevelDebug, + Format: logging.FormatDefault, + }) + if err != nil { + return nil, fmt.Errorf("creating logger: %w", err) + } + + tracer, err := tracing.New(tracing.Options{ + SamplingFraction: 0, + }) + if err != nil { + return nil, fmt.Errorf("creating tracer: %w", err) + } + + if err := os.MkdirAll(dataDir, 0700); err != nil { + return nil, fmt.Errorf("creating data directory: %w", err) + } + + f := flow.New(flow.Options{ + Logger: logger, + Tracer: tracer, + Reg: prometheus.NewRegistry(), + + DataPath: dataDir, + // TODO(rfratto): services? + }) + return f, nil +} + +// TestOption modifies default behavior of testing functions. +type TestOption func(*options) + +type options struct { + cmds map[string]script.Cmd +} + +// WithExtraCommands provides a list of extra commands available for scripts. +// If a key in cmds matches the name of an existing command, the existing +// command is shadowed by the command. +// +// WithExtraCommands can be passed multiple times. +func WithExtraCommands(cmds map[string]script.Cmd) TestOption { + return func(o *options) { + if o.cmds == nil { + o.cmds = make(map[string]script.Cmd) + } + for k, cmd := range cmds { + o.cmds[k] = cmd + } + } +} diff --git a/pkg/flow/flowtest/flowtest_test.go b/pkg/flow/flowtest/flowtest_test.go new file mode 100644 index 000000000000..3c1386f6db79 --- /dev/null +++ b/pkg/flow/flowtest/flowtest_test.go @@ -0,0 +1,22 @@ +package flowtest_test + +import ( + "io/fs" + "path/filepath" + "testing" + + "github.com/grafana/agent/pkg/flow/flowtest" + "github.com/stretchr/testify/require" +) + +func Test(t *testing.T) { + filepath.WalkDir("testdata/", func(path string, d fs.DirEntry, err error) error { + if filepath.Ext(path) != ".txtar" { + return nil + } + t.Run(d.Name(), func(t *testing.T) { + require.NoError(t, flowtest.TestScript(path)) + }) + return nil + }) +} diff --git a/pkg/flow/flowtest/testdata/basic.txtar b/pkg/flow/flowtest/testdata/basic.txtar new file mode 100644 index 000000000000..f725e77d63c5 --- /dev/null +++ b/pkg/flow/flowtest/testdata/basic.txtar @@ -0,0 +1,15 @@ +# This file performs a basic test of loading a component and asserting its +# health. + +controller.load main.river +controller.start & + +assert.health local.file.example healthy + +-- main.river -- +local.file "example" { + filename = "./hello.txt" +} + +-- hello.txt -- +Hello, world! diff --git a/pkg/flow/flowtest/testdata/basic_failure.txtar b/pkg/flow/flowtest/testdata/basic_failure.txtar new file mode 100644 index 000000000000..34775a18e76b --- /dev/null +++ b/pkg/flow/flowtest/testdata/basic_failure.txtar @@ -0,0 +1,14 @@ +# This file performs a basic test of loading a component which fails to +# evaluate. +# +# We start the controller first to allow us to probe for component health. + +controller.start & +! controller.load main.river + +assert.health local.file.example unhealthy + +-- main.river -- +local.file "example" { + filename = "does-not-exist.txt" +}