Skip to content

Commit

Permalink
flow: enable managing services (#4664)
Browse files Browse the repository at this point in the history
* flow/internal/controller: rename files holding DAG node implementations

Rename all files holding DAG node implementations to start with node_ so
they're easier to locate.

* flow/internal/controller: add ServiceNode

ServiceNode is the representation of a Flow service in the DAG.

Note that it is not currently wired up anywhere.

* flow/internal/controller: make NewLoader more flexible

Create a new type, LoaderOptions, which passes options to a Loader, to
make extending Loader with new options more flexible.

LoaderOptions currently encapsulates ComponentGlobals, but will be
expanded in the future to contain other relevant options (such as
services to load in the DAG).

* flow/internal/controller: add service nodes to graph

Add service nodes to the graph when the graph is being constructed. If a
service declares a dependency on another service, it forms a DAG edge.

* flow/internal/controller: expose ServiceNodes to caller

This change stores and exposes ServiceNodes after calls to Apply.

* flow: enable the management of services

This change adds a new field to flow.Options, Services, which causes the
lifecycle management of services to be handled by the Flow controller.

Services are added to the DAG as nodes, and are run with the Flow
controller when the Flow controller starts. Services that declare a
dependency on another service are only evaluated after the services they
depend on have been evaluated.

GetServiceConsumers will now currently return instances of
component.Component and service.Service which depend on a service. The
set of component.Component dependants is currently empty since it is not
yet possible for a component to define a dependency on a service.

The follow items are left as follow-up work:

* Allow components to define a dependency on a service.

* Allow service data to be exposed to components which depend on that
  service.

* Allow services to propagate to loaded modules.

* Migrate existing services to be managed by the Flow controller.

Related to #4253.
  • Loading branch information
rfratto authored Aug 1, 2023
1 parent 8d82b8f commit c4b63d4
Show file tree
Hide file tree
Showing 15 changed files with 645 additions and 76 deletions.
67 changes: 43 additions & 24 deletions pkg/flow/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/grafana/agent/pkg/flow/internal/controller"
"github.com/grafana/agent/pkg/flow/logging"
"github.com/grafana/agent/pkg/flow/tracing"
"github.com/grafana/agent/service"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
)
Expand Down Expand Up @@ -115,6 +116,12 @@ type Options struct {
// DialFunc is a function to use for components to properly connect to
// HTTPListenAddr. If nil, DialFunc defaults to (&net.Dialer{}).DialContext.
DialFunc func(ctx context.Context, network, address string) (net.Conn, error)

// List of Services to run with the Flow controller.
//
// Services are configured when LoadFile is invoked. Services are started
// when the Flow controller runs after LoadFile is invoked at least once.
Services []service.Service
}

// Flow is the Flow system.
Expand Down Expand Up @@ -164,17 +171,29 @@ func newController(modReg *moduleRegistry, o Options) *Flow {
dialFunc = (&net.Dialer{}).DialContext
}

var (
queue = controller.NewQueue()
sched = controller.NewScheduler()
loader = controller.NewLoader(controller.ComponentGlobals{
f := &Flow{
log: log,
tracer: tracer,
opts: o,

clusterer: clusterer,
updateQueue: controller.NewQueue(),
sched: controller.NewScheduler(),

modules: modReg,

loadFinished: make(chan struct{}, 1),
}

f.loader = controller.NewLoader(controller.LoaderOptions{
ComponentGlobals: controller.ComponentGlobals{
Logger: log,
TraceProvider: tracer,
Clusterer: clusterer,
DataPath: o.DataPath,
OnComponentUpdate: func(cn *controller.ComponentNode) {
// Changed components should be queued for reevaluation.
queue.Enqueue(cn)
f.updateQueue.Enqueue(cn)
},
OnExportsChange: o.OnExportsChange,
Registerer: o.Reg,
Expand All @@ -196,21 +215,13 @@ func newController(modReg *moduleRegistry, o Options) *Flow {
ID: id,
})
},
})
)
return &Flow{
log: log,
tracer: tracer,
opts: o,
},

clusterer: clusterer,
updateQueue: queue,
sched: sched,
loader: loader,
modules: modReg,
Services: o.Services,
Host: f,
})

loadFinished: make(chan struct{}, 1),
}
return f
}

// Run starts the Flow controller, blocking until the provided context is
Expand Down Expand Up @@ -240,16 +251,24 @@ func (f *Flow) Run(ctx context.Context) {
}

case <-f.loadFinished:
level.Info(f.log).Log("msg", "scheduling loaded components")
level.Info(f.log).Log("msg", "scheduling loaded components and services")

var (
components = f.loader.Components()
services = f.loader.Services()

components := f.loader.Components()
runnables := make([]controller.RunnableNode, 0, len(components))
for _, uc := range components {
runnables = append(runnables, uc)
runnables = make([]controller.RunnableNode, 0, len(components)+len(services))
)
for _, c := range components {
runnables = append(runnables, c)
}
for _, svc := range services {
runnables = append(runnables, svc)
}

err := f.sched.Synchronize(runnables)
if err != nil {
level.Error(f.log).Log("msg", "failed to load components", "err", err)
level.Error(f.log).Log("msg", "failed to load components and services", "err", err)
}
}
}
Expand Down
31 changes: 28 additions & 3 deletions pkg/flow/flow_services.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,35 @@
package flow

import "github.com/grafana/agent/pkg/flow/internal/controller"

// GetServiceConsumers implements [service.Host]. It returns a slice of
// [component.Component] and [service.Service]s which declared a dependency on
// the named service.
func (f *Flow) GetServiceConsumers(serviceName string) []any {
// TODO(rfratto): return non-nil once it is possible for a service or
// component to declare a dependency on a named service.
return nil
graph := f.loader.OriginalGraph()

serviceNode, _ := graph.GetByID(serviceName).(*controller.ServiceNode)
if serviceNode == nil {
return nil
}
dependants := graph.Dependants(serviceNode)

consumers := make([]any, 0, len(dependants))

for _, consumer := range dependants {
// Only return instances of component.Component and service.Service.
switch consumer := consumer.(type) {
case *controller.ComponentNode:
if c := consumer.Component(); c != nil {
consumers = append(consumers, c)
}

case *controller.ServiceNode:
if svc := consumer.Service(); svc != nil {
consumers = append(consumers, svc)
}
}
}

return consumers
}
176 changes: 176 additions & 0 deletions pkg/flow/flow_services_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package flow

import (
"context"
"testing"
"time"

"github.com/grafana/agent/pkg/flow/internal/testservices"
"github.com/grafana/agent/pkg/util"
"github.com/grafana/agent/service"
"github.com/stretchr/testify/require"
)

func TestServices(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var (
startedSvc = util.NewWaitTrigger()

svc = &testservices.Fake{
RunFunc: func(ctx context.Context, _ service.Host) error {
startedSvc.Trigger()

<-ctx.Done()
return nil
},
}
)

opts := testOptions(t)
opts.Services = append(opts.Services, svc)

ctrl := New(opts)
require.NoError(t, ctrl.LoadFile(makeEmptyFile(t), nil))

// Start the controller. This should cause our service to run.
go ctrl.Run(ctx)

require.NoError(t, startedSvc.Wait(5*time.Second), "Service did not start")
}

func TestServices_Configurable(t *testing.T) {
type ServiceOptions struct {
Name string `river:"name,attr"`
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var (
updateCalled = util.NewWaitTrigger()

svc = &testservices.Fake{
DefinitionFunc: func() service.Definition {
return service.Definition{
Name: "fake",
ConfigType: ServiceOptions{},
}
},

UpdateFunc: func(newConfig any) error {
defer updateCalled.Trigger()

require.IsType(t, ServiceOptions{}, newConfig)
require.Equal(t, "John Doe", newConfig.(ServiceOptions).Name)
return nil
},
}
)

f, err := ReadFile(t.Name(), []byte(`
fake {
name = "John Doe"
}
`))
require.NoError(t, err)
require.NotNil(t, f)

opts := testOptions(t)
opts.Services = append(opts.Services, svc)

ctrl := New(opts)

require.NoError(t, ctrl.LoadFile(f, nil))

// Start the controller. This should cause our service to run.
go ctrl.Run(ctx)

require.NoError(t, updateCalled.Wait(5*time.Second), "Service was not configured")
}

// TestServices_Configurable_Optional ensures that a service with optional
// arguments is configured properly even when it is not defined in the config
// file.
func TestServices_Configurable_Optional(t *testing.T) {
type ServiceOptions struct {
Name string `river:"name,attr,optional"`
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var (
updateCalled = util.NewWaitTrigger()

svc = &testservices.Fake{
DefinitionFunc: func() service.Definition {
return service.Definition{
Name: "fake",
ConfigType: ServiceOptions{},
}
},

UpdateFunc: func(newConfig any) error {
defer updateCalled.Trigger()

require.IsType(t, ServiceOptions{}, newConfig)
require.Equal(t, ServiceOptions{}, newConfig.(ServiceOptions))
return nil
},
}
)

opts := testOptions(t)
opts.Services = append(opts.Services, svc)

ctrl := New(opts)

require.NoError(t, ctrl.LoadFile(makeEmptyFile(t), nil))

// Start the controller. This should cause our service to run.
go ctrl.Run(ctx)

require.NoError(t, updateCalled.Wait(5*time.Second), "Service was not configured")
}

func TestFlow_GetServiceConsumers(t *testing.T) {
var (
svcA = &testservices.Fake{
DefinitionFunc: func() service.Definition {
return service.Definition{
Name: "svc_a",
}
},
}

svcB = &testservices.Fake{
DefinitionFunc: func() service.Definition {
return service.Definition{
Name: "svc_b",
DependsOn: []string{"svc_a"},
}
},
}
)

opts := testOptions(t)
opts.Services = append(opts.Services, svcA, svcB)

ctrl := New(opts)
require.NoError(t, ctrl.LoadFile(makeEmptyFile(t), nil))

consumers := ctrl.GetServiceConsumers("svc_a")
require.Equal(t, []any{svcB}, consumers)
}

func makeEmptyFile(t *testing.T) *File {
t.Helper()

f, err := ReadFile(t.Name(), nil)
require.NoError(t, err)
require.NotNil(t, f)

return f
}
Loading

0 comments on commit c4b63d4

Please sign in to comment.