Skip to content

Commit

Permalink
flowtest: introduce script-based testing framework for Flow
Browse files Browse the repository at this point in the history
This commit introduces a new package, flow/flowtest, which can be used
for running script-based test using Flow components.

flowtest accepts a txtar archive, where the comment of the txtar archive
denotes the script to run. Files in the txtar archive are unpacked to a
temporary directory as part of the test; that temporary directory is
also used as the working directory of the Flow controller.

See <rsc.io/script> for more information about the scripting language
and available default commands.

In addition to the default commands, the following Flow-specific
commands are made available to scripts:

- `controller.load [file]`: Load a file into the Flow controller.
- `controller.start`: Start the Flow controller.
- `assert.health [component] [health]`: Assert the health of a specific
  component.

Note that `controller.start` should almost always be run as a background
command to allow the controller to run for the duration of the test.

Users of flowtest can provide custom commands on top of these by passing
`WithExtraCommands` to `TestScript`.

The following is an example script using these commands:

  # This file performs a basic test of loading a component and asserting its
  # health.

  controller.load main.river
  controller.start &

  assert.health local.file.example healthy

  -- main.river --
  local.file "example" {
    filename = "./hello.txt"
  }

  -- hello.txt --
  Hello, world!

See the testdata/ directory for other test examples.
  • Loading branch information
rfratto committed Feb 20, 2024
1 parent 781dfe8 commit f522508
Show file tree
Hide file tree
Showing 7 changed files with 387 additions and 2 deletions.
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ require (
golang.org/x/mod v0.14.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/term v0.16.0 // indirect
golang.org/x/tools v0.15.0 // indirect
golang.org/x/tools v0.15.0
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
gonum.org/v1/gonum v0.14.0 // indirect
Expand All @@ -603,7 +603,7 @@ require (
sigs.k8s.io/structured-merge-diff/v4 v4.3.0 // indirect
)

require github.com/ianlancetaylor/demangle v0.0.0-20230524184225-eabc099b10ab
require github.com/ianlancetaylor/demangle v0.0.0-20230524184225-eabc099b10ab // indirect

require (
connectrpc.com/connect v1.14.0
Expand All @@ -618,6 +618,7 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.42.0
golang.org/x/crypto/x509roots/fallback v0.0.0-20240208163226-62c9f1799c91
k8s.io/apimachinery v0.28.3
rsc.io/script v0.0.2-0.20231205190631-334f6c18cff3
)

require (
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3201,6 +3201,8 @@ rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8
rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
rsc.io/script v0.0.2-0.20231205190631-334f6c18cff3 h1:2vM6uMBq2/Dou/Wzu2p+yUFkuI3lgMbX0UYfVnzh0ck=
rsc.io/script v0.0.2-0.20231205190631-334f6c18cff3/go.mod h1:cKBjCtFBBeZ0cbYFRXkRoxP+xGqhArPa9t3VWhtXfzU=
sigs.k8s.io/controller-runtime v0.16.2 h1:mwXAVuEk3EQf478PQwQ48zGOXvW27UJc8NHktQVuIPU=
sigs.k8s.io/controller-runtime v0.16.2/go.mod h1:vpMu3LpI5sYWtujJOa2uPK61nB5rbwlN7BAB8aSLvGU=
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo=
Expand Down
136 changes: 136 additions & 0 deletions pkg/flow/flowtest/commands.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package flowtest

import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"

"github.com/grafana/agent/component"
"github.com/grafana/agent/pkg/flow"
"github.com/grafana/dskit/backoff"
"rsc.io/script"
)

func createCommands(f *flow.Flow, opts *options) map[string]script.Cmd {
cmds := script.DefaultCmds()
registerControllerCommands(f, cmds)

for name, cmd := range opts.cmds {
cmds[name] = cmd
}

return cmds
}

func registerControllerCommands(f *flow.Flow, cmds map[string]script.Cmd) {
cmds["controller.load"] = script.Command(
script.CmdUsage{
Summary: "load a file into the Flow controller",
Args: "file",
},
func(s *script.State, args ...string) (script.WaitFunc, error) {
if len(args) != 1 {
return nil, script.ErrUsage
}

bb, err := os.ReadFile(makeAbsolute(s.Getwd(), args[0]))
if err != nil {
return nil, err
}
src, err := flow.ParseSource(args[0], bb)
if err != nil {
return nil, fmt.Errorf("parsing %q: %w", args[0], err)
}
return nil, f.LoadSource(src, nil)
},
)

cmds["controller.start"] = script.Command(
script.CmdUsage{
Summary: "start the Flow controller",
Async: true,
Detail: []string{
"This command should almost always be run in the background by appending a &, otherwise the controller will exit immediately after the command finishes'",
},
},
func(s *script.State, args ...string) (script.WaitFunc, error) {
if len(args) != 0 {
return nil, script.ErrUsage
}

var wg sync.WaitGroup
wg.Add(1)

ctx, cancel := context.WithCancel(s.Context())
go func() {
defer wg.Done()
f.Run(ctx)
}()

return script.WaitFunc(func(s *script.State) (stdout string, stderr string, err error) {
cancel()
wg.Wait()

return "", "", nil
}), nil
},
)

cmds["assert.health"] = script.Command(
script.CmdUsage{
Summary: "assert a health of a component",
Args: "component expected_health",
},
func(s *script.State, args ...string) (script.WaitFunc, error) {
if len(args) != 2 {
return nil, script.ErrUsage
}

// TODO(rfratto): allow configuring via flags.
bc := backoff.Config{
MinBackoff: 10 * time.Millisecond,
MaxBackoff: 1 * time.Second,
MaxRetries: 10,
}
bo := backoff.New(s.Context(), bc)

check := func() error {
var expectedHealth component.HealthType
if err := expectedHealth.UnmarshalText([]byte(strings.ToLower(args[1]))); err != nil {
return err
}

info, err := f.GetComponent(component.ParseID(args[0]), component.InfoOptions{GetHealth: true})
if err != nil {
return err
}

if info.Health.Health != expectedHealth {
return fmt.Errorf("expected %q, got %q", expectedHealth, info.Health.Health)
}

return nil
}

for bo.Ongoing() {
if err := check(); err == nil {
break
}
bo.Wait()
}

return nil, bo.Err()
},
)
}

func makeAbsolute(wd, path string) string {
if filepath.IsAbs(path) {
return path
}
return filepath.Join(wd, path)
}
195 changes: 195 additions & 0 deletions pkg/flow/flowtest/flowtest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
// Package flowtest provides a script-based testing framework for Flow.
//
// [TestScript] accepts a path to a txtar file, where the comment of the txtar
// file denotes the script to run. Files in the txtar archive are unpacked to a
// temporary directory as part of the test.
//
// See [rsc.io/script] for more information about the scripting language and
// available default commands.
//
// # Flow commands
//
// In addition to the default commands provided by [rsc.io/script], the
// following Flow-specific commands are available:
//
// - `controller.load [file]`: Load a file into the Flow controller.
// - `controller.start`: Start the Flow controller.
// - `assert.health [component] [health]`: Assert the health of a specific component.
//
// Note that `controller.start` should almost always be run in the background
// by appending a & to the end of the command, otherwise the controller will be
// terminated immediately after the command exits.
//
// Custom commands can be provided by passing [WithExtraCommands] to
// [TestScript].
//
// # Example script
//
// The following is an example script which uses local.file:
//
// # This file performs a basic test of loading a component and asserting its
// # health.
//
// controller.load main.river
// controller.start &
//
// assert.health local.file.example healthy
//
// -- main.river --
// local.file "example" {
// filename = "./hello.txt"
// }
//
// -- hello.txt --
// Hello, world!
//
// See the testdata directory for more basic examples.
package flowtest

import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"os"
"path/filepath"

"github.com/grafana/agent/pkg/flow"
"github.com/grafana/agent/pkg/flow/logging"
"github.com/grafana/agent/pkg/flow/tracing"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/tools/txtar"
"rsc.io/script"
)

// TestScript loads the file at filename and executes it as a script. A
// temporary working directory is created for running the script, and the
// working directory of the process is changed to the temporary directory for
// the duration of the call.
func TestScript(filename string, opts ...TestOption) error {
var o options
for _, opt := range opts {
opt(&o)
}

// Create a temporary directory for the scope of our test to operate in.
tmpDir, err := os.MkdirTemp("", "flowtest-*")
if err != nil {
return fmt.Errorf("creating temporary directory: %w", err)
}
defer os.RemoveAll(tmpDir)

archive, err := txtar.ParseFile(filename)
if err != nil {
return fmt.Errorf("loading script: %w", err)
}

// Create a test controller for the script to interact with. The test
// controller's logs are buffered and printed to stderr on exit. This
// prevents log mangling when both the script engine and the Flow controller
// are writing logs at the same time (as the script engine will write partial
// lines).
var controllerLogs bytes.Buffer
defer func() {
fmt.Fprintln(os.Stderr, "[controller logs]")
io.Copy(os.Stderr, &controllerLogs)
}()
f, err := newTestController(&controllerLogs, filepath.Join(tmpDir, "data"))
if err != nil {
return fmt.Errorf("creating test controller: %w", err)
}

// Create a state for the duration of our test, using it to unpack the txtar
// archive into our working directory.
//
// Because scripts are frequently asynchronous (such as controller.start), we
// need to close the state in a defer to make sure everything gets cleaned up
// properly.
s, err := script.NewState(context.Background(), tmpDir, nil)
if err != nil {
return fmt.Errorf("creating state: %w", err)
}
if err := s.ExtractFiles(archive); err != nil {
return fmt.Errorf("extracting archive: %w", err)
}
defer s.CloseAndWait(os.Stderr)

// Finally, prepare for running the script:
//
// 1. Change our working directory to the temporary directory. This allows
// components which rely on the working directory to work properly. The
// previous working directory is restored on exit. This step *MUST* happen
// here at the end before executing the engine, otherwise calls to
// TestScript relying on the working directory of tests will fail.
//
// 2. Create a new engine and register commands to scripts to use.
//
// 3. Execute the script.
initWD, err := os.Getwd()
if err != nil {
return fmt.Errorf("getting working directory: %w", err)
}
if err := os.Chdir(tmpDir); err != nil {
return fmt.Errorf("changing working directory: %w", err)
}
defer os.Chdir(initWD)

e := script.NewEngine()
e.Cmds = createCommands(f, &o)
return e.Execute(s, filename, bufio.NewReader(bytes.NewReader(archive.Comment)), os.Stderr)
}

func newTestController(out io.Writer, dataDir string) (*flow.Flow, error) {
logger, err := logging.New(out, logging.Options{
Level: logging.LevelDebug,
Format: logging.FormatDefault,
})
if err != nil {
return nil, fmt.Errorf("creating logger: %w", err)
}

tracer, err := tracing.New(tracing.Options{
SamplingFraction: 0,
})
if err != nil {
return nil, fmt.Errorf("creating tracer: %w", err)
}

if err := os.MkdirAll(dataDir, 0700); err != nil {
return nil, fmt.Errorf("creating data directory: %w", err)
}

f := flow.New(flow.Options{
Logger: logger,
Tracer: tracer,
Reg: prometheus.NewRegistry(),

DataPath: dataDir,
// TODO(rfratto): services?
})
return f, nil
}

// TestOption modifies default behavior of testing functions.
type TestOption func(*options)

type options struct {
cmds map[string]script.Cmd
}

// WithExtraCommands provides a list of extra commands available for scripts.
// If a key in cmds matches the name of an existing command, the existing
// command is shadowed by the command.
//
// WithExtraCommands can be passed multiple times.
func WithExtraCommands(cmds map[string]script.Cmd) TestOption {
return func(o *options) {
if o.cmds == nil {
o.cmds = make(map[string]script.Cmd)
}
for k, cmd := range cmds {
o.cmds[k] = cmd
}
}
}
22 changes: 22 additions & 0 deletions pkg/flow/flowtest/flowtest_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package flowtest_test

import (
"io/fs"
"path/filepath"
"testing"

"github.com/grafana/agent/pkg/flow/flowtest"
"github.com/stretchr/testify/require"
)

func Test(t *testing.T) {
filepath.WalkDir("testdata/", func(path string, d fs.DirEntry, err error) error {
if filepath.Ext(path) != ".txtar" {
return nil
}
t.Run(d.Name(), func(t *testing.T) {
require.NoError(t, flowtest.TestScript(path))
})
return nil
})
}
Loading

0 comments on commit f522508

Please sign in to comment.