From c4b63d408012b52084536ba49a46150b43e89047 Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Tue, 1 Aug 2023 13:58:03 -0400 Subject: [PATCH] flow: enable managing services (#4664) * flow/internal/controller: rename files holding DAG node implementations Rename all files holding DAG node implementations to start with node_ so they're easier to locate. * flow/internal/controller: add ServiceNode ServiceNode is the representation of a Flow service in the DAG. Note that it is not currently wired up anywhere. * flow/internal/controller: make NewLoader more flexible Create a new type, LoaderOptions, which passes options to a Loader, to make extending Loader with new options more flexible. LoaderOptions currently encapsulates ComponentGlobals, but will be expanded in the future to contain other relevant options (such as services to load in the DAG). * flow/internal/controller: add service nodes to graph Add service nodes to the graph when the graph is being constructed. If a service declares a dependency on another service, it forms a DAG edge. * flow/internal/controller: expose ServiceNodes to caller This change stores and exposes ServiceNodes after calls to Apply. * flow: enable the management of services This change adds a new field to flow.Options, Services, which causes the lifecycle management of services to be handled by the Flow controller. Services are added to the DAG as nodes, and are run with the Flow controller when the Flow controller starts. Services that declare a dependency on another service are only evaluated after the services they depend on have been evaluated. GetServiceConsumers will now currently return instances of component.Component and service.Service which depend on a service. The set of component.Component dependants is currently empty since it is not yet possible for a component to define a dependency on a service. The follow items are left as follow-up work: * Allow components to define a dependency on a service. * Allow service data to be exposed to components which depend on that service. * Allow services to propagate to loaded modules. * Migrate existing services to be managed by the Flow controller. Related to #4253. --- pkg/flow/flow.go | 67 +++--- pkg/flow/flow_services.go | 31 ++- pkg/flow/flow_services_test.go | 176 ++++++++++++++++ pkg/flow/internal/controller/loader.go | 197 ++++++++++++++++-- pkg/flow/internal/controller/loader_test.go | 60 +++--- .../{component.go => node_component.go} | 0 ...mponent_test.go => node_component_test.go} | 0 .../controller/{config.go => node_config.go} | 0 ...ig_argument.go => node_config_argument.go} | 0 ...config_export.go => node_config_export.go} | 0 ...nfig_logging.go => node_config_logging.go} | 0 ...nfig_tracing.go => node_config_tracing.go} | 0 pkg/flow/internal/controller/node_service.go | 128 ++++++++++++ pkg/flow/internal/testservices/doc.go | 4 + pkg/flow/internal/testservices/fake.go | 58 ++++++ 15 files changed, 645 insertions(+), 76 deletions(-) create mode 100644 pkg/flow/flow_services_test.go rename pkg/flow/internal/controller/{component.go => node_component.go} (100%) rename pkg/flow/internal/controller/{component_test.go => node_component_test.go} (100%) rename pkg/flow/internal/controller/{config.go => node_config.go} (100%) rename pkg/flow/internal/controller/{config_argument.go => node_config_argument.go} (100%) rename pkg/flow/internal/controller/{config_export.go => node_config_export.go} (100%) rename pkg/flow/internal/controller/{config_logging.go => node_config_logging.go} (100%) rename pkg/flow/internal/controller/{config_tracing.go => node_config_tracing.go} (100%) create mode 100644 pkg/flow/internal/controller/node_service.go create mode 100644 pkg/flow/internal/testservices/doc.go create mode 100644 pkg/flow/internal/testservices/fake.go diff --git a/pkg/flow/flow.go b/pkg/flow/flow.go index 8d5b1c2b6ff3..f5f5698047ad 100644 --- a/pkg/flow/flow.go +++ b/pkg/flow/flow.go @@ -55,6 +55,7 @@ import ( "github.com/grafana/agent/pkg/flow/internal/controller" "github.com/grafana/agent/pkg/flow/logging" "github.com/grafana/agent/pkg/flow/tracing" + "github.com/grafana/agent/service" "github.com/prometheus/client_golang/prometheus" "go.uber.org/atomic" ) @@ -115,6 +116,12 @@ type Options struct { // DialFunc is a function to use for components to properly connect to // HTTPListenAddr. If nil, DialFunc defaults to (&net.Dialer{}).DialContext. DialFunc func(ctx context.Context, network, address string) (net.Conn, error) + + // List of Services to run with the Flow controller. + // + // Services are configured when LoadFile is invoked. Services are started + // when the Flow controller runs after LoadFile is invoked at least once. + Services []service.Service } // Flow is the Flow system. @@ -164,17 +171,29 @@ func newController(modReg *moduleRegistry, o Options) *Flow { dialFunc = (&net.Dialer{}).DialContext } - var ( - queue = controller.NewQueue() - sched = controller.NewScheduler() - loader = controller.NewLoader(controller.ComponentGlobals{ + f := &Flow{ + log: log, + tracer: tracer, + opts: o, + + clusterer: clusterer, + updateQueue: controller.NewQueue(), + sched: controller.NewScheduler(), + + modules: modReg, + + loadFinished: make(chan struct{}, 1), + } + + f.loader = controller.NewLoader(controller.LoaderOptions{ + ComponentGlobals: controller.ComponentGlobals{ Logger: log, TraceProvider: tracer, Clusterer: clusterer, DataPath: o.DataPath, OnComponentUpdate: func(cn *controller.ComponentNode) { // Changed components should be queued for reevaluation. - queue.Enqueue(cn) + f.updateQueue.Enqueue(cn) }, OnExportsChange: o.OnExportsChange, Registerer: o.Reg, @@ -196,21 +215,13 @@ func newController(modReg *moduleRegistry, o Options) *Flow { ID: id, }) }, - }) - ) - return &Flow{ - log: log, - tracer: tracer, - opts: o, + }, - clusterer: clusterer, - updateQueue: queue, - sched: sched, - loader: loader, - modules: modReg, + Services: o.Services, + Host: f, + }) - loadFinished: make(chan struct{}, 1), - } + return f } // Run starts the Flow controller, blocking until the provided context is @@ -240,16 +251,24 @@ func (f *Flow) Run(ctx context.Context) { } case <-f.loadFinished: - level.Info(f.log).Log("msg", "scheduling loaded components") + level.Info(f.log).Log("msg", "scheduling loaded components and services") + + var ( + components = f.loader.Components() + services = f.loader.Services() - components := f.loader.Components() - runnables := make([]controller.RunnableNode, 0, len(components)) - for _, uc := range components { - runnables = append(runnables, uc) + runnables = make([]controller.RunnableNode, 0, len(components)+len(services)) + ) + for _, c := range components { + runnables = append(runnables, c) } + for _, svc := range services { + runnables = append(runnables, svc) + } + err := f.sched.Synchronize(runnables) if err != nil { - level.Error(f.log).Log("msg", "failed to load components", "err", err) + level.Error(f.log).Log("msg", "failed to load components and services", "err", err) } } } diff --git a/pkg/flow/flow_services.go b/pkg/flow/flow_services.go index 1f0cf4b4a728..633ca0b8a78c 100644 --- a/pkg/flow/flow_services.go +++ b/pkg/flow/flow_services.go @@ -1,10 +1,35 @@ package flow +import "github.com/grafana/agent/pkg/flow/internal/controller" + // GetServiceConsumers implements [service.Host]. It returns a slice of // [component.Component] and [service.Service]s which declared a dependency on // the named service. func (f *Flow) GetServiceConsumers(serviceName string) []any { - // TODO(rfratto): return non-nil once it is possible for a service or - // component to declare a dependency on a named service. - return nil + graph := f.loader.OriginalGraph() + + serviceNode, _ := graph.GetByID(serviceName).(*controller.ServiceNode) + if serviceNode == nil { + return nil + } + dependants := graph.Dependants(serviceNode) + + consumers := make([]any, 0, len(dependants)) + + for _, consumer := range dependants { + // Only return instances of component.Component and service.Service. + switch consumer := consumer.(type) { + case *controller.ComponentNode: + if c := consumer.Component(); c != nil { + consumers = append(consumers, c) + } + + case *controller.ServiceNode: + if svc := consumer.Service(); svc != nil { + consumers = append(consumers, svc) + } + } + } + + return consumers } diff --git a/pkg/flow/flow_services_test.go b/pkg/flow/flow_services_test.go new file mode 100644 index 000000000000..9507cd6c6211 --- /dev/null +++ b/pkg/flow/flow_services_test.go @@ -0,0 +1,176 @@ +package flow + +import ( + "context" + "testing" + "time" + + "github.com/grafana/agent/pkg/flow/internal/testservices" + "github.com/grafana/agent/pkg/util" + "github.com/grafana/agent/service" + "github.com/stretchr/testify/require" +) + +func TestServices(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var ( + startedSvc = util.NewWaitTrigger() + + svc = &testservices.Fake{ + RunFunc: func(ctx context.Context, _ service.Host) error { + startedSvc.Trigger() + + <-ctx.Done() + return nil + }, + } + ) + + opts := testOptions(t) + opts.Services = append(opts.Services, svc) + + ctrl := New(opts) + require.NoError(t, ctrl.LoadFile(makeEmptyFile(t), nil)) + + // Start the controller. This should cause our service to run. + go ctrl.Run(ctx) + + require.NoError(t, startedSvc.Wait(5*time.Second), "Service did not start") +} + +func TestServices_Configurable(t *testing.T) { + type ServiceOptions struct { + Name string `river:"name,attr"` + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var ( + updateCalled = util.NewWaitTrigger() + + svc = &testservices.Fake{ + DefinitionFunc: func() service.Definition { + return service.Definition{ + Name: "fake", + ConfigType: ServiceOptions{}, + } + }, + + UpdateFunc: func(newConfig any) error { + defer updateCalled.Trigger() + + require.IsType(t, ServiceOptions{}, newConfig) + require.Equal(t, "John Doe", newConfig.(ServiceOptions).Name) + return nil + }, + } + ) + + f, err := ReadFile(t.Name(), []byte(` + fake { + name = "John Doe" + } + `)) + require.NoError(t, err) + require.NotNil(t, f) + + opts := testOptions(t) + opts.Services = append(opts.Services, svc) + + ctrl := New(opts) + + require.NoError(t, ctrl.LoadFile(f, nil)) + + // Start the controller. This should cause our service to run. + go ctrl.Run(ctx) + + require.NoError(t, updateCalled.Wait(5*time.Second), "Service was not configured") +} + +// TestServices_Configurable_Optional ensures that a service with optional +// arguments is configured properly even when it is not defined in the config +// file. +func TestServices_Configurable_Optional(t *testing.T) { + type ServiceOptions struct { + Name string `river:"name,attr,optional"` + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var ( + updateCalled = util.NewWaitTrigger() + + svc = &testservices.Fake{ + DefinitionFunc: func() service.Definition { + return service.Definition{ + Name: "fake", + ConfigType: ServiceOptions{}, + } + }, + + UpdateFunc: func(newConfig any) error { + defer updateCalled.Trigger() + + require.IsType(t, ServiceOptions{}, newConfig) + require.Equal(t, ServiceOptions{}, newConfig.(ServiceOptions)) + return nil + }, + } + ) + + opts := testOptions(t) + opts.Services = append(opts.Services, svc) + + ctrl := New(opts) + + require.NoError(t, ctrl.LoadFile(makeEmptyFile(t), nil)) + + // Start the controller. This should cause our service to run. + go ctrl.Run(ctx) + + require.NoError(t, updateCalled.Wait(5*time.Second), "Service was not configured") +} + +func TestFlow_GetServiceConsumers(t *testing.T) { + var ( + svcA = &testservices.Fake{ + DefinitionFunc: func() service.Definition { + return service.Definition{ + Name: "svc_a", + } + }, + } + + svcB = &testservices.Fake{ + DefinitionFunc: func() service.Definition { + return service.Definition{ + Name: "svc_b", + DependsOn: []string{"svc_a"}, + } + }, + } + ) + + opts := testOptions(t) + opts.Services = append(opts.Services, svcA, svcB) + + ctrl := New(opts) + require.NoError(t, ctrl.LoadFile(makeEmptyFile(t), nil)) + + consumers := ctrl.GetServiceConsumers("svc_a") + require.Equal(t, []any{svcB}, consumers) +} + +func makeEmptyFile(t *testing.T) *File { + t.Helper() + + f, err := ReadFile(t.Name(), nil) + require.NoError(t, err) + require.NotNil(t, f) + + return f +} diff --git a/pkg/flow/internal/controller/loader.go b/pkg/flow/internal/controller/loader.go index 1d43a349e734..0fd81134ffe3 100644 --- a/pkg/flow/internal/controller/loader.go +++ b/pkg/flow/internal/controller/loader.go @@ -14,6 +14,7 @@ import ( "github.com/grafana/agent/pkg/flow/tracing" "github.com/grafana/agent/pkg/river/ast" "github.com/grafana/agent/pkg/river/diag" + "github.com/grafana/agent/service" "github.com/grafana/ckit" "github.com/grafana/ckit/peer" "github.com/hashicorp/go-multierror" @@ -26,14 +27,17 @@ import ( // The Loader builds and evaluates ComponentNodes from River blocks. type Loader struct { - log log.Logger - tracer trace.TracerProvider - globals ComponentGlobals + log log.Logger + tracer trace.TracerProvider + globals ComponentGlobals + services []service.Service + host service.Host mut sync.RWMutex graph *dag.Graph originalGraph *dag.Graph - components []*ComponentNode + componentNodes []*ComponentNode + serviceNodes []*ServiceNode cache *valueCache blocks []*ast.BlockStmt // Most recently loaded blocks, used for writing cm *controllerMetrics @@ -41,13 +45,30 @@ type Loader struct { moduleExportIndex int } +// LoaderOptions holds options for creating a Loader. +type LoaderOptions struct { + // ComponentGlobals contains data to use when creating components. + ComponentGlobals ComponentGlobals + + Services []service.Service // Services to load into the DAG. + Host service.Host // Service host (when running services). +} + // NewLoader creates a new Loader. Components built by the Loader will be built // with co for their options. -func NewLoader(globals ComponentGlobals) *Loader { +func NewLoader(opts LoaderOptions) *Loader { + var ( + globals = opts.ComponentGlobals + services = opts.Services + host = opts.Host + ) + l := &Loader{ - log: log.With(globals.Logger, "controller_id", globals.ControllerID), - tracer: tracing.WrapTracerForLoader(globals.TraceProvider, globals.ControllerID), - globals: globals, + log: log.With(globals.Logger, "controller_id", globals.ControllerID), + tracer: tracing.WrapTracerForLoader(globals.TraceProvider, globals.ControllerID), + globals: globals, + services: services, + host: host, graph: &dag.Graph{}, originalGraph: &dag.Graph{}, @@ -117,6 +138,7 @@ func (l *Loader) Apply(args map[string]any, componentBlocks []*ast.BlockStmt, co var ( components = make([]*ComponentNode, 0, len(componentBlocks)) componentIDs = make([]ComponentID, 0, len(componentBlocks)) + services = make([]*ServiceNode, 0, len(l.services)) ) tracer := l.tracer.Tracer("") @@ -148,12 +170,12 @@ func (l *Loader) Apply(args map[string]any, componentBlocks []*ast.BlockStmt, co var err error - switch c := n.(type) { + switch n := n.(type) { case *ComponentNode: - components = append(components, c) - componentIDs = append(componentIDs, c.ID()) + components = append(components, n) + componentIDs = append(componentIDs, n.ID()) - if err = l.evaluate(logger, c); err != nil { + if err = l.evaluate(logger, n); err != nil { var evalDiags diag.Diagnostics if errors.As(err, &evalDiags) { diags = append(diags, evalDiags...) @@ -161,18 +183,36 @@ func (l *Loader) Apply(args map[string]any, componentBlocks []*ast.BlockStmt, co diags.Add(diag.Diagnostic{ Severity: diag.SeverityLevelError, Message: fmt.Sprintf("Failed to build component: %s", err), - StartPos: ast.StartPos(c.Block()).Position(), - EndPos: ast.EndPos(c.Block()).Position(), + StartPos: ast.StartPos(n.Block()).Position(), + EndPos: ast.EndPos(n.Block()).Position(), + }) + } + } + + case *ServiceNode: + services = append(services, n) + + if err = l.evaluate(logger, n); err != nil { + var evalDiags diag.Diagnostics + if errors.As(err, &evalDiags) { + diags = append(diags, evalDiags...) + } else { + diags.Add(diag.Diagnostic{ + Severity: diag.SeverityLevelError, + Message: fmt.Sprintf("Failed to evaluate service: %s", err), + StartPos: ast.StartPos(n.Block()).Position(), + EndPos: ast.EndPos(n.Block()).Position(), }) } } + case BlockNode: - if err = l.evaluate(logger, c); err != nil { + if err = l.evaluate(logger, n); err != nil { diags.Add(diag.Diagnostic{ Severity: diag.SeverityLevelError, Message: fmt.Sprintf("Failed to evaluate node for config block: %s", err), - StartPos: ast.StartPos(c.Block()).Position(), - EndPos: ast.EndPos(c.Block()).Position(), + StartPos: ast.StartPos(n.Block()).Position(), + EndPos: ast.EndPos(n.Block()).Position(), }) } if exp, ok := n.(*ExportConfigNode); ok { @@ -190,7 +230,8 @@ func (l *Loader) Apply(args map[string]any, componentBlocks []*ast.BlockStmt, co return nil }) - l.components = components + l.componentNodes = components + l.serviceNodes = services l.graph = &newGraph l.cache.SyncIDs(componentIDs) l.blocks = componentBlocks @@ -214,8 +255,17 @@ func (l *Loader) Cleanup() { // loadNewGraph creates a new graph from the provided blocks and validates it. func (l *Loader) loadNewGraph(args map[string]any, componentBlocks []*ast.BlockStmt, configBlocks []*ast.BlockStmt) (dag.Graph, diag.Diagnostics) { var g dag.Graph + + // Split component blocks into blocks for components and services. + componentBlocks, serviceBlocks := l.splitComponentBlocks(componentBlocks) + + // Fill our graph with service blocks, which must be added before any other + // block. + diags := l.populateServiceNodes(&g, serviceBlocks) + // Fill our graph with config blocks. - diags := l.populateConfigBlockNodes(args, &g, configBlocks) + configBlockDiags := l.populateConfigBlockNodes(args, &g, configBlocks) + diags = append(diags, configBlockDiags...) // Fill our graph with components. componentNodeDiags := l.populateComponentNodes(&g, componentBlocks) @@ -241,6 +291,82 @@ func (l *Loader) loadNewGraph(args map[string]any, componentBlocks []*ast.BlockS return g, diags } +func (l *Loader) splitComponentBlocks(blocks []*ast.BlockStmt) (componentBlocks, serviceBlocks []*ast.BlockStmt) { + componentBlocks = make([]*ast.BlockStmt, 0, len(blocks)) + serviceBlocks = make([]*ast.BlockStmt, 0, len(l.services)) + + serviceNames := make(map[string]struct{}, len(l.services)) + for _, svc := range l.services { + serviceNames[svc.Definition().Name] = struct{}{} + } + + for _, block := range blocks { + if _, isService := serviceNames[BlockComponentID(block).String()]; isService { + serviceBlocks = append(serviceBlocks, block) + } else { + componentBlocks = append(componentBlocks, block) + } + } + + return componentBlocks, serviceBlocks +} + +// populateServiceNodes adds service nodes to the graph. +func (l *Loader) populateServiceNodes(g *dag.Graph, serviceBlocks []*ast.BlockStmt) diag.Diagnostics { + var diags diag.Diagnostics + + // First, build the services. + for _, svc := range l.services { + id := svc.Definition().Name + + if g.GetByID(id) != nil { + diags.Add(diag.Diagnostic{ + Severity: diag.SeverityLevelError, + Message: fmt.Sprintf("cannot add service %q; node with same ID already exists", id), + }) + + continue + } + + var node *ServiceNode + + // Check the graph from the previous call to Load to see we can copy an + // existing instance of ServiceNode. + if exist := l.graph.GetByID(id); exist != nil { + node = exist.(*ServiceNode) + } else { + node = NewServiceNode(l.host, svc) + } + + node.UpdateBlock(nil) // Reset configuration to nil. + g.Add(node) + } + + // Now, assign blocks to services. + for _, block := range serviceBlocks { + blockID := BlockComponentID(block).String() + node := g.GetByID(blockID).(*ServiceNode) + + // Blocks assigned to services are reset to nil in the previous loop. + // + // If the block is non-nil, it means that there was a duplicate block + // configuring the same service found in a previous iteration of this loop. + if node.Block() != nil { + diags.Add(diag.Diagnostic{ + Severity: diag.SeverityLevelError, + Message: fmt.Sprintf("duplicate definition of %q", blockID), + StartPos: ast.StartPos(block).Position(), + EndPos: ast.EndPos(block).Position(), + }) + continue + } + + node.UpdateBlock(block) + } + + return diags +} + // populateConfigBlockNodes adds any config blocks to the graph. func (l *Loader) populateConfigBlockNodes(args map[string]any, g *dag.Graph, configBlocks []*ast.BlockStmt) diag.Diagnostics { var ( @@ -311,8 +437,9 @@ func (l *Loader) populateComponentNodes(g *dag.Graph, componentBlocks []*ast.Blo } blockMap[id] = block + // Check the graph from the previous call to Load to see we can copy an + // existing instance of ComponentNode. if exist := l.graph.GetByID(id); exist != nil { - // Re-use the existing component and update its block c = exist.(*ComponentNode) c.UpdateBlock(block) } else { @@ -373,6 +500,27 @@ func (l *Loader) wireGraphEdges(g *dag.Graph) diag.Diagnostics { var diags diag.Diagnostics for _, n := range g.Nodes() { + // First, wire up dependencies on services. + switch n := n.(type) { + case *ServiceNode: // Service depending on other services. + for _, depName := range n.Definition().DependsOn { + dep := g.GetByID(depName) + if dep == nil { + diags.Add(diag.Diagnostic{ + Severity: diag.SeverityLevelError, + Message: fmt.Sprintf("service %q has invalid reference to service %q", n.NodeID(), depName), + }) + continue + } + + g.AddEdge(dag.Edge{From: n, To: dep}) + } + + case *ComponentNode: // Component depending on service. + // TODO(rfratto): Wire service dependencies of component. + } + + // Finally, wire component references. refs, nodeDiags := ComponentReferences(n, g) for _, ref := range refs { g.AddEdge(dag.Edge{From: n, To: ref.Target}) @@ -393,7 +541,14 @@ func (l *Loader) Variables() map[string]interface{} { func (l *Loader) Components() []*ComponentNode { l.mut.RLock() defer l.mut.RUnlock() - return l.components + return l.componentNodes +} + +// Services returns the current set of service nodes. +func (l *Loader) Services() []*ServiceNode { + l.mut.RLock() + defer l.mut.RUnlock() + return l.serviceNodes } // Graph returns a copy of the DAG managed by the Loader. diff --git a/pkg/flow/internal/controller/loader_test.go b/pkg/flow/internal/controller/loader_test.go index 2aa8c3592b59..8c74bff95f0d 100644 --- a/pkg/flow/internal/controller/loader_test.go +++ b/pkg/flow/internal/controller/loader_test.go @@ -64,30 +64,32 @@ func TestLoader(t *testing.T) { }, } - newGlobals := func() controller.ComponentGlobals { + newLoaderOptions := func() controller.LoaderOptions { l, _ := logging.New(os.Stderr, logging.DefaultOptions) - return controller.ComponentGlobals{ - Logger: l, - TraceProvider: trace.NewNoopTracerProvider(), - Clusterer: noOpClusterer(), - DataPath: t.TempDir(), - OnComponentUpdate: func(cn *controller.ComponentNode) { /* no-op */ }, - Registerer: prometheus.NewRegistry(), - NewModuleController: func(id string) controller.ModuleController { - return nil + return controller.LoaderOptions{ + ComponentGlobals: controller.ComponentGlobals{ + Logger: l, + TraceProvider: trace.NewNoopTracerProvider(), + Clusterer: noOpClusterer(), + DataPath: t.TempDir(), + OnComponentUpdate: func(cn *controller.ComponentNode) { /* no-op */ }, + Registerer: prometheus.NewRegistry(), + NewModuleController: func(id string) controller.ModuleController { + return nil + }, }, } } t.Run("New Graph", func(t *testing.T) { - l := controller.NewLoader(newGlobals()) + l := controller.NewLoader(newLoaderOptions()) diags := applyFromContent(t, l, []byte(testFile), []byte(testConfig)) require.NoError(t, diags.ErrorOrNil()) requireGraph(t, l.Graph(), testGraphDefinition) }) t.Run("New Graph No Config", func(t *testing.T) { - l := controller.NewLoader(newGlobals()) + l := controller.NewLoader(newLoaderOptions()) diags := applyFromContent(t, l, []byte(testFile), nil) require.NoError(t, diags.ErrorOrNil()) requireGraph(t, l.Graph(), testGraphDefinition) @@ -105,7 +107,7 @@ func TestLoader(t *testing.T) { frequency = "1m" } ` - l := controller.NewLoader(newGlobals()) + l := controller.NewLoader(newLoaderOptions()) diags := applyFromContent(t, l, []byte(startFile), []byte(testConfig)) origGraph := l.Graph() require.NoError(t, diags.ErrorOrNil()) @@ -124,7 +126,7 @@ func TestLoader(t *testing.T) { doesnotexist "bad_component" { } ` - l := controller.NewLoader(newGlobals()) + l := controller.NewLoader(newLoaderOptions()) diags := applyFromContent(t, l, []byte(invalidFile), nil) require.ErrorContains(t, diags.ErrorOrNil(), `Unrecognized component name "doesnotexist`) }) @@ -143,7 +145,7 @@ func TestLoader(t *testing.T) { input = testcomponents.tick.doesnotexist.tick_time } ` - l := controller.NewLoader(newGlobals()) + l := controller.NewLoader(newLoaderOptions()) diags := applyFromContent(t, l, []byte(invalidFile), nil) require.Error(t, diags.ErrorOrNil()) @@ -171,7 +173,7 @@ func TestLoader(t *testing.T) { input = testcomponents.passthrough.ticker.output } ` - l := controller.NewLoader(newGlobals()) + l := controller.NewLoader(newLoaderOptions()) diags := applyFromContent(t, l, []byte(invalidFile), nil) require.Error(t, diags.ErrorOrNil()) }) @@ -183,7 +185,7 @@ func TestLoader(t *testing.T) { testcomponents.singleton "first" { } ` - l := controller.NewLoader(newGlobals()) + l := controller.NewLoader(newLoaderOptions()) diags := applyFromContent(t, l, []byte(invalidFile), nil) require.ErrorContains(t, diags[0], `Component "testcomponents.tick" must have a label`) require.ErrorContains(t, diags[1], `Component "testcomponents.singleton" does not support labels`) @@ -210,22 +212,24 @@ func TestScopeWithFailingComponent(t *testing.T) { input = testcomponents.passthrough.ticker.output } ` - newGlobals := func() controller.ComponentGlobals { + newLoaderOptions := func() controller.LoaderOptions { l, _ := logging.New(os.Stderr, logging.DefaultOptions) - return controller.ComponentGlobals{ - Logger: l, - TraceProvider: trace.NewNoopTracerProvider(), - DataPath: t.TempDir(), - OnComponentUpdate: func(cn *controller.ComponentNode) { /* no-op */ }, - Registerer: prometheus.NewRegistry(), - Clusterer: noOpClusterer(), - NewModuleController: func(id string) controller.ModuleController { - return nil + return controller.LoaderOptions{ + ComponentGlobals: controller.ComponentGlobals{ + Logger: l, + TraceProvider: trace.NewNoopTracerProvider(), + DataPath: t.TempDir(), + OnComponentUpdate: func(cn *controller.ComponentNode) { /* no-op */ }, + Registerer: prometheus.NewRegistry(), + Clusterer: noOpClusterer(), + NewModuleController: func(id string) controller.ModuleController { + return nil + }, }, } } - l := controller.NewLoader(newGlobals()) + l := controller.NewLoader(newLoaderOptions()) diags := applyFromContent(t, l, []byte(testFile), nil) require.Error(t, diags.ErrorOrNil()) require.Len(t, diags, 1) diff --git a/pkg/flow/internal/controller/component.go b/pkg/flow/internal/controller/node_component.go similarity index 100% rename from pkg/flow/internal/controller/component.go rename to pkg/flow/internal/controller/node_component.go diff --git a/pkg/flow/internal/controller/component_test.go b/pkg/flow/internal/controller/node_component_test.go similarity index 100% rename from pkg/flow/internal/controller/component_test.go rename to pkg/flow/internal/controller/node_component_test.go diff --git a/pkg/flow/internal/controller/config.go b/pkg/flow/internal/controller/node_config.go similarity index 100% rename from pkg/flow/internal/controller/config.go rename to pkg/flow/internal/controller/node_config.go diff --git a/pkg/flow/internal/controller/config_argument.go b/pkg/flow/internal/controller/node_config_argument.go similarity index 100% rename from pkg/flow/internal/controller/config_argument.go rename to pkg/flow/internal/controller/node_config_argument.go diff --git a/pkg/flow/internal/controller/config_export.go b/pkg/flow/internal/controller/node_config_export.go similarity index 100% rename from pkg/flow/internal/controller/config_export.go rename to pkg/flow/internal/controller/node_config_export.go diff --git a/pkg/flow/internal/controller/config_logging.go b/pkg/flow/internal/controller/node_config_logging.go similarity index 100% rename from pkg/flow/internal/controller/config_logging.go rename to pkg/flow/internal/controller/node_config_logging.go diff --git a/pkg/flow/internal/controller/config_tracing.go b/pkg/flow/internal/controller/node_config_tracing.go similarity index 100% rename from pkg/flow/internal/controller/config_tracing.go rename to pkg/flow/internal/controller/node_config_tracing.go diff --git a/pkg/flow/internal/controller/node_service.go b/pkg/flow/internal/controller/node_service.go new file mode 100644 index 000000000000..37d1dfc001c9 --- /dev/null +++ b/pkg/flow/internal/controller/node_service.go @@ -0,0 +1,128 @@ +package controller + +import ( + "context" + "fmt" + "reflect" + "sync" + + "github.com/grafana/agent/component" + "github.com/grafana/agent/pkg/river/ast" + "github.com/grafana/agent/pkg/river/vm" + "github.com/grafana/agent/service" +) + +// ServiceNode is a Flow DAG node which represents a running service. +type ServiceNode struct { + host service.Host + svc service.Service + def service.Definition + + mut sync.RWMutex + block *ast.BlockStmt // Current River block to derive args from + eval *vm.Evaluator + args component.Arguments // Evaluated arguments for the managed component +} + +var ( + _ BlockNode = (*ServiceNode)(nil) + _ RunnableNode = (*ServiceNode)(nil) +) + +// NewServiceNode creates a new instance of a ServiceNode from an instance of a +// Service. The provided host is used when running the service. +func NewServiceNode(host service.Host, svc service.Service) *ServiceNode { + return &ServiceNode{ + host: host, + svc: svc, + def: svc.Definition(), + } +} + +// Service returns the service instance associated with the node. +func (sn *ServiceNode) Service() service.Service { return sn.svc } + +// Definition returns the service definition associated with the node. +func (sn *ServiceNode) Definition() service.Definition { return sn.def } + +// NodeID returns the ID of the ServiceNode, which is equal to the service's +// name. +func (sn *ServiceNode) NodeID() string { return sn.def.Name } + +// Block implements BlockNode. It returns nil, since ServiceNodes don't have +// associated configs. +func (sn *ServiceNode) Block() *ast.BlockStmt { + sn.mut.RLock() + defer sn.mut.RUnlock() + return sn.block +} + +// UpdateBlock updates the River block used to construct arguments for the +// service. The new block isn't used until the next time Evaluate is called. +// +// UpdateBlock will panic if the block does not match the ID of the +// ServiceNode. +// +// Call UpdateBlock with a nil block to remove the block associated with the +// ServiceNode. +func (sn *ServiceNode) UpdateBlock(b *ast.BlockStmt) { + if b != nil && !BlockComponentID(b).Equals([]string{sn.NodeID()}) { + panic("UpdateBlock called with a River block with a different block ID") + } + + sn.mut.Lock() + defer sn.mut.Unlock() + + sn.block = b + + if b != nil { + sn.eval = vm.New(b.Body) + } else { + sn.eval = vm.New(ast.Body{}) + } +} + +// Evaluate implements BlockNode, evaluating the configuration for a service. +// Evalute returns an error if the service doesn't support being configured and +// the ServiceNode has an associated block from a call to UpdateBlock. +func (sn *ServiceNode) Evaluate(scope *vm.Scope) error { + sn.mut.Lock() + defer sn.mut.Unlock() + + switch { + case sn.block != nil && sn.def.ConfigType == nil: + return fmt.Errorf("service %q does not support being configured", sn.NodeID()) + + case sn.def.ConfigType == nil: + return nil // Do nothing; no configuration. + } + + argsPointer := reflect.New(reflect.TypeOf(sn.def.ConfigType)).Interface() + + if err := sn.eval.Evaluate(scope, argsPointer); err != nil { + return fmt.Errorf("decoding River: %w", err) + } + + // args is always a pointer to the args type, so we want to deference it + // since services expect a non-pointer. + argsCopyValue := reflect.ValueOf(argsPointer).Elem().Interface() + + if reflect.DeepEqual(sn.args, argsCopyValue) { + // Ignore arguments which haven't changed. This reduces the cost of calling + // evaluate for services where evaluation is expensive (e.g., if + // re-evaluating requires re-starting some internal logic). + return nil + } + + // Update the service. + if err := sn.svc.Update(argsCopyValue); err != nil { + return fmt.Errorf("updating service: %w", err) + } + + sn.args = argsCopyValue + return nil +} + +func (sn *ServiceNode) Run(ctx context.Context) error { + return sn.svc.Run(ctx, sn.host) +} diff --git a/pkg/flow/internal/testservices/doc.go b/pkg/flow/internal/testservices/doc.go new file mode 100644 index 000000000000..4c1984a597d7 --- /dev/null +++ b/pkg/flow/internal/testservices/doc.go @@ -0,0 +1,4 @@ +// Package testservices contains services useful for testing. They are not +// intended to be exposed by end users and so this package should only be +// imported in tests. +package testservices diff --git a/pkg/flow/internal/testservices/fake.go b/pkg/flow/internal/testservices/fake.go new file mode 100644 index 000000000000..5d08febfb14a --- /dev/null +++ b/pkg/flow/internal/testservices/fake.go @@ -0,0 +1,58 @@ +package testservices + +import ( + "context" + + "github.com/grafana/agent/service" +) + +// The Fake service allows injecting custom behavior for interface methods. +type Fake struct { + DefinitionFunc func() service.Definition + RunFunc func(ctx context.Context, host service.Host) error + UpdateFunc func(newConfig any) error + DataFunc func() any +} + +var _ service.Service = (*Fake)(nil) + +// Definition implements [service.Service]. If f.DefinitionFunc is non-nil, it +// will be used. Otherwise, a default implementation is used. +func (f *Fake) Definition() service.Definition { + if f.DefinitionFunc != nil { + return f.DefinitionFunc() + } + + return service.Definition{Name: "fake"} +} + +// Run implements [service.Service]. If f.RunFunc is non-nil, it will be used. +// Otherwise, a default implementation is used. +func (f *Fake) Run(ctx context.Context, host service.Host) error { + if f.RunFunc != nil { + return f.RunFunc(ctx, host) + } + + <-ctx.Done() + return nil +} + +// Update implements [service.Service]. If f.UpdateFunc is non-nil, it will be +// used. Otherwise, a default implementation is used. +func (f *Fake) Update(newConfig any) error { + if f.UpdateFunc != nil { + return f.UpdateFunc(newConfig) + } + + return nil +} + +// Data implements [service.Service]. If f.DataFunc is non-nil, it will be +// used. Otherwise, a default implementation is used. +func (f *Fake) Data() any { + if f.DataFunc != nil { + return f.DataFunc() + } + + return nil +}