diff --git a/internal/component/mimir/rules/kubernetes/debug.go b/internal/component/mimir/rules/kubernetes/debug.go index 59b5103858dc..1505092ad101 100644 --- a/internal/component/mimir/rules/kubernetes/debug.go +++ b/internal/component/mimir/rules/kubernetes/debug.go @@ -22,40 +22,32 @@ type DebugMimirNamespace struct { func (c *Component) DebugInfo() interface{} { var output DebugInfo - for ns := range c.currentState { - if !isManagedMimirNamespace(c.args.MimirNameSpacePrefix, ns) { + + currentState := c.eventProcessor.getMimirState() + for namespace := range currentState { + if !isManagedMimirNamespace(c.args.MimirNameSpacePrefix, namespace) { continue } output.MimirRuleNamespaces = append(output.MimirRuleNamespaces, DebugMimirNamespace{ - Name: ns, - NumRuleGroups: len(c.currentState[ns]), + Name: namespace, + NumRuleGroups: len(currentState[namespace]), }) } // This should load from the informer cache, so it shouldn't fail under normal circumstances. - managedK8sNamespaces, err := c.namespaceLister.List(c.namespaceSelector) + rulesByNamespace, err := c.eventProcessor.getKubernetesState() if err != nil { - return DebugInfo{ - Error: fmt.Sprintf("failed to list namespaces: %v", err), - } + return DebugInfo{Error: fmt.Sprintf("failed to list rules: %v", err)} } - for _, n := range managedK8sNamespaces { - // This should load from the informer cache, so it shouldn't fail under normal circumstances. - rules, err := c.ruleLister.PrometheusRules(n.Name).List(c.ruleSelector) - if err != nil { - return DebugInfo{ - Error: fmt.Sprintf("failed to list rules: %v", err), - } - } - - for _, r := range rules { + for namespace, rules := range rulesByNamespace { + for _, rule := range rules { output.PrometheusRules = append(output.PrometheusRules, DebugK8sPrometheusRule{ - Namespace: n.Name, - Name: r.Name, - UID: string(r.UID), - NumRuleGroups: len(r.Spec.Groups), + Namespace: namespace, + Name: rule.Name, + UID: string(rule.UID), + NumRuleGroups: len(rule.Spec.Groups), }) } } diff --git a/internal/component/mimir/rules/kubernetes/events.go b/internal/component/mimir/rules/kubernetes/events.go index 7752077d9730..2ff388d65063 100644 --- a/internal/component/mimir/rules/kubernetes/events.go +++ b/internal/component/mimir/rules/kubernetes/events.go @@ -4,108 +4,156 @@ import ( "context" "fmt" "regexp" + "sync" "time" + "github.com/go-kit/log" "github.com/grafana/agent/internal/component/common/kubernetes" "github.com/grafana/agent/internal/flow/logging/level" + mimirClient "github.com/grafana/agent/internal/mimir/client" "github.com/hashicorp/go-multierror" promv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" + promListers "github.com/prometheus-operator/prometheus-operator/pkg/client/listers/monitoring/v1" "github.com/prometheus/prometheus/model/rulefmt" + "k8s.io/apimachinery/pkg/labels" + coreListers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/util/workqueue" "sigs.k8s.io/yaml" // Used for CRD compatibility instead of gopkg.in/yaml.v2 ) -const eventTypeSyncMimir kubernetes.EventType = "sync-mimir" +const ( + eventTypeSyncMimir kubernetes.EventType = "sync-mimir" +) + +type healthReporter interface { + reportUnhealthy(err error) + reportHealthy() +} + +type eventProcessor struct { + queue workqueue.RateLimitingInterface + stopChan chan struct{} + health healthReporter + + mimirClient mimirClient.Interface + namespaceLister coreListers.NamespaceLister + ruleLister promListers.PrometheusRuleLister + namespaceSelector labels.Selector + ruleSelector labels.Selector + namespacePrefix string -func (c *Component) eventLoop(ctx context.Context) { + metrics *metrics + logger log.Logger + + currentState kubernetes.RuleGroupsByNamespace + currentStateMtx sync.Mutex +} + +func (e *eventProcessor) run(ctx context.Context) { for { - eventInterface, shutdown := c.queue.Get() + eventInterface, shutdown := e.queue.Get() if shutdown { - level.Info(c.log).Log("msg", "shutting down event loop") + level.Info(e.logger).Log("msg", "shutting down event loop") return } evt := eventInterface.(kubernetes.Event) - c.metrics.eventsTotal.WithLabelValues(string(evt.Typ)).Inc() - err := c.processEvent(ctx, evt) + e.metrics.eventsTotal.WithLabelValues(string(evt.Typ)).Inc() + err := e.processEvent(ctx, evt) if err != nil { - retries := c.queue.NumRequeues(evt) + retries := e.queue.NumRequeues(evt) if retries < 5 { - c.metrics.eventsRetried.WithLabelValues(string(evt.Typ)).Inc() - c.queue.AddRateLimited(evt) - level.Error(c.log).Log( + e.metrics.eventsRetried.WithLabelValues(string(evt.Typ)).Inc() + e.queue.AddRateLimited(evt) + level.Error(e.logger).Log( "msg", "failed to process event, will retry", "retries", fmt.Sprintf("%d/5", retries), "err", err, ) continue } else { - c.metrics.eventsFailed.WithLabelValues(string(evt.Typ)).Inc() - level.Error(c.log).Log( + e.metrics.eventsFailed.WithLabelValues(string(evt.Typ)).Inc() + level.Error(e.logger).Log( "msg", "failed to process event, max retries exceeded", "retries", fmt.Sprintf("%d/5", retries), "err", err, ) - c.reportUnhealthy(err) + e.health.reportUnhealthy(err) } } else { - c.reportHealthy() + e.health.reportHealthy() } - c.queue.Forget(evt) + e.queue.Forget(evt) } } -func (c *Component) processEvent(ctx context.Context, e kubernetes.Event) error { - defer c.queue.Done(e) +func (e *eventProcessor) stop() { + close(e.stopChan) + e.queue.ShutDownWithDrain() +} + +func (e *eventProcessor) processEvent(ctx context.Context, event kubernetes.Event) error { + defer e.queue.Done(event) - switch e.Typ { + switch event.Typ { case kubernetes.EventTypeResourceChanged: - level.Info(c.log).Log("msg", "processing event", "type", e.Typ, "key", e.ObjectKey) + level.Info(e.logger).Log("msg", "processing event", "type", event.Typ, "key", event.ObjectKey) case eventTypeSyncMimir: - level.Debug(c.log).Log("msg", "syncing current state from ruler") - err := c.syncMimir(ctx) + level.Debug(e.logger).Log("msg", "syncing current state from ruler") + err := e.syncMimir(ctx) if err != nil { return err } default: - return fmt.Errorf("unknown event type: %s", e.Typ) + return fmt.Errorf("unknown event type: %s", event.Typ) } - return c.reconcileState(ctx) + return e.reconcileState(ctx) } -func (c *Component) syncMimir(ctx context.Context) error { - rulesByNamespace, err := c.mimirClient.ListRules(ctx, "") +func (e *eventProcessor) enqueueSyncMimir() { + e.queue.Add(kubernetes.Event{ + Typ: eventTypeSyncMimir, + }) +} + +func (e *eventProcessor) syncMimir(ctx context.Context) error { + rulesByNamespace, err := e.mimirClient.ListRules(ctx, "") if err != nil { - level.Error(c.log).Log("msg", "failed to list rules from mimir", "err", err) + level.Error(e.logger).Log("msg", "failed to list rules from mimir", "err", err) return err } for ns := range rulesByNamespace { - if !isManagedMimirNamespace(c.args.MimirNameSpacePrefix, ns) { + if !isManagedMimirNamespace(e.namespacePrefix, ns) { delete(rulesByNamespace, ns) } } - c.currentState = rulesByNamespace + e.currentStateMtx.Lock() + e.currentState = rulesByNamespace + e.currentStateMtx.Unlock() return nil } -func (c *Component) reconcileState(ctx context.Context) error { +func (e *eventProcessor) reconcileState(ctx context.Context) error { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - desiredState, err := c.loadStateFromK8s() + desiredState, err := e.desiredStateFromKubernetes() if err != nil { return err } - diffs := kubernetes.DiffRuleState(desiredState, c.currentState) + currentState := e.getMimirState() + diffs := kubernetes.DiffRuleState(desiredState, currentState) + var result error for ns, diff := range diffs { - err = c.applyChanges(ctx, ns, diff) + err = e.applyChanges(ctx, ns, diff) if err != nil { result = multierror.Append(result, err) continue @@ -115,23 +163,19 @@ func (c *Component) reconcileState(ctx context.Context) error { return result } -func (c *Component) loadStateFromK8s() (kubernetes.RuleGroupsByNamespace, error) { - matchedNamespaces, err := c.namespaceLister.List(c.namespaceSelector) +// desiredStateFromKubernetes loads PrometheusRule resources from Kubernetes and converts +// them to corresponding Mimir rule groups, indexed by Mimir namespace. +func (e *eventProcessor) desiredStateFromKubernetes() (kubernetes.RuleGroupsByNamespace, error) { + kubernetesState, err := e.getKubernetesState() if err != nil { - return nil, fmt.Errorf("failed to list namespaces: %w", err) + return nil, err } desiredState := make(kubernetes.RuleGroupsByNamespace) - for _, ns := range matchedNamespaces { - crdState, err := c.ruleLister.PrometheusRules(ns.Name).List(c.ruleSelector) - if err != nil { - return nil, fmt.Errorf("failed to list rules: %w", err) - } - - for _, pr := range crdState { - mimirNs := mimirNamespaceForRuleCRD(c.args.MimirNameSpacePrefix, pr) - - groups, err := convertCRDRuleGroupToRuleGroup(pr.Spec) + for _, rules := range kubernetesState { + for _, rule := range rules { + mimirNs := mimirNamespaceForRuleCRD(e.namespacePrefix, rule) + groups, err := convertCRDRuleGroupToRuleGroup(rule.Spec) if err != nil { return nil, fmt.Errorf("failed to convert rule group: %w", err) } @@ -157,7 +201,7 @@ func convertCRDRuleGroupToRuleGroup(crd promv1.PrometheusRuleSpec) ([]rulefmt.Ru return groups.Groups, nil } -func (c *Component) applyChanges(ctx context.Context, namespace string, diffs []kubernetes.RuleGroupDiff) error { +func (e *eventProcessor) applyChanges(ctx context.Context, namespace string, diffs []kubernetes.RuleGroupDiff) error { if len(diffs) == 0 { return nil } @@ -165,30 +209,63 @@ func (c *Component) applyChanges(ctx context.Context, namespace string, diffs [] for _, diff := range diffs { switch diff.Kind { case kubernetes.RuleGroupDiffKindAdd: - err := c.mimirClient.CreateRuleGroup(ctx, namespace, diff.Desired) + err := e.mimirClient.CreateRuleGroup(ctx, namespace, diff.Desired) if err != nil { return err } - level.Info(c.log).Log("msg", "added rule group", "namespace", namespace, "group", diff.Desired.Name) + level.Info(e.logger).Log("msg", "added rule group", "namespace", namespace, "group", diff.Desired.Name) case kubernetes.RuleGroupDiffKindRemove: - err := c.mimirClient.DeleteRuleGroup(ctx, namespace, diff.Actual.Name) + err := e.mimirClient.DeleteRuleGroup(ctx, namespace, diff.Actual.Name) if err != nil { return err } - level.Info(c.log).Log("msg", "removed rule group", "namespace", namespace, "group", diff.Actual.Name) + level.Info(e.logger).Log("msg", "removed rule group", "namespace", namespace, "group", diff.Actual.Name) case kubernetes.RuleGroupDiffKindUpdate: - err := c.mimirClient.CreateRuleGroup(ctx, namespace, diff.Desired) + err := e.mimirClient.CreateRuleGroup(ctx, namespace, diff.Desired) if err != nil { return err } - level.Info(c.log).Log("msg", "updated rule group", "namespace", namespace, "group", diff.Desired.Name) + level.Info(e.logger).Log("msg", "updated rule group", "namespace", namespace, "group", diff.Desired.Name) default: - level.Error(c.log).Log("msg", "unknown rule group diff kind", "kind", diff.Kind) + level.Error(e.logger).Log("msg", "unknown rule group diff kind", "kind", diff.Kind) } } // resync mimir state after applying changes - return c.syncMimir(ctx) + return e.syncMimir(ctx) +} + +// getMimirState returns the cached Mimir ruler state, rule groups indexed by Mimir namespace. +func (e *eventProcessor) getMimirState() kubernetes.RuleGroupsByNamespace { + e.currentStateMtx.Lock() + defer e.currentStateMtx.Unlock() + + out := make(kubernetes.RuleGroupsByNamespace, len(e.currentState)) + for ns, groups := range e.currentState { + out[ns] = groups + } + + return out +} + +// getKubernetesState returns PrometheusRule resources indexed by Kubernetes namespace. +func (e *eventProcessor) getKubernetesState() (map[string][]*promv1.PrometheusRule, error) { + namespaces, err := e.namespaceLister.List(e.namespaceSelector) + if err != nil { + return nil, fmt.Errorf("failed to list namespaces: %w", err) + } + + out := make(map[string][]*promv1.PrometheusRule) + for _, namespace := range namespaces { + rules, err := e.ruleLister.PrometheusRules(namespace.Name).List(e.ruleSelector) + if err != nil { + return nil, fmt.Errorf("failed to list rules: %w", err) + } + + out[namespace.Name] = append(out[namespace.Name], rules...) + } + + return out, nil } // mimirNamespaceForRuleCRD returns the namespace that the rule CRD should be diff --git a/internal/component/mimir/rules/kubernetes/events_test.go b/internal/component/mimir/rules/kubernetes/events_test.go index e177e41bd13f..4a197eda31c9 100644 --- a/internal/component/mimir/rules/kubernetes/events_test.go +++ b/internal/component/mimir/rules/kubernetes/events_test.go @@ -84,6 +84,12 @@ func (m *fakeMimirClient) ListRules(ctx context.Context, namespace string) (map[ return output, nil } +type fakeHealthReporter struct{} + +func (f fakeHealthReporter) reportUnhealthy(error) {} + +func (f fakeHealthReporter) reportHealthy() {} + func TestEventLoop(t *testing.T) { nsIndexer := cache.NewIndexer( cache.DeletionHandlingMetaNamespaceKeyFunc, @@ -125,61 +131,64 @@ func TestEventLoop(t *testing.T) { }, } - component := Component{ - log: log.NewLogfmtLogger(os.Stdout), + processor := &eventProcessor{ queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + stopChan: make(chan struct{}), + health: &fakeHealthReporter{}, + mimirClient: newFakeMimirClient(), namespaceLister: nsLister, - namespaceSelector: labels.Everything(), ruleLister: ruleLister, + namespaceSelector: labels.Everything(), ruleSelector: labels.Everything(), - mimirClient: newFakeMimirClient(), - args: Arguments{MimirNameSpacePrefix: "agent"}, + namespacePrefix: "agent", metrics: newMetrics(), + logger: log.With(log.NewLogfmtLogger(os.Stdout), "ts", log.DefaultTimestampUTC), } - eventHandler := kubernetes.NewQueuedEventHandler(component.log, component.queue) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + ctx := context.Background() + + // Do an initial sync of the Mimir ruler state before starting the event processing loop. + require.NoError(t, processor.syncMimir(ctx)) + go processor.run(ctx) + defer processor.stop() - go component.eventLoop(ctx) + eventHandler := kubernetes.NewQueuedEventHandler(processor.logger, processor.queue) // Add a namespace and rule to kubernetes - nsIndexer.Add(ns) - ruleIndexer.Add(rule) + require.NoError(t, nsIndexer.Add(ns)) + require.NoError(t, ruleIndexer.Add(rule)) eventHandler.OnAdd(rule, false) // Wait for the rule to be added to mimir require.Eventually(t, func() bool { - rules, err := component.mimirClient.ListRules(ctx, "") + rules, err := processor.mimirClient.ListRules(ctx, "") require.NoError(t, err) return len(rules) == 1 }, time.Second, 10*time.Millisecond) - component.queue.AddRateLimited(kubernetes.Event{Typ: eventTypeSyncMimir}) // Update the rule in kubernetes rule.Spec.Groups[0].Rules = append(rule.Spec.Groups[0].Rules, v1.Rule{ Alert: "alert2", Expr: intstr.FromString("expr2"), }) - ruleIndexer.Update(rule) + require.NoError(t, ruleIndexer.Update(rule)) eventHandler.OnUpdate(rule, rule) // Wait for the rule to be updated in mimir require.Eventually(t, func() bool { - allRules, err := component.mimirClient.ListRules(ctx, "") + allRules, err := processor.mimirClient.ListRules(ctx, "") require.NoError(t, err) rules := allRules[mimirNamespaceForRuleCRD("agent", rule)][0].Rules return len(rules) == 2 }, time.Second, 10*time.Millisecond) - component.queue.AddRateLimited(kubernetes.Event{Typ: eventTypeSyncMimir}) // Remove the rule from kubernetes - ruleIndexer.Delete(rule) + require.NoError(t, ruleIndexer.Delete(rule)) eventHandler.OnDelete(rule) // Wait for the rule to be removed from mimir require.Eventually(t, func() bool { - rules, err := component.mimirClient.ListRules(ctx, "") + rules, err := processor.mimirClient.ListRules(ctx, "") require.NoError(t, err) return len(rules) == 0 }, time.Second, 10*time.Millisecond) diff --git a/internal/component/mimir/rules/kubernetes/rules.go b/internal/component/mimir/rules/kubernetes/rules.go index afd28c72fda3..e658437fa893 100644 --- a/internal/component/mimir/rules/kubernetes/rules.go +++ b/internal/component/mimir/rules/kubernetes/rules.go @@ -21,7 +21,6 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" coreListers "k8s.io/client-go/listers/core/v1" - "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" _ "k8s.io/component-base/metrics/prometheus/workqueue" controller "sigs.k8s.io/controller-runtime" @@ -47,24 +46,15 @@ type Component struct { opts component.Options args Arguments - mimirClient mimirClient.Interface - k8sClient kubernetes.Interface - promClient promVersioned.Interface - ruleLister promListers.PrometheusRuleLister - ruleInformer cache.SharedIndexInformer - - namespaceLister coreListers.NamespaceLister - namespaceInformer cache.SharedIndexInformer - informerStopChan chan struct{} - ticker *time.Ticker - - queue workqueue.RateLimitingInterface - configUpdates chan ConfigUpdate - + mimirClient mimirClient.Interface + k8sClient kubernetes.Interface + promClient promVersioned.Interface namespaceSelector labels.Selector ruleSelector labels.Selector - currentState commonK8s.RuleGroupsByNamespace + eventProcessor *eventProcessor + configUpdates chan ConfigUpdate + ticker *time.Ticker metrics *metrics healthMut sync.RWMutex @@ -203,34 +193,40 @@ func (c *Component) Run(ctx context.Context) error { c.shutdown() return nil case <-c.ticker.C: - c.queue.Add(commonK8s.Event{ - Typ: eventTypeSyncMimir, - }) + c.eventProcessor.enqueueSyncMimir() } } } // startup launches the informers and starts the event loop. func (c *Component) startup(ctx context.Context) error { - c.queue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "mimir.rules.kubernetes") - c.informerStopChan = make(chan struct{}) + queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "mimir.rules.kubernetes") + informerStopChan := make(chan struct{}) - if err := c.startNamespaceInformer(); err != nil { + namespaceLister, err := c.startNamespaceInformer(queue, informerStopChan) + if err != nil { return err } - if err := c.startRuleInformer(); err != nil { + + ruleLister, err := c.startRuleInformer(queue, informerStopChan) + if err != nil { return err } - if err := c.syncMimir(ctx); err != nil { + + c.eventProcessor = c.newEventProcessor(queue, informerStopChan, namespaceLister, ruleLister) + if err = c.eventProcessor.syncMimir(ctx); err != nil { return err } - go c.eventLoop(ctx) + + go c.eventProcessor.run(ctx) return nil } func (c *Component) shutdown() { - close(c.informerStopChan) - c.queue.ShutDownWithDrain() + if c.eventProcessor != nil { + c.eventProcessor.stop() + c.eventProcessor = nil + } } func (c *Component) Update(newConfig component.Arguments) error { @@ -289,7 +285,7 @@ func (c *Component) init() error { return nil } -func (c *Component) startNamespaceInformer() error { +func (c *Component) startNamespaceInformer(queue workqueue.RateLimitingInterface, stopChan chan struct{}) (coreListers.NamespaceLister, error) { factory := informers.NewSharedInformerFactoryWithOptions( c.k8sClient, 24*time.Hour, @@ -299,19 +295,19 @@ func (c *Component) startNamespaceInformer() error { ) namespaces := factory.Core().V1().Namespaces() - c.namespaceLister = namespaces.Lister() - c.namespaceInformer = namespaces.Informer() - _, err := c.namespaceInformer.AddEventHandler(commonK8s.NewQueuedEventHandler(c.log, c.queue)) + namespaceLister := namespaces.Lister() + namespaceInformer := namespaces.Informer() + _, err := namespaceInformer.AddEventHandler(commonK8s.NewQueuedEventHandler(c.log, queue)) if err != nil { - return err + return nil, err } - factory.Start(c.informerStopChan) - factory.WaitForCacheSync(c.informerStopChan) - return nil + factory.Start(stopChan) + factory.WaitForCacheSync(stopChan) + return namespaceLister, nil } -func (c *Component) startRuleInformer() error { +func (c *Component) startRuleInformer(queue workqueue.RateLimitingInterface, stopChan chan struct{}) (promListers.PrometheusRuleLister, error) { factory := promExternalVersions.NewSharedInformerFactoryWithOptions( c.promClient, 24*time.Hour, @@ -321,14 +317,30 @@ func (c *Component) startRuleInformer() error { ) promRules := factory.Monitoring().V1().PrometheusRules() - c.ruleLister = promRules.Lister() - c.ruleInformer = promRules.Informer() - _, err := c.ruleInformer.AddEventHandler(commonK8s.NewQueuedEventHandler(c.log, c.queue)) + ruleLister := promRules.Lister() + ruleInformer := promRules.Informer() + _, err := ruleInformer.AddEventHandler(commonK8s.NewQueuedEventHandler(c.log, queue)) if err != nil { - return err + return nil, err } - factory.Start(c.informerStopChan) - factory.WaitForCacheSync(c.informerStopChan) - return nil + factory.Start(stopChan) + factory.WaitForCacheSync(stopChan) + return ruleLister, nil +} + +func (c *Component) newEventProcessor(queue workqueue.RateLimitingInterface, stopChan chan struct{}, namespaceLister coreListers.NamespaceLister, ruleLister promListers.PrometheusRuleLister) *eventProcessor { + return &eventProcessor{ + queue: queue, + stopChan: stopChan, + health: c, + mimirClient: c.mimirClient, + namespaceLister: namespaceLister, + ruleLister: ruleLister, + namespaceSelector: c.namespaceSelector, + ruleSelector: c.ruleSelector, + namespacePrefix: c.args.MimirNameSpacePrefix, + metrics: c.metrics, + logger: c.log, + } }