Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Import - new modules #6350

Merged
merged 23 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
78b370a
(pkg/flow/internal): add importsource package
wildum Feb 9, 2024
7c68059
Add ImportConfigNode
wildum Feb 9, 2024
ad2257d
Update flow controller to use the ImportConfigNode.
wildum Feb 9, 2024
ade13d8
add import test
wildum Feb 9, 2024
b943ce6
add another test to check config reload
wildum Feb 9, 2024
b7bd0f5
use runner package (goroutines leaking)
wildum Feb 12, 2024
a213e8b
stop runner to avoid goroutines leak
wildum Feb 13, 2024
a33a241
improve error handling / code quality following review
wildum Feb 13, 2024
b17dd73
prometheus metrics and health status can be collected from the import…
wildum Feb 13, 2024
153010d
extend test
wildum Feb 13, 2024
29d83af
refactor import tests
wildum Feb 14, 2024
5e6ef3c
add two more tests and increase robustness
wildum Feb 15, 2024
35abc3c
merge main
wildum Feb 16, 2024
e543e32
protect registerImport with mutex
wildum Feb 16, 2024
1221f5d
use custom args instead of local.file args for import.file
wildum Feb 16, 2024
77effec
lint
wildum Feb 16, 2024
9184077
update import nodes to be unhealthy if a nested import stops running …
wildum Feb 19, 2024
a2a655f
use %q instead of %s when logging strings
wildum Feb 19, 2024
a2f1a17
prevent runner from reusing previous tasks
wildum Feb 19, 2024
cd65c5d
refactor ExtractImportAndDeclare
wildum Feb 19, 2024
d103d18
refactor tests in multiple txtar files
wildum Feb 19, 2024
96d9d0b
equals in import config node runner should compare pointers
wildum Feb 20, 2024
f63d560
Merge branch 'main' into import-new-modules-2
wildum Feb 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pkg/flow/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,13 +248,18 @@ func (f *Flow) Run(ctx context.Context) {
var (
components = f.loader.Components()
services = f.loader.Services()
imports = f.loader.Imports()

runnables = make([]controller.RunnableNode, 0, len(components)+len(services))
runnables = make([]controller.RunnableNode, 0, len(components)+len(services)+len(imports))
)
for _, c := range components {
runnables = append(runnables, c)
}

for _, i := range imports {
runnables = append(runnables, i)
}

// Only the root controller should run services, since modules share the
// same service instance as the root.
if !f.opts.IsModule {
Expand Down
33 changes: 29 additions & 4 deletions pkg/flow/internal/controller/component_node_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,25 @@ func (m *ComponentNodeManager) createComponentNode(componentName string, block *
}

// getCustomComponentConfig is used by the custom component to retrieve its template and the customComponentRegistry associated with it.
func (m *ComponentNodeManager) getCustomComponentConfig(componentName string) (ast.Body, *CustomComponentRegistry, error) {
func (m *ComponentNodeManager) getCustomComponentConfig(namespace string, componentName string) (ast.Body, *CustomComponentRegistry, error) {
m.mut.Lock()
defer m.mut.Unlock()

template, customComponentRegistry := findLocalDeclare(m.customComponentReg, componentName)
var (
template ast.Body
customComponentRegistry *CustomComponentRegistry
)

if namespace == "" {
template, customComponentRegistry = findLocalDeclare(m.customComponentReg, componentName)
} else {
template, customComponentRegistry = findImportedDeclare(m.customComponentReg, namespace, componentName)
}

if customComponentRegistry == nil || template == nil {
return nil, nil, fmt.Errorf("custom component config not found in the registry, componentName: %s", componentName)
return nil, nil, fmt.Errorf("custom component config not found in the registry, namespace: %s, componentName: %s", namespace, componentName)
wildum marked this conversation as resolved.
Show resolved Hide resolved
}
// The registry is passed as a pointer to the custom component config.
return template, customComponentRegistry, nil
}

Expand All @@ -61,7 +71,8 @@ func isCustomComponent(reg *CustomComponentRegistry, name string) bool {
return false
}
_, declareExists := reg.declares[name]
return declareExists || isCustomComponent(reg.parent, name)
_, importExists := reg.imports[name]
return declareExists || importExists || isCustomComponent(reg.parent, name)
}

// findLocalDeclare recursively searches for a declare definition in the custom component registry.
Expand All @@ -74,3 +85,17 @@ func findLocalDeclare(reg *CustomComponentRegistry, componentName string) (ast.B
}
return nil, nil
}

// findImportedDeclare recursively searches for an import matching the provided namespace.
// When the import is found, it will search for a declare matching the componentName within the custom registry of the import.
func findImportedDeclare(reg *CustomComponentRegistry, namespace string, componentName string) (ast.Body, *CustomComponentRegistry) {
if imported, ok := reg.imports[namespace]; ok {
if declare, ok := imported.declares[componentName]; ok {
return declare, imported
}
}
if reg.parent != nil {
return findImportedDeclare(reg.parent, namespace, componentName)
}
return nil, nil
}
44 changes: 44 additions & 0 deletions pkg/flow/internal/controller/custom_component_registry.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
package controller

import (
"fmt"
"sync"

"github.com/grafana/river/ast"
)

// CustomComponentRegistry holds custom component definitions that are available in the context.
// The definitions are either imported, declared locally, or declared in a parent registry.
// Imported definitions are stored inside of the corresponding import registry.
type CustomComponentRegistry struct {
parent *CustomComponentRegistry // nil if root config
declares map[string]ast.Body // customComponentName: template

mut sync.RWMutex
imports map[string]*CustomComponentRegistry // importNamespace: importScope
}

// NewCustomComponentRegistry creates a new CustomComponentRegistry with a parent.
Expand All @@ -16,10 +24,46 @@ func NewCustomComponentRegistry(parent *CustomComponentRegistry) *CustomComponen
return &CustomComponentRegistry{
parent: parent,
declares: make(map[string]ast.Body),
imports: make(map[string]*CustomComponentRegistry),
}
}

// registerDeclare stores a local declare block.
func (s *CustomComponentRegistry) registerDeclare(declare *ast.BlockStmt) {
s.declares[declare.Label] = declare.Body
}

// registerImport stores the import namespace.
// The content will be added later during evaluation.
// It's important to register it before populating the component nodes
// (else we don't know which one exists).
func (s *CustomComponentRegistry) registerImport(importNamespace string) {
s.imports[importNamespace] = nil
}

// updateImportContent updates the content of a registered import.
// The content of an import node can contain other import blocks.
// These are considered as "children" of the root import node.
// Each child has its own CustomComponentRegistry which needs to be updated.
func (s *CustomComponentRegistry) updateImportContent(importNode *ImportConfigNode) {
s.mut.Lock()
defer s.mut.Unlock()
if _, exist := s.imports[importNode.label]; !exist {
panic(fmt.Errorf("import %q was not registered", importNode.label))
}
importScope := NewCustomComponentRegistry(nil)
importScope.declares = importNode.ImportedDeclares()
importScope.updateImportContentChildren(importNode)
s.imports[importNode.label] = importScope
}

// updateImportContentChildren recurse through the children of an import node
// and update their scope with the imported declare blocks.
func (s *CustomComponentRegistry) updateImportContentChildren(importNode *ImportConfigNode) {
for _, child := range importNode.ImportConfigNodesChildren() {
childScope := NewCustomComponentRegistry(nil)
childScope.declares = child.ImportedDeclares()
childScope.updateImportContentChildren(child)
s.imports[child.label] = childScope
wildum marked this conversation as resolved.
Show resolved Hide resolved
}
}
29 changes: 26 additions & 3 deletions pkg/flow/internal/controller/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Loader struct {
originalGraph *dag.Graph
componentNodes []ComponentNode
declareNodes map[string]*DeclareNode
importConfigNodes map[string]*ImportConfigNode
serviceNodes []*ServiceNode
cache *valueCache
blocks []*ast.BlockStmt // Most recently loaded blocks, used for writing
Expand Down Expand Up @@ -462,6 +463,10 @@ func (l *Loader) populateConfigBlockNodes(args map[string]any, g *dag.Graph, con
continue
}

if importNode, ok := node.(*ImportConfigNode); ok {
l.componentNodeManager.customComponentReg.registerImport(importNode.label)
}

g.Add(node)
}

Expand All @@ -480,7 +485,7 @@ func (l *Loader) populateConfigBlockNodes(args map[string]any, g *dag.Graph, con
g.Add(c)
}

// TODO: set import config nodes form the nodeMap to the importConfigNodes field of the loader.
l.importConfigNodes = nodeMap.importMap

return diags
}
Expand Down Expand Up @@ -580,12 +585,15 @@ func (l *Loader) wireGraphEdges(g *dag.Graph) diag.Diagnostics {

// wireCustomComponentNode wires a custom component to the import/declare nodes that it depends on.
func (l *Loader) wireCustomComponentNode(g *dag.Graph, cc *CustomComponentNode) {
if declare, ok := l.declareNodes[cc.componentName]; ok {
if declare, ok := l.declareNodes[cc.customComponentName]; ok {
refs := l.findCustomComponentReferences(declare.Block())
for ref := range refs {
// add edges between the custom component and declare/import nodes.
g.AddEdge(dag.Edge{From: cc, To: ref})
}
} else if importNode, ok := l.importConfigNodes[cc.importNamespace]; ok {
// add an edge between the custom component and the corresponding import node.
g.AddEdge(dag.Edge{From: cc, To: importNode})
}
}

Expand All @@ -609,6 +617,13 @@ func (l *Loader) Services() []*ServiceNode {
return l.serviceNodes
}

// Imports returns the current set of import nodes.
func (l *Loader) Imports() map[string]*ImportConfigNode {
l.mut.RLock()
defer l.mut.RUnlock()
return l.importConfigNodes
}

// Graph returns a copy of the DAG managed by the Loader.
func (l *Loader) Graph() *dag.Graph {
l.mut.RLock()
Expand Down Expand Up @@ -652,6 +667,9 @@ func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*QueuedN
case ComponentNode:
// Make sure we're in-sync with the current exports of parent.
l.cache.CacheExports(parentNode.ID(), parentNode.Exports())
case *ImportConfigNode:
// Update the scope with the imported content.
l.componentNodeManager.customComponentReg.updateImportContent(parentNode)
}
// We collect all nodes directly incoming to parent.
_ = dag.WalkIncomingNodes(l.graph, parent.Node, func(n dag.Node) error {
Expand Down Expand Up @@ -787,6 +805,8 @@ func (l *Loader) postEvaluate(logger log.Logger, bn BlockNode, err error) error
err = fmt.Errorf("missing required argument %q to module", c.Label())
}
}
case *ImportConfigNode:
l.componentNodeManager.customComponentReg.updateImportContent(c)
}

if err != nil {
Expand Down Expand Up @@ -821,7 +841,7 @@ func (l *Loader) findCustomComponentReferences(declare *ast.BlockStmt) map[Block
return uniqueReferences
}

// collectCustomComponentDependencies recursively collects references to declare nodes through an AST body.
// collectCustomComponentDependencies recursively collects references to import/declare nodes through an AST body.
func (l *Loader) collectCustomComponentReferences(stmts ast.Body, uniqueReferences map[BlockNode]struct{}) {
for _, stmt := range stmts {
blockStmt, ok := stmt.(*ast.BlockStmt)
Expand All @@ -833,13 +853,16 @@ func (l *Loader) collectCustomComponentReferences(stmts ast.Body, uniqueReferenc
componentName = strings.Join(blockStmt.Name, ".")

declareNode, foundDeclare = l.declareNodes[blockStmt.Name[0]]
importNode, foundImport = l.importConfigNodes[blockStmt.Name[0]]
)

switch {
case componentName == "declare":
l.collectCustomComponentReferences(blockStmt.Body, uniqueReferences)
case foundDeclare:
uniqueReferences[declareNode] = struct{}{}
case foundImport:
uniqueReferences[importNode] = struct{}{}
}
}
}
1 change: 1 addition & 0 deletions pkg/flow/internal/controller/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func newControllerCollector(l *Loader, id string) *controllerCollector {
func (cc *controllerCollector) Collect(ch chan<- prometheus.Metric) {
componentsByHealth := make(map[string]int)

// Should we also collect metrics from components running in import config nodes?
wildum marked this conversation as resolved.
Show resolved Hide resolved
for _, component := range cc.l.Components() {
health := component.CurrentHealth().Health.String()
componentsByHealth[health]++
Expand Down
7 changes: 7 additions & 0 deletions pkg/flow/internal/controller/node_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package controller
import (
"fmt"

"github.com/grafana/agent/pkg/flow/internal/importsource"
"github.com/grafana/river/ast"
"github.com/grafana/river/diag"
)
Expand All @@ -26,6 +27,8 @@ func NewConfigNode(block *ast.BlockStmt, globals ComponentGlobals) (BlockNode, d
return NewLoggingConfigNode(block, globals), nil
case tracingBlockID:
return NewTracingConfigNode(block, globals), nil
case importsource.BlockImportFile, importsource.BlockImportString:
return NewImportConfigNode(block, globals, importsource.GetSourceType(block.GetBlockName())), nil
default:
var diags diag.Diagnostics
diags.Add(diag.Diagnostic{
Expand All @@ -46,6 +49,7 @@ type ConfigNodeMap struct {
tracing *TracingConfigNode
argumentMap map[string]*ArgumentConfigNode
exportMap map[string]*ExportConfigNode
importMap map[string]*ImportConfigNode
}

// NewConfigNodeMap will create an initial ConfigNodeMap. Append must be called
Expand All @@ -56,6 +60,7 @@ func NewConfigNodeMap() *ConfigNodeMap {
tracing: nil,
argumentMap: map[string]*ArgumentConfigNode{},
exportMap: map[string]*ExportConfigNode{},
importMap: map[string]*ImportConfigNode{},
}
}

Expand All @@ -73,6 +78,8 @@ func (nodeMap *ConfigNodeMap) Append(configNode BlockNode) diag.Diagnostics {
nodeMap.logging = n
case *TracingConfigNode:
nodeMap.tracing = n
case *ImportConfigNode:
nodeMap.importMap[n.Label()] = n
default:
diags.Add(diag.Diagnostic{
Severity: diag.SeverityLevelError,
Expand Down
Loading