Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Import - new modules #6350

Merged
merged 23 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
78b370a
(pkg/flow/internal): add importsource package
wildum Feb 9, 2024
7c68059
Add ImportConfigNode
wildum Feb 9, 2024
ad2257d
Update flow controller to use the ImportConfigNode.
wildum Feb 9, 2024
ade13d8
add import test
wildum Feb 9, 2024
b943ce6
add another test to check config reload
wildum Feb 9, 2024
b7bd0f5
use runner package (goroutines leaking)
wildum Feb 12, 2024
a213e8b
stop runner to avoid goroutines leak
wildum Feb 13, 2024
a33a241
improve error handling / code quality following review
wildum Feb 13, 2024
b17dd73
prometheus metrics and health status can be collected from the import…
wildum Feb 13, 2024
153010d
extend test
wildum Feb 13, 2024
29d83af
refactor import tests
wildum Feb 14, 2024
5e6ef3c
add two more tests and increase robustness
wildum Feb 15, 2024
35abc3c
merge main
wildum Feb 16, 2024
e543e32
protect registerImport with mutex
wildum Feb 16, 2024
1221f5d
use custom args instead of local.file args for import.file
wildum Feb 16, 2024
77effec
lint
wildum Feb 16, 2024
9184077
update import nodes to be unhealthy if a nested import stops running …
wildum Feb 19, 2024
a2a655f
use %q instead of %s when logging strings
wildum Feb 19, 2024
a2f1a17
prevent runner from reusing previous tasks
wildum Feb 19, 2024
cd65c5d
refactor ExtractImportAndDeclare
wildum Feb 19, 2024
d103d18
refactor tests in multiple txtar files
wildum Feb 19, 2024
96d9d0b
equals in import config node runner should compare pointers
wildum Feb 20, 2024
f63d560
Merge branch 'main' into import-new-modules-2
wildum Feb 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pkg/flow/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,13 +248,18 @@ func (f *Flow) Run(ctx context.Context) {
var (
components = f.loader.Components()
services = f.loader.Services()
imports = f.loader.Imports()

runnables = make([]controller.RunnableNode, 0, len(components)+len(services))
runnables = make([]controller.RunnableNode, 0, len(components)+len(services)+len(imports))
)
for _, c := range components {
runnables = append(runnables, c)
}

for _, i := range imports {
runnables = append(runnables, i)
}

// Only the root controller should run services, since modules share the
// same service instance as the root.
if !f.opts.IsModule {
Expand Down
239 changes: 239 additions & 0 deletions pkg/flow/import_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
package flow_test
wildum marked this conversation as resolved.
Show resolved Hide resolved
wildum marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"
"io/fs"
"os"
"strings"
"sync"
"testing"
"time"

"github.com/grafana/agent/pkg/flow"
"github.com/grafana/agent/pkg/flow/internal/testcomponents"
"github.com/grafana/agent/pkg/flow/logging"
"github.com/grafana/agent/service"
"github.com/stretchr/testify/require"
"golang.org/x/tools/txtar"

_ "github.com/grafana/agent/component/module/string"
)

// The tests are using the .txtar files stored in the testdata folder.

type testImportFile struct {
description string // description at the top of the txtar file
main string // root config that the controller should load
module string // module imported by the root config
nestedModule string // nested module that can be imported by the module
reloadConfig string // root config that the controller should apply on reload
otherNestedModule string // another nested module
update *updateFile // update can be used to update the content of a file at runtime
}

type updateFile struct {
name string // name of the file which should be updated
updateConfig string // new module config which should be used
}

func buildTestImportFile(t *testing.T, filename string) testImportFile {
archive, err := txtar.ParseFile(filename)
require.NoError(t, err)
var tc testImportFile
tc.description = string(archive.Comment)
for _, riverConfig := range archive.Files {
switch riverConfig.Name {
case "main.river":
tc.main = string(riverConfig.Data)
case "module.river":
tc.module = string(riverConfig.Data)
case "nested_module.river":
tc.nestedModule = string(riverConfig.Data)
case "update/module.river":
require.Nil(t, tc.update)
tc.update = &updateFile{
name: "module.river",
updateConfig: string(riverConfig.Data),
}
case "update/nested_module.river":
require.Nil(t, tc.update)
tc.update = &updateFile{
name: "nested_module.river",
updateConfig: string(riverConfig.Data),
}
case "reload_config.river":
tc.reloadConfig = string(riverConfig.Data)
case "other_nested_module.river":
tc.otherNestedModule = string(riverConfig.Data)
}
}
return tc
}

func TestImportFile(t *testing.T) {
directory := "./testdata/import_file"
for _, file := range getTestFiles(directory, t) {
tc := buildTestImportFile(t, directory+"/"+file.Name())
t.Run(tc.description, func(t *testing.T) {
defer os.Remove("module.river")
require.NoError(t, os.WriteFile("module.river", []byte(tc.module), 0664))
if tc.nestedModule != "" {
defer os.Remove("nested_module.river")
require.NoError(t, os.WriteFile("nested_module.river", []byte(tc.nestedModule), 0664))
}
if tc.otherNestedModule != "" {
defer os.Remove("other_nested_module.river")
require.NoError(t, os.WriteFile("other_nested_module.river", []byte(tc.otherNestedModule), 0664))
}

if tc.update != nil {
testConfig(t, tc.main, tc.reloadConfig, func() {
require.NoError(t, os.WriteFile(tc.update.name, []byte(tc.update.updateConfig), 0664))
})
} else {
testConfig(t, tc.main, tc.reloadConfig, nil)
}
})
}
}

func TestImportString(t *testing.T) {
directory := "./testdata/import_string"
for _, file := range getTestFiles(directory, t) {
archive, err := txtar.ParseFile(directory + "/" + file.Name())
require.NoError(t, err)
t.Run(archive.Files[0].Name, func(t *testing.T) {
testConfig(t, string(archive.Files[0].Data), "", nil)
})
}
}

type testImportError struct {
description string
main string
expectedError string
}

func buildTestImportError(t *testing.T, filename string) testImportError {
archive, err := txtar.ParseFile(filename)
require.NoError(t, err)
var tc testImportError
tc.description = string(archive.Comment)
for _, riverConfig := range archive.Files {
switch riverConfig.Name {
case "main.river":
tc.main = string(riverConfig.Data)
case "error":
tc.expectedError = string(riverConfig.Data)
}
}
return tc
}

func TestImportError(t *testing.T) {
directory := "./testdata/import_error"
for _, file := range getTestFiles(directory, t) {
tc := buildTestImportError(t, directory+"/"+file.Name())
t.Run(tc.description, func(t *testing.T) {
testConfigError(t, tc.main, strings.TrimRight(tc.expectedError, "\n"))
})
}
}

func testConfig(t *testing.T, config string, reloadConfig string, update func()) {
defer verifyNoGoroutineLeaks(t)
ctrl, f := setup(t, config)

err := ctrl.LoadSource(f, nil)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
defer func() {
cancel()
wg.Wait()
}()

wg.Add(1)
go func() {
defer wg.Done()
ctrl.Run(ctx)
}()

// Check for initial condition
require.Eventually(t, func() bool {
export := getExport[testcomponents.SummationExports](t, ctrl, "", "testcomponents.summation.sum")
return export.LastAdded >= 10
}, 3*time.Second, 10*time.Millisecond)

if update != nil {
update()

// Export should be -10 after update
require.Eventually(t, func() bool {
export := getExport[testcomponents.SummationExports](t, ctrl, "", "testcomponents.summation.sum")
return export.LastAdded <= -10
}, 3*time.Second, 10*time.Millisecond)
}

if reloadConfig != "" {
f, err = flow.ParseSource(t.Name(), []byte(reloadConfig))
require.NoError(t, err)
require.NotNil(t, f)

// Reload the controller with the new config.
err = ctrl.LoadSource(f, nil)
require.NoError(t, err)

// Export should be -10 after update
require.Eventually(t, func() bool {
export := getExport[testcomponents.SummationExports](t, ctrl, "", "testcomponents.summation.sum")
return export.LastAdded <= -10
}, 3*time.Second, 10*time.Millisecond)
}
}

func testConfigError(t *testing.T, config string, expectedError string) {
defer verifyNoGoroutineLeaks(t)
ctrl, f := setup(t, config)
err := ctrl.LoadSource(f, nil)
require.ErrorContains(t, err, expectedError)
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
defer func() {
cancel()
wg.Wait()
}()

wg.Add(1)
go func() {
defer wg.Done()
ctrl.Run(ctx)
}()
}

func setup(t *testing.T, config string) (*flow.Flow, *flow.Source) {
s, err := logging.New(os.Stderr, logging.DefaultOptions)
require.NoError(t, err)
ctrl := flow.New(flow.Options{
Logger: s,
DataPath: t.TempDir(),
Reg: nil,
Services: []service.Service{},
})
f, err := flow.ParseSource(t.Name(), []byte(config))
require.NoError(t, err)
require.NotNil(t, f)
return ctrl, f
}

func getTestFiles(directory string, t *testing.T) []fs.FileInfo {
dir, err := os.Open(directory)
require.NoError(t, err)
defer dir.Close()

files, err := dir.Readdir(-1)
require.NoError(t, err)

return files
}
33 changes: 29 additions & 4 deletions pkg/flow/internal/controller/component_node_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,25 @@ func (m *ComponentNodeManager) createComponentNode(componentName string, block *
}

// getCustomComponentConfig is used by the custom component to retrieve its template and the customComponentRegistry associated with it.
func (m *ComponentNodeManager) getCustomComponentConfig(componentName string) (ast.Body, *CustomComponentRegistry, error) {
func (m *ComponentNodeManager) getCustomComponentConfig(namespace string, componentName string) (ast.Body, *CustomComponentRegistry, error) {
m.mut.Lock()
defer m.mut.Unlock()

template, customComponentRegistry := findLocalDeclare(m.customComponentReg, componentName)
var (
template ast.Body
customComponentRegistry *CustomComponentRegistry
)

if namespace == "" {
template, customComponentRegistry = findLocalDeclare(m.customComponentReg, componentName)
} else {
template, customComponentRegistry = findImportedDeclare(m.customComponentReg, namespace, componentName)
}

if customComponentRegistry == nil || template == nil {
return nil, nil, fmt.Errorf("custom component config not found in the registry, componentName: %s", componentName)
return nil, nil, fmt.Errorf("custom component config not found in the registry, namespace: %q, componentName: %q", namespace, componentName)
}
// The registry is passed as a pointer to the custom component config.
return template, customComponentRegistry, nil
}

Expand All @@ -61,7 +71,8 @@ func isCustomComponent(reg *CustomComponentRegistry, name string) bool {
return false
}
_, declareExists := reg.declares[name]
return declareExists || isCustomComponent(reg.parent, name)
_, importExists := reg.imports[name]
return declareExists || importExists || isCustomComponent(reg.parent, name)
}

// findLocalDeclare recursively searches for a declare definition in the custom component registry.
Expand All @@ -75,6 +86,20 @@ func findLocalDeclare(reg *CustomComponentRegistry, componentName string) (ast.B
return nil, nil
}

// findImportedDeclare recursively searches for an import matching the provided namespace.
// When the import is found, it will search for a declare matching the componentName within the custom registry of the import.
func findImportedDeclare(reg *CustomComponentRegistry, namespace string, componentName string) (ast.Body, *CustomComponentRegistry) {
if imported, ok := reg.imports[namespace]; ok {
if declare, ok := imported.declares[componentName]; ok {
return declare, imported
}
}
if reg.parent != nil {
return findImportedDeclare(reg.parent, namespace, componentName)
}
return nil, nil
}

func (m *ComponentNodeManager) setCustomComponentRegistry(reg *CustomComponentRegistry) {
m.mut.Lock()
defer m.mut.Unlock()
Expand Down
44 changes: 43 additions & 1 deletion pkg/flow/internal/controller/custom_component_registry.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
package controller

import (
"fmt"
"sync"

"github.com/grafana/river/ast"
)

// CustomComponentRegistry holds custom component definitions that are available in the context.
// The definitions are either imported, declared locally, or declared in a parent registry.
// Imported definitions are stored inside of the corresponding import registry.
type CustomComponentRegistry struct {
parent *CustomComponentRegistry // nil if root config

mut sync.RWMutex
declares map[string]ast.Body // customComponentName: template
imports map[string]*CustomComponentRegistry // importNamespace: importScope
declares map[string]ast.Body // customComponentName: template
}

// NewCustomComponentRegistry creates a new CustomComponentRegistry with a parent.
Expand All @@ -20,6 +24,7 @@ func NewCustomComponentRegistry(parent *CustomComponentRegistry) *CustomComponen
return &CustomComponentRegistry{
parent: parent,
declares: make(map[string]ast.Body),
imports: make(map[string]*CustomComponentRegistry),
}
}

Expand All @@ -29,3 +34,40 @@ func (s *CustomComponentRegistry) registerDeclare(declare *ast.BlockStmt) {
defer s.mut.Unlock()
s.declares[declare.Label] = declare.Body
}

// registerImport stores the import namespace.
// The content will be added later during evaluation.
// It's important to register it before populating the component nodes
// (else we don't know which one exists).
func (s *CustomComponentRegistry) registerImport(importNamespace string) {
s.mut.Lock()
defer s.mut.Unlock()
s.imports[importNamespace] = nil
}

// updateImportContent updates the content of a registered import.
// The content of an import node can contain other import blocks.
// These are considered as "children" of the root import node.
// Each child has its own CustomComponentRegistry which needs to be updated.
func (s *CustomComponentRegistry) updateImportContent(importNode *ImportConfigNode) {
s.mut.Lock()
defer s.mut.Unlock()
if _, exist := s.imports[importNode.label]; !exist {
panic(fmt.Errorf("import %q was not registered", importNode.label))
}
importScope := NewCustomComponentRegistry(nil)
importScope.declares = importNode.ImportedDeclares()
importScope.updateImportContentChildren(importNode)
s.imports[importNode.label] = importScope
}

// updateImportContentChildren recurse through the children of an import node
// and update their scope with the imported declare blocks.
func (s *CustomComponentRegistry) updateImportContentChildren(importNode *ImportConfigNode) {
for _, child := range importNode.ImportConfigNodesChildren() {
childScope := NewCustomComponentRegistry(nil)
childScope.declares = child.ImportedDeclares()
childScope.updateImportContentChildren(child)
s.imports[child.label] = childScope
wildum marked this conversation as resolved.
Show resolved Hide resolved
}
}
Loading
Loading