Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait for the old CRD Manager to stop before starting a new one #1778

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ Main (unreleased)

- Fixed a bug with `loki.source.podlogs` not starting in large clusters due to short informer sync timeout. (@elburnetto-intapp)

- `prometheus.operator.*` components: Fixed a bug which would sometimes cause a
"failed to create service discovery refresh metrics" error after a config reload. (@ptodev)

### Other changes

- Small fix in UI stylesheet to fit more content into visible table area. (@defanator)
Expand Down
29 changes: 21 additions & 8 deletions internal/component/prometheus/operator/common/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ import (
type Component struct {
mut sync.RWMutex
config *operator.Arguments
manager *crdManager
manager crdManagerInterface
ls labelstore.LabelStore

onUpdate chan struct{}
opts component.Options
healthMut sync.RWMutex
health component.Health

crdManagerFactory crdManagerFactory

kind string
cluster cluster.Cluster
}
Expand All @@ -44,11 +46,12 @@ func New(o component.Options, args component.Arguments, kind string) (*Component
}
ls := service.(labelstore.LabelStore)
c := &Component{
opts: o,
onUpdate: make(chan struct{}, 1),
kind: kind,
cluster: clusterData,
ls: ls,
opts: o,
onUpdate: make(chan struct{}, 1),
kind: kind,
cluster: clusterData,
ls: ls,
crdManagerFactory: realCrdManagerFactory{},
}
return c, c.Update(args)
}
Expand All @@ -74,6 +77,8 @@ func (c *Component) Run(ctx context.Context) error {

c.reportHealth(nil)
errChan := make(chan error, 1)
runWg := sync.WaitGroup{}
defer runWg.Wait()
for {
select {
case <-ctx.Done():
Expand All @@ -85,17 +90,25 @@ func (c *Component) Run(ctx context.Context) error {
c.reportHealth(err)
case <-c.onUpdate:
c.mut.Lock()
manager := newCrdManager(c.opts, c.cluster, c.opts.Logger, c.config, c.kind, c.ls)
manager := c.crdManagerFactory.New(c.opts, c.cluster, c.opts.Logger, c.config, c.kind, c.ls)
c.manager = manager

// Wait for the old manager to stop.
// If we start the new manager before stopping the old one,
// the new manager might not be able to register its debug metrics due to a duplicate registration error.
if cancel != nil {
cancel()
}
runWg.Wait()

innerCtx, cancel = context.WithCancel(ctx)
runWg.Add(1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to get a test on this? Other than that looks good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed a commit which outlines how a test could potentially work. Would this be ok? I am not sure how else to test it.

go func() {
if err := manager.Run(innerCtx); err != nil {
level.Error(c.opts.Logger).Log("msg", "error running crd manager", "err", err)
errChan <- err
}
runWg.Done()
}()
c.mut.Unlock()
}
Expand Down Expand Up @@ -170,7 +183,7 @@ func (c *Component) Handler() http.Handler {
}
ns := parts[1]
name := parts[2]
scs := man.getScrapeConfig(ns, name)
scs := man.GetScrapeConfig(ns, name)
if len(scs) == 0 {
w.WriteHeader(404)
return
Expand Down
124 changes: 124 additions & 0 deletions internal/component/prometheus/operator/common/component_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package common

import (
"context"
"fmt"
"net"
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/component/prometheus/operator"
"github.com/grafana/alloy/internal/service/cluster"
http_service "github.com/grafana/alloy/internal/service/http"
"github.com/grafana/alloy/internal/service/labelstore"
"github.com/grafana/alloy/internal/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/storage"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)

type crdManagerFactoryHungRun struct {
stopRun chan struct{}
}

func (m crdManagerFactoryHungRun) New(_ component.Options, _ cluster.Cluster, _ log.Logger,
_ *operator.Arguments, _ string, _ labelstore.LabelStore) crdManagerInterface {
return &crdManagerHungRun{
stopRun: m.stopRun,
}
}

type crdManagerHungRun struct {
stopRun chan struct{}
}

func (c *crdManagerHungRun) Run(ctx context.Context) error {
<-c.stopRun
return nil
}

func (c *crdManagerHungRun) ClusteringUpdated() {}

func (c *crdManagerHungRun) DebugInfo() interface{} {
return nil
}

func (c *crdManagerHungRun) GetScrapeConfig(ns, name string) []*config.ScrapeConfig {
return nil
}

func TestRunExit(t *testing.T) {
opts := component.Options{
Logger: util.TestAlloyLogger(t),
Registerer: prometheus.NewRegistry(),
GetServiceData: func(name string) (interface{}, error) {
switch name {
case http_service.ServiceName:
return http_service.Data{
HTTPListenAddr: "localhost:12345",
MemoryListenAddr: "alloy.internal:1245",
BaseHTTPPath: "/",
DialFunc: (&net.Dialer{}).DialContext,
}, nil

case cluster.ServiceName:
return cluster.Mock(), nil
case labelstore.ServiceName:
return labelstore.New(nil, prometheus.DefaultRegisterer), nil
default:
return nil, fmt.Errorf("service %q does not exist", name)
}
},
}

nilReceivers := []storage.Appendable{nil, nil}

var args operator.Arguments
args.SetToDefault()
args.ForwardTo = nilReceivers

// Create a Component
c, err := New(opts, args, "")
require.NoError(t, err)

stopRun := make(chan struct{})
c.crdManagerFactory = crdManagerFactoryHungRun{
stopRun: stopRun,
}

// Run the component
ctx, cancelFunc := context.WithCancel(context.Background())
cmpRunExited := atomic.Bool{}
cmpRunExited.Store(false)
go func() {
err := c.Run(ctx)
require.NoError(t, err)
cmpRunExited.Store(true)
fmt.Println("component.Run exited")
}()

// Stop the component.
// It shouldn't stop immediately, because the CRD Manager is hung.
cancelFunc()

// Make sure component.Run didn't exit for a few seconds
fmt.Println("start sleeping")
time.Sleep(5 * time.Second)
fmt.Println("finished sleeping")

if cmpRunExited.Load() {
require.Fail(t, "component.Run exited")
}

// Make crdManager.Run exit
close(stopRun)

// Make sure component.Run exits
require.Eventually(t, func() bool {
return cmpRunExited.Load()
}, 5*time.Second, 100*time.Millisecond, "component.Run didn't exit")
}
19 changes: 18 additions & 1 deletion internal/component/prometheus/operator/common/crdmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,23 @@ import (
// Generous timeout period for configuring all informers
const informerSyncTimeout = 10 * time.Second

type crdManagerInterface interface {
Run(ctx context.Context) error
ClusteringUpdated()
DebugInfo() interface{}
GetScrapeConfig(ns, name string) []*config.ScrapeConfig
}

type crdManagerFactory interface {
New(opts component.Options, cluster cluster.Cluster, logger log.Logger, args *operator.Arguments, kind string, ls labelstore.LabelStore) crdManagerInterface
}

type realCrdManagerFactory struct{}

func (realCrdManagerFactory) New(opts component.Options, cluster cluster.Cluster, logger log.Logger, args *operator.Arguments, kind string, ls labelstore.LabelStore) crdManagerInterface {
return newCrdManager(opts, cluster, logger, args, kind, ls)
}

// crdManager is all of the fields required to run a crd based component.
// on update, this entire thing should be recreated and restarted
type crdManager struct {
Expand Down Expand Up @@ -237,7 +254,7 @@ func (c *crdManager) DebugInfo() interface{} {
return info
}

func (c *crdManager) getScrapeConfig(ns, name string) []*config.ScrapeConfig {
func (c *crdManager) GetScrapeConfig(ns, name string) []*config.ScrapeConfig {
prefix := fmt.Sprintf("%s/%s/%s", c.kind, ns, name)
matches := []*config.ScrapeConfig{}
for k, v := range c.scrapeConfigs {
Expand Down
Loading