Skip to content

Commit

Permalink
Changes and testing.
Browse files Browse the repository at this point in the history
  • Loading branch information
mattdurham committed Oct 8, 2024
1 parent f04edb4 commit 0d99288
Show file tree
Hide file tree
Showing 13 changed files with 231 additions and 106 deletions.
3 changes: 2 additions & 1 deletion internal/component/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ import (
_ "github.com/grafana/alloy/internal/component/otelcol/processor/attributes" // Import otelcol.processor.attributes
_ "github.com/grafana/alloy/internal/component/otelcol/processor/batch" // Import otelcol.processor.batch
_ "github.com/grafana/alloy/internal/component/otelcol/processor/deltatocumulative" // Import otelcol.processor.deltatocumulative
_ "github.com/grafana/alloy/internal/component/otelcol/processor/interval" // Import otelcol.processor.interval
_ "github.com/grafana/alloy/internal/component/otelcol/processor/discovery" // Import otelcol.processor.discovery
_ "github.com/grafana/alloy/internal/component/otelcol/processor/filter" // Import otelcol.processor.filter
_ "github.com/grafana/alloy/internal/component/otelcol/processor/groupbyattrs" // Import otelcol.processor.groupbyattrs
_ "github.com/grafana/alloy/internal/component/otelcol/processor/interval" // Import otelcol.processor.interval
_ "github.com/grafana/alloy/internal/component/otelcol/processor/k8sattributes" // Import otelcol.processor.k8sattributes
_ "github.com/grafana/alloy/internal/component/otelcol/processor/memorylimiter" // Import otelcol.processor.memory_limiter
_ "github.com/grafana/alloy/internal/component/otelcol/processor/probabilistic_sampler" // Import otelcol.processor.probabilistic_sampler
Expand Down Expand Up @@ -134,6 +134,7 @@ import (
_ "github.com/grafana/alloy/internal/component/prometheus/operator/servicemonitors" // Import prometheus.operator.servicemonitors
_ "github.com/grafana/alloy/internal/component/prometheus/receive_http" // Import prometheus.receive_http
_ "github.com/grafana/alloy/internal/component/prometheus/relabel" // Import prometheus.relabel
_ "github.com/grafana/alloy/internal/component/prometheus/remote/queue" // Import prometheus.remote.queue
_ "github.com/grafana/alloy/internal/component/prometheus/remotewrite" // Import prometheus.remote_write
_ "github.com/grafana/alloy/internal/component/prometheus/scrape" // Import prometheus.scrape
_ "github.com/grafana/alloy/internal/component/pyroscope/ebpf" // Import pyroscope.ebpf
Expand Down
39 changes: 14 additions & 25 deletions internal/component/prometheus/remote/queue/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ func NewComponent(opts component.Options, args Arguments) (*Queue, error) {
}
s.opts.OnStateChange(Exports{Receiver: s})
err := s.createEndpoints()
for _, ep := range s.endpoints {
ep.Start()
}
if err != nil {
return nil, err
}
Expand All @@ -58,9 +61,6 @@ type Queue struct {
// suffers a fatal error. Run is guaranteed to be called exactly once per
// Component.
func (s *Queue) Run(ctx context.Context) error {
for _, ep := range s.endpoints {
ep.Start()
}
defer func() {
s.mut.Lock()
defer s.mut.Unlock()
Expand Down Expand Up @@ -94,7 +94,8 @@ func (s *Queue) Update(args component.Arguments) error {
return nil
}
s.args = newArgs
// TODO @mattdurham need to cycle through the endpoints figuring out what changed instead of this global stop and start..
// TODO @mattdurham need to cycle through the endpoints figuring out what changed instead of this global stop and start.
// TODO @mattdurham is there an issue/race condition with stopping these while the appender is still going on.
if len(s.endpoints) > 0 {
for _, ep := range s.endpoints {
ep.Stop()
Expand All @@ -112,41 +113,29 @@ func (s *Queue) Update(args component.Arguments) error {
}

func (s *Queue) createEndpoints() error {
for _, ep := range s.args.Connections {
// @mattdurham not in love with this code.
for _, ep := range s.args.Endpoints {
reg := prometheus.WrapRegistererWith(prometheus.Labels{"endpoint": ep.Name}, s.opts.Registerer)
stats := types.NewStats("alloy", "queue_series", reg)
stats.BackwardsCompatibility(reg)
stats.SeriesBackwardsCompatibility(reg)
meta := types.NewStats("alloy", "queue_metadata", reg)
cfg := types.ConnectionConfig{
URL: ep.URL,
BatchCount: ep.BatchCount,
FlushFrequency: ep.FlushFrequency,
Timeout: ep.Timeout,
UserAgent: "alloy",
ExternalLabels: s.args.ExternalLabels,
Connections: ep.QueueCount,
}
if ep.BasicAuth != nil {
cfg.BasicAuth = &types.BasicAuth{
Username: ep.BasicAuth.Username,
Password: string(ep.BasicAuth.Password),
}
}
meta.MetaBackwardsCompatibility(reg)
cfg := ep.ToNativeType()
client, err := network.New(cfg, s.log, stats.UpdateNetwork, meta.UpdateNetwork)
if err != nil {
return err
}
end := NewEndpoint(client, nil, stats, meta, s.args.TTL, s.opts.Logger)
end := NewEndpoint(client, nil, s.args.TTL, s.opts.Logger)
fq, err := filequeue.NewQueue(filepath.Join(s.opts.DataPath, ep.Name, "wal"), func(ctx context.Context, dh types.DataHandle) {
_ = end.incoming.Send(ctx, dh)
}, s.opts.Logger)
if err != nil {
return err
}
serial, err := serialization.NewSerializer(types.SerializerConfig{
MaxSignalsInBatch: uint32(s.args.MaxSignalsToBatch),
FlushFrequency: s.args.BatchFrequency,
}, fq, stats.UpdateFileQueue, s.opts.Logger)
MaxSignalsInBatch: uint32(s.args.Serialization.MaxSignalsToBatch),
FlushFrequency: s.args.Serialization.BatchFrequency,
}, fq, stats.UpdateSerializer, s.opts.Logger)
if err != nil {
return err
}
Expand Down
11 changes: 6 additions & 5 deletions internal/component/prometheus/remote/queue/e2e_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,12 @@ func newComponentBenchmark(t *testing.B, l log.Logger, url string, exp chan Expo
Registerer: fakeRegistry{},
Tracer: nil,
}, Arguments{
TTL: 2 * time.Hour,
MaxSignalsToBatch: 100_000,
BatchFrequency: 1 * time.Second,
Connections: []ConnectionConfig{{
TTL: 2 * time.Hour,
Serialization: Serialization{
MaxSignalsToBatch: 100_000,
BatchFrequency: 1 * time.Second,
},
Endpoints: []EndpointConfig{{
Name: "test",
URL: url,
Timeout: 10 * time.Second,
Expand All @@ -105,7 +107,6 @@ func newComponentBenchmark(t *testing.B, l log.Logger, url string, exp chan Expo
FlushFrequency: 1 * time.Second,
QueueCount: 1,
}},
ExternalLabels: nil,
})
}

Expand Down
Loading

0 comments on commit 0d99288

Please sign in to comment.