diff --git a/go.mod b/go.mod index 3f5ee98cab1c..0ab4536248dc 100644 --- a/go.mod +++ b/go.mod @@ -751,7 +751,7 @@ replace ( // https://github.com/grafana/cadvisor/tree/grafana-v0.47-noglobals github.com/google/cadvisor => github.com/grafana/cadvisor v0.0.0-20231110094609-5f7917925dea - github.com/prometheus-community/postgres_exporter => github.com/grafana/postgres_exporter v0.15.1-0.20240402092333-fad5f95ea113 + github.com/prometheus-community/postgres_exporter => github.com/grafana/postgres_exporter v0.15.1-0.20240402092333-fad5f95ea113 // exporter-package-v0.15.0 branch // TODO(marctc): remove once this PR is merged upstream: https://github.com/prometheus/mysqld_exporter/pull/774 github.com/prometheus/mysqld_exporter => github.com/grafana/mysqld_exporter v0.12.2-0.20231005125903-364b9c41e595 diff --git a/internal/flow/flow.go b/internal/flow/flow.go index 83fa44f78ca2..7fbc10ac30f7 100644 --- a/internal/flow/flow.go +++ b/internal/flow/flow.go @@ -112,11 +112,11 @@ type Options struct { type Flow struct { log *logging.Logger tracer *tracing.Tracer - opts ControllerOptions + opts controllerOptions updateQueue *controller.Queue sched *controller.Scheduler - Loader *controller.Loader + loader *controller.Loader modules *moduleRegistry loadFinished chan struct{} @@ -127,17 +127,17 @@ type Flow struct { // New creates a new, unstarted Flow controller. Call Run to run the controller. func New(o Options) *Flow { - return NewController(ControllerOptions{ + return newController(controllerOptions{ Options: o, - ModuleRegistry: NewModuleRegistry(), + ModuleRegistry: newModuleRegistry(), IsModule: false, // We are creating a new root controller. WorkerPool: worker.NewDefaultWorkerPool(), }) } -// ControllerOptions are internal options used to create both root Flow +// controllerOptions are internal options used to create both root Flow // controller and controllers for modules. -type ControllerOptions struct { +type controllerOptions struct { Options ComponentRegistry controller.ComponentRegistry // Custom component registry used in tests. @@ -150,7 +150,7 @@ type ControllerOptions struct { // newController creates a new, unstarted Flow controller with a specific // moduleRegistry. Modules created by the controller will be passed to the // given modReg. -func NewController(o ControllerOptions) *Flow { +func newController(o controllerOptions) *Flow { var ( log = o.Logger tracer = o.Tracer @@ -186,7 +186,7 @@ func NewController(o ControllerOptions) *Flow { serviceMap := controller.NewServiceMap(o.Services) - f.Loader = controller.NewLoader(controller.LoaderOptions{ + f.loader = controller.NewLoader(controller.LoaderOptions{ ComponentGlobals: controller.ComponentGlobals{ Logger: log, TraceProvider: tracer, @@ -235,7 +235,7 @@ func NewController(o ControllerOptions) *Flow { // canceled. Run must only be called once. func (f *Flow) Run(ctx context.Context) { defer func() { _ = f.sched.Close() }() - defer f.Loader.Cleanup(!f.opts.IsModule) + defer f.loader.Cleanup(!f.opts.IsModule) defer level.Debug(f.log).Log("msg", "flow controller exiting") for { @@ -248,14 +248,14 @@ func (f *Flow) Run(ctx context.Context) { // throughput - it prevents the situation where two nodes have the same dependency, and the first time // it's picked up by the worker pool and the second time it's enqueued again, resulting in more evaluations. all := f.updateQueue.DequeueAll() - f.Loader.EvaluateDependants(ctx, all) + f.loader.EvaluateDependants(ctx, all) case <-f.loadFinished: level.Info(f.log).Log("msg", "scheduling loaded components and services") var ( - components = f.Loader.Components() - services = f.Loader.Services() - imports = f.Loader.Imports() + components = f.loader.Components() + services = f.loader.Services() + imports = f.loader.Imports() runnables = make([]controller.RunnableNode, 0, len(components)+len(services)+len(imports)) ) @@ -307,7 +307,7 @@ func (f *Flow) loadSource(source *Source, args map[string]any, customComponentRe CustomComponentRegistry: customComponentRegistry, } - diags := f.Loader.Apply(applyOptions) + diags := f.loader.Apply(applyOptions) if !f.loadedOnce.Load() && diags.HasErrors() { // The first call to Load should not run any components if there were // errors in the configuration file. diff --git a/internal/flow/flow_components.go b/internal/flow/flow_components.go index 8fec3fc3572d..bdbe87a2ab88 100644 --- a/internal/flow/flow_components.go +++ b/internal/flow/flow_components.go @@ -22,7 +22,7 @@ func (f *Flow) GetComponent(id component.ID, opts component.InfoOptions) (*compo return mod.f.GetComponent(component.ID{LocalID: id.LocalID}, opts) } - graph := f.Loader.OriginalGraph() + graph := f.loader.OriginalGraph() node := graph.GetByID(id.LocalID) if node == nil { @@ -52,8 +52,8 @@ func (f *Flow) ListComponents(moduleID string, opts component.InfoOptions) ([]*c } var ( - components = f.Loader.Components() - graph = f.Loader.OriginalGraph() + components = f.loader.Components() + graph = f.loader.OriginalGraph() ) detail := make([]*component.Info, len(components)) diff --git a/internal/flow/flow_services.go b/internal/flow/flow_services.go index bd04c5dfa51a..f55380118cf9 100644 --- a/internal/flow/flow_services.go +++ b/internal/flow/flow_services.go @@ -13,13 +13,13 @@ import ( // [component.Component] and [service.Service]s which declared a dependency on // the named service. func (f *Flow) GetServiceConsumers(serviceName string) []service.Consumer { - consumers := serviceConsumersForGraph(f.Loader.OriginalGraph(), serviceName, true) + consumers := serviceConsumersForGraph(f.loader.OriginalGraph(), serviceName, true) // Iterate through all modules to find other components that depend on the // service. Peer services aren't checked here, since the services are always // a subset of the services from the root controller. for _, mod := range f.modules.List() { - moduleGraph := mod.f.Loader.OriginalGraph() + moduleGraph := mod.f.loader.OriginalGraph() consumers = append(consumers, serviceConsumersForGraph(moduleGraph, serviceName, false)...) } @@ -71,7 +71,7 @@ func serviceConsumersForGraph(graph *dag.Graph, serviceName string, includePeerS // services can instantiate their own components. func (f *Flow) NewController(id string) service.Controller { return serviceController{ - f: NewController(ControllerOptions{ + f: newController(controllerOptions{ Options: Options{ ControllerID: id, Logger: f.opts.Logger, @@ -83,7 +83,7 @@ func (f *Flow) NewController(id string) service.Controller { OnExportsChange: nil, // NOTE(@tpaschalis, @wildum) The isolated controller shouldn't be able to export any values. }, IsModule: true, - ModuleRegistry: NewModuleRegistry(), + ModuleRegistry: newModuleRegistry(), WorkerPool: worker.NewDefaultWorkerPool(), }), } diff --git a/internal/flow/flow_services_test.go b/internal/flow/flow_services_test.go index c747350799c1..80404b80f427 100644 --- a/internal/flow/flow_services_test.go +++ b/internal/flow/flow_services_test.go @@ -247,10 +247,10 @@ func TestComponents_Using_Services(t *testing.T) { opts := testOptions(t) opts.Services = append(opts.Services, existsSvc) - ctrl := NewController(ControllerOptions{ + ctrl := newController(controllerOptions{ Options: opts, ComponentRegistry: registry, - ModuleRegistry: NewModuleRegistry(), + ModuleRegistry: newModuleRegistry(), }) require.NoError(t, ctrl.LoadSource(f, nil)) go ctrl.Run(ctx) @@ -325,10 +325,10 @@ func TestComponents_Using_Services_In_Modules(t *testing.T) { opts := testOptions(t) opts.Services = append(opts.Services, existsSvc) - ctrl := NewController(ControllerOptions{ + ctrl := newController(controllerOptions{ Options: opts, ComponentRegistry: registry, - ModuleRegistry: NewModuleRegistry(), + ModuleRegistry: newModuleRegistry(), }) require.NoError(t, ctrl.LoadSource(f, nil)) go ctrl.Run(ctx) diff --git a/internal/flow/flow_test.go b/internal/flow/flow_test.go index 30441c3e6c2d..d750568303e7 100644 --- a/internal/flow/flow_test.go +++ b/internal/flow/flow_test.go @@ -45,11 +45,11 @@ func TestController_LoadSource_Evaluation(t *testing.T) { err = ctrl.LoadSource(f, nil) require.NoError(t, err) - require.Len(t, ctrl.Loader.Components(), 4) + require.Len(t, ctrl.loader.Components(), 4) // Check the inputs and outputs of things that should be immediately resolved // without having to run the components. - in, out := getFields(t, ctrl.Loader.Graph(), "testcomponents.passthrough.static") + in, out := getFields(t, ctrl.loader.Graph(), "testcomponents.passthrough.static") require.Equal(t, "hello, world!", in.(testcomponents.PassthroughConfig).Input) require.Equal(t, "hello, world!", out.(testcomponents.PassthroughExports).Output) } diff --git a/internal/flow/flow_updates_test.go b/internal/flow/flow_updates_test.go index d2724d703d27..4eb89354784d 100644 --- a/internal/flow/flow_updates_test.go +++ b/internal/flow/flow_updates_test.go @@ -1,37 +1,44 @@ -package flow_test +package flow import ( "context" - "github.com/grafana/agent/internal/component" - "github.com/grafana/agent/internal/flow" - "github.com/grafana/agent/internal/flow/internal/controller" - "github.com/grafana/agent/internal/flow/internal/dag" "testing" "time" - _ "github.com/grafana/agent/internal/component/all" + "github.com/grafana/agent/internal/flow/internal/testcomponents" "github.com/grafana/agent/internal/flow/internal/worker" "github.com/stretchr/testify/require" ) -func TestPG(t *testing.T) { +func TestController_Updates(t *testing.T) { + defer verifyNoGoroutineLeaks(t) + + // Simple pipeline with a minimal lag config := ` -logging { - level = "debug" -} + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } -prometheus.exporter.postgres "pg" { - data_source_names = [ - "postgresql://user1:password1@localhost:5432/demo1?sslmode=disable", - "postgresql://user2:password2@localhost:5433/demo2?sslmode=disable", - ] -} + testcomponents.passthrough "inc_dep_1" { + input = testcomponents.count.inc.count + lag = "1ms" + } + + testcomponents.passthrough "inc_dep_2" { + input = testcomponents.passthrough.inc_dep_1.output + lag = "1ms" + } + + testcomponents.summation "sum" { + input = testcomponents.passthrough.inc_dep_2.output + } ` ctrl := newTestController(t) // Use testUpdatesFile from graph_builder_test.go. - f, err := flow.ParseSource(t.Name(), []byte(config)) + f, err := ParseSource(t.Name(), []byte(config)) require.NoError(t, err) require.NotNil(t, f) @@ -49,25 +56,326 @@ prometheus.exporter.postgres "pg" { <-done }() - time.Sleep(90 * time.Minute) + // Wait for the updates to propagate + require.Eventually(t, func() bool { + _, out := getFields(t, ctrl.loader.Graph(), "testcomponents.summation.sum") + return out.(testcomponents.SummationExports).LastAdded == 10 + }, 3*time.Second, 10*time.Millisecond) + + in, out := getFields(t, ctrl.loader.Graph(), "testcomponents.passthrough.inc_dep_1") + require.Equal(t, "10", in.(testcomponents.PassthroughConfig).Input) + require.Equal(t, "10", out.(testcomponents.PassthroughExports).Output) + + in, out = getFields(t, ctrl.loader.Graph(), "testcomponents.passthrough.inc_dep_2") + require.Equal(t, "10", in.(testcomponents.PassthroughConfig).Input) + require.Equal(t, "10", out.(testcomponents.PassthroughExports).Output) + + in, _ = getFields(t, ctrl.loader.Graph(), "testcomponents.summation.sum") + require.Equal(t, 10, in.(testcomponents.SummationConfig).Input) } -func newTestController(t *testing.T) *flow.Flow { - return flow.NewController(flow.ControllerOptions{ +func TestController_Updates_WithQueueFull(t *testing.T) { + defer verifyNoGoroutineLeaks(t) + + // Simple pipeline with a minimal lag with one node having 3 direct dependencies and one misbehaving node. + config := ` + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + testcomponents.passthrough "misbehaving_slow" { + input = testcomponents.count.inc.count + lag = "100ms" + } + + testcomponents.passthrough "inc_dep_1" { + input = testcomponents.count.inc.count + lag = "1ms" + } + + testcomponents.passthrough "inc_dep_2" { + input = testcomponents.count.inc.count + lag = "1ms" + } + + testcomponents.passthrough "inc_dep_3" { + input = testcomponents.count.inc.count + lag = "1ms" + } + + testcomponents.summation "sum" { + input = testcomponents.passthrough.inc_dep_3.output + } +` + + ctrl := newController(controllerOptions{ Options: testOptions(t), - ModuleRegistry: flow.NewModuleRegistry(), + ModuleRegistry: newModuleRegistry(), IsModule: false, - // Make sure that we have consistent number of workers for tests to make them deterministic. - WorkerPool: worker.NewFixedWorkerPool(4, 100), + // The small number of workers and small queue means that a lot of updates will need to be retried. + WorkerPool: worker.NewFixedWorkerPool(1, 1), }) + + // Use testUpdatesFile from graph_builder_test.go. + f, err := ParseSource(t.Name(), []byte(config)) + require.NoError(t, err) + require.NotNil(t, f) + + err = ctrl.LoadSource(f, nil) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + + done := make(chan struct{}) + go func() { + ctrl.Run(ctx) + close(done) + }() + defer func() { + cancel() + <-done + }() + + // Wait for the updates to propagate + require.Eventually(t, func() bool { + _, out := getFields(t, ctrl.loader.Graph(), "testcomponents.summation.sum") + return out.(testcomponents.SummationExports).LastAdded == 10 + }, 3*time.Second, 10*time.Millisecond) + + in, _ := getFields(t, ctrl.loader.Graph(), "testcomponents.summation.sum") + require.Equal(t, 10, in.(testcomponents.SummationConfig).Input) + + in, out := getFields(t, ctrl.loader.Graph(), "testcomponents.passthrough.inc_dep_3") + require.Equal(t, "10", in.(testcomponents.PassthroughConfig).Input) + require.Equal(t, "10", out.(testcomponents.PassthroughExports).Output) + + // The dep_2 is independent of sum and dep_3, so we check for it with eventually. + require.Eventually(t, func() bool { + _, out := getFields(t, ctrl.loader.Graph(), "testcomponents.passthrough.inc_dep_2") + return out.(testcomponents.PassthroughExports).Output == "10" + }, 3*time.Second, 10*time.Millisecond) + + // Similar for dep_1 + require.Eventually(t, func() bool { + _, out := getFields(t, ctrl.loader.Graph(), "testcomponents.passthrough.inc_dep_1") + return out.(testcomponents.PassthroughExports).Output == "10" + }, 3*time.Second, 10*time.Millisecond) } -func getFields(t *testing.T, g *dag.Graph, nodeID string) (component.Arguments, component.Exports) { - t.Helper() +func TestController_Updates_WithLag(t *testing.T) { + defer verifyNoGoroutineLeaks(t) + + // Simple pipeline with some lag + config := ` + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + testcomponents.passthrough "inc_dep_1" { + input = testcomponents.count.inc.count + lag = "10ms" + } + + testcomponents.passthrough "inc_dep_2" { + input = testcomponents.passthrough.inc_dep_1.output + lag = "10ms" + } + + testcomponents.summation "sum" { + input = testcomponents.passthrough.inc_dep_2.output + } +` + + ctrl := newTestController(t) + + // Use testUpdatesFile from graph_builder_test.go. + f, err := ParseSource(t.Name(), []byte(config)) + require.NoError(t, err) + require.NotNil(t, f) + + err = ctrl.LoadSource(f, nil) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + ctrl.Run(ctx) + close(done) + }() + defer func() { + cancel() + <-done + }() + + // Wait for the updates to propagate + require.Eventually(t, func() bool { + _, out := getFields(t, ctrl.loader.Graph(), "testcomponents.summation.sum") + return out.(testcomponents.SummationExports).LastAdded == 10 + }, 3*time.Second, 10*time.Millisecond) + + in, out := getFields(t, ctrl.loader.Graph(), "testcomponents.passthrough.inc_dep_1") + require.Equal(t, "10", in.(testcomponents.PassthroughConfig).Input) + require.Equal(t, "10", out.(testcomponents.PassthroughExports).Output) - n := g.GetByID(nodeID) - require.NotNil(t, n, "couldn't find node %q in graph", nodeID) + in, out = getFields(t, ctrl.loader.Graph(), "testcomponents.passthrough.inc_dep_2") + require.Equal(t, "10", in.(testcomponents.PassthroughConfig).Input) + require.Equal(t, "10", out.(testcomponents.PassthroughExports).Output) + + in, _ = getFields(t, ctrl.loader.Graph(), "testcomponents.summation.sum") + require.Equal(t, 10, in.(testcomponents.SummationConfig).Input) +} - uc := n.(*controller.BuiltinComponentNode) - return uc.Arguments(), uc.Exports() +func TestController_Updates_WithOtherLaggingPipeline(t *testing.T) { + defer verifyNoGoroutineLeaks(t) + + // Another pipeline exists with a significant lag. + config := ` + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + testcomponents.passthrough "inc_dep_1" { + input = testcomponents.count.inc.count + lag = "1ms" + } + + testcomponents.passthrough "inc_dep_2" { + input = testcomponents.passthrough.inc_dep_1.output + lag = "1ms" + } + + testcomponents.summation "sum" { + input = testcomponents.passthrough.inc_dep_2.output + } + + testcomponents.count "inc_2" { + frequency = "10ms" + max = 10 + } + + testcomponents.passthrough "inc_dep_slow" { + input = testcomponents.count.inc_2.count + lag = "500ms" + } +` + + ctrl := newTestController(t) + + // Use testUpdatesFile from graph_builder_test.go. + f, err := ParseSource(t.Name(), []byte(config)) + require.NoError(t, err) + require.NotNil(t, f) + + err = ctrl.LoadSource(f, nil) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + ctrl.Run(ctx) + close(done) + }() + defer func() { + cancel() + <-done + }() + + // Wait for the updates to propagate + require.Eventually(t, func() bool { + _, out := getFields(t, ctrl.loader.Graph(), "testcomponents.summation.sum") + return out.(testcomponents.SummationExports).LastAdded == 10 + }, 2*time.Second, 10*time.Millisecond) + + in, out := getFields(t, ctrl.loader.Graph(), "testcomponents.passthrough.inc_dep_1") + require.Equal(t, "10", in.(testcomponents.PassthroughConfig).Input) + require.Equal(t, "10", out.(testcomponents.PassthroughExports).Output) + + in, out = getFields(t, ctrl.loader.Graph(), "testcomponents.passthrough.inc_dep_2") + require.Equal(t, "10", in.(testcomponents.PassthroughConfig).Input) + require.Equal(t, "10", out.(testcomponents.PassthroughExports).Output) + + in, _ = getFields(t, ctrl.loader.Graph(), "testcomponents.summation.sum") + require.Equal(t, 10, in.(testcomponents.SummationConfig).Input) +} + +func TestController_Updates_WithLaggingComponent(t *testing.T) { + defer verifyNoGoroutineLeaks(t) + + // Part of the pipeline has a significant lag. + config := ` + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + testcomponents.passthrough "inc_dep_1" { + input = testcomponents.count.inc.count + lag = "1ms" + } + + testcomponents.passthrough "inc_dep_2" { + input = testcomponents.passthrough.inc_dep_1.output + lag = "1ms" + } + + testcomponents.summation "sum" { + input = testcomponents.passthrough.inc_dep_2.output + } + + testcomponents.passthrough "inc_dep_slow" { + input = testcomponents.count.inc.count + lag = "500ms" + } +` + + ctrl := newTestController(t) + + // Use testUpdatesFile from graph_builder_test.go. + f, err := ParseSource(t.Name(), []byte(config)) + require.NoError(t, err) + require.NotNil(t, f) + + err = ctrl.LoadSource(f, nil) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + ctrl.Run(ctx) + close(done) + }() + defer func() { + cancel() + <-done + }() + + // Wait for the updates to propagate + require.Eventually(t, func() bool { + _, out := getFields(t, ctrl.loader.Graph(), "testcomponents.summation.sum") + return out.(testcomponents.SummationExports).LastAdded == 10 + }, 2*time.Second, 10*time.Millisecond) + + in, out := getFields(t, ctrl.loader.Graph(), "testcomponents.passthrough.inc_dep_1") + require.Equal(t, "10", in.(testcomponents.PassthroughConfig).Input) + require.Equal(t, "10", out.(testcomponents.PassthroughExports).Output) + + in, out = getFields(t, ctrl.loader.Graph(), "testcomponents.passthrough.inc_dep_2") + require.Equal(t, "10", in.(testcomponents.PassthroughConfig).Input) + require.Equal(t, "10", out.(testcomponents.PassthroughExports).Output) + + in, _ = getFields(t, ctrl.loader.Graph(), "testcomponents.summation.sum") + require.Equal(t, 10, in.(testcomponents.SummationConfig).Input) +} + +func newTestController(t *testing.T) *Flow { + return newController(controllerOptions{ + Options: testOptions(t), + ModuleRegistry: newModuleRegistry(), + IsModule: false, + // Make sure that we have consistent number of workers for tests to make them deterministic. + WorkerPool: worker.NewFixedWorkerPool(4, 100), + }) } diff --git a/internal/flow/module.go b/internal/flow/module.go index f027836c2835..edc3c2385246 100644 --- a/internal/flow/module.go +++ b/internal/flow/module.go @@ -130,7 +130,7 @@ var ( func newModule(o *moduleOptions) *module { return &module{ o: o, - f: NewController(ControllerOptions{ + f: newController(controllerOptions{ IsModule: true, ModuleRegistry: o.ModuleRegistry, ComponentRegistry: o.ComponentRegistry, diff --git a/internal/flow/module_fail_test.go b/internal/flow/module_fail_test.go index c2eb2bf24d0b..a6ec2eeec6f1 100644 --- a/internal/flow/module_fail_test.go +++ b/internal/flow/module_fail_test.go @@ -24,7 +24,7 @@ func TestIDRemovalIfFailedToLoad(t *testing.T) { go f.Run(ctx) var t1 *componenttest.TestFailModule require.Eventually(t, func() bool { - t1 = f.Loader.Components()[0].(*controller.BuiltinComponentNode).Component().(*componenttest.TestFailModule) + t1 = f.loader.Components()[0].(*controller.BuiltinComponentNode).Component().(*componenttest.TestFailModule) return t1 != nil }, 10*time.Second, 100*time.Millisecond) require.Eventually(t, func() bool { diff --git a/internal/flow/module_registry.go b/internal/flow/module_registry.go index 37e9f0f95032..34e89fb6f4de 100644 --- a/internal/flow/module_registry.go +++ b/internal/flow/module_registry.go @@ -10,7 +10,7 @@ type moduleRegistry struct { modules map[string]*module } -func NewModuleRegistry() *moduleRegistry { +func newModuleRegistry() *moduleRegistry { return &moduleRegistry{ modules: make(map[string]*module), } diff --git a/internal/flow/module_test.go b/internal/flow/module_test.go index 4d9003a4bca3..663d058757e5 100644 --- a/internal/flow/module_test.go +++ b/internal/flow/module_test.go @@ -183,7 +183,7 @@ func TestExportsWhenNotUsed(t *testing.T) { ctx, cnc := context.WithTimeout(ctx, 1*time.Second) defer cnc() f.Run(ctx) - exps := f.Loader.Components()[0].Exports().(TestExports) + exps := f.loader.Components()[0].Exports().(TestExports) for _, x := range []string{"username", "dummy"} { _, found := exps.Exports[x] require.True(t, found) @@ -268,7 +268,7 @@ func testModuleControllerOptions(t *testing.T) *moduleControllerOptions { DataPath: t.TempDir(), MinStability: featuregate.StabilityBeta, Reg: prometheus.NewRegistry(), - ModuleRegistry: NewModuleRegistry(), + ModuleRegistry: newModuleRegistry(), WorkerPool: worker.NewFixedWorkerPool(1, 100), ServiceMap: serviceMap, }