From bd2d0839e8b7278abf2efe3a56165279829fb919 Mon Sep 17 00:00:00 2001 From: matt durham Date: Fri, 11 Oct 2024 11:08:45 -0400 Subject: [PATCH] PR feedback --- .../prometheus/remote/queue/component.go | 1 + .../prometheus/remote/queue/endpoint.go | 3 +-- .../remote/queue/network/manager.go | 20 +++++++++---------- .../remote/queue/serialization/serializer.go | 2 +- .../remote/queue/types/serializer.go | 2 ++ 5 files changed, 15 insertions(+), 13 deletions(-) diff --git a/internal/component/prometheus/remote/queue/component.go b/internal/component/prometheus/remote/queue/component.go index 083c3a2b5..f837f221d 100644 --- a/internal/component/prometheus/remote/queue/component.go +++ b/internal/component/prometheus/remote/queue/component.go @@ -96,6 +96,7 @@ func (s *Queue) Update(args component.Arguments) error { } s.args = newArgs // TODO @mattdurham need to cycle through the endpoints figuring out what changed instead of this global stop and start. + // This will cause data in the endpoints and their children to be lost. if len(s.endpoints) > 0 { for _, ep := range s.endpoints { ep.Stop() diff --git a/internal/component/prometheus/remote/queue/endpoint.go b/internal/component/prometheus/remote/queue/endpoint.go index bc863a211..223dbafd3 100644 --- a/internal/component/prometheus/remote/queue/endpoint.go +++ b/internal/component/prometheus/remote/queue/endpoint.go @@ -47,7 +47,6 @@ func (ep *endpoint) Stop() { // Stop in order of data flow. This prevents errors around stopped mailboxes that can pop up. ep.serializer.Stop() ep.network.Stop() - ep.network.Stop() ep.self.Stop() } @@ -84,7 +83,7 @@ func (ep *endpoint) deserializeAndSend(ctx context.Context, meta map[string]stri level.Error(ep.log).Log("msg", "version not found for deserialization") return } - if version != "alloy.metrics.queue.v1" { + if version != types.AlloyFileVersion { level.Error(ep.log).Log("msg", "invalid version found for deserialization", "version", version) return } diff --git a/internal/component/prometheus/remote/queue/network/manager.go b/internal/component/prometheus/remote/queue/network/manager.go index fbe6e430e..277db5335 100644 --- a/internal/component/prometheus/remote/queue/network/manager.go +++ b/internal/component/prometheus/remote/queue/network/manager.go @@ -22,10 +22,10 @@ type manager struct { metaStats func(types.NetworkStats) } -// configCallback allows the config to be synchronous. +// configCallback allows actors to notify via `done` channel when they're done processing the config `cc`. Useful when synchronous processing is required. type configCallback struct { - cc types.ConnectionConfig - ch chan struct{} + cc types.ConnectionConfig + done chan struct{} } var _ types.NetworkClient = (*manager)(nil) @@ -76,16 +76,16 @@ func (s *manager) SendMetadata(ctx context.Context, data *types.TimeSeriesBinary } func (s *manager) UpdateConfig(ctx context.Context, cc types.ConnectionConfig) error { - ch := make(chan struct{}) - defer close(ch) + done := make(chan struct{}) + defer close(done) err := s.configInbox.Send(ctx, configCallback{ - cc: cc, - ch: ch, + cc: cc, + done: done, }) if err != nil { return err } - <-ch + <-done return nil } @@ -99,7 +99,7 @@ func (s *manager) DoWork(ctx actor.Context) actor.WorkerStatus { } s.updateConfig(cfg.cc) // Notify the caller we have applied the config. - cfg.ch <- struct{}{} + cfg.done <- struct{}{} return actor.WorkerContinue default: } @@ -134,7 +134,7 @@ func (s *manager) DoWork(ctx actor.Context) actor.WorkerStatus { } s.updateConfig(cfg.cc) // Notify the caller we have applied the config. - cfg.ch <- struct{}{} + cfg.done <- struct{}{} return actor.WorkerContinue } } diff --git a/internal/component/prometheus/remote/queue/serialization/serializer.go b/internal/component/prometheus/remote/queue/serialization/serializer.go index f6a38c1b8..95a89cca5 100644 --- a/internal/component/prometheus/remote/queue/serialization/serializer.go +++ b/internal/component/prometheus/remote/queue/serialization/serializer.go @@ -192,7 +192,7 @@ func (s *serializer) flushToDisk(ctx actor.Context) error { out := snappy.Encode(buf) meta := map[string]string{ // product.signal_type.schema.version - "version": "alloy.metrics.queue.v1", + "version": types.AlloyFileVersion, "compression": "snappy", "series_count": strconv.Itoa(len(group.Series)), "meta_count": strconv.Itoa(len(group.Metadata)), diff --git a/internal/component/prometheus/remote/queue/types/serializer.go b/internal/component/prometheus/remote/queue/types/serializer.go index 6919f666f..d0041242c 100644 --- a/internal/component/prometheus/remote/queue/types/serializer.go +++ b/internal/component/prometheus/remote/queue/types/serializer.go @@ -5,6 +5,8 @@ import ( "time" ) +const AlloyFileVersion = "alloy.metrics.queue.v1" + type SerializerConfig struct { // MaxSignalsInBatch controls what the max batch size is. MaxSignalsInBatch uint32