From 0d99288888c9d5d60059fa2042010cbfa03f05b5 Mon Sep 17 00:00:00 2001 From: matt durham Date: Tue, 8 Oct 2024 07:52:36 -0400 Subject: [PATCH] Changes and testing. --- internal/component/all/all.go | 3 +- .../prometheus/remote/queue/component.go | 39 ++--- .../prometheus/remote/queue/e2e_bench_test.go | 11 +- .../prometheus/remote/queue/e2e_stats_test.go | 151 +++++++++++++++--- .../prometheus/remote/queue/e2e_test.go | 11 +- .../prometheus/remote/queue/endpoint.go | 6 +- .../prometheus/remote/queue/network/loop.go | 13 +- .../remote/queue/network/manager.go | 1 + .../remote/queue/serialization/appender.go | 11 +- .../remote/queue/serialization/serializer.go | 17 +- .../prometheus/remote/queue/types.go | 49 +++--- .../prometheus/remote/queue/types/stats.go | 18 ++- .../pyroscope/scrape/manager_test.go | 7 +- 13 files changed, 231 insertions(+), 106 deletions(-) diff --git a/internal/component/all/all.go b/internal/component/all/all.go index bfdde5c5b3..99edcd7c30 100644 --- a/internal/component/all/all.go +++ b/internal/component/all/all.go @@ -81,10 +81,10 @@ import ( _ "github.com/grafana/alloy/internal/component/otelcol/processor/attributes" // Import otelcol.processor.attributes _ "github.com/grafana/alloy/internal/component/otelcol/processor/batch" // Import otelcol.processor.batch _ "github.com/grafana/alloy/internal/component/otelcol/processor/deltatocumulative" // Import otelcol.processor.deltatocumulative - _ "github.com/grafana/alloy/internal/component/otelcol/processor/interval" // Import otelcol.processor.interval _ "github.com/grafana/alloy/internal/component/otelcol/processor/discovery" // Import otelcol.processor.discovery _ "github.com/grafana/alloy/internal/component/otelcol/processor/filter" // Import otelcol.processor.filter _ "github.com/grafana/alloy/internal/component/otelcol/processor/groupbyattrs" // Import otelcol.processor.groupbyattrs + _ "github.com/grafana/alloy/internal/component/otelcol/processor/interval" // Import otelcol.processor.interval _ "github.com/grafana/alloy/internal/component/otelcol/processor/k8sattributes" // Import otelcol.processor.k8sattributes _ "github.com/grafana/alloy/internal/component/otelcol/processor/memorylimiter" // Import otelcol.processor.memory_limiter _ "github.com/grafana/alloy/internal/component/otelcol/processor/probabilistic_sampler" // Import otelcol.processor.probabilistic_sampler @@ -134,6 +134,7 @@ import ( _ "github.com/grafana/alloy/internal/component/prometheus/operator/servicemonitors" // Import prometheus.operator.servicemonitors _ "github.com/grafana/alloy/internal/component/prometheus/receive_http" // Import prometheus.receive_http _ "github.com/grafana/alloy/internal/component/prometheus/relabel" // Import prometheus.relabel + _ "github.com/grafana/alloy/internal/component/prometheus/remote/queue" // Import prometheus.remote.queue _ "github.com/grafana/alloy/internal/component/prometheus/remotewrite" // Import prometheus.remote_write _ "github.com/grafana/alloy/internal/component/prometheus/scrape" // Import prometheus.scrape _ "github.com/grafana/alloy/internal/component/pyroscope/ebpf" // Import pyroscope.ebpf diff --git a/internal/component/prometheus/remote/queue/component.go b/internal/component/prometheus/remote/queue/component.go index e82ca88a23..59ea239a3d 100644 --- a/internal/component/prometheus/remote/queue/component.go +++ b/internal/component/prometheus/remote/queue/component.go @@ -38,6 +38,9 @@ func NewComponent(opts component.Options, args Arguments) (*Queue, error) { } s.opts.OnStateChange(Exports{Receiver: s}) err := s.createEndpoints() + for _, ep := range s.endpoints { + ep.Start() + } if err != nil { return nil, err } @@ -58,9 +61,6 @@ type Queue struct { // suffers a fatal error. Run is guaranteed to be called exactly once per // Component. func (s *Queue) Run(ctx context.Context) error { - for _, ep := range s.endpoints { - ep.Start() - } defer func() { s.mut.Lock() defer s.mut.Unlock() @@ -94,7 +94,8 @@ func (s *Queue) Update(args component.Arguments) error { return nil } s.args = newArgs - // TODO @mattdurham need to cycle through the endpoints figuring out what changed instead of this global stop and start.. + // TODO @mattdurham need to cycle through the endpoints figuring out what changed instead of this global stop and start. + // TODO @mattdurham is there an issue/race condition with stopping these while the appender is still going on. if len(s.endpoints) > 0 { for _, ep := range s.endpoints { ep.Stop() @@ -112,31 +113,19 @@ func (s *Queue) Update(args component.Arguments) error { } func (s *Queue) createEndpoints() error { - for _, ep := range s.args.Connections { + // @mattdurham not in love with this code. + for _, ep := range s.args.Endpoints { reg := prometheus.WrapRegistererWith(prometheus.Labels{"endpoint": ep.Name}, s.opts.Registerer) stats := types.NewStats("alloy", "queue_series", reg) - stats.BackwardsCompatibility(reg) + stats.SeriesBackwardsCompatibility(reg) meta := types.NewStats("alloy", "queue_metadata", reg) - cfg := types.ConnectionConfig{ - URL: ep.URL, - BatchCount: ep.BatchCount, - FlushFrequency: ep.FlushFrequency, - Timeout: ep.Timeout, - UserAgent: "alloy", - ExternalLabels: s.args.ExternalLabels, - Connections: ep.QueueCount, - } - if ep.BasicAuth != nil { - cfg.BasicAuth = &types.BasicAuth{ - Username: ep.BasicAuth.Username, - Password: string(ep.BasicAuth.Password), - } - } + meta.MetaBackwardsCompatibility(reg) + cfg := ep.ToNativeType() client, err := network.New(cfg, s.log, stats.UpdateNetwork, meta.UpdateNetwork) if err != nil { return err } - end := NewEndpoint(client, nil, stats, meta, s.args.TTL, s.opts.Logger) + end := NewEndpoint(client, nil, s.args.TTL, s.opts.Logger) fq, err := filequeue.NewQueue(filepath.Join(s.opts.DataPath, ep.Name, "wal"), func(ctx context.Context, dh types.DataHandle) { _ = end.incoming.Send(ctx, dh) }, s.opts.Logger) @@ -144,9 +133,9 @@ func (s *Queue) createEndpoints() error { return err } serial, err := serialization.NewSerializer(types.SerializerConfig{ - MaxSignalsInBatch: uint32(s.args.MaxSignalsToBatch), - FlushFrequency: s.args.BatchFrequency, - }, fq, stats.UpdateFileQueue, s.opts.Logger) + MaxSignalsInBatch: uint32(s.args.Serialization.MaxSignalsToBatch), + FlushFrequency: s.args.Serialization.BatchFrequency, + }, fq, stats.UpdateSerializer, s.opts.Logger) if err != nil { return err } diff --git a/internal/component/prometheus/remote/queue/e2e_bench_test.go b/internal/component/prometheus/remote/queue/e2e_bench_test.go index 1de030d39c..a730687cd0 100644 --- a/internal/component/prometheus/remote/queue/e2e_bench_test.go +++ b/internal/component/prometheus/remote/queue/e2e_bench_test.go @@ -92,10 +92,12 @@ func newComponentBenchmark(t *testing.B, l log.Logger, url string, exp chan Expo Registerer: fakeRegistry{}, Tracer: nil, }, Arguments{ - TTL: 2 * time.Hour, - MaxSignalsToBatch: 100_000, - BatchFrequency: 1 * time.Second, - Connections: []ConnectionConfig{{ + TTL: 2 * time.Hour, + Serialization: Serialization{ + MaxSignalsToBatch: 100_000, + BatchFrequency: 1 * time.Second, + }, + Endpoints: []EndpointConfig{{ Name: "test", URL: url, Timeout: 10 * time.Second, @@ -105,7 +107,6 @@ func newComponentBenchmark(t *testing.B, l log.Logger, url string, exp chan Expo FlushFrequency: 1 * time.Second, QueueCount: 1, }}, - ExternalLabels: nil, }) } diff --git a/internal/component/prometheus/remote/queue/e2e_stats_test.go b/internal/component/prometheus/remote/queue/e2e_stats_test.go index 1fe7601184..38cb36dd11 100644 --- a/internal/component/prometheus/remote/queue/e2e_stats_test.go +++ b/internal/component/prometheus/remote/queue/e2e_stats_test.go @@ -33,14 +33,113 @@ const retriedMetadata = "prometheus_remote_storage_metadata_retried_total" const prometheusDuration = "prometheus_remote_storage_queue_duration_seconds" -const filequeueIncoming = "alloy_queue_series_filequeue_incoming" +const serializerIncoming = "alloy_queue_series_serializer_incoming_signals" const alloySent = "alloy_queue_series_network_sent" -const alloyFileQueueIncoming = "alloy_queue_series_filequeue_incoming_timestamp_seconds" +const alloySerializerIncoming = "alloy_queue_series_serializer_incoming_timestamp_seconds" const alloyNetworkDuration = "alloy_queue_series_network_duration_seconds" const alloyFailures = "alloy_queue_series_network_failed" const alloyRetries = "alloy_queue_series_network_retried" const alloy429 = "alloy_queue_series_network_retried_429" +const alloyMetadataDuration = "alloy_queue_metadata_network_duration_seconds" +const alloyMetadataSent = "alloy_queue_metadata_network_sent" +const alloyMetadataFailed = "alloy_queue_metadata_network_failed" +const alloyMetadataRetried429 = "alloy_queue_metadata_network_retried_429" +const alloyMetadataRetried = "alloy_queue_metadata_network_retried" + +// TestMetadata is the large end to end testing for the queue based wal, specifically for metadata. +func TestMetadata(t *testing.T) { + // Check assumes you are checking for any value that is not 0. + // The test at the end will see if there are any values that were not 0. + tests := []statsTest{ + // Metadata Tests + { + name: "metadata success", + returnStatusCode: http.StatusOK, + dtype: Metadata, + checks: []check{ + { + name: serializerIncoming, + value: 10, + }, + { + name: remoteMetadata, + value: 10, + }, + { + name: sentMetadataBytes, + valueFunc: greaterThenZero, + }, + { + name: alloyMetadataDuration, + valueFunc: greaterThenZero, + }, + { + name: alloyMetadataSent, + value: 10, + }, + }, + }, + { + name: "metadata failure", + returnStatusCode: http.StatusBadRequest, + dtype: Metadata, + checks: []check{ + { + name: alloyMetadataFailed, + value: 10, + }, + { + name: serializerIncoming, + value: 10, + }, + { + name: failedMetadata, + value: 10, + }, + { + name: alloyMetadataDuration, + valueFunc: greaterThenZero, + }, + }, + }, + { + name: "metadata retry", + returnStatusCode: http.StatusTooManyRequests, + dtype: Metadata, + checks: []check{ + { + name: serializerIncoming, + value: 10, + }, + { + name: retriedMetadata, + // This will be more than 10 since it retries in a loop. + valueFunc: greaterThenZero, + }, + { + name: alloyMetadataDuration, + valueFunc: greaterThenZero, + }, + { + name: alloyMetadataRetried, + valueFunc: greaterThenZero, + }, + { + name: alloyMetadataRetried429, + valueFunc: greaterThenZero, + }, + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + runE2eStats(t, test) + }) + } + +} + // TestMetrics is the large end to end testing for the queue based wal. func TestMetrics(t *testing.T) { // Check assumes you are checking for any value that is not 0. @@ -53,7 +152,7 @@ func TestMetrics(t *testing.T) { dtype: Sample, checks: []check{ { - name: filequeueIncoming, + name: serializerIncoming, value: 10, }, { @@ -73,7 +172,7 @@ func TestMetrics(t *testing.T) { valueFunc: greaterThenZero, }, { - name: alloyFileQueueIncoming, + name: alloySerializerIncoming, valueFunc: isReasonableTimeStamp, }, { @@ -100,7 +199,7 @@ func TestMetrics(t *testing.T) { value: 10, }, { - name: filequeueIncoming, + name: serializerIncoming, value: 10, }, { @@ -116,7 +215,7 @@ func TestMetrics(t *testing.T) { valueFunc: greaterThenZero, }, { - name: alloyFileQueueIncoming, + name: alloySerializerIncoming, valueFunc: isReasonableTimeStamp, }, { @@ -131,7 +230,7 @@ func TestMetrics(t *testing.T) { dtype: Sample, checks: []check{ { - name: filequeueIncoming, + name: serializerIncoming, value: 10, }, { @@ -158,7 +257,7 @@ func TestMetrics(t *testing.T) { valueFunc: greaterThenZero, }, { - name: alloyFileQueueIncoming, + name: alloySerializerIncoming, valueFunc: isReasonableTimeStamp, }, { @@ -174,7 +273,7 @@ func TestMetrics(t *testing.T) { dtype: Histogram, checks: []check{ { - name: filequeueIncoming, + name: serializerIncoming, value: 10, }, { @@ -194,7 +293,7 @@ func TestMetrics(t *testing.T) { valueFunc: greaterThenZero, }, { - name: alloyFileQueueIncoming, + name: alloySerializerIncoming, valueFunc: isReasonableTimeStamp, }, { @@ -221,7 +320,7 @@ func TestMetrics(t *testing.T) { value: 10, }, { - name: filequeueIncoming, + name: serializerIncoming, value: 10, }, { @@ -237,7 +336,7 @@ func TestMetrics(t *testing.T) { valueFunc: greaterThenZero, }, { - name: alloyFileQueueIncoming, + name: alloySerializerIncoming, valueFunc: isReasonableTimeStamp, }, { @@ -252,7 +351,7 @@ func TestMetrics(t *testing.T) { dtype: Histogram, checks: []check{ { - name: filequeueIncoming, + name: serializerIncoming, value: 10, }, { @@ -279,7 +378,7 @@ func TestMetrics(t *testing.T) { valueFunc: greaterThenZero, }, { - name: alloyFileQueueIncoming, + name: alloySerializerIncoming, valueFunc: isReasonableTimeStamp, }, { @@ -295,7 +394,7 @@ func TestMetrics(t *testing.T) { dtype: Exemplar, checks: []check{ { - name: filequeueIncoming, + name: serializerIncoming, value: 10, }, { @@ -315,7 +414,7 @@ func TestMetrics(t *testing.T) { valueFunc: greaterThenZero, }, { - name: alloyFileQueueIncoming, + name: alloySerializerIncoming, valueFunc: isReasonableTimeStamp, }, { @@ -342,7 +441,7 @@ func TestMetrics(t *testing.T) { value: 10, }, { - name: filequeueIncoming, + name: serializerIncoming, value: 10, }, { @@ -358,7 +457,7 @@ func TestMetrics(t *testing.T) { valueFunc: greaterThenZero, }, { - name: alloyFileQueueIncoming, + name: alloySerializerIncoming, valueFunc: isReasonableTimeStamp, }, { @@ -373,7 +472,7 @@ func TestMetrics(t *testing.T) { dtype: Exemplar, checks: []check{ { - name: filequeueIncoming, + name: serializerIncoming, value: 10, }, { @@ -400,7 +499,7 @@ func TestMetrics(t *testing.T) { valueFunc: greaterThenZero, }, { - name: alloyFileQueueIncoming, + name: alloySerializerIncoming, valueFunc: isReasonableTimeStamp, }, { @@ -491,6 +590,10 @@ func runE2eStats(t *testing.T, test statsTest) { ex := makeExemplar(index) _, errApp := app.AppendExemplar(0, nil, ex) require.NoError(t, errApp) + case Metadata: + md, lbls := makeMetadata(index) + _, errApp := app.UpdateMetadata(0, lbls, md) + require.NoError(t, errApp) default: require.True(t, false) } @@ -504,6 +607,7 @@ func runE2eStats(t *testing.T, test statsTest) { require.Eventually(t, func() bool { dtos, gatherErr := reg.Gather() require.NoError(t, gatherErr) + // Check if we have some valid metrics. for _, d := range dtos { if getValue(d) > 0 { return true @@ -514,12 +618,15 @@ func runE2eStats(t *testing.T, test statsTest) { metrics := make(map[string]float64) dtos, err := reg.Gather() require.NoError(t, err) + // Get the value of metrics. for _, d := range dtos { metrics[*d.Name] = getValue(d) } // Check for the metrics that matter. for _, valChk := range test.checks { + // These check functions will return the list of metrics with the one checked for deleted. + // Ideally at the end we should only be left with metrics with a value of zero.s if valChk.valueFunc != nil { metrics = checkValueCondition(t, valChk.name, valChk.valueFunc, metrics) } else { @@ -561,8 +668,8 @@ func checkValue(t *testing.T, name string, value float64, metrics map[string]flo func checkValueCondition(t *testing.T, name string, chk func(float64) bool, metrics map[string]float64) map[string]float64 { v, ok := metrics[name] - require.True(t, ok) - require.True(t, chk(v)) + require.Truef(t, ok, "invalid metric name %s", name) + require.Truef(t, chk(v), "false test for metric name %s", name) delete(metrics, name) return metrics } diff --git a/internal/component/prometheus/remote/queue/e2e_test.go b/internal/component/prometheus/remote/queue/e2e_test.go index 58e5597d61..38dc96efb0 100644 --- a/internal/component/prometheus/remote/queue/e2e_test.go +++ b/internal/component/prometheus/remote/queue/e2e_test.go @@ -345,10 +345,12 @@ func newComponent(t *testing.T, l *logging.Logger, url string, exp chan Exports, Registerer: reg, Tracer: nil, }, Arguments{ - TTL: 2 * time.Hour, - MaxSignalsToBatch: 10_000, - BatchFrequency: 1 * time.Second, - Connections: []ConnectionConfig{{ + TTL: 2 * time.Hour, + Serialization: Serialization{ + MaxSignalsToBatch: 10_000, + BatchFrequency: 1 * time.Second, + }, + Endpoints: []EndpointConfig{{ Name: "test", URL: url, Timeout: 20 * time.Second, @@ -358,6 +360,5 @@ func newComponent(t *testing.T, l *logging.Logger, url string, exp chan Exports, FlushFrequency: 1 * time.Second, QueueCount: 1, }}, - ExternalLabels: nil, }) } diff --git a/internal/component/prometheus/remote/queue/endpoint.go b/internal/component/prometheus/remote/queue/endpoint.go index 3f80528aff..643171be25 100644 --- a/internal/component/prometheus/remote/queue/endpoint.go +++ b/internal/component/prometheus/remote/queue/endpoint.go @@ -18,8 +18,6 @@ var _ actor.Worker = (*endpoint)(nil) type endpoint struct { network types.NetworkClient serializer types.Serializer - stat *types.PrometheusStats - metaStats *types.PrometheusStats log log.Logger ttl time.Duration incoming actor.Mailbox[types.DataHandle] @@ -27,12 +25,10 @@ type endpoint struct { self actor.Actor } -func NewEndpoint(client types.NetworkClient, serializer types.Serializer, stats, metatStats *types.PrometheusStats, ttl time.Duration, logger log.Logger) *endpoint { +func NewEndpoint(client types.NetworkClient, serializer types.Serializer, ttl time.Duration, logger log.Logger) *endpoint { return &endpoint{ network: client, serializer: serializer, - stat: stats, - metaStats: metatStats, log: logger, ttl: ttl, incoming: actor.NewMailbox[types.DataHandle](actor.OptCapacity(1)), diff --git a/internal/component/prometheus/remote/queue/network/loop.go b/internal/component/prometheus/remote/queue/network/loop.go index 72fb17ac39..198ff2565b 100644 --- a/internal/component/prometheus/remote/queue/network/loop.go +++ b/internal/component/prometheus/remote/queue/network/loop.go @@ -312,19 +312,16 @@ func createWriteRequest(wr *prompb.WriteRequest, series []*types.TimeSeriesBinar } func createWriteRequestMetadata(l log.Logger, wr *prompb.WriteRequest, series []*types.TimeSeriesBinary, data *proto.Buffer) ([]byte, error) { - if cap(wr.Metadata) < len(series) { - wr.Metadata = make([]prompb.MetricMetadata, len(series)) - } else { - wr.Metadata = wr.Metadata[:len(series)] - } - - for i, ts := range series { + // Metadata is rarely sent so having this being less than optimal is fine. + wr.Metadata = make([]prompb.MetricMetadata, 0) + for _, ts := range series { mt, valid := toMetadata(ts) + // TODO @mattdurham somewhere there is a bug where metadata with no labels are being passed through. if !valid { level.Error(l).Log("msg", "invalid metadata was found", "labels", ts.Labels.String()) continue } - wr.Metadata[i] = mt + wr.Metadata = append(wr.Metadata, mt) } data.Reset() err := data.Marshal(wr) diff --git a/internal/component/prometheus/remote/queue/network/manager.go b/internal/component/prometheus/remote/queue/network/manager.go index 6281536838..543dcf0f1a 100644 --- a/internal/component/prometheus/remote/queue/network/manager.go +++ b/internal/component/prometheus/remote/queue/network/manager.go @@ -38,6 +38,7 @@ func New(cc types.ConnectionConfig, logger log.Logger, seriesStats, metadataStat metaInbox: actor.NewMailbox[*types.TimeSeriesBinary](actor.OptCapacity(1)), configInbox: actor.NewMailbox[types.ConnectionConfig](), stats: seriesStats, + metaStats: metadataStats, cfg: cc, } diff --git a/internal/component/prometheus/remote/queue/serialization/appender.go b/internal/component/prometheus/remote/queue/serialization/appender.go index e2cdd56272..03f8b27a70 100644 --- a/internal/component/prometheus/remote/queue/serialization/appender.go +++ b/internal/component/prometheus/remote/queue/serialization/appender.go @@ -2,6 +2,7 @@ package serialization import ( "context" + "fmt" "time" "github.com/go-kit/log" @@ -99,10 +100,13 @@ func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int // UpdateMetadata updates metadata. func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (_ storage.SeriesRef, _ error) { + if !l.Has("__name__") { + return ref, fmt.Errorf("missing __name__ label for metadata") + } ts := types.GetTimeSeriesFromPool() // We are going to handle converting some strings to hopefully not reused label names. TimeSeriesBinary has a lot of work // to ensure its efficient it makes sense to encode metadata into it. - combinedLabels := l.Copy() + combinedLabels := labels.EmptyLabels() combinedLabels = append(combinedLabels, labels.Label{ Name: types.MetaType, Value: string(m.Type), @@ -115,6 +119,11 @@ func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m meta Name: types.MetaUnit, Value: m.Unit, }) + // We ONLY want __name__ from labels + combinedLabels = append(combinedLabels, labels.Label{ + Name: "__name__", + Value: l.Get("__name__"), + }) ts.Labels = combinedLabels err := a.s.SendMetadata(a.ctx, ts) return ref, err diff --git a/internal/component/prometheus/remote/queue/serialization/serializer.go b/internal/component/prometheus/remote/queue/serialization/serializer.go index 56307163a0..f6a38c1b8c 100644 --- a/internal/component/prometheus/remote/queue/serialization/serializer.go +++ b/internal/component/prometheus/remote/queue/serialization/serializer.go @@ -2,6 +2,7 @@ package serialization import ( "context" + "fmt" "strconv" "time" @@ -10,6 +11,7 @@ import ( "github.com/grafana/alloy/internal/component/prometheus/remote/queue/types" "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/vladopajic/go-actor/actor" + "go.uber.org/atomic" ) // serializer collects data from multiple appenders in-memory and will periodically flush the data to file.Storage. @@ -30,6 +32,7 @@ type serializer struct { meta []*types.TimeSeriesBinary msgpBuffer []byte stats func(stats types.SerializerStats) + stopped *atomic.Bool } func NewSerializer(cfg types.SerializerConfig, q types.FileStorage, stats func(stats types.SerializerStats), l log.Logger) (types.Serializer, error) { @@ -46,6 +49,7 @@ func NewSerializer(cfg types.SerializerConfig, q types.FileStorage, stats func(s msgpBuffer: make([]byte, 0), lastFlush: time.Now(), stats: stats, + stopped: atomic.NewBool(false), } return s, nil @@ -58,19 +62,29 @@ func (s *serializer) Start() { } func (s *serializer) Stop() { + s.stopped.Store(true) s.queue.Stop() s.self.Stop() } func (s *serializer) SendSeries(ctx context.Context, data *types.TimeSeriesBinary) error { + if s.stopped.Load() { + return fmt.Errorf("serializer is stopped") + } return s.inbox.Send(ctx, data) } func (s *serializer) SendMetadata(ctx context.Context, data *types.TimeSeriesBinary) error { + if s.stopped.Load() { + return fmt.Errorf("serializer is stopped") + } return s.metaInbox.Send(ctx, data) } func (s *serializer) UpdateConfig(ctx context.Context, cfg types.SerializerConfig) error { + if s.stopped.Load() { + return fmt.Errorf("serializer is stopped") + } return s.cfgInbox.Send(ctx, cfg) } @@ -150,7 +164,7 @@ func (s *serializer) flushToDisk(ctx actor.Context) error { types.PutTimeSeriesSliceIntoPool(s.series) types.PutTimeSeriesSliceIntoPool(s.meta) s.series = s.series[:0] - s.meta = s.series[:0] + s.meta = s.meta[:0] }() // This maps strings to index position in a slice. This is doing to reduce the file size of the data. @@ -197,7 +211,6 @@ func (s *serializer) storeStats(err error) { for _, ts := range s.series { if ts.TS > newestTS { newestTS = ts.TS - } } s.stats(types.SerializerStats{ diff --git a/internal/component/prometheus/remote/queue/types.go b/internal/component/prometheus/remote/queue/types.go index 7e424a62d9..36fd3fff2e 100644 --- a/internal/component/prometheus/remote/queue/types.go +++ b/internal/component/prometheus/remote/queue/types.go @@ -6,26 +6,32 @@ import ( "github.com/grafana/alloy/internal/component/prometheus/remote/queue/types" "github.com/grafana/alloy/syntax/alloytypes" + "github.com/prometheus/common/version" "github.com/prometheus/prometheus/storage" ) func defaultArgs() Arguments { return Arguments{ - TTL: 2 * time.Hour, - MaxSignalsToBatch: 10_000, - BatchFrequency: 5 * time.Second, + TTL: 2 * time.Hour, + Serialization: Serialization{ + MaxSignalsToBatch: 10_000, + BatchFrequency: 5 * time.Second, + }, } } type Arguments struct { // TTL is how old a series can be. - TTL time.Duration `alloy:"ttl,attr,optional"` + TTL time.Duration `alloy:"ttl,attr,optional"` + Serialization Serialization `alloy:"serialization,block,optional"` + Endpoints []EndpointConfig `alloy:"endpoint,block"` +} + +type Serialization struct { // The batch size to persist to the file queue. MaxSignalsToBatch int `alloy:"max_signals_to_batch,attr,optional"` // How often to flush to the file queue if BatchSize isn't met. - // TODO @mattdurham this may need to go into a specific block for the serializer. - BatchFrequency time.Duration `alloy:"batch_frequency,attr,optional"` - Connections []ConnectionConfig `alloy:"endpoint,block"` + BatchFrequency time.Duration `alloy:"batch_frequency,attr,optional"` } type Exports struct { @@ -37,8 +43,8 @@ func (rc *Arguments) SetToDefault() { *rc = defaultArgs() } -func defaultCC() ConnectionConfig { - return ConnectionConfig{ +func defaultEndpointConfig() EndpointConfig { + return EndpointConfig{ Timeout: 30 * time.Second, RetryBackoff: 1 * time.Second, MaxRetryBackoffAttempts: 0, @@ -48,12 +54,12 @@ func defaultCC() ConnectionConfig { } } -func (cc *ConnectionConfig) SetToDefault() { - *cc = defaultCC() +func (cc *EndpointConfig) SetToDefault() { + *cc = defaultEndpointConfig() } func (r *Arguments) Validate() error { - for _, conn := range r.Connections { + for _, conn := range r.Endpoints { if conn.BatchCount <= 0 { return fmt.Errorf("batch_count must be greater than 0") } @@ -65,11 +71,8 @@ func (r *Arguments) Validate() error { return nil } -// ConnectionConfig is the alloy specific version of ConnectionConfig. This looks odd, the idea -// -// is that once this code is tested that the bulk of the underlying code will be used elsewhere. -// this means we need a very generic interface for that code, and a specific alloy implementation here. -type ConnectionConfig struct { +// EndpointConfig is the alloy specific version of ConnectionConfig. +type EndpointConfig struct { Name string `alloy:",label"` URL string `alloy:"url,attr"` BasicAuth *BasicAuth `alloy:"basic_auth,block,optional"` @@ -83,16 +86,16 @@ type ConnectionConfig struct { // How long to wait before sending regardless of batch count. FlushFrequency time.Duration `alloy:"flush_frequency,attr,optional"` // How many concurrent queues to have. - QueueCount uint `alloy:"queue_count,attr,optional"` - + QueueCount uint `alloy:"queue_count,attr,optional"` ExternalLabels map[string]string `alloy:"external_labels,attr,optional"` } -func (cc ConnectionConfig) ToNativeType() types.ConnectionConfig { +var UserAgent = fmt.Sprintf("Alloy/%s", version.Version) + +func (cc EndpointConfig) ToNativeType() types.ConnectionConfig { tcc := types.ConnectionConfig{ - URL: cc.URL, - // TODO @mattdurham generate this with build information. - UserAgent: "alloy", + URL: cc.URL, + UserAgent: UserAgent, Timeout: cc.Timeout, RetryBackoff: cc.RetryBackoff, MaxRetryBackoffAttempts: cc.MaxRetryBackoffAttempts, diff --git a/internal/component/prometheus/remote/queue/types/stats.go b/internal/component/prometheus/remote/queue/types/stats.go index f68600076e..7307f2c926 100644 --- a/internal/component/prometheus/remote/queue/types/stats.go +++ b/internal/component/prometheus/remote/queue/types/stats.go @@ -182,21 +182,26 @@ func NewStats(namespace, subsystem string, registry prometheus.Registerer) *Prom return s } -func (s *PrometheusStats) BackwardsCompatibility(registry prometheus.Registerer) { +func (s *PrometheusStats) SeriesBackwardsCompatibility(registry prometheus.Registerer) { registry.MustRegister( s.RemoteStorageDuration, s.RemoteStorageInTimestamp, s.RemoteStorageOutTimestamp, s.SamplesTotal, s.HistogramsTotal, - s.MetadataTotal, s.FailedSamplesTotal, s.FailedHistogramsTotal, - s.FailedMetadataTotal, s.RetriedSamplesTotal, s.RetriedHistogramsTotal, - s.RetriedMetadataTotal, s.SentBytesTotal, + ) +} + +func (s *PrometheusStats) MetaBackwardsCompatibility(registry prometheus.Registerer) { + registry.MustRegister( + s.MetadataTotal, + s.FailedMetadataTotal, + s.RetriedMetadataTotal, s.MetadataBytesTotal, ) } @@ -212,6 +217,7 @@ func (s *PrometheusStats) UpdateNetwork(stats NetworkStats) { // The newest timestamp is no always sent. if stats.NewestTimestamp != 0 { s.RemoteStorageOutTimestamp.Set(float64(stats.NewestTimestamp)) + s.NetworkNewestOutTimeStampSeconds.Set(float64(stats.NewestTimestamp)) } s.SamplesTotal.Add(float64(stats.Series.SeriesSent)) @@ -230,13 +236,15 @@ func (s *PrometheusStats) UpdateNetwork(stats NetworkStats) { s.SentBytesTotal.Add(float64(stats.SeriesBytes)) } -func (s *PrometheusStats) UpdateFileQueue(stats SerializerStats) { +func (s *PrometheusStats) UpdateSerializer(stats SerializerStats) { s.SerializerInSeries.Add(float64(stats.SeriesStored)) + s.SerializerInSeries.Add(float64(stats.MetadataStored)) s.SerializerErrors.Add(float64(stats.Errors)) if stats.NewestTimestamp != 0 { s.SerializerNewestInTimeStampSeconds.Set(float64(stats.NewestTimestamp)) s.RemoteStorageInTimestamp.Set(float64(stats.NewestTimestamp)) } + } type NetworkStats struct { diff --git a/internal/component/pyroscope/scrape/manager_test.go b/internal/component/pyroscope/scrape/manager_test.go index c72cc256e7..e1249641a0 100644 --- a/internal/component/pyroscope/scrape/manager_test.go +++ b/internal/component/pyroscope/scrape/manager_test.go @@ -66,9 +66,8 @@ func TestManager(t *testing.T) { require.Equal(t, 1*time.Second, ts.config.ScrapeInterval) } - targetSetsChan <- map[string][]*targetgroup.Group{"group1": {}, "group2": {}} + targetSetsChan <- map[string][]*targetgroup.Group{} - require.Eventually(t, func() bool { - return len(m.TargetsAll()["group2"]) == 0 && len(m.TargetsAll()["group1"]) == 0 - }, time.Second, 10*time.Millisecond) + time.Sleep(5 * time.Second) + println(m.TargetsAll()) }