Skip to content

Commit

Permalink
PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
mattdurham committed Oct 11, 2024
1 parent 98872f8 commit bd2d083
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 13 deletions.
1 change: 1 addition & 0 deletions internal/component/prometheus/remote/queue/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func (s *Queue) Update(args component.Arguments) error {
}
s.args = newArgs
// TODO @mattdurham need to cycle through the endpoints figuring out what changed instead of this global stop and start.
// This will cause data in the endpoints and their children to be lost.
if len(s.endpoints) > 0 {
for _, ep := range s.endpoints {
ep.Stop()
Expand Down
3 changes: 1 addition & 2 deletions internal/component/prometheus/remote/queue/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func (ep *endpoint) Stop() {
// Stop in order of data flow. This prevents errors around stopped mailboxes that can pop up.
ep.serializer.Stop()
ep.network.Stop()
ep.network.Stop()
ep.self.Stop()
}

Expand Down Expand Up @@ -84,7 +83,7 @@ func (ep *endpoint) deserializeAndSend(ctx context.Context, meta map[string]stri
level.Error(ep.log).Log("msg", "version not found for deserialization")
return
}
if version != "alloy.metrics.queue.v1" {
if version != types.AlloyFileVersion {
level.Error(ep.log).Log("msg", "invalid version found for deserialization", "version", version)
return
}
Expand Down
20 changes: 10 additions & 10 deletions internal/component/prometheus/remote/queue/network/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ type manager struct {
metaStats func(types.NetworkStats)
}

// configCallback allows the config to be synchronous.
// configCallback allows actors to notify via `done` channel when they're done processing the config `cc`. Useful when synchronous processing is required.
type configCallback struct {
cc types.ConnectionConfig
ch chan struct{}
cc types.ConnectionConfig
done chan struct{}
}

var _ types.NetworkClient = (*manager)(nil)
Expand Down Expand Up @@ -76,16 +76,16 @@ func (s *manager) SendMetadata(ctx context.Context, data *types.TimeSeriesBinary
}

func (s *manager) UpdateConfig(ctx context.Context, cc types.ConnectionConfig) error {
ch := make(chan struct{})
defer close(ch)
done := make(chan struct{})
defer close(done)
err := s.configInbox.Send(ctx, configCallback{
cc: cc,
ch: ch,
cc: cc,
done: done,
})
if err != nil {
return err
}
<-ch
<-done
return nil
}

Expand All @@ -99,7 +99,7 @@ func (s *manager) DoWork(ctx actor.Context) actor.WorkerStatus {
}
s.updateConfig(cfg.cc)
// Notify the caller we have applied the config.
cfg.ch <- struct{}{}
cfg.done <- struct{}{}
return actor.WorkerContinue
default:
}
Expand Down Expand Up @@ -134,7 +134,7 @@ func (s *manager) DoWork(ctx actor.Context) actor.WorkerStatus {
}
s.updateConfig(cfg.cc)
// Notify the caller we have applied the config.
cfg.ch <- struct{}{}
cfg.done <- struct{}{}
return actor.WorkerContinue
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (s *serializer) flushToDisk(ctx actor.Context) error {
out := snappy.Encode(buf)
meta := map[string]string{
// product.signal_type.schema.version
"version": "alloy.metrics.queue.v1",
"version": types.AlloyFileVersion,
"compression": "snappy",
"series_count": strconv.Itoa(len(group.Series)),
"meta_count": strconv.Itoa(len(group.Metadata)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"time"
)

const AlloyFileVersion = "alloy.metrics.queue.v1"

type SerializerConfig struct {
// MaxSignalsInBatch controls what the max batch size is.
MaxSignalsInBatch uint32
Expand Down

0 comments on commit bd2d083

Please sign in to comment.