Skip to content

Commit

Permalink
Undo testing changes
Browse files Browse the repository at this point in the history
  • Loading branch information
thampiotr committed Apr 2, 2024
1 parent 09dd358 commit 1e0d149
Show file tree
Hide file tree
Showing 11 changed files with 370 additions and 62 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@ replace (
// https://github.com/grafana/cadvisor/tree/grafana-v0.47-noglobals
github.com/google/cadvisor => github.com/grafana/cadvisor v0.0.0-20231110094609-5f7917925dea

github.com/prometheus-community/postgres_exporter => github.com/grafana/postgres_exporter v0.15.1-0.20240402092333-fad5f95ea113
github.com/prometheus-community/postgres_exporter => github.com/grafana/postgres_exporter v0.15.1-0.20240402092333-fad5f95ea113 // exporter-package-v0.15.0 branch

// TODO(marctc): remove once this PR is merged upstream: https://github.com/prometheus/mysqld_exporter/pull/774
github.com/prometheus/mysqld_exporter => github.com/grafana/mysqld_exporter v0.12.2-0.20231005125903-364b9c41e595
Expand Down
28 changes: 14 additions & 14 deletions internal/flow/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@ type Options struct {
type Flow struct {
log *logging.Logger
tracer *tracing.Tracer
opts ControllerOptions
opts controllerOptions

updateQueue *controller.Queue
sched *controller.Scheduler
Loader *controller.Loader
loader *controller.Loader
modules *moduleRegistry

loadFinished chan struct{}
Expand All @@ -127,17 +127,17 @@ type Flow struct {

// New creates a new, unstarted Flow controller. Call Run to run the controller.
func New(o Options) *Flow {
return NewController(ControllerOptions{
return newController(controllerOptions{
Options: o,
ModuleRegistry: NewModuleRegistry(),
ModuleRegistry: newModuleRegistry(),
IsModule: false, // We are creating a new root controller.
WorkerPool: worker.NewDefaultWorkerPool(),
})
}

// ControllerOptions are internal options used to create both root Flow
// controllerOptions are internal options used to create both root Flow
// controller and controllers for modules.
type ControllerOptions struct {
type controllerOptions struct {
Options

ComponentRegistry controller.ComponentRegistry // Custom component registry used in tests.
Expand All @@ -150,7 +150,7 @@ type ControllerOptions struct {
// newController creates a new, unstarted Flow controller with a specific
// moduleRegistry. Modules created by the controller will be passed to the
// given modReg.
func NewController(o ControllerOptions) *Flow {
func newController(o controllerOptions) *Flow {
var (
log = o.Logger
tracer = o.Tracer
Expand Down Expand Up @@ -186,7 +186,7 @@ func NewController(o ControllerOptions) *Flow {

serviceMap := controller.NewServiceMap(o.Services)

f.Loader = controller.NewLoader(controller.LoaderOptions{
f.loader = controller.NewLoader(controller.LoaderOptions{
ComponentGlobals: controller.ComponentGlobals{
Logger: log,
TraceProvider: tracer,
Expand Down Expand Up @@ -235,7 +235,7 @@ func NewController(o ControllerOptions) *Flow {
// canceled. Run must only be called once.
func (f *Flow) Run(ctx context.Context) {
defer func() { _ = f.sched.Close() }()
defer f.Loader.Cleanup(!f.opts.IsModule)
defer f.loader.Cleanup(!f.opts.IsModule)
defer level.Debug(f.log).Log("msg", "flow controller exiting")

for {
Expand All @@ -248,14 +248,14 @@ func (f *Flow) Run(ctx context.Context) {
// throughput - it prevents the situation where two nodes have the same dependency, and the first time
// it's picked up by the worker pool and the second time it's enqueued again, resulting in more evaluations.
all := f.updateQueue.DequeueAll()
f.Loader.EvaluateDependants(ctx, all)
f.loader.EvaluateDependants(ctx, all)
case <-f.loadFinished:
level.Info(f.log).Log("msg", "scheduling loaded components and services")

var (
components = f.Loader.Components()
services = f.Loader.Services()
imports = f.Loader.Imports()
components = f.loader.Components()
services = f.loader.Services()
imports = f.loader.Imports()

runnables = make([]controller.RunnableNode, 0, len(components)+len(services)+len(imports))
)
Expand Down Expand Up @@ -307,7 +307,7 @@ func (f *Flow) loadSource(source *Source, args map[string]any, customComponentRe
CustomComponentRegistry: customComponentRegistry,
}

diags := f.Loader.Apply(applyOptions)
diags := f.loader.Apply(applyOptions)
if !f.loadedOnce.Load() && diags.HasErrors() {
// The first call to Load should not run any components if there were
// errors in the configuration file.
Expand Down
6 changes: 3 additions & 3 deletions internal/flow/flow_components.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (f *Flow) GetComponent(id component.ID, opts component.InfoOptions) (*compo
return mod.f.GetComponent(component.ID{LocalID: id.LocalID}, opts)
}

graph := f.Loader.OriginalGraph()
graph := f.loader.OriginalGraph()

node := graph.GetByID(id.LocalID)
if node == nil {
Expand Down Expand Up @@ -52,8 +52,8 @@ func (f *Flow) ListComponents(moduleID string, opts component.InfoOptions) ([]*c
}

var (
components = f.Loader.Components()
graph = f.Loader.OriginalGraph()
components = f.loader.Components()
graph = f.loader.OriginalGraph()
)

detail := make([]*component.Info, len(components))
Expand Down
8 changes: 4 additions & 4 deletions internal/flow/flow_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ import (
// [component.Component] and [service.Service]s which declared a dependency on
// the named service.
func (f *Flow) GetServiceConsumers(serviceName string) []service.Consumer {
consumers := serviceConsumersForGraph(f.Loader.OriginalGraph(), serviceName, true)
consumers := serviceConsumersForGraph(f.loader.OriginalGraph(), serviceName, true)

// Iterate through all modules to find other components that depend on the
// service. Peer services aren't checked here, since the services are always
// a subset of the services from the root controller.
for _, mod := range f.modules.List() {
moduleGraph := mod.f.Loader.OriginalGraph()
moduleGraph := mod.f.loader.OriginalGraph()
consumers = append(consumers, serviceConsumersForGraph(moduleGraph, serviceName, false)...)
}

Expand Down Expand Up @@ -71,7 +71,7 @@ func serviceConsumersForGraph(graph *dag.Graph, serviceName string, includePeerS
// services can instantiate their own components.
func (f *Flow) NewController(id string) service.Controller {
return serviceController{
f: NewController(ControllerOptions{
f: newController(controllerOptions{
Options: Options{
ControllerID: id,
Logger: f.opts.Logger,
Expand All @@ -83,7 +83,7 @@ func (f *Flow) NewController(id string) service.Controller {
OnExportsChange: nil, // NOTE(@tpaschalis, @wildum) The isolated controller shouldn't be able to export any values.
},
IsModule: true,
ModuleRegistry: NewModuleRegistry(),
ModuleRegistry: newModuleRegistry(),
WorkerPool: worker.NewDefaultWorkerPool(),
}),
}
Expand Down
8 changes: 4 additions & 4 deletions internal/flow/flow_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,10 @@ func TestComponents_Using_Services(t *testing.T) {
opts := testOptions(t)
opts.Services = append(opts.Services, existsSvc)

ctrl := NewController(ControllerOptions{
ctrl := newController(controllerOptions{
Options: opts,
ComponentRegistry: registry,
ModuleRegistry: NewModuleRegistry(),
ModuleRegistry: newModuleRegistry(),
})
require.NoError(t, ctrl.LoadSource(f, nil))
go ctrl.Run(ctx)
Expand Down Expand Up @@ -325,10 +325,10 @@ func TestComponents_Using_Services_In_Modules(t *testing.T) {
opts := testOptions(t)
opts.Services = append(opts.Services, existsSvc)

ctrl := NewController(ControllerOptions{
ctrl := newController(controllerOptions{
Options: opts,
ComponentRegistry: registry,
ModuleRegistry: NewModuleRegistry(),
ModuleRegistry: newModuleRegistry(),
})
require.NoError(t, ctrl.LoadSource(f, nil))
go ctrl.Run(ctx)
Expand Down
4 changes: 2 additions & 2 deletions internal/flow/flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ func TestController_LoadSource_Evaluation(t *testing.T) {

err = ctrl.LoadSource(f, nil)
require.NoError(t, err)
require.Len(t, ctrl.Loader.Components(), 4)
require.Len(t, ctrl.loader.Components(), 4)

// Check the inputs and outputs of things that should be immediately resolved
// without having to run the components.
in, out := getFields(t, ctrl.Loader.Graph(), "testcomponents.passthrough.static")
in, out := getFields(t, ctrl.loader.Graph(), "testcomponents.passthrough.static")
require.Equal(t, "hello, world!", in.(testcomponents.PassthroughConfig).Input)
require.Equal(t, "hello, world!", out.(testcomponents.PassthroughExports).Output)
}
Expand Down
Loading

0 comments on commit 1e0d149

Please sign in to comment.