diff --git a/CHANGELOG.md b/CHANGELOG.md index f13e515e75d2..4ff616c25480 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,9 @@ Main (unreleased) - Labels of `otel_scope_info` metrics other than `otel_scope_name` and `otel_scope_version` are added as scope attributes with the matching name and version. +- The `target` block in `prometheus.exporter.blackbox` requires a mandatory `name` + argument instead of a block label. (@hainenber) + ### Enhancements - Flow Windows service: Support environment variables. (@jkroepke) @@ -52,6 +55,8 @@ Main (unreleased) - The `remote.http` component can optionally define a request body. (@tpaschalis) +- Added support for `loki.write` to flush WAL on agent shutdown. (@thepalbi) + ### Bugfixes - Update `pyroscope.ebpf` to fix a logical bug causing to profile to many kthreads instead of regular processes https://github.com/grafana/pyroscope/pull/2778 (@korniltsev) @@ -64,6 +69,8 @@ Main (unreleased) - Fixes `loki.source.docker` a behavior that synced an incomplete list of targets to the tailer manager. (@FerdinandvHagen) +- Fixes `otelcol.connector.servicegraph` store ttl default value from 2ms to 2s. (@rlankfo) + ### Other changes - Bump github.com/IBM/sarama from v1.41.2 to v1.42.1 @@ -142,6 +149,8 @@ v0.38.0 (2023-11-21) - Added support for python profiling to `pyroscope.ebpf` component. (@korniltsev) +- Added support for native Prometheus histograms to `otelcol.exporter.prometheus` (@wildum) + - Windows Flow Installer: Add /CONFIG /DISABLEPROFILING and /DISABLEREPORTING flag (@jkroepke) - Add queueing logs remote write client for `loki.write` when WAL is enabled. (@thepalbi) diff --git a/component/common/loki/client/manager.go b/component/common/loki/client/manager.go index 6683e9e5772b..244aa587a81f 100644 --- a/component/common/loki/client/manager.go +++ b/component/common/loki/client/manager.go @@ -35,8 +35,9 @@ func (n nilNotifier) SubscribeCleanup(_ wal.CleanupEventSubscriber) {} func (n nilNotifier) SubscribeWrite(_ wal.WriteEventSubscriber) {} -type Stoppable interface { +type StoppableWatcher interface { Stop() + Drain() } type StoppableClient interface { @@ -44,6 +45,27 @@ type StoppableClient interface { StopNow() } +// watcherClientPair represents a pair of watcher and client, which are coupled together, or just a single client. +type watcherClientPair struct { + watcher StoppableWatcher + client StoppableClient +} + +// Stop will proceed to stop, in order, the possibly-nil watcher and the client. +func (p watcherClientPair) Stop(drain bool) { + // if the config has WAL disabled, there will be no watcher per client config + if p.watcher != nil { + // if drain enabled, drain the WAL + if drain { + p.watcher.Drain() + } + p.watcher.Stop() + } + + // subsequently stop the client + p.client.Stop() +} + // Manager manages remote write client instantiation, and connects the related components to orchestrate the flow of loki.Entry // from the scrape targets, to the remote write clients themselves. // @@ -51,12 +73,10 @@ type StoppableClient interface { // work, tracked in https://github.com/grafana/loki/issues/8197, this Manager will be responsible for instantiating all client // types: Logger, Multi and WAL. type Manager struct { - name string - clients []Client - walWatchers []Stoppable + name string - // stoppableClients is kept separate from clients for avoiding having to couple queueClient to the Client interface - stoppableClients []StoppableClient + clients []Client + pairs []watcherClientPair entries chan loki.Entry once sync.Once @@ -78,8 +98,7 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr clientsCheck := make(map[string]struct{}) clients := make([]Client, 0, len(clientCfgs)) - watchers := make([]Stoppable, 0, len(clientCfgs)) - stoppableClients := make([]StoppableClient, 0, len(clientCfgs)) + pairs := make([]watcherClientPair, 0, len(clientCfgs)) for _, cfg := range clientCfgs { // Don't allow duplicate clients, we have client specific metrics that need at least one unique label value (name). clientName := GetClientName(cfg) @@ -103,7 +122,6 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr if err != nil { return nil, fmt.Errorf("error starting queue client: %w", err) } - stoppableClients = append(stoppableClients, queue) // subscribe watcher's wal.WriteTo to writer events. This will make the writer trigger the cleanup of the wal.WriteTo // series cache whenever a segment is deleted. @@ -116,7 +134,10 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr level.Debug(logger).Log("msg", "starting WAL watcher for client", "client", clientName) watcher.Start() - watchers = append(watchers, watcher) + pairs = append(pairs, watcherClientPair{ + watcher: watcher, + client: queue, + }) } else { client, err := New(metrics, cfg, limits.MaxStreams, limits.MaxLineSize.Val(), limits.MaxLineSizeTruncate, logger) if err != nil { @@ -124,14 +145,16 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr } clients = append(clients, client) - stoppableClients = append(stoppableClients, client) + + pairs = append(pairs, watcherClientPair{ + client: client, + }) } } manager := &Manager{ - clients: clients, - stoppableClients: stoppableClients, - walWatchers: watchers, - entries: make(chan loki.Entry), + clients: clients, + pairs: pairs, + entries: make(chan loki.Entry), } if walCfg.Enabled { manager.name = buildManagerName("wal", clientCfgs...) @@ -174,8 +197,8 @@ func (m *Manager) startWithForward() { } func (m *Manager) StopNow() { - for _, c := range m.stoppableClients { - c.StopNow() + for _, pair := range m.pairs { + pair.client.StopNow() } } @@ -187,18 +210,35 @@ func (m *Manager) Chan() chan<- loki.Entry { return m.entries } +// Stop the manager, not draining the Write-Ahead Log, if that mode is enabled. func (m *Manager) Stop() { + m.StopWithDrain(false) +} + +// StopWithDrain will stop the manager, its Write-Ahead Log watchers, and clients accordingly. If drain is enabled, +// the Watchers will attempt to drain the WAL completely. +// The shutdown procedure first stops the Watchers, allowing them to flush as much data into the clients as possible. Then +// the clients are shut down accordingly. +func (m *Manager) StopWithDrain(drain bool) { // first stop the receiving channel m.once.Do(func() { close(m.entries) }) m.wg.Wait() - // close wal watchers - for _, walWatcher := range m.walWatchers { - walWatcher.Stop() - } - // close clients - for _, c := range m.stoppableClients { - c.Stop() + + var stopWG sync.WaitGroup + + // Depending on whether drain is enabled, the maximum time stopping a watcher and it's client can take is + // the drain time of the watcher + drain time client. To minimize this, and since we keep a separate WAL for each + // client config, each (watcher, client) pair is stopped concurrently. + for _, pair := range m.pairs { + stopWG.Add(1) + go func(pair watcherClientPair) { + defer stopWG.Done() + pair.Stop(drain) + }(pair) } + + // wait for all pairs to be stopped + stopWG.Wait() } // GetClientName computes the specific name for each client config. The name is either the configured Name setting in Config, diff --git a/component/common/loki/client/queue_client.go b/component/common/loki/client/queue_client.go index 871880cad17b..edd9c25bbd37 100644 --- a/component/common/loki/client/queue_client.go +++ b/component/common/loki/client/queue_client.go @@ -29,7 +29,7 @@ import ( // StoppableWriteTo is a mixing of the WAL's WriteTo interface, that is Stoppable as well. type StoppableWriteTo interface { agentWal.WriteTo - Stoppable + Stop() StopNow() } @@ -38,7 +38,7 @@ type StoppableWriteTo interface { type MarkerHandler interface { UpdateReceivedData(segmentId, dataCount int) // Data queued for sending UpdateSentData(segmentId, dataCount int) // Data which was sent or given up on sending - Stoppable + Stop() } // queuedBatch is a batch specific to a tenant, that is considered ready to be sent. diff --git a/component/common/loki/wal/config.go b/component/common/loki/wal/config.go index c0d6c7ae2752..7c22d747c13d 100644 --- a/component/common/loki/wal/config.go +++ b/component/common/loki/wal/config.go @@ -10,8 +10,9 @@ const ( // DefaultWatchConfig is the opinionated defaults for operating the Watcher. var DefaultWatchConfig = WatchConfig{ - MinReadFrequency: time.Millisecond * 250, + MinReadFrequency: 250 * time.Millisecond, MaxReadFrequency: time.Second, + DrainTimeout: 15 * time.Second, } // Config contains all WAL-related settings. @@ -49,6 +50,10 @@ type WatchConfig struct { // MaxReadFrequency controls the maximum read frequency the Watcher polls the WAL for new records. As mentioned above // it caps the polling frequency to a maximum, to prevent to exponential backoff from making it too high. MaxReadFrequency time.Duration + + // DrainTimeout is the maximum amount of time that the Watcher can spend draining the remaining segments in the WAL. + // After that time, the Watcher is stopped immediately, dropping all the work in process. + DrainTimeout time.Duration } // UnmarshalYAML implement YAML Unmarshaler diff --git a/component/common/loki/wal/internal/watcher_state.go b/component/common/loki/wal/internal/watcher_state.go new file mode 100644 index 000000000000..c81413dfd230 --- /dev/null +++ b/component/common/loki/wal/internal/watcher_state.go @@ -0,0 +1,88 @@ +package internal + +import ( + "sync" + + "github.com/go-kit/log" + "github.com/grafana/agent/pkg/flow/logging/level" +) + +const ( + // StateRunning is the main functioning state of the watcher. It will keep tailing head segments, consuming closed + // ones, and checking for new ones. + StateRunning = iota + + // StateDraining is an intermediary state between running and stopping. The watcher will attempt to consume all the data + // found in the WAL, omitting errors and assuming all segments found are "closed", that is, no longer being written. + StateDraining + + // StateStopping means the Watcher is being stopped. It should drop all segment read activity, and exit promptly. + StateStopping +) + +// WatcherState is a holder for the state the Watcher is in. It provides handy methods for checking it it's stopping, getting +// the current state, or blocking until it has stopped. +type WatcherState struct { + current int + mut sync.RWMutex + stoppingSignal chan struct{} + logger log.Logger +} + +func NewWatcherState(l log.Logger) *WatcherState { + return &WatcherState{ + current: StateRunning, + stoppingSignal: make(chan struct{}), + logger: l, + } +} + +// Transition changes the state of WatcherState to next, reacting accordingly. +func (s *WatcherState) Transition(next int) { + s.mut.Lock() + defer s.mut.Unlock() + + level.Debug(s.logger).Log("msg", "watcher transitioning state", "currentState", printState(s.current), "nextState", printState(next)) + + // only perform channel close if the state is not already stopping + // expect s.s to be either draining ro running to perform a close + if next == StateStopping && s.current != next { + close(s.stoppingSignal) + } + + // update state + s.current = next +} + +// IsDraining evaluates to true if the current state is StateDraining. +func (s *WatcherState) IsDraining() bool { + s.mut.RLock() + defer s.mut.RUnlock() + return s.current == StateDraining +} + +// IsStopping evaluates to true if the current state is StateStopping. +func (s *WatcherState) IsStopping() bool { + s.mut.RLock() + defer s.mut.RUnlock() + return s.current == StateStopping +} + +// WaitForStopping returns a channel in which the called can read, effectively waiting until the state changes to stopping. +func (s *WatcherState) WaitForStopping() <-chan struct{} { + return s.stoppingSignal +} + +// printState prints a user-friendly name of the possible Watcher states. +func printState(state int) string { + switch state { + case StateRunning: + return "running" + case StateDraining: + return "draining" + case StateStopping: + return "stopping" + default: + return "unknown" + } +} diff --git a/component/common/loki/wal/watcher.go b/component/common/loki/wal/watcher.go index 0972f32f8f8a..f91e71b856dc 100644 --- a/component/common/loki/wal/watcher.go +++ b/component/common/loki/wal/watcher.go @@ -10,6 +10,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/grafana/agent/component/common/loki/wal/internal" "github.com/grafana/agent/pkg/flow/logging/level" "github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/wlog" @@ -80,7 +81,7 @@ type Watcher struct { actions WriteTo readNotify chan struct{} done chan struct{} - quit chan struct{} + state *internal.WatcherState walDir string logger log.Logger MaxSegment int @@ -88,6 +89,7 @@ type Watcher struct { metrics *WatcherMetrics minReadFreq time.Duration maxReadFreq time.Duration + drainTimeout time.Duration marker Marker savedSegment int } @@ -99,7 +101,7 @@ func NewWatcher(walDir, id string, metrics *WatcherMetrics, writeTo WriteTo, log id: id, actions: writeTo, readNotify: make(chan struct{}), - quit: make(chan struct{}), + state: internal.NewWatcherState(logger), done: make(chan struct{}), MaxSegment: -1, marker: marker, @@ -108,6 +110,7 @@ func NewWatcher(walDir, id string, metrics *WatcherMetrics, writeTo WriteTo, log metrics: metrics, minReadFreq: config.MinReadFrequency, maxReadFreq: config.MaxReadFrequency, + drainTimeout: config.DrainTimeout, } } @@ -121,18 +124,26 @@ func (w *Watcher) Start() { // retries. func (w *Watcher) mainLoop() { defer close(w.done) - for !isClosed(w.quit) { + for !w.state.IsStopping() { if w.marker != nil { w.savedSegment = w.marker.LastMarkedSegment() level.Debug(w.logger).Log("msg", "last saved segment", "segment", w.savedSegment) } - if err := w.run(); err != nil { + err := w.run() + if err != nil { level.Error(w.logger).Log("msg", "error tailing WAL", "err", err) } + if w.state.IsDraining() && errors.Is(err, os.ErrNotExist) { + level.Info(w.logger).Log("msg", "Reached non existing segment while draining, assuming end of WAL") + // since we've reached the end of the WAL, and the Watcher is draining, promptly transition to stopping state + // so the watcher can stoppingSignal early + w.state.Transition(internal.StateStopping) + } + select { - case <-w.quit: + case <-w.state.WaitForStopping(): return case <-time.After(5 * time.Second): } @@ -160,9 +171,8 @@ func (w *Watcher) run() error { } level.Debug(w.logger).Log("msg", "Tailing WAL", "currentSegment", currentSegment, "lastSegment", lastSegment) - for !isClosed(w.quit) { + for !w.state.IsStopping() { w.metrics.currentSegment.WithLabelValues(w.id).Set(float64(currentSegment)) - level.Debug(w.logger).Log("msg", "Processing segment", "currentSegment", currentSegment) // On start, we have a pointer to what is the latest segment. On subsequent calls to this function, // currentSegment will have been incremented, and we should open that segment. @@ -187,6 +197,8 @@ func (w *Watcher) run() error { // If tail is false, we know the segment we are "watching" over is closed (no further write will occur to it). Then, the // segment is read fully, any errors are logged as Warnings, and no error is returned. func (w *Watcher) watch(segmentNum int, tail bool) error { + level.Debug(w.logger).Log("msg", "Watching WAL segment", "currentSegment", segmentNum, "tail", tail) + segment, err := wlog.OpenReadSegment(wlog.SegmentName(w.walDir, segmentNum)) if err != nil { return err @@ -215,7 +227,7 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { for { select { - case <-w.quit: + case <-w.state.WaitForStopping(): return nil case <-segmentTicker.C: @@ -224,24 +236,30 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { return fmt.Errorf("segments: %w", err) } - // Check if new segments exists. - if last <= segmentNum { + // Check if new segments exists, or we are draining the WAL, which means that either: + // - This is the last segment, and we can consume it fully because we are draining the WAL + // - There's a segment after the current one, and we can consume this segment fully as well + if last <= segmentNum && !w.state.IsDraining() { continue } - // Since we know last > segmentNum, there must be a new segment. Read the remaining from the segmentNum segment - // and return from `watch` to read the next one + if w.state.IsDraining() { + level.Debug(w.logger).Log("msg", "Draining segment completely", "segment", segmentNum, "lastSegment", last) + } + + // We now that there's either a new segment (last > segmentNum), or we are draining the WAL. Either case, read + // the remaining data from the segmentNum and return from `watch` to read the next one. _, err = w.readSegment(reader, segmentNum) if debug { level.Warn(w.logger).Log("msg", "Error reading segment inside segmentTicker", "segment", segmentNum, "read", reader.Offset(), "err", err) } - // io.EOF error are non-fatal since we are tailing the wal + // io.EOF error are non-fatal since we are consuming the segment till the end if errors.Unwrap(err) != io.EOF { return err } - // return after reading the whole segment for creating a new LiveReader from the newly created segment + // return after reading the whole segment return nil // the cases below will unlock the select block, and execute the block below @@ -293,7 +311,7 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { func (w *Watcher) readSegment(r *wlog.LiveReader, segmentNum int) (bool, error) { var readData bool - for r.Next() && !isClosed(w.quit) { + for r.Next() && !w.state.IsStopping() { rec := r.Record() w.metrics.recordsRead.WithLabelValues(w.id).Inc() read, err := w.decodeAndDispatch(rec, segmentNum) @@ -331,9 +349,24 @@ func (w *Watcher) decodeAndDispatch(b []byte, segmentNum int) (bool, error) { return readData, firstErr } +// Drain moves the Watcher to a draining state, which will assume no more data is being written to the WAL, and it will +// attempt to read until the end of the last written segment. The calling routine of Drain will block until all data is +// read, or a timeout occurs. +func (w *Watcher) Drain() { + level.Info(w.logger).Log("msg", "Draining Watcher") + w.state.Transition(internal.StateDraining) + // wait for drain timeout, or stopping state, in case the Watcher does the transition itself promptly + select { + case <-time.NewTimer(w.drainTimeout).C: + level.Warn(w.logger).Log("msg", "Watcher drain timeout occurred, transitioning to Stopping") + case <-w.state.WaitForStopping(): + } +} + +// Stop stops the Watcher, shutting down the main routine. func (w *Watcher) Stop() { - // first close the quit channel to order main mainLoop routine to stop - close(w.quit) + w.state.Transition(internal.StateStopping) + // upon calling stop, wait for main mainLoop execution to stop <-w.done @@ -397,16 +430,6 @@ func (w *Watcher) findNextSegmentFor(index int) (int, error) { return -1, errors.New("failed to find segment for index") } -// isClosed checks in a non-blocking manner if a channel is closed or not. -func isClosed(c chan struct{}) bool { - select { - case <-c: - return true - default: - return false - } -} - // readSegmentNumbers reads the given directory and returns all segment identifiers, that is, the index of each segment // file. func readSegmentNumbers(dir string) ([]int, error) { diff --git a/component/common/loki/wal/watcher_test.go b/component/common/loki/wal/watcher_test.go index 97de19748dee..15644d740a28 100644 --- a/component/common/loki/wal/watcher_test.go +++ b/component/common/loki/wal/watcher_test.go @@ -3,6 +3,7 @@ package wal import ( "fmt" "os" + "strings" "testing" "time" @@ -12,6 +13,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/tsdb/record" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "github.com/grafana/agent/component/common/loki" "github.com/grafana/agent/component/common/loki/utils" @@ -568,3 +570,228 @@ func TestWatcher_Replay(t *testing.T) { writeTo.AssertContainsLines(t, segment2Lines...) }) } + +// slowWriteTo mimics the combination of a WriteTo and a slow remote write client. This will allow us to have a writer +// that moves faster than the WAL watcher, and therefore, test the draining procedure. +type slowWriteTo struct { + t *testing.T + entriesReceived atomic.Uint64 + sleepAfterAppendEntries time.Duration +} + +func (s *slowWriteTo) SeriesReset(segmentNum int) { +} + +func (s *slowWriteTo) StoreSeries(series []record.RefSeries, segmentNum int) { +} + +func (s *slowWriteTo) AppendEntries(entries wal.RefEntries, segmentNum int) error { + // only log on development debug flag + if debug { + var allLines strings.Builder + for _, e := range entries.Entries { + allLines.WriteString(e.Line) + allLines.WriteString("/") + } + s.t.Logf("AppendEntries called from segment %d - %s", segmentNum, allLines.String()) + } + + s.entriesReceived.Add(uint64(len(entries.Entries))) + time.Sleep(s.sleepAfterAppendEntries) + return nil +} + +func TestWatcher_StopAndDrainWAL(t *testing.T) { + labels := model.LabelSet{ + "app": "test", + } + logger := level.NewFilter(log.NewLogfmtLogger(os.Stdout), level.AllowDebug()) + + // newTestingResources is a helper for bootstrapping all required testing resources + newTestingResources := func(t *testing.T, cfg WatchConfig) (*slowWriteTo, *Watcher, WAL) { + reg := prometheus.NewRegistry() + dir := t.TempDir() + metrics := NewWatcherMetrics(reg) + + // the slow write to will take one second on each AppendEntries operation + writeTo := &slowWriteTo{ + t: t, + sleepAfterAppendEntries: time.Second, + } + + watcher := NewWatcher(dir, "test", metrics, writeTo, logger, cfg, mockMarker{ + LastMarkedSegmentFunc: func() int { + // Ignore marker to read from last segment, which is none + return -1 + }, + }) + + // start watcher, and burn through WAL as we write to it + watcher.Start() + + wl, err := New(Config{ + Enabled: true, + Dir: dir, + }, logger, reg) + require.NoError(t, err) + return writeTo, watcher, wl + } + + t.Run("watcher drains WAL just in time", func(t *testing.T) { + cfg := DefaultWatchConfig + // considering the slow write to has a 1 second delay when Appending an entry, and before the draining begins, + // the watcher would have consumed only 5 entries, this timeout will give the Watcher just enough time to fully + // drain the WAL. + cfg.DrainTimeout = time.Second * 16 + writeTo, watcher, wl := newTestingResources(t, cfg) + defer wl.Close() + + ew := newEntryWriter() + + // helper to add context to each written line + var lineCounter atomic.Int64 + writeNLines := func(t *testing.T, n int) { + for i := 0; i < n; i++ { + // First, write to segment 0. This will be the last "marked" segment + err := ew.WriteEntry(loki.Entry{ + Labels: labels, + Entry: logproto.Entry{ + Timestamp: time.Now(), + Line: fmt.Sprintf("test line %d", lineCounter.Load()), + }, + }, wl, logger) + lineCounter.Add(1) + require.NoError(t, err) + } + } + + // The test will write the WAL while the Watcher is running. First, 10 lines will be written to a segment, and the test + // will wait for the Watcher to have read 5 lines. After, a new segment will be cut, 10 other lines written, and the + // Watcher stopped with drain. The test will expect all 20 lines in total to have been received. + + writeNLines(t, 10) + + require.Eventually(t, func() bool { + return writeTo.entriesReceived.Load() >= 5 + }, time.Second*11, time.Millisecond*500, "expected the write to catch up to half of the first segment") + + _, err := wl.NextSegment() + require.NoError(t, err) + writeNLines(t, 10) + require.NoError(t, wl.Sync()) + + // Upon calling Stop drain, the Watcher should finish burning through segment 0, and also consume segment 1 + now := time.Now() + watcher.Drain() + watcher.Stop() + + // expecting 15s (missing 15 entries * 1 sec delay in AppendEntries) +/- 1.1s (taking into account the drain timeout + // has one extra second. + require.InDelta(t, time.Second*15, time.Since(now), float64(time.Millisecond*1100), "expected the drain procedure to take around 15s") + require.Equal(t, int(writeTo.entriesReceived.Load()), 20, "expected the watcher to fully drain the WAL") + }) + + t.Run("watcher should exit promptly after draining completely", func(t *testing.T) { + cfg := DefaultWatchConfig + // the drain timeout will be too long, for the amount of data remaining in the WAL (~15 entries more) + cfg.DrainTimeout = time.Second * 30 + writeTo, watcher, wl := newTestingResources(t, cfg) + defer wl.Close() + + ew := newEntryWriter() + + // helper to add context to each written line + var lineCounter atomic.Int64 + writeNLines := func(t *testing.T, n int) { + for i := 0; i < n; i++ { + // First, write to segment 0. This will be the last "marked" segment + err := ew.WriteEntry(loki.Entry{ + Labels: labels, + Entry: logproto.Entry{ + Timestamp: time.Now(), + Line: fmt.Sprintf("test line %d", lineCounter.Load()), + }, + }, wl, logger) + lineCounter.Add(1) + require.NoError(t, err) + } + } + + // The test will write the WAL while the Watcher is running. First, 10 lines will be written to a segment, and the test + // will wait for the Watcher to have read 5 lines. After, a new segment will be cut, 10 other lines written, and the + // Watcher stopped with drain. The test will expect all 20 lines in total to have been received. + + writeNLines(t, 10) + + require.Eventually(t, func() bool { + return writeTo.entriesReceived.Load() >= 5 + }, time.Second*11, time.Millisecond*500, "expected the write to catch up to half of the first segment") + + _, err := wl.NextSegment() + require.NoError(t, err) + writeNLines(t, 10) + require.NoError(t, wl.Sync()) + + // Upon calling Stop drain, the Watcher should finish burning through segment 0, and also consume segment 1 + now := time.Now() + watcher.Drain() + watcher.Stop() + + // expecting 15s (missing 15 entries * 1 sec delay in AppendEntries) +/- 1.1s (taking into account the drain timeout + // has one extra second. + require.InDelta(t, time.Second*15, time.Since(now), float64(time.Millisecond*1100), "expected the drain procedure to take around 15s") + require.Equal(t, int(writeTo.entriesReceived.Load()), 20, "expected the watcher to fully drain the WAL") + }) + + t.Run("watcher drain timeout too short, should exit promptly", func(t *testing.T) { + cfg := DefaultWatchConfig + // having a 10 seconds timeout should give the watcher enough time to only consume ~10 entries, and be missing ~5 + // from the last segment + cfg.DrainTimeout = time.Second * 10 + writeTo, watcher, wl := newTestingResources(t, cfg) + defer wl.Close() + + ew := newEntryWriter() + + // helper to add context to each written line + var lineCounter atomic.Int64 + writeNLines := func(t *testing.T, n int) { + for i := 0; i < n; i++ { + // First, write to segment 0. This will be the last "marked" segment + err := ew.WriteEntry(loki.Entry{ + Labels: labels, + Entry: logproto.Entry{ + Timestamp: time.Now(), + Line: fmt.Sprintf("test line %d", lineCounter.Load()), + }, + }, wl, logger) + lineCounter.Add(1) + require.NoError(t, err) + } + } + + // The test will write the WAL while the Watcher is running. First, 10 lines will be written to a segment, and the test + // will wait for the Watcher to have read 5 lines. After, a new segment will be cut, 10 other lines written, and the + // Watcher stopped with drain. The test will expect all 20 lines in total to have been received. + + writeNLines(t, 10) + + require.Eventually(t, func() bool { + return writeTo.entriesReceived.Load() >= 5 + }, time.Second*11, time.Millisecond*500, "expected the write to catch up to half of the first segment") + + _, err := wl.NextSegment() + require.NoError(t, err) + writeNLines(t, 10) + require.NoError(t, wl.Sync()) + + // Upon calling Stop drain, the Watcher should finish burning through segment 0, and also consume segment 1 + now := time.Now() + watcher.Drain() + watcher.Stop() + + require.InDelta(t, time.Second*10, time.Since(now), float64(time.Millisecond*1100), "expected the drain procedure to take around 15s") + require.Less(t, int(writeTo.entriesReceived.Load()), 20, "expected watcher to have not consumed WAL fully") + require.InDelta(t, 15, int(writeTo.entriesReceived.Load()), 1.0, "expected Watcher to consume at most +/- 1 entry from the WAL") + }) +} diff --git a/component/loki/write/types.go b/component/loki/write/types.go index 2959f1b681a1..dc240c675e98 100644 --- a/component/loki/write/types.go +++ b/component/loki/write/types.go @@ -82,7 +82,7 @@ type QueueConfig struct { func (q *QueueConfig) SetToDefault() { *q = QueueConfig{ Capacity: 10 * units.MiB, // considering the default BatchSize of 1MiB, this gives us a default buffered channel of size 10 - DrainTimeout: time.Minute, + DrainTimeout: 15 * time.Second, } } diff --git a/component/loki/write/write.go b/component/loki/write/write.go index efda7b597c6f..a31cb0745976 100644 --- a/component/loki/write/write.go +++ b/component/loki/write/write.go @@ -41,6 +41,7 @@ type WalArguments struct { MaxSegmentAge time.Duration `river:"max_segment_age,attr,optional"` MinReadFrequency time.Duration `river:"min_read_frequency,attr,optional"` MaxReadFrequency time.Duration `river:"max_read_frequency,attr,optional"` + DrainTimeout time.Duration `river:"drain_timeout,attr,optional"` } func (wa *WalArguments) Validate() error { @@ -58,6 +59,7 @@ func (wa *WalArguments) SetToDefault() { MaxSegmentAge: wal.DefaultMaxSegmentAge, MinReadFrequency: wal.DefaultWatchConfig.MinReadFrequency, MaxReadFrequency: wal.DefaultWatchConfig.MaxReadFrequency, + DrainTimeout: wal.DefaultWatchConfig.DrainTimeout, } } @@ -81,7 +83,7 @@ type Component struct { receiver loki.LogsReceiver // remote write components - clientManger client.Client + clientManger *client.Manager walWriter *wal.Writer // sink is the place where log entries received by this component should be written to. If WAL @@ -111,6 +113,18 @@ func New(o component.Options, args Arguments) (*Component, error) { // Run implements component.Component. func (c *Component) Run(ctx context.Context) error { + defer func() { + // when exiting Run, proceed to shut down first the writer component, and then + // the client manager, with the WAL and remote-write client inside + if c.walWriter != nil { + c.walWriter.Stop() + } + if c.clientManger != nil { + // drain, since the component is shutting down. That means the agent is shutting down as well + c.clientManger.StopWithDrain(true) + } + }() + for { select { case <-ctx.Done(): @@ -140,6 +154,7 @@ func (c *Component) Update(args component.Arguments) error { c.walWriter.Stop() } if c.clientManger != nil { + // only drain on component shutdown c.clientManger.Stop() } @@ -150,6 +165,7 @@ func (c *Component) Update(args component.Arguments) error { WatchConfig: wal.WatchConfig{ MinReadFrequency: newArgs.WAL.MinReadFrequency, MaxReadFrequency: newArgs.WAL.MaxReadFrequency, + DrainTimeout: newArgs.WAL.DrainTimeout, }, } diff --git a/component/loki/write/write_test.go b/component/loki/write/write_test.go index 642b53703e0c..d77bebe21c0f 100644 --- a/component/loki/write/write_test.go +++ b/component/loki/write/write_test.go @@ -79,6 +79,7 @@ func TestUnmarshallWalAttrributes(t *testing.T) { MaxSegmentAge: wal.DefaultMaxSegmentAge, MinReadFrequency: wal.DefaultWatchConfig.MinReadFrequency, MaxReadFrequency: wal.DefaultWatchConfig.MaxReadFrequency, + DrainTimeout: wal.DefaultWatchConfig.DrainTimeout, }, }, "wal enabled with defaults": { @@ -90,6 +91,7 @@ func TestUnmarshallWalAttrributes(t *testing.T) { MaxSegmentAge: wal.DefaultMaxSegmentAge, MinReadFrequency: wal.DefaultWatchConfig.MinReadFrequency, MaxReadFrequency: wal.DefaultWatchConfig.MaxReadFrequency, + DrainTimeout: wal.DefaultWatchConfig.DrainTimeout, }, }, "wal enabled with some overrides": { @@ -97,12 +99,14 @@ func TestUnmarshallWalAttrributes(t *testing.T) { enabled = true max_segment_age = "10m" min_read_frequency = "11ms" + drain_timeout = "5m" `, expected: WalArguments{ Enabled: true, MaxSegmentAge: time.Minute * 10, MinReadFrequency: time.Millisecond * 11, MaxReadFrequency: wal.DefaultWatchConfig.MaxReadFrequency, + DrainTimeout: time.Minute * 5, }, }, } { diff --git a/component/otelcol/connector/servicegraph/servicegraph.go b/component/otelcol/connector/servicegraph/servicegraph.go index e5370d89f620..c1713cca5ad5 100644 --- a/component/otelcol/connector/servicegraph/servicegraph.go +++ b/component/otelcol/connector/servicegraph/servicegraph.go @@ -91,7 +91,7 @@ var DefaultArguments = Arguments{ Dimensions: []string{}, Store: StoreConfig{ MaxItems: 1000, - TTL: 2 * time.Millisecond, + TTL: 2 * time.Second, }, CacheLoop: 1 * time.Minute, StoreExpirationLoop: 2 * time.Second, diff --git a/component/otelcol/connector/servicegraph/servicegraph_test.go b/component/otelcol/connector/servicegraph/servicegraph_test.go index 5f7204b2bb6b..952ac8fc06d5 100644 --- a/component/otelcol/connector/servicegraph/servicegraph_test.go +++ b/component/otelcol/connector/servicegraph/servicegraph_test.go @@ -44,7 +44,7 @@ func TestArguments_UnmarshalRiver(t *testing.T) { Dimensions: []string{}, Store: servicegraphprocessor.StoreConfig{ MaxItems: 1000, - TTL: 2 * time.Millisecond, + TTL: 2 * time.Second, }, CacheLoop: 1 * time.Minute, StoreExpirationLoop: 2 * time.Second, diff --git a/component/otelcol/exporter/prometheus/internal/convert/cache.go b/component/otelcol/exporter/prometheus/internal/convert/cache.go index ac0dc12087f1..3401e87e7f8d 100644 --- a/component/otelcol/exporter/prometheus/internal/convert/cache.go +++ b/component/otelcol/exporter/prometheus/internal/convert/cache.go @@ -6,6 +6,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" "github.com/prometheus/prometheus/model/timestamp" @@ -117,6 +118,17 @@ func (series *memorySeries) WriteExemplarsTo(app storage.Appender, e exemplar.Ex return nil } +func (series *memorySeries) WriteNativeHistogramTo(app storage.Appender, ts time.Time, h *histogram.Histogram, fh *histogram.FloatHistogram) error { + series.Lock() + defer series.Unlock() + + if _, err := app.AppendHistogram(series.id, series.labels, timestamp.FromTime(ts), h, fh); err != nil { + return err + } + + return nil +} + type memoryMetadata struct { sync.Mutex diff --git a/component/otelcol/exporter/prometheus/internal/convert/convert.go b/component/otelcol/exporter/prometheus/internal/convert/convert.go index 0a7039e7195e..fc0f2287de01 100644 --- a/component/otelcol/exporter/prometheus/internal/convert/convert.go +++ b/component/otelcol/exporter/prometheus/internal/convert/convert.go @@ -287,6 +287,8 @@ func (conv *Converter) consumeMetric(app storage.Appender, memResource *memorySe conv.consumeHistogram(app, memResource, memScope, m, resAttrs) case pmetric.MetricTypeSummary: conv.consumeSummary(app, memResource, memScope, m, resAttrs) + case pmetric.MetricTypeExponentialHistogram: + conv.consumeExponentialHistogram(app, memResource, memScope, m, resAttrs) } } @@ -306,7 +308,7 @@ func (conv *Converter) consumeGauge(app storage.Appender, memResource *memorySer Help: m.Description(), }) if err := metricMD.WriteTo(app, time.Now()); err != nil { - level.Warn(conv.log).Log("msg", "failed to write metric family metadata, metric name", metricName, "err", err) + level.Warn(conv.log).Log("msg", "failed to write metric family metadata", "metric name", metricName, "err", err) } for dpcount := 0; dpcount < m.Gauge().DataPoints().Len(); dpcount++ { @@ -438,7 +440,7 @@ func (conv *Converter) consumeSum(app storage.Appender, memResource *memorySerie Help: m.Description(), }) if err := metricMD.WriteTo(app, time.Now()); err != nil { - level.Warn(conv.log).Log("msg", "failed to write metric family metadata, metric name", metricName, "err", err) + level.Warn(conv.log).Log("msg", "failed to write metric family metadata", "metric name", metricName, "err", err) } for dpcount := 0; dpcount < m.Sum().DataPoints().Len(); dpcount++ { @@ -481,7 +483,7 @@ func (conv *Converter) consumeHistogram(app storage.Appender, memResource *memor Help: m.Description(), }) if err := metricMD.WriteTo(app, time.Now()); err != nil { - level.Warn(conv.log).Log("msg", "failed to write metric family metadata, metric name", metricName, "err", err) + level.Warn(conv.log).Log("msg", "failed to write metric family metadata", "metric name", metricName, "err", err) } for dpcount := 0; dpcount < m.Histogram().DataPoints().Len(); dpcount++ { @@ -497,7 +499,7 @@ func (conv *Converter) consumeHistogram(app storage.Appender, memResource *memor sumMetricVal := dp.Sum() if err := writeSeries(app, sumMetric, dp, sumMetricVal); err != nil { - level.Error(conv.log).Log("msg", "failed to write histogram sum sample, metric name", metricName, "err", err) + level.Error(conv.log).Log("msg", "failed to write histogram sum sample", "metric name", metricName, "err", err) } } @@ -507,7 +509,7 @@ func (conv *Converter) consumeHistogram(app storage.Appender, memResource *memor countMetricVal := float64(dp.Count()) if err := writeSeries(app, countMetric, dp, countMetricVal); err != nil { - level.Error(conv.log).Log("msg", "failed to write histogram count sample, metric name", metricName, "err", err) + level.Error(conv.log).Log("msg", "failed to write histogram count sample", "metric name", metricName, "err", err) } } @@ -562,13 +564,13 @@ func (conv *Converter) consumeHistogram(app storage.Appender, memResource *memor bucketVal := float64(count) if err := writeSeries(app, bucket, dp, bucketVal); err != nil { - level.Error(conv.log).Log("msg", "failed to write histogram bucket sample, metric name", metricName, "bucket", bucketLabel.Value, "err", err) + level.Error(conv.log).Log("msg", "failed to write histogram bucket sample", "metric name", metricName, "bucket", bucketLabel.Value, "err", err) } for ; exemplarInd < len(exemplars); exemplarInd++ { if exemplars[exemplarInd].DoubleValue() < bound { if err := conv.writeExemplar(app, bucket, exemplars[exemplarInd]); err != nil { - level.Error(conv.log).Log("msg", "failed to add exemplar, metric name", metricName, "bucket", bucketLabel.Value, "err", err) + level.Error(conv.log).Log("msg", "failed to add exemplar", "metric name", metricName, "bucket", bucketLabel.Value, "err", err) } } else { break @@ -588,19 +590,72 @@ func (conv *Converter) consumeHistogram(app storage.Appender, memResource *memor infBucketVal := float64(dp.Count()) if err := writeSeries(app, infBucket, dp, infBucketVal); err != nil { - level.Error(conv.log).Log("msg", "failed to write histogram bucket sample, metric name", metricName, "bucket", bucketLabel.Value, "err", err) + level.Error(conv.log).Log("msg", "failed to write histogram bucket sample", "metric name", metricName, "bucket", bucketLabel.Value, "err", err) } // Add remaining exemplars. for ; exemplarInd < len(exemplars); exemplarInd++ { if err := conv.writeExemplar(app, infBucket, exemplars[exemplarInd]); err != nil { - level.Error(conv.log).Log("msg", "failed to add exemplar, metric name", metricName, "bucket", bucketLabel.Value, "err", err) + level.Error(conv.log).Log("msg", "failed to add exemplar", "metric name", metricName, "bucket", bucketLabel.Value, "err", err) } } } } } +func (conv *Converter) consumeExponentialHistogram(app storage.Appender, memResource *memorySeries, memScope *memorySeries, m pmetric.Metric, resAttrs pcommon.Map) { + metricName := prometheus.BuildCompliantName(m, "", conv.opts.AddMetricSuffixes) + + if m.ExponentialHistogram().AggregationTemporality() != pmetric.AggregationTemporalityCumulative { + // Drop non-cumulative histograms for now, which is permitted by the spec. + return + } + + metricMD := conv.createOrUpdateMetadata(metricName, metadata.Metadata{ + Type: textparse.MetricTypeHistogram, + Unit: m.Unit(), + Help: m.Description(), + }) + if err := metricMD.WriteTo(app, time.Now()); err != nil { + level.Warn(conv.log).Log("msg", "failed to write metric family metadata", "metric name", metricName, "err", err) + } + + for dpcount := 0; dpcount < m.ExponentialHistogram().DataPoints().Len(); dpcount++ { + dp := m.ExponentialHistogram().DataPoints().At(dpcount) + + if conv.getOpts().ResourceToTelemetryConversion { + joinAttributeMaps(resAttrs, dp.Attributes()) + } + + memSeries := conv.getOrCreateSeries(memResource, memScope, metricName, dp.Attributes()) + + ts := dp.Timestamp().AsTime() + if ts.Before(memSeries.Timestamp()) { + // Out-of-order; skip. + continue + } + memSeries.SetTimestamp(ts) + + promHistogram, err := exponentialToNativeHistogram(dp) + + if err != nil { + level.Error(conv.log).Log("msg", "failed to convert exponential histogram to native histogram", "metric name", metricName, "err", err) + continue + } + + if err := memSeries.WriteNativeHistogramTo(app, ts, &promHistogram, nil); err != nil { + level.Error(conv.log).Log("msg", "failed to write native histogram", "metric name", metricName, "err", err) + continue + } + + for i := 0; i < dp.Exemplars().Len(); i++ { + if err := conv.writeExemplar(app, memSeries, dp.Exemplars().At(i)); err != nil { + level.Error(conv.log).Log("msg", "failed to add exemplar", "metric name", metricName, "err", err) + } + } + } +} + // Convert Otel Exemplar to Prometheus Exemplar. func (conv *Converter) convertExemplar(otelExemplar pmetric.Exemplar, ts time.Time) exemplar.Exemplar { exemplarLabels := make(labels.Labels, 0) @@ -637,7 +692,7 @@ func (conv *Converter) consumeSummary(app storage.Appender, memResource *memoryS Help: m.Description(), }) if err := metricMD.WriteTo(app, time.Now()); err != nil { - level.Warn(conv.log).Log("msg", "failed to write metric family metadata, metric name", metricName, "err", err) + level.Warn(conv.log).Log("msg", "failed to write metric family metadata", "metric name", metricName, "err", err) } for dpcount := 0; dpcount < m.Summary().DataPoints().Len(); dpcount++ { @@ -653,7 +708,7 @@ func (conv *Converter) consumeSummary(app storage.Appender, memResource *memoryS sumMetricVal := dp.Sum() if err := writeSeries(app, sumMetric, dp, sumMetricVal); err != nil { - level.Error(conv.log).Log("msg", "failed to write summary sum sample, metric name", metricName, "err", err) + level.Error(conv.log).Log("msg", "failed to write summary sum sample", "metric name", metricName, "err", err) } } @@ -663,7 +718,7 @@ func (conv *Converter) consumeSummary(app storage.Appender, memResource *memoryS countMetricVal := float64(dp.Count()) if err := writeSeries(app, countMetric, dp, countMetricVal); err != nil { - level.Error(conv.log).Log("msg", "failed to write histogram count sample, metric name", metricName, "err", err) + level.Error(conv.log).Log("msg", "failed to write histogram count sample", "metric name", metricName, "err", err) } } @@ -680,7 +735,7 @@ func (conv *Converter) consumeSummary(app storage.Appender, memResource *memoryS quantileVal := qp.Value() if err := writeSeries(app, quantile, dp, quantileVal); err != nil { - level.Error(conv.log).Log("msg", "failed to write histogram quantile sample, metric name", metricName, "quantile", quantileLabel.Value, "err", err) + level.Error(conv.log).Log("msg", "failed to write histogram quantile sample", "metric name", metricName, "quantile", quantileLabel.Value, "err", err) } } } diff --git a/component/otelcol/exporter/prometheus/internal/convert/convert_test.go b/component/otelcol/exporter/prometheus/internal/convert/convert_test.go index dcace6574b6c..928e3fc2f813 100644 --- a/component/otelcol/exporter/prometheus/internal/convert/convert_test.go +++ b/component/otelcol/exporter/prometheus/internal/convert/convert_test.go @@ -2,6 +2,7 @@ package convert_test import ( "context" + "encoding/json" "testing" "github.com/grafana/agent/component/otelcol/exporter/prometheus/internal/convert" @@ -1137,6 +1138,201 @@ func TestConverter(t *testing.T) { } } +// Exponential histograms don't have a text format representation. +// In this test we are comparing the JSON format. +func TestConverterExponentialHistograms(t *testing.T) { + tt := []struct { + name string + input string + expect string + }{ + { + name: "Exponential Histogram", + input: `{ + "resource_metrics": [{ + "scope_metrics": [{ + "metrics": [{ + "name": "test_exponential_histogram", + "exponential_histogram": { + "aggregation_temporality": 2, + "data_points": [{ + "start_time_unix_nano": 1000000000, + "time_unix_nano": 1000000000, + "scale": 0, + "count": 11, + "sum": 158.63, + "positive": { + "offset": -1, + "bucket_counts": [2, 1, 3, 2, 0, 0, 3] + }, + "exemplars":[ + { + "time_unix_nano": 1000000001, + "as_double": 3.0, + "span_id": "aaaaaaaaaaaaaaaa", + "trace_id": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + }, + { + "time_unix_nano": 1000000003, + "as_double": 1.0, + "span_id": "cccccccccccccccc", + "trace_id": "cccccccccccccccccccccccccccccccc" + }, + { + "time_unix_nano": 1000000002, + "as_double": 1.5, + "span_id": "bbbbbbbbbbbbbbbb", + "trace_id": "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb" + } + ] + }] + } + }] + }] + }] + }`, + // The tests only allow one exemplar/series because it uses a map[series]exemplar as storage. Therefore only the exemplar "bbbbbbbbbbbbbbbb" is stored. + expect: `{ + "bucket": [ + { + "exemplar": { + "label": [ + { + "name": "span_id", + "value": "bbbbbbbbbbbbbbbb" + }, + { + "name": "trace_id", + "value": "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb" + } + ], + "value": 1.5 + } + } + ], + "positive_delta": [2, -1, 2, -1, -2, 0, 3], + "positive_span": [ + { + "length": 7, + "offset": 0 + } + ], + "sample_count": 11, + "sample_sum": 158.63, + "schema": 0, + "zero_count": 0, + "zero_threshold": 1e-128 + }`, + }, + { + name: "Exponential Histogram 2", + input: `{ + "resource_metrics": [{ + "scope_metrics": [{ + "metrics": [{ + "name": "test_exponential_histogram_2", + "exponential_histogram": { + "aggregation_temporality": 2, + "data_points": [{ + "start_time_unix_nano": 1000000000, + "time_unix_nano": 1000000000, + "scale": 2, + "count": 19, + "sum": 200, + "zero_count" : 5, + "zero_threshold": 0.1, + "positive": { + "offset": 3, + "bucket_counts": [0, 0, 0, 0, 2, 1, 1, 0, 3, 0, 0] + }, + "negative": { + "offset": 0, + "bucket_counts": [0, 4, 0, 2, 3, 0, 0, 3] + }, + "exemplars":[ + { + "time_unix_nano": 1000000001, + "as_double": 3.0, + "span_id": "aaaaaaaaaaaaaaaa", + "trace_id": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + } + ] + }] + } + }] + }] + }] + }`, + // zero_threshold is set to 1e-128 because dp.ZeroThreshold() is not yet available. + expect: `{ + "bucket": [ + { + "exemplar": { + "label": [ + { + "name": "span_id", + "value": "aaaaaaaaaaaaaaaa" + }, + { + "name": "trace_id", + "value": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + } + ], + "value": 3 + } + } + ], + "negative_delta": [0, 4, -4, 2, 1, -3, 0, 3], + "negative_span": [ + { + "length": 8, + "offset": 1 + } + ], + "positive_delta": [2, -1, 0, -1, 3, -3, 0], + "positive_span": [ + { + "length": 0, + "offset": 4 + }, + { + "length": 7, + "offset": 4 + } + ], + "sample_count": 19, + "sample_sum": 200, + "schema": 2, + "zero_count": 5, + "zero_threshold": 1e-128 + }`, + }, + } + decoder := &pmetric.JSONUnmarshaler{} + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + payload, err := decoder.UnmarshalMetrics([]byte(tc.input)) + require.NoError(t, err) + + var app testappender.Appender + l := util.TestLogger(t) + conv := convert.New(l, appenderAppendable{Inner: &app}, convert.Options{}) + require.NoError(t, conv.ConsumeMetrics(context.Background(), payload)) + + families, err := app.MetricFamilies() + require.NoError(t, err) + + require.NotEmpty(t, families) + require.NotNil(t, families[0]) + require.NotEmpty(t, families[0].Metric) + require.NotNil(t, families[0].Metric[0].Histogram) + histJsonRep, err := json.Marshal(families[0].Metric[0].Histogram) + require.NoError(t, err) + require.JSONEq(t, string(histJsonRep), tc.expect) + }) + } +} + // appenderAppendable always returns the same Appender. type appenderAppendable struct { Inner storage.Appender diff --git a/component/otelcol/exporter/prometheus/internal/convert/histograms.go b/component/otelcol/exporter/prometheus/internal/convert/histograms.go new file mode 100644 index 000000000000..168eae1844fd --- /dev/null +++ b/component/otelcol/exporter/prometheus/internal/convert/histograms.go @@ -0,0 +1,152 @@ +// THIS CODE IS COPIED AND ADAPTED FROM opentelemetry-contrib (https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/cfeecd887979e6f372b4a370c4562da92a2baf34/pkg/translator/prometheusremotewrite/histograms.go) +// see https://www.youtube.com/watch?v=W2_TpDcess8 for more information on the conversion + +package convert + +import ( + "fmt" + "math" + + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/value" + "go.opentelemetry.io/collector/pdata/pmetric" +) + +const defaultZeroThreshold = 1e-128 + +// exponentialToNativeHistogram translates OTel Exponential Histogram data point +// to Prometheus Native Histogram. +func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint) (histogram.Histogram, error) { + scale := p.Scale() + if scale < -4 { + return histogram.Histogram{}, + fmt.Errorf("cannot convert exponential to native histogram."+ + " Scale must be >= -4, was %d", scale) + } + + var scaleDown int32 + if scale > 8 { + scaleDown = scale - 8 + scale = 8 + } + + pSpans, pDeltas := convertBucketsLayout(p.Positive(), scaleDown) + nSpans, nDeltas := convertBucketsLayout(p.Negative(), scaleDown) + + h := histogram.Histogram{ + Schema: scale, + + ZeroCount: p.ZeroCount(), + // TODO use zero_threshold, if set, see + // https://github.com/open-telemetry/opentelemetry-proto/pull/441 + ZeroThreshold: defaultZeroThreshold, + + PositiveSpans: pSpans, + PositiveBuckets: pDeltas, + NegativeSpans: nSpans, + NegativeBuckets: nDeltas, + } + + if p.Flags().NoRecordedValue() { + h.Sum = math.Float64frombits(value.StaleNaN) + h.Count = value.StaleNaN + } else { + if p.HasSum() { + h.Sum = p.Sum() + } + h.Count = p.Count() + } + return h, nil +} + +// convertBucketsLayout translates OTel Exponential Histogram dense buckets +// representation to Prometheus Native Histogram sparse bucket representation. +// +// The translation logic is taken from the client_golang `histogram.go#makeBuckets` +// function, see `makeBuckets` https://github.com/prometheus/client_golang/blob/main/prometheus/histogram.go +// The bucket indexes conversion was adjusted, since OTel exp. histogram bucket +// index 0 corresponds to the range (1, base] while Prometheus bucket index 0 +// to the range (base 1]. +func convertBucketsLayout(buckets pmetric.ExponentialHistogramDataPointBuckets, scaleDown int32) ([]histogram.Span, []int64) { + bucketCounts := buckets.BucketCounts() + if bucketCounts.Len() == 0 { + return nil, nil + } + + var ( + spans []histogram.Span + deltas []int64 + count int64 + prevCount int64 + ) + + appendDelta := func(count int64) { + spans[len(spans)-1].Length++ + deltas = append(deltas, count-prevCount) + prevCount = count + } + + // Let the compiler figure out that this is const during this function by + // moving it into a local variable. + numBuckets := bucketCounts.Len() + + // The offset is scaled and adjusted by 1 as described above. + bucketIdx := buckets.Offset()>>scaleDown + 1 + spans = append(spans, histogram.Span{ + Offset: bucketIdx, + Length: 0, + }) + + for i := 0; i < numBuckets; i++ { + // The offset is scaled and adjusted by 1 as described above. + nextBucketIdx := (int32(i)+buckets.Offset())>>scaleDown + 1 + if bucketIdx == nextBucketIdx { // We have not collected enough buckets to merge yet. + count += int64(bucketCounts.At(i)) + continue + } + if count == 0 { + count = int64(bucketCounts.At(i)) + continue + } + + gap := nextBucketIdx - bucketIdx - 1 + if gap > 2 { + // We have to create a new span, because we have found a gap + // of more than two buckets. The constant 2 is copied from the logic in + // https://github.com/prometheus/client_golang/blob/27f0506d6ebbb117b6b697d0552ee5be2502c5f2/prometheus/histogram.go#L1296 + spans = append(spans, histogram.Span{ + Offset: gap, + Length: 0, + }) + } else { + // We have found a small gap (or no gap at all). + // Insert empty buckets as needed. + for j := int32(0); j < gap; j++ { + appendDelta(0) + } + } + appendDelta(count) + count = int64(bucketCounts.At(i)) + bucketIdx = nextBucketIdx + } + // Need to use the last item's index. The offset is scaled and adjusted by 1 as described above. + gap := (int32(numBuckets)+buckets.Offset()-1)>>scaleDown + 1 - bucketIdx + if gap > 2 { + // We have to create a new span, because we have found a gap + // of more than two buckets. The constant 2 is copied from the logic in + // https://github.com/prometheus/client_golang/blob/27f0506d6ebbb117b6b697d0552ee5be2502c5f2/prometheus/histogram.go#L1296 + spans = append(spans, histogram.Span{ + Offset: gap, + Length: 0, + }) + } else { + // We have found a small gap (or no gap at all). + // Insert empty buckets as needed. + for j := int32(0); j < gap; j++ { + appendDelta(0) + } + } + appendDelta(count) + + return spans, deltas +} diff --git a/component/prometheus/exporter/blackbox/blackbox.go b/component/prometheus/exporter/blackbox/blackbox.go index 0388560443a8..62c3981e82a0 100644 --- a/component/prometheus/exporter/blackbox/blackbox.go +++ b/component/prometheus/exporter/blackbox/blackbox.go @@ -67,7 +67,7 @@ var DefaultArguments = Arguments{ // BlackboxTarget defines a target to be used by the exporter. type BlackboxTarget struct { - Name string `river:",label"` + Name string `river:"name,attr"` Target string `river:"address,attr"` Module string `river:"module,attr,optional"` Labels map[string]string `river:"labels,attr,optional"` diff --git a/component/prometheus/exporter/blackbox/blackbox_test.go b/component/prometheus/exporter/blackbox/blackbox_test.go index 6a4e9dab0d15..3016935ee0c2 100644 --- a/component/prometheus/exporter/blackbox/blackbox_test.go +++ b/component/prometheus/exporter/blackbox/blackbox_test.go @@ -16,11 +16,13 @@ import ( func TestUnmarshalRiver(t *testing.T) { riverCfg := ` config_file = "modules.yml" - target "target_a" { + target { + name = "target_a" address = "http://example.com" module = "http_2xx" } - target "target_b" { + target { + name = "target-b" address = "http://grafana.com" module = "http_2xx" } @@ -35,7 +37,7 @@ func TestUnmarshalRiver(t *testing.T) { require.Contains(t, "target_a", args.Targets[0].Name) require.Contains(t, "http://example.com", args.Targets[0].Target) require.Contains(t, "http_2xx", args.Targets[0].Module) - require.Contains(t, "target_b", args.Targets[1].Name) + require.Contains(t, "target-b", args.Targets[1].Name) require.Contains(t, "http://grafana.com", args.Targets[1].Target) require.Contains(t, "http_2xx", args.Targets[1].Module) } @@ -44,11 +46,13 @@ func TestUnmarshalRiverWithInlineConfig(t *testing.T) { riverCfg := ` config = "{ modules: { http_2xx: { prober: http, timeout: 5s } } }" - target "target_a" { + target { + name = "target_a" address = "http://example.com" module = "http_2xx" } - target "target_b" { + target { + name = "target-b" address = "http://grafana.com" module = "http_2xx" } @@ -68,7 +72,7 @@ func TestUnmarshalRiverWithInlineConfig(t *testing.T) { require.Contains(t, "target_a", args.Targets[0].Name) require.Contains(t, "http://example.com", args.Targets[0].Target) require.Contains(t, "http_2xx", args.Targets[0].Module) - require.Contains(t, "target_b", args.Targets[1].Name) + require.Contains(t, "target-b", args.Targets[1].Name) require.Contains(t, "http://grafana.com", args.Targets[1].Target) require.Contains(t, "http_2xx", args.Targets[1].Module) } @@ -77,11 +81,13 @@ func TestUnmarshalRiverWithInlineConfigYaml(t *testing.T) { riverCfg := ` config = "modules:\n http_2xx:\n prober: http\n timeout: 5s\n" - target "target_a" { + target { + name = "target_a" address = "http://example.com" module = "http_2xx" } - target "target_b" { + target { + name = "target-b" address = "http://grafana.com" module = "http_2xx" } @@ -101,7 +107,7 @@ func TestUnmarshalRiverWithInlineConfigYaml(t *testing.T) { require.Contains(t, "target_a", args.Targets[0].Name) require.Contains(t, "http://example.com", args.Targets[0].Target) require.Contains(t, "http_2xx", args.Targets[0].Module) - require.Contains(t, "target_b", args.Targets[1].Name) + require.Contains(t, "target-b", args.Targets[1].Name) require.Contains(t, "http://grafana.com", args.Targets[1].Target) require.Contains(t, "http_2xx", args.Targets[1].Module) } @@ -117,7 +123,8 @@ func TestUnmarshalRiverWithInvalidConfig(t *testing.T) { ` config = "{ modules: { http_2xx: { prober: http, timeout: 5s }" - target "target_a" { + target { + name = "target_a" address = "http://example.com" module = "http_2xx" } @@ -129,7 +136,8 @@ func TestUnmarshalRiverWithInvalidConfig(t *testing.T) { ` config = "{ module: { http_2xx: { prober: http, timeout: 5s } } }" - target "target_a" { + target { + name = "target_a" address = "http://example.com" module = "http_2xx" } @@ -142,7 +150,8 @@ func TestUnmarshalRiverWithInvalidConfig(t *testing.T) { config_file = "config" config = "{ modules: { http_2xx: { prober: http, timeout: 5s } } }" - target "target_a" { + target { + name = "target-a" address = "http://example.com" module = "http_2xx" } @@ -152,13 +161,24 @@ func TestUnmarshalRiverWithInvalidConfig(t *testing.T) { { "Define neither config nor config_file", ` - target "target_a" { + target { + name = "target-a" address = "http://example.com" module = "http_2xx" } `, `config or config_file must be set`, }, + { + "Specify label for target block instead of name attribute", + ` + target "target_a" { + address = "http://example.com" + module = "http_2xx" + } + `, + `2:4: block "target" does not support specifying labels`, + }, } for _, tt := range tests { t.Run(tt.testname, func(t *testing.T) { diff --git a/converter/internal/staticconvert/internal/build/app_agent_receiver.go b/converter/internal/staticconvert/internal/build/app_agent_receiver.go new file mode 100644 index 000000000000..d9bc0267c030 --- /dev/null +++ b/converter/internal/staticconvert/internal/build/app_agent_receiver.go @@ -0,0 +1,84 @@ +package build + +import ( + "fmt" + + "github.com/alecthomas/units" + "github.com/grafana/agent/component/common/loki" + "github.com/grafana/agent/component/faro/receiver" + "github.com/grafana/agent/component/otelcol" + "github.com/grafana/agent/converter/diag" + "github.com/grafana/agent/converter/internal/common" + app_agent_receiver_v2 "github.com/grafana/agent/pkg/integrations/v2/app_agent_receiver" + "github.com/grafana/river/rivertypes" + "github.com/grafana/river/scanner" +) + +func (b *IntegrationsConfigBuilder) appendAppAgentReceiverV2(config *app_agent_receiver_v2.Config) { + args := toAppAgentReceiverV2(config) + + compLabel, err := scanner.SanitizeIdentifier(b.formatJobName(config.Name(), nil)) + if err != nil { + b.diags.Add(diag.SeverityLevelCritical, fmt.Sprintf("failed to sanitize job name: %s", err)) + } + + b.f.Body().AppendBlock(common.NewBlockWithOverride( + []string{"faro", "receiver"}, + compLabel, + args, + )) +} + +func toAppAgentReceiverV2(config *app_agent_receiver_v2.Config) *receiver.Arguments { + var logLabels map[string]string + if config.LogsLabels != nil { + logLabels = config.LogsLabels + } + + logsReceiver := common.ConvertLogsReceiver{} + if config.LogsInstance != "" { + compLabel, err := scanner.SanitizeIdentifier("logs_" + config.LogsInstance) + if err != nil { + panic(fmt.Errorf("failed to sanitize job name: %s", err)) + } + + logsReceiver.Expr = fmt.Sprintf("loki.write.%s.receiver", compLabel) + } + + return &receiver.Arguments{ + LogLabels: logLabels, + Server: receiver.ServerArguments{ + Host: config.Server.Host, + Port: config.Server.Port, + CORSAllowedOrigins: config.Server.CORSAllowedOrigins, + APIKey: rivertypes.Secret(config.Server.APIKey), + MaxAllowedPayloadSize: units.Base2Bytes(config.Server.MaxAllowedPayloadSize), + RateLimiting: receiver.RateLimitingArguments{ + Enabled: config.Server.RateLimiting.Enabled, + Rate: config.Server.RateLimiting.RPS, + BurstSize: float64(config.Server.RateLimiting.Burstiness), + }, + }, + SourceMaps: receiver.SourceMapsArguments{ + Download: config.SourceMaps.Download, + DownloadFromOrigins: config.SourceMaps.DownloadFromOrigins, + DownloadTimeout: config.SourceMaps.DownloadTimeout, + Locations: toLocationArguments(config.SourceMaps.FileSystem), + }, + Output: receiver.OutputArguments{ + Logs: []loki.LogsReceiver{logsReceiver}, + Traces: []otelcol.Consumer{}, + }, + } +} + +func toLocationArguments(locations []app_agent_receiver_v2.SourceMapFileLocation) []receiver.LocationArguments { + args := make([]receiver.LocationArguments, len(locations)) + for i, location := range locations { + args[i] = receiver.LocationArguments{ + Path: location.Path, + MinifiedPathPrefix: location.MinifiedPathPrefix, + } + } + return args +} diff --git a/converter/internal/staticconvert/internal/build/blackbox_exporter.go b/converter/internal/staticconvert/internal/build/blackbox_exporter.go index 5282f9440b34..70007319ae1b 100644 --- a/converter/internal/staticconvert/internal/build/blackbox_exporter.go +++ b/converter/internal/staticconvert/internal/build/blackbox_exporter.go @@ -5,7 +5,6 @@ import ( "github.com/grafana/agent/component/discovery" "github.com/grafana/agent/component/prometheus/exporter/blackbox" - "github.com/grafana/agent/converter/internal/common" "github.com/grafana/agent/pkg/integrations/blackbox_exporter" blackbox_exporter_v2 "github.com/grafana/agent/pkg/integrations/v2/blackbox_exporter" "github.com/grafana/river/rivertypes" @@ -57,7 +56,7 @@ func toBlackboxTargets(blackboxTargets []blackbox_exporter.BlackboxTarget) black func toBlackboxTarget(target blackbox_exporter.BlackboxTarget) blackbox.BlackboxTarget { return blackbox.BlackboxTarget{ - Name: common.SanitizeIdentifierPanics(target.Name), + Name: target.Name, Target: target.Target, Module: target.Module, } diff --git a/converter/internal/staticconvert/internal/build/builder.go b/converter/internal/staticconvert/internal/build/builder.go index 0053f545f5af..6528a533bc8c 100644 --- a/converter/internal/staticconvert/internal/build/builder.go +++ b/converter/internal/staticconvert/internal/build/builder.go @@ -39,10 +39,12 @@ import ( "github.com/grafana/agent/pkg/integrations/statsd_exporter" agent_exporter_v2 "github.com/grafana/agent/pkg/integrations/v2/agent" apache_exporter_v2 "github.com/grafana/agent/pkg/integrations/v2/apache_http" + app_agent_receiver_v2 "github.com/grafana/agent/pkg/integrations/v2/app_agent_receiver" blackbox_exporter_v2 "github.com/grafana/agent/pkg/integrations/v2/blackbox_exporter" common_v2 "github.com/grafana/agent/pkg/integrations/v2/common" "github.com/grafana/agent/pkg/integrations/v2/metricsutils" snmp_exporter_v2 "github.com/grafana/agent/pkg/integrations/v2/snmp_exporter" + vmware_exporter_v2 "github.com/grafana/agent/pkg/integrations/v2/vmware_exporter" "github.com/grafana/agent/pkg/integrations/windows_exporter" "github.com/grafana/river/scanner" "github.com/grafana/river/token/builder" @@ -217,12 +219,18 @@ func (b *IntegrationsConfigBuilder) appendV2Integrations() { case *apache_exporter_v2.Config: exports = b.appendApacheExporterV2(itg) commonConfig = itg.Common + case *app_agent_receiver_v2.Config: + b.appendAppAgentReceiverV2(itg) + commonConfig = itg.Common case *blackbox_exporter_v2.Config: exports = b.appendBlackboxExporterV2(itg) commonConfig = itg.Common case *snmp_exporter_v2.Config: exports = b.appendSnmpExporterV2(itg) commonConfig = itg.Common + case *vmware_exporter_v2.Config: + exports = b.appendVmwareExporterV2(itg) + commonConfig = itg.Common case *metricsutils.ConfigShim: commonConfig = itg.Common switch v1_itg := itg.Orig.(type) { diff --git a/converter/internal/staticconvert/internal/build/vmware_exporter.go b/converter/internal/staticconvert/internal/build/vmware_exporter.go new file mode 100644 index 000000000000..61b595330b6d --- /dev/null +++ b/converter/internal/staticconvert/internal/build/vmware_exporter.go @@ -0,0 +1,25 @@ +package build + +import ( + "github.com/grafana/agent/component/discovery" + "github.com/grafana/agent/component/prometheus/exporter/vsphere" + vmware_exporter_v2 "github.com/grafana/agent/pkg/integrations/v2/vmware_exporter" + "github.com/grafana/river/rivertypes" +) + +func (b *IntegrationsConfigBuilder) appendVmwareExporterV2(config *vmware_exporter_v2.Config) discovery.Exports { + args := toVmwareExporter(config) + return b.appendExporterBlock(args, config.Name(), nil, "vsphere") +} + +func toVmwareExporter(config *vmware_exporter_v2.Config) *vsphere.Arguments { + return &vsphere.Arguments{ + ChunkSize: config.ChunkSize, + CollectConcurrency: config.CollectConcurrency, + VSphereURL: config.VSphereURL, + VSphereUser: config.VSphereUser, + VSpherePass: rivertypes.Secret(config.VSpherePass), + ObjectDiscoveryInterval: config.ObjectDiscoveryInterval, + EnableExporterMetrics: config.EnableExporterMetrics, + } +} diff --git a/converter/internal/staticconvert/testdata-v2/integrations_v2.river b/converter/internal/staticconvert/testdata-v2/integrations_v2.river index 3580c4767151..c5489cbbb1a7 100644 --- a/converter/internal/staticconvert/testdata-v2/integrations_v2.river +++ b/converter/internal/staticconvert/testdata-v2/integrations_v2.river @@ -9,6 +9,13 @@ prometheus.remote_write "metrics_default" { } } +loki.write "logs_log_config" { + endpoint { + url = "http://localhost/loki/api/v1/push" + } + external_labels = {} +} + prometheus.exporter.azure "integrations_azure1" { subscriptions = ["subId"] resource_type = "Microsoft.Dashboard/grafana" @@ -471,10 +478,37 @@ prometheus.scrape "integrations_apache2" { job_name = "integrations/apache2" } +faro.receiver "integrations_app_agent_receiver" { + extra_log_labels = {} + + server { + listen_address = "localhost" + listen_port = 55678 + max_allowed_payload_size = "4MiB786KiB832B" + + rate_limiting { + enabled = true + rate = 100 + burst_size = 50 + } + } + + sourcemaps { + download_from_origins = ["*"] + download_timeout = "1s" + } + + output { + logs = [loki.write.logs_log_config.receiver] + traces = [] + } +} + prometheus.exporter.blackbox "integrations_blackbox" { config = "modules:\n http_2xx:\n prober: http\n timeout: 5s\n http:\n method: POST\n headers:\n Content-Type: application/json\n body: '{}'\n preferred_ip_protocol: ip4\n" - target "example" { + target { + name = "example" address = "http://example.com" module = "http_2xx" } @@ -508,3 +542,24 @@ prometheus.scrape "integrations_snmp" { forward_to = [prometheus.remote_write.metrics_default.receiver] job_name = "integrations/snmp" } + +prometheus.exporter.vsphere "integrations_vsphere" { + vsphere_url = "https://127.0.0.1:8989/sdk" + vsphere_user = "user" + vsphere_password = "pass" +} + +discovery.relabel "integrations_vsphere" { + targets = prometheus.exporter.vsphere.integrations_vsphere.targets + + rule { + target_label = "instance" + replacement = "vsphere" + } +} + +prometheus.scrape "integrations_vsphere" { + targets = discovery.relabel.integrations_vsphere.output + forward_to = [prometheus.remote_write.metrics_default.receiver] + job_name = "integrations/vsphere" +} diff --git a/converter/internal/staticconvert/testdata-v2/integrations_v2.yaml b/converter/internal/staticconvert/testdata-v2/integrations_v2.yaml index d533977c6a9f..2fde06c384ae 100644 --- a/converter/internal/staticconvert/testdata-v2/integrations_v2.yaml +++ b/converter/internal/staticconvert/testdata-v2/integrations_v2.yaml @@ -5,6 +5,13 @@ metrics: configs: - name: default +logs: + positions_directory: /path + configs: + - name: log_config + clients: + - url: http://localhost/loki/api/v1/push + integrations: agent: autoscrape: @@ -19,6 +26,12 @@ integrations: extra_labels: test_label: test_label_value test_label_2: test_label_value_2 + app_agent_receiver_configs: + - instance: "default" + logs_instance: "log_config" + server: + host: "localhost" + port: 55678 azure_configs: - instance: "azure1" subscriptions: @@ -204,4 +217,14 @@ integrations: scrape_timeout: 1m statsd: autoscrape: - metrics_instance: "default" \ No newline at end of file + metrics_instance: "default" + vsphere_configs: + - vsphere_url: https://127.0.0.1:8989/sdk + vsphere_user: user + vsphere_password: pass + request_chunk_size: 256 + collect_concurrency: 8 + instance: vsphere + autoscrape: + enable: true + metrics_instance: default \ No newline at end of file diff --git a/converter/internal/staticconvert/testdata-v2/unsupported.diags b/converter/internal/staticconvert/testdata-v2/unsupported.diags new file mode 100644 index 000000000000..6337d505766f --- /dev/null +++ b/converter/internal/staticconvert/testdata-v2/unsupported.diags @@ -0,0 +1,3 @@ +(Warning) Please review your agent command line flags and ensure they are set in your Flow mode config file where necessary. +(Error) The converter does not support converting the provided eventhandler integration. +(Error) The converter does not support converting the provided app_agent_receiver traces_instance config. \ No newline at end of file diff --git a/converter/internal/staticconvert/testdata-v2/unsupported.river b/converter/internal/staticconvert/testdata-v2/unsupported.river new file mode 100644 index 000000000000..4b78abf71b43 --- /dev/null +++ b/converter/internal/staticconvert/testdata-v2/unsupported.river @@ -0,0 +1,36 @@ +prometheus.remote_write "metrics_default" { + endpoint { + name = "default-8be96f" + url = "http://localhost:9009/api/prom/push" + + queue_config { } + + metadata_config { } + } +} + +faro.receiver "integrations_app_agent_receiver" { + extra_log_labels = {} + + server { + listen_address = "localhost" + listen_port = 55678 + max_allowed_payload_size = "4MiB786KiB832B" + + rate_limiting { + enabled = true + rate = 100 + burst_size = 50 + } + } + + sourcemaps { + download_from_origins = ["*"] + download_timeout = "1s" + } + + output { + logs = [] + traces = [] + } +} diff --git a/converter/internal/staticconvert/testdata-v2/unsupported.yaml b/converter/internal/staticconvert/testdata-v2/unsupported.yaml new file mode 100644 index 000000000000..13b6c44998ad --- /dev/null +++ b/converter/internal/staticconvert/testdata-v2/unsupported.yaml @@ -0,0 +1,17 @@ + +metrics: + global: + remote_write: + - url: http://localhost:9009/api/prom/push + configs: + - name: default + +integrations: + app_agent_receiver_configs: + - instance: "default" + traces_instance: "not_supported" + server: + host: "localhost" + port: 55678 + eventhandler: + cache_path: "/etc/eventhandler/eventhandler.cache" \ No newline at end of file diff --git a/converter/internal/staticconvert/testdata/integrations.river b/converter/internal/staticconvert/testdata/integrations.river index 68a04d198066..261ef76f228b 100644 --- a/converter/internal/staticconvert/testdata/integrations.river +++ b/converter/internal/staticconvert/testdata/integrations.river @@ -41,7 +41,8 @@ prometheus.scrape "integrations_apache_http" { prometheus.exporter.blackbox "integrations_blackbox" { config = "modules:\n http_2xx:\n prober: http\n timeout: 5s\n http:\n method: POST\n headers:\n Content-Type: application/json\n body: '{}'\n preferred_ip_protocol: ip4\n" - target "example" { + target { + name = "example" address = "http://example.com" module = "http_2xx" } diff --git a/converter/internal/staticconvert/validate.go b/converter/internal/staticconvert/validate.go index eed0b8c57717..fb714af9d555 100644 --- a/converter/internal/staticconvert/validate.go +++ b/converter/internal/staticconvert/validate.go @@ -35,9 +35,11 @@ import ( v2 "github.com/grafana/agent/pkg/integrations/v2" agent_exporter_v2 "github.com/grafana/agent/pkg/integrations/v2/agent" apache_exporter_v2 "github.com/grafana/agent/pkg/integrations/v2/apache_http" + app_agent_receiver_v2 "github.com/grafana/agent/pkg/integrations/v2/app_agent_receiver" blackbox_exporter_v2 "github.com/grafana/agent/pkg/integrations/v2/blackbox_exporter" "github.com/grafana/agent/pkg/integrations/v2/metricsutils" snmp_exporter_v2 "github.com/grafana/agent/pkg/integrations/v2/snmp_exporter" + vmware_exporter_v2 "github.com/grafana/agent/pkg/integrations/v2/vmware_exporter" "github.com/grafana/agent/pkg/integrations/windows_exporter" "github.com/grafana/agent/pkg/logs" "github.com/grafana/agent/pkg/metrics" @@ -166,8 +168,11 @@ func validateIntegrationsV2(integrationsConfig *v2.SubsystemOptions) diag.Diagno switch itg := integration.(type) { case *agent_exporter_v2.Config: case *apache_exporter_v2.Config: + case *app_agent_receiver_v2.Config: + diags.AddAll(common.ValidateSupported(common.NotEquals, itg.TracesInstance, "", "app_agent_receiver traces_instance", "")) case *blackbox_exporter_v2.Config: case *snmp_exporter_v2.Config: + case *vmware_exporter_v2.Config: case *metricsutils.ConfigShim: switch v1_itg := itg.Orig.(type) { case *azure_exporter.Config: diff --git a/docs/developer/writing-exporter-flow-components.md b/docs/developer/writing-exporter-flow-components.md index 20afa6264a35..fb0681ae043b 100644 --- a/docs/developer/writing-exporter-flow-components.md +++ b/docs/developer/writing-exporter-flow-components.md @@ -39,7 +39,8 @@ The river config would look like this: prometheus.exporter.blackbox "example" { config_file = "blackbox_modules.yml" - target "example" { + target { + name = "example" address = "http://example.com" module = "http_2xx" } diff --git a/docs/sources/flow/_index.md b/docs/sources/flow/_index.md index 67db78b24cb7..df5e6ffdec8f 100644 --- a/docs/sources/flow/_index.md +++ b/docs/sources/flow/_index.md @@ -14,7 +14,7 @@ cascade: PRODUCT_ROOT_NAME: Grafana Agent --- -# {{< param "PRODUCT_NAME" >}} +# {{% param "PRODUCT_NAME" %}} {{< param "PRODUCT_NAME" >}} is a _component-based_ revision of {{< param "PRODUCT_ROOT_NAME" >}} with a focus on ease-of-use, debuggability, and ability to adapt to the needs of power users. @@ -67,7 +67,7 @@ prometheus.remote_write "default" { } ``` -## {{< param "PRODUCT_ROOT_NAME" >}} configuration generator +## {{% param "PRODUCT_ROOT_NAME" %}} configuration generator The {{< param "PRODUCT_ROOT_NAME" >}} [configuration generator](https://grafana.github.io/agent-configurator/) will help you get a head start on creating flow code. diff --git a/docs/sources/flow/getting-started/_index.md b/docs/sources/flow/getting-started/_index.md index 58238b81671d..bc989084f42b 100644 --- a/docs/sources/flow/getting-started/_index.md +++ b/docs/sources/flow/getting-started/_index.md @@ -12,7 +12,7 @@ title: Get started with Grafana Agent Flow weight: 200 --- -# Get started with {{< param "PRODUCT_NAME" >}} +# Get started with {{% param "PRODUCT_NAME" %}} This section details guides for getting started with {{< param "PRODUCT_NAME" >}}. diff --git a/docs/sources/flow/getting-started/configure-agent-clustering.md b/docs/sources/flow/getting-started/configure-agent-clustering.md index fa9692eb36f7..eacc61407986 100644 --- a/docs/sources/flow/getting-started/configure-agent-clustering.md +++ b/docs/sources/flow/getting-started/configure-agent-clustering.md @@ -11,7 +11,7 @@ title: Configure Grafana Agent clustering in an existing installation weight: 400 --- -# Configure {{< param "PRODUCT_NAME" >}} clustering in an existing installation +# Configure {{% param "PRODUCT_NAME" %}} clustering in an existing installation You can configure {{< param "PRODUCT_NAME" >}} to run with [clustering][] so that individual {{< param "PRODUCT_ROOT_NAME" >}}s can work together for workload distribution and high availability. @@ -20,7 +20,7 @@ You can configure {{< param "PRODUCT_NAME" >}} to run with [clustering][] so tha This topic describes how to add clustering to an existing installation. -## Configure {{< param "PRODUCT_NAME" >}} clustering with Helm Chart +## Configure {{% param "PRODUCT_NAME" %}} clustering with Helm Chart This section guides you through enabling clustering when {{< param "PRODUCT_NAME" >}} is installed on Kubernetes using the {{< param "PRODUCT_ROOT_NAME" >}} [Helm chart][install-helm]. diff --git a/docs/sources/flow/getting-started/migrating-from-operator.md b/docs/sources/flow/getting-started/migrating-from-operator.md index a796ab1fad02..5985488c2915 100644 --- a/docs/sources/flow/getting-started/migrating-from-operator.md +++ b/docs/sources/flow/getting-started/migrating-from-operator.md @@ -9,7 +9,7 @@ title: Migrating from Grafana Agent Operator to Grafana Agent Flow weight: 320 --- -# Migrating from Grafana Agent Operator to {{< param "PRODUCT_NAME" >}} +# Migrating from Grafana Agent Operator to {{% param "PRODUCT_NAME" %}} With the release of {{< param "PRODUCT_NAME" >}}, Grafana Agent Operator is no longer the recommended way to deploy {{< param "PRODUCT_ROOT_NAME" >}} in Kubernetes. Some of the Operator functionality has moved into {{< param "PRODUCT_NAME" >}} itself, and the Helm Chart has replaced the remaining functionality. @@ -21,7 +21,7 @@ Some of the Operator functionality has moved into {{< param "PRODUCT_NAME" >}} i This guide provides some steps to get started with {{< param "PRODUCT_NAME" >}} for users coming from Grafana Agent Operator. -## Deploy {{< param "PRODUCT_NAME" >}} with Helm +## Deploy {{% param "PRODUCT_NAME" %}} with Helm 1. Create a `values.yaml` file, which contains options for deploying your {{< param "PRODUCT_ROOT_NAME" >}}. You can start with the [default values][] and customize as you see fit, or start with this snippet, which should be a good starting point for what the Operator does. @@ -64,7 +64,7 @@ This guide provides some steps to get started with {{< param "PRODUCT_NAME" >}} This command uses the `--set-file` flag to pass the configuration file as a Helm value so that you can continue to edit it as a regular River file. -## Convert `MetricsIntances` to {{< param "PRODUCT_NAME" >}} components +## Convert `MetricsIntances` to {{% param "PRODUCT_NAME" %}} components A `MetricsInstance` resource primarily defines: diff --git a/docs/sources/flow/getting-started/migrating-from-prometheus.md b/docs/sources/flow/getting-started/migrating-from-prometheus.md index ecd287b96116..75ca9b20935f 100644 --- a/docs/sources/flow/getting-started/migrating-from-prometheus.md +++ b/docs/sources/flow/getting-started/migrating-from-prometheus.md @@ -11,7 +11,7 @@ title: Migrate from Prometheus to Grafana Agent Flow weight: 320 --- -# Migrate from Prometheus to {{< param "PRODUCT_NAME" >}} +# Migrate from Prometheus to {{% param "PRODUCT_NAME" %}} The built-in {{< param "PRODUCT_ROOT_NAME" >}} convert command can migrate your [Prometheus][] configuration to a {{< param "PRODUCT_NAME" >}} configuration. diff --git a/docs/sources/flow/getting-started/migrating-from-promtail.md b/docs/sources/flow/getting-started/migrating-from-promtail.md index 28e3866bbfb7..ec7cf68ff817 100644 --- a/docs/sources/flow/getting-started/migrating-from-promtail.md +++ b/docs/sources/flow/getting-started/migrating-from-promtail.md @@ -11,7 +11,7 @@ title: Migrate from Promtail to Grafana Agent Flow weight: 330 --- -# Migrate from Promtail to {{< param "PRODUCT_NAME" >}} +# Migrate from Promtail to {{% param "PRODUCT_NAME" %}} The built-in {{< param "PRODUCT_ROOT_NAME" >}} convert command can migrate your [Promtail][] configuration to a {{< param "PRODUCT_NAME" >}} configuration. diff --git a/docs/sources/flow/getting-started/migrating-from-static.md b/docs/sources/flow/getting-started/migrating-from-static.md index 37a350c8d951..9ff6b56cc8e1 100644 --- a/docs/sources/flow/getting-started/migrating-from-static.md +++ b/docs/sources/flow/getting-started/migrating-from-static.md @@ -11,7 +11,7 @@ title: Migrate Grafana Agent Static to Grafana Agent Flow weight: 340 --- -# Migrate from {{< param "PRODUCT_ROOT_NAME" >}} Static to {{< param "PRODUCT_NAME" >}} +# Migrate from {{% param "PRODUCT_ROOT_NAME" %}} Static to {{% param "PRODUCT_NAME" %}} The built-in {{< param "PRODUCT_ROOT_NAME" >}} convert command can migrate your [Static][] configuration to a {{< param "PRODUCT_NAME" >}} configuration. diff --git a/docs/sources/flow/monitoring/_index.md b/docs/sources/flow/monitoring/_index.md index 975db20e2031..729bbaa8ff89 100644 --- a/docs/sources/flow/monitoring/_index.md +++ b/docs/sources/flow/monitoring/_index.md @@ -11,7 +11,7 @@ menuTitle: Monitoring weight: 500 --- -# Monitoring {{< param "PRODUCT_NAME" >}} +# Monitoring {{% param "PRODUCT_NAME" %}} This section details various ways to monitor and debug {{< param "PRODUCT_NAME" >}}. diff --git a/docs/sources/flow/monitoring/debugging.md b/docs/sources/flow/monitoring/debugging.md index 36b2f2636b97..98bbda9cee7e 100644 --- a/docs/sources/flow/monitoring/debugging.md +++ b/docs/sources/flow/monitoring/debugging.md @@ -17,7 +17,7 @@ Follow these steps to debug issues with {{< param "PRODUCT_NAME" >}}: 1. Use the {{< param "PRODUCT_NAME" >}} UI to debug issues. 1. If the {{< param "PRODUCT_NAME" >}} UI doesn't help with debugging an issue, logs can be examined instead. -## {{< param "PRODUCT_NAME" >}} UI +## {{% param "PRODUCT_NAME" %}} UI {{< param "PRODUCT_NAME" >}} includes an embedded UI viewable from the {{< param "PRODUCT_ROOT_NAME" >}} HTTP server, which defaults to listening at `http://localhost:12345`. diff --git a/docs/sources/flow/reference/_index.md b/docs/sources/flow/reference/_index.md index 8e8f31fc645b..5c4e88aac9cc 100644 --- a/docs/sources/flow/reference/_index.md +++ b/docs/sources/flow/reference/_index.md @@ -11,7 +11,7 @@ title: Grafana Agent Flow Reference weight: 600 --- -# {{< param "PRODUCT_NAME" >}} Reference +# {{% param "PRODUCT_NAME" %}} Reference This section provides reference-level documentation for the various parts of {{< param "PRODUCT_NAME" >}}: diff --git a/docs/sources/flow/reference/cli/_index.md b/docs/sources/flow/reference/cli/_index.md index 55fa9d9197e1..43fa4be774fd 100644 --- a/docs/sources/flow/reference/cli/_index.md +++ b/docs/sources/flow/reference/cli/_index.md @@ -11,7 +11,7 @@ title: The Grafana Agent command-line interface weight: 100 --- -# The {{< param "PRODUCT_ROOT_NAME" >}} command-line interface +# The {{% param "PRODUCT_ROOT_NAME" %}} command-line interface When in Flow mode, the `grafana-agent` binary exposes a command-line interface with subcommands to perform various operations. diff --git a/docs/sources/flow/reference/components/loki.write.md b/docs/sources/flow/reference/components/loki.write.md index efbcdf34eabc..75aad04f3f2a 100644 --- a/docs/sources/flow/reference/components/loki.write.md +++ b/docs/sources/flow/reference/components/loki.write.md @@ -162,10 +162,11 @@ The following arguments are supported: Name | Type | Description | Default | Required --------------------- |------------|--------------------------------------------------------------------------------------------------------------------|-----------| -------- -`enabled` | `bool` | Whether to enable the WAL. | false | no +`enabled` | `bool` | Whether to enable the WAL. | false | no `max_segment_age` | `duration` | Maximum time a WAL segment should be allowed to live. Segments older than this setting will be eventually deleted. | `"1h"` | no `min_read_frequency` | `duration` | Minimum backoff time in the backup read mechanism. | `"250ms"` | no `max_read_frequency` | `duration` | Maximum backoff time in the backup read mechanism. | `"1s"` | no +`drain_timeout` | `duration` | Maximum time the WAL drain procedure can take, before being forcefully stopped. | `"30s"` | no [run]: {{< relref "../cli/run.md" >}} diff --git a/docs/sources/flow/reference/components/otelcol.connector.servicegraph.md b/docs/sources/flow/reference/components/otelcol.connector.servicegraph.md index e7aad20d8e29..ab3e55b5521f 100644 --- a/docs/sources/flow/reference/components/otelcol.connector.servicegraph.md +++ b/docs/sources/flow/reference/components/otelcol.connector.servicegraph.md @@ -135,7 +135,7 @@ The `store` block configures the in-memory store for spans. Name | Type | Description | Default | Required ---- | ---- | ----------- | ------- | -------- `max_items` | `number` | Maximum number of items to keep in the store. | `1000` | no -`ttl` | `duration` | The time to live for spans in the store. | `"2ms"` | no +`ttl` | `duration` | The time to live for spans in the store. | `"2s"` | no ### output block @@ -238,4 +238,4 @@ connection work correctly. Refer to the linked documentation for more details. {{% /admonition %}} - \ No newline at end of file + diff --git a/docs/sources/flow/reference/components/otelcol.exporter.prometheus.md b/docs/sources/flow/reference/components/otelcol.exporter.prometheus.md index f594934332e5..3008f22f4353 100644 --- a/docs/sources/flow/reference/components/otelcol.exporter.prometheus.md +++ b/docs/sources/flow/reference/components/otelcol.exporter.prometheus.md @@ -74,7 +74,6 @@ are forwarded to the `forward_to` argument. The following are dropped during the conversion process: * Metrics that use the delta aggregation temporality -* ExponentialHistogram data points ## Component health diff --git a/docs/sources/flow/reference/components/prometheus.exporter.blackbox.md b/docs/sources/flow/reference/components/prometheus.exporter.blackbox.md index f509061417dd..24fe248d5e23 100644 --- a/docs/sources/flow/reference/components/prometheus.exporter.blackbox.md +++ b/docs/sources/flow/reference/components/prometheus.exporter.blackbox.md @@ -18,7 +18,8 @@ The `prometheus.exporter.blackbox` component embeds ```river prometheus.exporter.blackbox "LABEL" { - target "example" { + target { + name = "example" address = "EXAMPLE_ADDRESS" } } @@ -60,10 +61,11 @@ The following blocks are supported inside the definition of ### target block The `target` block defines an individual blackbox target. -The `target` block may be specified multiple times to define multiple targets. The label of the block is required and will be used in the target's `job` label. +The `target` block may be specified multiple times to define multiple targets. `name` attribute is required and will be used in the target's `job` label. | Name | Type | Description | Default | Required | | --------- | ---------------- | ----------------------------------- | ------- | -------- | +| `name` | `string` | The name of the target to probe. | | yes | | `address` | `string` | The address of the target to probe. | | yes | | `module` | `string` | Blackbox module to use to probe. | `""` | no | | `labels` | `map(string)` | Labels to add to the target. | | no | @@ -103,12 +105,14 @@ The `config_file` argument is used to define which `blackbox_exporter` modules t prometheus.exporter.blackbox "example" { config_file = "blackbox_modules.yml" - target "example" { + target { + name = "example" address = "http://example.com" module = "http_2xx" } - target "grafana" { + target { + name = "grafana" address = "http://grafana.com" module = "http_2xx" labels = { @@ -149,12 +153,14 @@ This example is the same above with using an embedded configuration: prometheus.exporter.blackbox "example" { config = "{ modules: { http_2xx: { prober: http, timeout: 5s } } }" - target "example" { + target { + name = "example" address = "http://example.com" module = "http_2xx" } - target "grafana" { + target { + name = "grafana" address = "http://grafana.com" module = "http_2xx" labels = { diff --git a/docs/sources/flow/release-notes.md b/docs/sources/flow/release-notes.md index 33a53fc85c4c..f8053bf3c0b3 100644 --- a/docs/sources/flow/release-notes.md +++ b/docs/sources/flow/release-notes.md @@ -12,7 +12,7 @@ title: Release notes for Grafana Agent Flow weight: 999 --- -# Release notes for {{< param "PRODUCT_NAME" >}} +# Release notes for {{% param "PRODUCT_NAME" %}} The release notes provide information about deprecations and breaking changes in {{< param "PRODUCT_NAME" >}}. @@ -38,6 +38,44 @@ Other release notes for the different {{< param "PRODUCT_ROOT_NAME" >}} variants * Labels for `otel_scope_info` metrics other than `otel_scope_name` and `otel_scope_version` are added as scope attributes with the matching name and version. +### Breaking change: label for `target` block in `prometheus.exporter.blackbox` is removed + +Previously in `prometheus.exporter.blackbox`, the `target` block requires a label which is used in job's name. +In this version, user needs to be specify `name` attribute instead, which allow less restrictive naming. + +Old configuration example: + +```river +prometheus.exporter.blackbox "example" { + config_file = "blackbox_modules.yml" + + target "grafana" { + address = "http://grafana.com" + module = "http_2xx" + labels = { + "env": "dev", + } + } +} +``` + +New configuration example: + +```river +prometheus.exporter.blackbox "example" { + config_file = "blackbox_modules.yml" + + target { + name = "grafana" + address = "http://grafana.com" + module = "http_2xx" + labels = { + "env": "dev", + } + } +} +``` + ## v0.38 ### Breaking change: `otelcol.exporter.jaeger` component removed @@ -340,7 +378,7 @@ The change was made in PR [#18070](https://github.com/open-telemetry/opentelemet The `remote_sampling` block in `otelcol.receiver.jaeger` has been an undocumented no-op configuration for some time, and has now been removed. Customers are advised to use `otelcol.extension.jaeger_remote_sampling` instead. -### Deprecation: `otelcol.exporter.jaeger` has been deprecated and will be removed in {{< param "PRODUCT_NAME" >}} v0.38.0. +### Deprecation: `otelcol.exporter.jaeger` has been deprecated and will be removed in {{% param "PRODUCT_NAME" %}} v0.38.0. This is because Jaeger supports OTLP directly and OpenTelemetry Collector is also removing its [Jaeger receiver](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/jaegerexporter). diff --git a/docs/sources/flow/setup/_index.md b/docs/sources/flow/setup/_index.md index 0a1fbe189c2c..d639fa3eaea1 100644 --- a/docs/sources/flow/setup/_index.md +++ b/docs/sources/flow/setup/_index.md @@ -11,7 +11,7 @@ title: Set up Grafana Agent Flow weight: 50 --- -# Set up {{< param "PRODUCT_NAME" >}} +# Set up {{% param "PRODUCT_NAME" %}} This section includes information that helps you install and configure {{< param "PRODUCT_NAME" >}}. diff --git a/docs/sources/flow/setup/configure/_index.md b/docs/sources/flow/setup/configure/_index.md index 5b468138977a..b185bdac69a0 100644 --- a/docs/sources/flow/setup/configure/_index.md +++ b/docs/sources/flow/setup/configure/_index.md @@ -11,7 +11,7 @@ title: Configure Grafana Agent Flow weight: 150 --- -# Configure {{< param "PRODUCT_NAME" >}} +# Configure {{% param "PRODUCT_NAME" %}} You can configure {{< param "PRODUCT_NAME" >}} after it is installed. The default River configuration file for {{< param "PRODUCT_NAME" >}} is located at: diff --git a/docs/sources/flow/setup/configure/configure-kubernetes.md b/docs/sources/flow/setup/configure/configure-kubernetes.md index a68017c3c248..0eceedd5f89d 100644 --- a/docs/sources/flow/setup/configure/configure-kubernetes.md +++ b/docs/sources/flow/setup/configure/configure-kubernetes.md @@ -11,7 +11,7 @@ title: Configure Grafana Agent Flow on Kubernetes weight: 200 --- -# Configure {{< param "PRODUCT_NAME" >}} on Kubernetes +# Configure {{% param "PRODUCT_NAME" %}} on Kubernetes To configure {{< param "PRODUCT_NAME" >}} on Kubernetes, perform the following steps: diff --git a/docs/sources/flow/setup/configure/configure-linux.md b/docs/sources/flow/setup/configure/configure-linux.md index a7446dea9a7b..3964eb416070 100644 --- a/docs/sources/flow/setup/configure/configure-linux.md +++ b/docs/sources/flow/setup/configure/configure-linux.md @@ -11,7 +11,7 @@ title: Configure Grafana Agent Flow on Linux weight: 300 --- -# Configure {{< param "PRODUCT_NAME" >}} on Linux +# Configure {{% param "PRODUCT_NAME" %}} on Linux To configure {{< param "PRODUCT_NAME" >}} on Linux, perform the following steps: diff --git a/docs/sources/flow/setup/configure/configure-macos.md b/docs/sources/flow/setup/configure/configure-macos.md index fd664fc1149d..5261e75f9877 100644 --- a/docs/sources/flow/setup/configure/configure-macos.md +++ b/docs/sources/flow/setup/configure/configure-macos.md @@ -11,7 +11,7 @@ title: Configure Grafana Agent Flow on macOS weight: 400 --- -# Configure {{< param "PRODUCT_NAME" >}} on macOS +# Configure {{% param "PRODUCT_NAME" %}} on macOS To configure {{< param "PRODUCT_NAME" >}} on macOS, perform the following steps: @@ -23,7 +23,7 @@ To configure {{< param "PRODUCT_NAME" >}} on macOS, perform the following steps: brew services restart grafana-agent-flow ``` -## Configure the {{< param "PRODUCT_NAME" >}} service +## Configure the {{% param "PRODUCT_NAME" %}} service {{% admonition type="note" %}} Due to limitations in Homebrew, customizing the service used by diff --git a/docs/sources/flow/setup/configure/configure-windows.md b/docs/sources/flow/setup/configure/configure-windows.md index f62014caac83..010e6897b8ea 100644 --- a/docs/sources/flow/setup/configure/configure-windows.md +++ b/docs/sources/flow/setup/configure/configure-windows.md @@ -11,7 +11,7 @@ title: Configure Grafana Agent Flow on Windows weight: 500 --- -# Configure {{< param "PRODUCT_NAME" >}} on Windows +# Configure {{% param "PRODUCT_NAME" %}} on Windows To configure {{< param "PRODUCT_NAME" >}} on Windows, perform the following steps: diff --git a/docs/sources/flow/setup/install/_index.md b/docs/sources/flow/setup/install/_index.md index 4c6526600ac4..8305e7bf9a39 100644 --- a/docs/sources/flow/setup/install/_index.md +++ b/docs/sources/flow/setup/install/_index.md @@ -12,7 +12,7 @@ title: Install Grafana Agent Flow weight: 50 --- -# Install {{< param "PRODUCT_NAME" >}} +# Install {{% param "PRODUCT_NAME" %}} You can install {{< param "PRODUCT_NAME" >}} on Docker, Kubernetes, Linux, macOS, or Windows. diff --git a/docs/sources/flow/setup/install/binary.md b/docs/sources/flow/setup/install/binary.md index 91ceff6ba243..d491ad86cfe5 100644 --- a/docs/sources/flow/setup/install/binary.md +++ b/docs/sources/flow/setup/install/binary.md @@ -12,7 +12,7 @@ title: Install Grafana Agent Flow as a standalone binary weight: 600 --- -# Install {{< param "PRODUCT_NAME" >}} as a standalone binary +# Install {{% param "PRODUCT_NAME" %}} as a standalone binary {{< param "PRODUCT_NAME" >}} is distributed as a standalone binary for the following operating systems and architectures: @@ -21,7 +21,7 @@ weight: 600 * macOS: AMD64 (Intel), ARM64 (Apple Silicon) * FreeBSD: AMD64 -## Download {{< param "PRODUCT_ROOT_NAME" >}} +## Download {{% param "PRODUCT_ROOT_NAME" %}} To download {{< param "PRODUCT_NAME" >}} as a standalone binary, perform the following steps. diff --git a/docs/sources/flow/setup/install/docker.md b/docs/sources/flow/setup/install/docker.md index e7b40cc417d9..15f9e391e08e 100644 --- a/docs/sources/flow/setup/install/docker.md +++ b/docs/sources/flow/setup/install/docker.md @@ -12,7 +12,7 @@ title: Run Grafana Agent Flow in a Docker container weight: 100 --- -# Run {{< param "PRODUCT_NAME" >}} in a Docker container +# Run {{% param "PRODUCT_NAME" %}} in a Docker container {{< param "PRODUCT_NAME" >}} is available as a Docker container image on the following platforms: diff --git a/docs/sources/flow/setup/install/kubernetes.md b/docs/sources/flow/setup/install/kubernetes.md index 3e27b6e48b87..51ab96260ca3 100644 --- a/docs/sources/flow/setup/install/kubernetes.md +++ b/docs/sources/flow/setup/install/kubernetes.md @@ -12,7 +12,7 @@ title: Deploy Grafana Agent Flow on Kubernetes weight: 200 --- -# Deploy {{< param "PRODUCT_NAME" >}} on Kubernetes +# Deploy {{% param "PRODUCT_NAME" %}} on Kubernetes {{< param "PRODUCT_NAME" >}} can be deployed on Kubernetes by using the Helm chart for {{< param "PRODUCT_ROOT_NAME" >}}. diff --git a/docs/sources/flow/setup/install/linux.md b/docs/sources/flow/setup/install/linux.md index 563abdc503ee..56aae580299a 100644 --- a/docs/sources/flow/setup/install/linux.md +++ b/docs/sources/flow/setup/install/linux.md @@ -12,7 +12,7 @@ title: Install Grafana Agent Flow on Linux weight: 300 --- -# Install or uninstall {{< param "PRODUCT_NAME" >}} on Linux +# Install or uninstall {{% param "PRODUCT_NAME" %}} on Linux You can install {{< param "PRODUCT_NAME" >}} as a systemd service on Linux. diff --git a/docs/sources/flow/setup/install/macos.md b/docs/sources/flow/setup/install/macos.md index 8b276ce7bdc0..ad5174577aa3 100644 --- a/docs/sources/flow/setup/install/macos.md +++ b/docs/sources/flow/setup/install/macos.md @@ -12,7 +12,7 @@ title: Install Grafana Agent Flow on macOS weight: 400 --- -# Install {{< param "PRODUCT_NAME" >}} on macOS +# Install {{% param "PRODUCT_NAME" %}} on macOS You can install {{< param "PRODUCT_NAME" >}} on macOS with Homebrew . diff --git a/docs/sources/flow/setup/install/windows.md b/docs/sources/flow/setup/install/windows.md index 7dcc820309ba..765e9fdd3ac4 100644 --- a/docs/sources/flow/setup/install/windows.md +++ b/docs/sources/flow/setup/install/windows.md @@ -12,7 +12,7 @@ title: Install Grafana Agent Flow on Windows weight: 500 --- -# Install {{< param "PRODUCT_NAME" >}} on Windows +# Install {{% param "PRODUCT_NAME" %}} on Windows You can install {{< param "PRODUCT_NAME" >}} on Windows as a standard graphical install, or as a silent install. diff --git a/docs/sources/flow/setup/start-agent.md b/docs/sources/flow/setup/start-agent.md index c9bf6d9572b9..b63386cf1436 100644 --- a/docs/sources/flow/setup/start-agent.md +++ b/docs/sources/flow/setup/start-agent.md @@ -11,7 +11,7 @@ title: Start, restart, and stop Grafana Agent Flow weight: 800 --- -# Start, restart, and stop {{< param "PRODUCT_NAME" >}} +# Start, restart, and stop {{% param "PRODUCT_NAME" %}} You can start, restart, and stop {{< param "PRODUCT_NAME" >}} after it is installed. @@ -21,7 +21,7 @@ You can start, restart, and stop {{< param "PRODUCT_NAME" >}} after it is instal [systemd]: https://systemd.io/ -### Start {{< param "PRODUCT_NAME" >}} +### Start {{% param "PRODUCT_NAME" %}} To start {{< param "PRODUCT_NAME" >}}, run the following command in a terminal window: @@ -35,7 +35,7 @@ sudo systemctl start grafana-agent-flow sudo systemctl status grafana-agent-flow ``` -### Configure {{< param "PRODUCT_NAME" >}} to start at boot +### Configure {{% param "PRODUCT_NAME" %}} to start at boot To automatically run {{< param "PRODUCT_NAME" >}} when the system starts, run the following command in a terminal window: @@ -43,7 +43,7 @@ To automatically run {{< param "PRODUCT_NAME" >}} when the system starts, run th sudo systemctl enable grafana-agent-flow.service ``` -### Restart {{< param "PRODUCT_NAME" >}} +### Restart {{% param "PRODUCT_NAME" %}} To restart {{< param "PRODUCT_NAME" >}}, run the following command in a terminal window: @@ -51,7 +51,7 @@ To restart {{< param "PRODUCT_NAME" >}}, run the following command in a terminal sudo systemctl restart grafana-agent-flow ``` -### Stop {{< param "PRODUCT_NAME" >}} +### Stop {{% param "PRODUCT_NAME" %}} To stop {{< param "PRODUCT_NAME" >}}, run the following command in a terminal window: @@ -59,7 +59,7 @@ To stop {{< param "PRODUCT_NAME" >}}, run the following command in a terminal wi sudo systemctl stop grafana-agent-flow ``` -### View {{< param "PRODUCT_NAME" >}} logs on Linux +### View {{% param "PRODUCT_NAME" %}} logs on Linux To view {{< param "PRODUCT_NAME" >}} log files, run the following command in a terminal window: @@ -71,7 +71,7 @@ sudo journalctl -u grafana-agent-flow {{< param "PRODUCT_NAME" >}} is installed as a launchd service on macOS. -### Start {{< param "PRODUCT_NAME" >}} +### Start {{% param "PRODUCT_NAME" %}} To start {{< param "PRODUCT_NAME" >}}, run the following command in a terminal window: @@ -87,7 +87,7 @@ brew services start grafana-agent-flow brew services info grafana-agent-flow ``` -### Restart {{< param "PRODUCT_NAME" >}} +### Restart {{% param "PRODUCT_NAME" %}} To restart {{< param "PRODUCT_NAME" >}}, run the following command in a terminal window: @@ -95,7 +95,7 @@ To restart {{< param "PRODUCT_NAME" >}}, run the following command in a terminal brew services restart grafana-agent-flow ``` -### Stop {{< param "PRODUCT_NAME" >}} +### Stop {{% param "PRODUCT_NAME" %}} To stop {{< param "PRODUCT_NAME" >}}, run the following command in a terminal window: @@ -103,7 +103,7 @@ To stop {{< param "PRODUCT_NAME" >}}, run the following command in a terminal wi brew services stop grafana-agent-flow ``` -### View {{< param "PRODUCT_NAME" >}} logs on macOS +### View {{% param "PRODUCT_NAME" %}} logs on macOS By default, logs are written to `$(brew --prefix)/var/log/grafana-agent-flow.log` and `$(brew --prefix)/var/log/grafana-agent-flow.err.log`. @@ -125,7 +125,7 @@ To verify that {{< param "PRODUCT_NAME" >}} is running as a Windows Service: 1. Scroll down to find the **{{< param "PRODUCT_NAME" >}}** service and verify that the **Status** is **Running**. -### View {{< param "PRODUCT_NAME" >}} logs +### View {{% param "PRODUCT_NAME" %}} logs When running on Windows, {{< param "PRODUCT_NAME" >}} writes its logs to Windows Event Logs with an event source name of **{{< param "PRODUCT_NAME" >}}**. @@ -146,7 +146,7 @@ To view the logs, perform the following steps: If you downloaded the standalone binary, you must run {{< param "PRODUCT_NAME" >}} from a terminal or command window. -### Start {{< param "PRODUCT_NAME" >}} on Linux, macOS, or FreeBSD +### Start {{% param "PRODUCT_NAME" %}} on Linux, macOS, or FreeBSD To start {{< param "PRODUCT_NAME" >}} on Linux, macOS, or FreeBSD, run the following command in a terminal window: @@ -159,7 +159,7 @@ Replace the following: * _``_: The path to the {{< param "PRODUCT_NAME" >}} binary file. * _``_: The path to the {{< param "PRODUCT_NAME" >}} configuration file. -### Start {{< param "PRODUCT_NAME" >}} on Windows +### Start {{% param "PRODUCT_NAME" %}} on Windows To start {{< param "PRODUCT_NAME" >}} on Windows, run the following commands in a command prompt: @@ -173,7 +173,7 @@ Replace the following: * _``_: The path to the {{< param "PRODUCT_NAME" >}} binary file. * _``_: The path to the {{< param "PRODUCT_NAME" >}} configuration file. -### Set up {{< param "PRODUCT_NAME" >}} as a Linux systemd service +### Set up {{% param "PRODUCT_NAME" %}} as a Linux systemd service You can set up and manage the standalone binary for {{< param "PRODUCT_NAME" >}} as a Linux systemd service. diff --git a/integration-tests/tests/otlp-metrics/config.river b/integration-tests/tests/otlp-metrics/config.river index 26a6212a3c11..859181f9ccbb 100644 --- a/integration-tests/tests/otlp-metrics/config.river +++ b/integration-tests/tests/otlp-metrics/config.river @@ -2,7 +2,7 @@ otelcol.receiver.otlp "otlp_metrics" { http {} output { - metrics = [otelcol.processor.attributes.otlp_metrics.input] + metrics = [otelcol.processor.attributes.otlp_metrics.input, otelcol.exporter.prometheus.otlp_to_prom_metrics.input] } } @@ -27,3 +27,23 @@ otelcol.exporter.otlphttp "otlp_metrics" { } } } + +otelcol.exporter.prometheus "otlp_to_prom_metrics" { + forward_to = [prometheus.remote_write.otlp_to_prom_metrics.receiver] +} + +prometheus.remote_write "otlp_to_prom_metrics" { + endpoint { + url = "http://localhost:9009/api/v1/push" + send_native_histograms = true + metadata_config { + send_interval = "1s" + } + queue_config { + max_samples_per_send = 100 + } + } + external_labels = { + test_name = "otlp_to_prom_metrics", + } +} diff --git a/integration-tests/tests/otlp-metrics/otlp_metrics_test.go b/integration-tests/tests/otlp-metrics/otlp_metrics_test.go index a884cd702e84..6c3676a20604 100644 --- a/integration-tests/tests/otlp-metrics/otlp_metrics_test.go +++ b/integration-tests/tests/otlp-metrics/otlp_metrics_test.go @@ -13,11 +13,12 @@ import ( const promURL = "http://localhost:9009/prometheus/api/v1/query?query=" -func metricQuery(metricName string) string { - return fmt.Sprintf("%s%s{test_name='otlp_metrics'}", promURL, metricName) +func metricQuery(metricName string, testName string) string { + return fmt.Sprintf("%s%s{test_name='%s'}", promURL, metricName, testName) } func TestOTLPMetrics(t *testing.T) { + const testName = "otlp_metrics" tests := []struct { metric string }{ @@ -34,7 +35,7 @@ func TestOTLPMetrics(t *testing.T) { tt := tt t.Run(tt.metric, func(t *testing.T) { t.Parallel() - assertMetricData(t, metricQuery(tt.metric), tt.metric) + assertMetricData(t, metricQuery(tt.metric, testName), tt.metric, testName) }) } @@ -47,19 +48,19 @@ func TestOTLPMetrics(t *testing.T) { metric := metric t.Run(metric, func(t *testing.T) { t.Parallel() - assertHistogramData(t, metricQuery(metric), metric) + assertHistogramData(t, metricQuery(metric, testName), metric, testName) }) } } -func assertHistogramData(t *testing.T, query string, expectedMetric string) { +func assertHistogramData(t *testing.T, query string, expectedMetric string, testName string) { var metricResponse common.MetricResponse assert.EventuallyWithT(t, func(c *assert.CollectT) { err := common.FetchDataFromURL(query, &metricResponse) assert.NoError(c, err) if assert.NotEmpty(c, metricResponse.Data.Result) { assert.Equal(c, metricResponse.Data.Result[0].Metric.Name, expectedMetric) - assert.Equal(c, metricResponse.Data.Result[0].Metric.TestName, "otlp_metrics") + assert.Equal(c, metricResponse.Data.Result[0].Metric.TestName, testName) if assert.NotNil(c, metricResponse.Data.Result[0].Histogram) { histogram := metricResponse.Data.Result[0].Histogram if assert.NotEmpty(c, histogram.Data.Count) { @@ -77,14 +78,14 @@ func assertHistogramData(t *testing.T, query string, expectedMetric string) { }, common.DefaultTimeout, common.DefaultRetryInterval, "Histogram data did not satisfy the conditions within the time limit") } -func assertMetricData(t *testing.T, query, expectedMetric string) { +func assertMetricData(t *testing.T, query, expectedMetric string, testName string) { var metricResponse common.MetricResponse assert.EventuallyWithT(t, func(c *assert.CollectT) { err := common.FetchDataFromURL(query, &metricResponse) assert.NoError(c, err) if assert.NotEmpty(c, metricResponse.Data.Result) { assert.Equal(c, metricResponse.Data.Result[0].Metric.Name, expectedMetric) - assert.Equal(c, metricResponse.Data.Result[0].Metric.TestName, "otlp_metrics") + assert.Equal(c, metricResponse.Data.Result[0].Metric.TestName, testName) assert.NotEmpty(c, metricResponse.Data.Result[0].Value.Value) assert.Nil(c, metricResponse.Data.Result[0].Histogram) } diff --git a/integration-tests/tests/otlp-metrics/otlp_to_prom_metrics_test.go b/integration-tests/tests/otlp-metrics/otlp_to_prom_metrics_test.go new file mode 100644 index 000000000000..9a0c5d780975 --- /dev/null +++ b/integration-tests/tests/otlp-metrics/otlp_to_prom_metrics_test.go @@ -0,0 +1,42 @@ +//go:build !windows + +package main + +import ( + "testing" +) + +func TestOTLPToPromMetrics(t *testing.T) { + const testName = "otlp_to_prom_metrics" + tests := []struct { + metric string + }{ + {"example_counter_total"}, + {"example_float_counter_total"}, + {"example_updowncounter"}, + {"example_float_updowncounter"}, + {"example_histogram_bucket"}, + {"example_float_histogram_bucket"}, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.metric, func(t *testing.T) { + t.Parallel() + assertMetricData(t, metricQuery(tt.metric, testName), tt.metric, testName) + }) + } + + histogramTests := []string{ + "example_exponential_histogram", + "example_exponential_float_histogram", + } + + for _, metric := range histogramTests { + metric := metric + t.Run(metric, func(t *testing.T) { + t.Parallel() + assertHistogramData(t, metricQuery(metric, testName), metric, testName) + }) + } +} diff --git a/operations/helm/charts/grafana-agent/CHANGELOG.md b/operations/helm/charts/grafana-agent/CHANGELOG.md index 6133ed8a8dd0..70b612d74572 100644 --- a/operations/helm/charts/grafana-agent/CHANGELOG.md +++ b/operations/helm/charts/grafana-agent/CHANGELOG.md @@ -10,6 +10,10 @@ internal API changes are not present. Unreleased ---------- +### Enhancements + +- Update `rbac` to include necessary rules for the `otelcol.processor.k8sattributes` component. (@rlankfo) + 0.29.0 (2023-11-30) ------------------- diff --git a/operations/helm/charts/grafana-agent/templates/rbac.yaml b/operations/helm/charts/grafana-agent/templates/rbac.yaml index fbbe733dcd92..af0d30665ad0 100644 --- a/operations/helm/charts/grafana-agent/templates/rbac.yaml +++ b/operations/helm/charts/grafana-agent/templates/rbac.yaml @@ -83,6 +83,13 @@ rules: - get - list - watch + # needed for otelcol.processor.k8sattributes + - apiGroups: ["apps"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] + - apiGroups: ["extensions"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] --- diff --git a/operations/helm/tests/clustering/grafana-agent/templates/rbac.yaml b/operations/helm/tests/clustering/grafana-agent/templates/rbac.yaml index 063aa099ae47..3765583fb64f 100644 --- a/operations/helm/tests/clustering/grafana-agent/templates/rbac.yaml +++ b/operations/helm/tests/clustering/grafana-agent/templates/rbac.yaml @@ -88,6 +88,13 @@ rules: - get - list - watch + # needed for otelcol.processor.k8sattributes + - apiGroups: ["apps"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] + - apiGroups: ["extensions"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] --- # Source: grafana-agent/templates/rbac.yaml apiVersion: rbac.authorization.k8s.io/v1 diff --git a/operations/helm/tests/controller-volumes-extra/grafana-agent/templates/rbac.yaml b/operations/helm/tests/controller-volumes-extra/grafana-agent/templates/rbac.yaml index 063aa099ae47..3765583fb64f 100644 --- a/operations/helm/tests/controller-volumes-extra/grafana-agent/templates/rbac.yaml +++ b/operations/helm/tests/controller-volumes-extra/grafana-agent/templates/rbac.yaml @@ -88,6 +88,13 @@ rules: - get - list - watch + # needed for otelcol.processor.k8sattributes + - apiGroups: ["apps"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] + - apiGroups: ["extensions"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] --- # Source: grafana-agent/templates/rbac.yaml apiVersion: rbac.authorization.k8s.io/v1 diff --git a/operations/helm/tests/create-daemonset-hostnetwork/grafana-agent/templates/rbac.yaml b/operations/helm/tests/create-daemonset-hostnetwork/grafana-agent/templates/rbac.yaml index 063aa099ae47..3765583fb64f 100644 --- a/operations/helm/tests/create-daemonset-hostnetwork/grafana-agent/templates/rbac.yaml +++ b/operations/helm/tests/create-daemonset-hostnetwork/grafana-agent/templates/rbac.yaml @@ -88,6 +88,13 @@ rules: - get - list - watch + # needed for otelcol.processor.k8sattributes + - apiGroups: ["apps"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] + - apiGroups: ["extensions"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] --- # Source: grafana-agent/templates/rbac.yaml apiVersion: rbac.authorization.k8s.io/v1 diff --git a/operations/helm/tests/create-daemonset/grafana-agent/templates/rbac.yaml b/operations/helm/tests/create-daemonset/grafana-agent/templates/rbac.yaml index 063aa099ae47..3765583fb64f 100644 --- a/operations/helm/tests/create-daemonset/grafana-agent/templates/rbac.yaml +++ b/operations/helm/tests/create-daemonset/grafana-agent/templates/rbac.yaml @@ -88,6 +88,13 @@ rules: - get - list - watch + # needed for otelcol.processor.k8sattributes + - apiGroups: ["apps"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] + - apiGroups: ["extensions"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] --- # Source: grafana-agent/templates/rbac.yaml apiVersion: rbac.authorization.k8s.io/v1 diff --git a/operations/helm/tests/create-deployment-autoscaling/grafana-agent/templates/rbac.yaml b/operations/helm/tests/create-deployment-autoscaling/grafana-agent/templates/rbac.yaml index 063aa099ae47..3765583fb64f 100644 --- a/operations/helm/tests/create-deployment-autoscaling/grafana-agent/templates/rbac.yaml +++ b/operations/helm/tests/create-deployment-autoscaling/grafana-agent/templates/rbac.yaml @@ -88,6 +88,13 @@ rules: - get - list - watch + # needed for otelcol.processor.k8sattributes + - apiGroups: ["apps"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] + - apiGroups: ["extensions"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] --- # Source: grafana-agent/templates/rbac.yaml apiVersion: rbac.authorization.k8s.io/v1 diff --git a/operations/helm/tests/create-deployment/grafana-agent/templates/rbac.yaml b/operations/helm/tests/create-deployment/grafana-agent/templates/rbac.yaml index 063aa099ae47..3765583fb64f 100644 --- a/operations/helm/tests/create-deployment/grafana-agent/templates/rbac.yaml +++ b/operations/helm/tests/create-deployment/grafana-agent/templates/rbac.yaml @@ -88,6 +88,13 @@ rules: - get - list - watch + # needed for otelcol.processor.k8sattributes + - apiGroups: ["apps"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] + - apiGroups: ["extensions"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] --- # Source: grafana-agent/templates/rbac.yaml apiVersion: rbac.authorization.k8s.io/v1 diff --git a/operations/helm/tests/create-statefulset-autoscaling/grafana-agent/templates/rbac.yaml b/operations/helm/tests/create-statefulset-autoscaling/grafana-agent/templates/rbac.yaml index 063aa099ae47..3765583fb64f 100644 --- a/operations/helm/tests/create-statefulset-autoscaling/grafana-agent/templates/rbac.yaml +++ b/operations/helm/tests/create-statefulset-autoscaling/grafana-agent/templates/rbac.yaml @@ -88,6 +88,13 @@ rules: - get - list - watch + # needed for otelcol.processor.k8sattributes + - apiGroups: ["apps"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] + - apiGroups: ["extensions"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] --- # Source: grafana-agent/templates/rbac.yaml apiVersion: rbac.authorization.k8s.io/v1 diff --git a/operations/helm/tests/create-statefulset/grafana-agent/templates/rbac.yaml b/operations/helm/tests/create-statefulset/grafana-agent/templates/rbac.yaml index 063aa099ae47..3765583fb64f 100644 --- a/operations/helm/tests/create-statefulset/grafana-agent/templates/rbac.yaml +++ b/operations/helm/tests/create-statefulset/grafana-agent/templates/rbac.yaml @@ -88,6 +88,13 @@ rules: - get - list - watch + # needed for otelcol.processor.k8sattributes + - apiGroups: ["apps"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] + - apiGroups: ["extensions"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] --- # Source: grafana-agent/templates/rbac.yaml apiVersion: rbac.authorization.k8s.io/v1 diff --git a/operations/helm/tests/custom-config/grafana-agent/templates/rbac.yaml b/operations/helm/tests/custom-config/grafana-agent/templates/rbac.yaml index 063aa099ae47..3765583fb64f 100644 --- a/operations/helm/tests/custom-config/grafana-agent/templates/rbac.yaml +++ b/operations/helm/tests/custom-config/grafana-agent/templates/rbac.yaml @@ -88,6 +88,13 @@ rules: - get - list - watch + # needed for otelcol.processor.k8sattributes + - apiGroups: ["apps"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] + - apiGroups: ["extensions"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] --- # Source: grafana-agent/templates/rbac.yaml apiVersion: rbac.authorization.k8s.io/v1 diff --git a/operations/helm/tests/default-values/grafana-agent/templates/rbac.yaml b/operations/helm/tests/default-values/grafana-agent/templates/rbac.yaml index 063aa099ae47..3765583fb64f 100644 --- a/operations/helm/tests/default-values/grafana-agent/templates/rbac.yaml +++ b/operations/helm/tests/default-values/grafana-agent/templates/rbac.yaml @@ -88,6 +88,13 @@ rules: - get - list - watch + # needed for otelcol.processor.k8sattributes + - apiGroups: ["apps"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] + - apiGroups: ["extensions"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] --- # Source: grafana-agent/templates/rbac.yaml apiVersion: rbac.authorization.k8s.io/v1 diff --git a/operations/helm/tests/enable-servicemonitor/grafana-agent/templates/rbac.yaml b/operations/helm/tests/enable-servicemonitor/grafana-agent/templates/rbac.yaml index 063aa099ae47..3765583fb64f 100644 --- a/operations/helm/tests/enable-servicemonitor/grafana-agent/templates/rbac.yaml +++ b/operations/helm/tests/enable-servicemonitor/grafana-agent/templates/rbac.yaml @@ -88,6 +88,13 @@ rules: - get - list - watch + # needed for otelcol.processor.k8sattributes + - apiGroups: ["apps"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] + - apiGroups: ["extensions"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] --- # Source: grafana-agent/templates/rbac.yaml apiVersion: rbac.authorization.k8s.io/v1 diff --git a/operations/helm/tests/envFrom/grafana-agent/templates/rbac.yaml b/operations/helm/tests/envFrom/grafana-agent/templates/rbac.yaml index 063aa099ae47..3765583fb64f 100644 --- a/operations/helm/tests/envFrom/grafana-agent/templates/rbac.yaml +++ b/operations/helm/tests/envFrom/grafana-agent/templates/rbac.yaml @@ -88,6 +88,13 @@ rules: - get - list - watch + # needed for otelcol.processor.k8sattributes + - apiGroups: ["apps"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] + - apiGroups: ["extensions"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] --- # Source: grafana-agent/templates/rbac.yaml apiVersion: rbac.authorization.k8s.io/v1 diff --git a/operations/helm/tests/existing-config/grafana-agent/templates/rbac.yaml b/operations/helm/tests/existing-config/grafana-agent/templates/rbac.yaml index 063aa099ae47..3765583fb64f 100644 --- a/operations/helm/tests/existing-config/grafana-agent/templates/rbac.yaml +++ b/operations/helm/tests/existing-config/grafana-agent/templates/rbac.yaml @@ -88,6 +88,13 @@ rules: - get - list - watch + # needed for otelcol.processor.k8sattributes + - apiGroups: ["apps"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] + - apiGroups: ["extensions"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] --- # Source: grafana-agent/templates/rbac.yaml apiVersion: rbac.authorization.k8s.io/v1 diff --git a/operations/helm/tests/extra-env/grafana-agent/templates/rbac.yaml b/operations/helm/tests/extra-env/grafana-agent/templates/rbac.yaml index 063aa099ae47..3765583fb64f 100644 --- a/operations/helm/tests/extra-env/grafana-agent/templates/rbac.yaml +++ b/operations/helm/tests/extra-env/grafana-agent/templates/rbac.yaml @@ -88,6 +88,13 @@ rules: - get - list - watch + # needed for otelcol.processor.k8sattributes + - apiGroups: ["apps"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] + - apiGroups: ["extensions"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] --- # Source: grafana-agent/templates/rbac.yaml apiVersion: rbac.authorization.k8s.io/v1 diff --git a/operations/helm/tests/extra-ports/grafana-agent/templates/rbac.yaml b/operations/helm/tests/extra-ports/grafana-agent/templates/rbac.yaml index 063aa099ae47..3765583fb64f 100644 --- a/operations/helm/tests/extra-ports/grafana-agent/templates/rbac.yaml +++ b/operations/helm/tests/extra-ports/grafana-agent/templates/rbac.yaml @@ -88,6 +88,13 @@ rules: - get - list - watch + # needed for otelcol.processor.k8sattributes + - apiGroups: ["apps"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] + - apiGroups: ["extensions"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] --- # Source: grafana-agent/templates/rbac.yaml apiVersion: rbac.authorization.k8s.io/v1 diff --git a/operations/helm/tests/faro-ingress/grafana-agent/templates/rbac.yaml b/operations/helm/tests/faro-ingress/grafana-agent/templates/rbac.yaml index 063aa099ae47..3765583fb64f 100644 --- a/operations/helm/tests/faro-ingress/grafana-agent/templates/rbac.yaml +++ b/operations/helm/tests/faro-ingress/grafana-agent/templates/rbac.yaml @@ -88,6 +88,13 @@ rules: - get - list - watch + # needed for otelcol.processor.k8sattributes + - apiGroups: ["apps"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] + - apiGroups: ["extensions"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] --- # Source: grafana-agent/templates/rbac.yaml apiVersion: rbac.authorization.k8s.io/v1 diff --git a/operations/helm/tests/global-image-pullsecrets/grafana-agent/templates/rbac.yaml b/operations/helm/tests/global-image-pullsecrets/grafana-agent/templates/rbac.yaml index 063aa099ae47..3765583fb64f 100644 --- a/operations/helm/tests/global-image-pullsecrets/grafana-agent/templates/rbac.yaml +++ b/operations/helm/tests/global-image-pullsecrets/grafana-agent/templates/rbac.yaml @@ -88,6 +88,13 @@ rules: - get - list - watch + # needed for otelcol.processor.k8sattributes + - apiGroups: ["apps"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] + - apiGroups: ["extensions"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] --- # Source: grafana-agent/templates/rbac.yaml apiVersion: rbac.authorization.k8s.io/v1 diff --git a/operations/helm/tests/global-image-registry/grafana-agent/templates/rbac.yaml b/operations/helm/tests/global-image-registry/grafana-agent/templates/rbac.yaml index 063aa099ae47..3765583fb64f 100644 --- a/operations/helm/tests/global-image-registry/grafana-agent/templates/rbac.yaml +++ b/operations/helm/tests/global-image-registry/grafana-agent/templates/rbac.yaml @@ -88,6 +88,13 @@ rules: - get - list - watch + # needed for otelcol.processor.k8sattributes + - apiGroups: ["apps"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] + - apiGroups: ["extensions"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] --- # Source: grafana-agent/templates/rbac.yaml apiVersion: rbac.authorization.k8s.io/v1 diff --git a/operations/helm/tests/initcontainers/grafana-agent/templates/rbac.yaml b/operations/helm/tests/initcontainers/grafana-agent/templates/rbac.yaml index 063aa099ae47..3765583fb64f 100644 --- a/operations/helm/tests/initcontainers/grafana-agent/templates/rbac.yaml +++ b/operations/helm/tests/initcontainers/grafana-agent/templates/rbac.yaml @@ -88,6 +88,13 @@ rules: - get - list - watch + # needed for otelcol.processor.k8sattributes + - apiGroups: ["apps"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] + - apiGroups: ["extensions"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] --- # Source: grafana-agent/templates/rbac.yaml apiVersion: rbac.authorization.k8s.io/v1 diff --git a/operations/helm/tests/local-image-pullsecrets/grafana-agent/templates/rbac.yaml b/operations/helm/tests/local-image-pullsecrets/grafana-agent/templates/rbac.yaml index 063aa099ae47..3765583fb64f 100644 --- a/operations/helm/tests/local-image-pullsecrets/grafana-agent/templates/rbac.yaml +++ b/operations/helm/tests/local-image-pullsecrets/grafana-agent/templates/rbac.yaml @@ -88,6 +88,13 @@ rules: - get - list - watch + # needed for otelcol.processor.k8sattributes + - apiGroups: ["apps"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] + - apiGroups: ["extensions"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] --- # Source: grafana-agent/templates/rbac.yaml apiVersion: rbac.authorization.k8s.io/v1 diff --git a/operations/helm/tests/local-image-registry/grafana-agent/templates/rbac.yaml b/operations/helm/tests/local-image-registry/grafana-agent/templates/rbac.yaml index 063aa099ae47..3765583fb64f 100644 --- a/operations/helm/tests/local-image-registry/grafana-agent/templates/rbac.yaml +++ b/operations/helm/tests/local-image-registry/grafana-agent/templates/rbac.yaml @@ -88,6 +88,13 @@ rules: - get - list - watch + # needed for otelcol.processor.k8sattributes + - apiGroups: ["apps"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] + - apiGroups: ["extensions"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] --- # Source: grafana-agent/templates/rbac.yaml apiVersion: rbac.authorization.k8s.io/v1 diff --git a/operations/helm/tests/nodeselectors-and-tolerations/grafana-agent/templates/rbac.yaml b/operations/helm/tests/nodeselectors-and-tolerations/grafana-agent/templates/rbac.yaml index 063aa099ae47..3765583fb64f 100644 --- a/operations/helm/tests/nodeselectors-and-tolerations/grafana-agent/templates/rbac.yaml +++ b/operations/helm/tests/nodeselectors-and-tolerations/grafana-agent/templates/rbac.yaml @@ -88,6 +88,13 @@ rules: - get - list - watch + # needed for otelcol.processor.k8sattributes + - apiGroups: ["apps"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] + - apiGroups: ["extensions"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] --- # Source: grafana-agent/templates/rbac.yaml apiVersion: rbac.authorization.k8s.io/v1 diff --git a/operations/helm/tests/static-mode/grafana-agent/templates/rbac.yaml b/operations/helm/tests/static-mode/grafana-agent/templates/rbac.yaml index 063aa099ae47..3765583fb64f 100644 --- a/operations/helm/tests/static-mode/grafana-agent/templates/rbac.yaml +++ b/operations/helm/tests/static-mode/grafana-agent/templates/rbac.yaml @@ -88,6 +88,13 @@ rules: - get - list - watch + # needed for otelcol.processor.k8sattributes + - apiGroups: ["apps"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] + - apiGroups: ["extensions"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] --- # Source: grafana-agent/templates/rbac.yaml apiVersion: rbac.authorization.k8s.io/v1 diff --git a/pkg/flow/flow.go b/pkg/flow/flow.go index ec16333568ee..76d62bdb9cb2 100644 --- a/pkg/flow/flow.go +++ b/pkg/flow/flow.go @@ -240,7 +240,7 @@ func (f *Flow) Run(ctx context.Context) { // throughput - it prevents the situation where two components have the same dependency, and the first time // it's picked up by the worker pool and the second time it's enqueued again, resulting in more evaluations. all := f.updateQueue.DequeueAll() - f.loader.EvaluateDependencies(ctx, all) + f.loader.EvaluateDependants(ctx, all) case <-f.loadFinished: level.Info(f.log).Log("msg", "scheduling loaded components and services") diff --git a/pkg/flow/internal/controller/loader.go b/pkg/flow/internal/controller/loader.go index e968e8102fe0..10a6f37965ab 100644 --- a/pkg/flow/internal/controller/loader.go +++ b/pkg/flow/internal/controller/loader.go @@ -32,7 +32,7 @@ type Loader struct { componentReg ComponentRegistry workerPool worker.Pool // backoffConfig is used to backoff when an updated component's dependencies cannot be submitted to worker - // pool for evaluation in EvaluateDependencies, because the queue is full. This is an unlikely scenario, but when + // pool for evaluation in EvaluateDependants, because the queue is full. This is an unlikely scenario, but when // it happens we should avoid retrying too often to give other goroutines a chance to progress. Having a backoff // also prevents log spamming with errors. backoffConfig backoff.Config @@ -543,13 +543,13 @@ func (l *Loader) OriginalGraph() *dag.Graph { return l.originalGraph.Clone() } -// EvaluateDependencies sends components which depend directly on components in updatedNodes for evaluation to the +// EvaluateDependants sends components which depend directly on components in updatedNodes for evaluation to the // workerPool. It should be called whenever components update their exports. -// It is beneficial to call EvaluateDependencies with a batch of components, as it will enqueue the entire batch before +// It is beneficial to call EvaluateDependants with a batch of components, as it will enqueue the entire batch before // the worker pool starts to evaluate them, resulting in smaller number of total evaluations when -// node updates are frequent. If the worker pool's queue is full, EvaluateDependencies will retry with a backoff until +// node updates are frequent. If the worker pool's queue is full, EvaluateDependants will retry with a backoff until // it succeeds or until the ctx is cancelled. -func (l *Loader) EvaluateDependencies(ctx context.Context, updatedNodes []*ComponentNode) { +func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*ComponentNode) { if len(updatedNodes) == 0 { return } @@ -578,7 +578,7 @@ func (l *Loader) EvaluateDependencies(ctx context.Context, updatedNodes []*Compo // Submit all dependencies for asynchronous evaluation. // During evaluation, if a node's exports change, Flow will add it to updated nodes queue (controller.Queue) and - // the Flow controller will call EvaluateDependencies on it again. This results in a concurrent breadth-first + // the Flow controller will call EvaluateDependants on it again. This results in a concurrent breadth-first // traversal of the nodes that need to be evaluated. for n, parent := range dependenciesToParentsMap { dependantCtx, span := tracer.Start(spanCtx, "SubmitForEvaluation", trace.WithSpanKind(trace.SpanKindInternal)) diff --git a/pkg/util/testappender/internal/dtobuilder/dtobuilder.go b/pkg/util/testappender/internal/dtobuilder/dtobuilder.go index 75d1613cdbdf..3aa15bb9e5a7 100644 --- a/pkg/util/testappender/internal/dtobuilder/dtobuilder.go +++ b/pkg/util/testappender/internal/dtobuilder/dtobuilder.go @@ -4,11 +4,13 @@ import ( "math" "sort" "strconv" + "strings" "time" dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" "github.com/prometheus/prometheus/model/textparse" @@ -33,18 +35,25 @@ type SeriesExemplar struct { Exemplar exemplar.Exemplar } +type SeriesHistogram struct { + Labels labels.Labels + Histogram histogram.Histogram +} + // Build converts a series of written samples, exemplars, and metadata into a // slice of *dto.MetricFamily. func Build( samples map[string]Sample, exemplars map[string]SeriesExemplar, + histograms map[string]SeriesHistogram, metadata map[string]metadata.Metadata, ) []*dto.MetricFamily { b := builder{ - Samples: samples, - Exemplars: exemplars, - Metadata: metadata, + Samples: samples, + Exemplars: exemplars, + Metadata: metadata, + Histograms: histograms, familyLookup: make(map[string]*dto.MetricFamily), } @@ -52,9 +61,10 @@ func Build( } type builder struct { - Samples map[string]Sample - Exemplars map[string]SeriesExemplar - Metadata map[string]metadata.Metadata + Samples map[string]Sample + Exemplars map[string]SeriesExemplar + Metadata map[string]metadata.Metadata + Histograms map[string]SeriesHistogram families []*dto.MetricFamily familyLookup map[string]*dto.MetricFamily @@ -86,9 +96,11 @@ func (b *builder) Build() []*dto.MetricFamily { // 1. Populate the families from metadata so we know what fields in // *dto.Metric to set. // 2. Populate *dto.Metric values from provided samples. - // 3. Assign exemplars to *dto.Metrics as appropriate. + // 3. Build Histograms (used for Native Histograms). + // 4. Assign exemplars to *dto.Metrics as appropriate. b.buildFamiliesFromMetadata() b.buildMetricsFromSamples() + b.buildHistograms() b.injectExemplars() // Sort all the data before returning. @@ -124,6 +136,12 @@ func (b *builder) buildFamiliesFromMetadata() { b.familyLookup[familyName+"_sum"] = mf b.familyLookup[familyName+"_count"] = mf case dto.MetricType_HISTOGRAM: + // Metadata types do not differentiate between histogram and exponential histogram yet. + // This is a temporary hacky way which allow us to test exponential histogram by having exponential in the name. + if strings.Contains(familyName, "exponential") { + b.familyLookup[familyName] = mf + break + } // Histograms include metrics for _bucket, _sum, and _count suffixes. b.familyLookup[familyName+"_bucket"] = mf b.familyLookup[familyName+"_sum"] = mf @@ -153,6 +171,49 @@ func textParseToMetricType(tp textparse.MetricType) dto.MetricType { } } +func (b *builder) buildHistograms() { + for _, histogram := range b.Histograms { + metricName := histogram.Labels.Get(model.MetricNameLabel) + mf := b.getOrCreateMetricFamily(metricName) + + m := getOrCreateMetric(mf, histogram.Labels) + sum := histogram.Histogram.Sum + count := histogram.Histogram.Count + schema := histogram.Histogram.Schema + zeroThreshold := histogram.Histogram.ZeroThreshold + zeroCount := histogram.Histogram.ZeroCount + + m.Histogram = &dto.Histogram{ + PositiveSpan: convertSpans(histogram.Histogram.PositiveSpans), + NegativeSpan: convertSpans(histogram.Histogram.NegativeSpans), + PositiveDelta: histogram.Histogram.PositiveBuckets, + NegativeDelta: histogram.Histogram.NegativeBuckets, + SampleSum: &sum, + SampleCount: &count, + Schema: &schema, + ZeroThreshold: &zeroThreshold, + ZeroCount: &zeroCount, + } + } +} + +func convertSpans(spans []histogram.Span) []*dto.BucketSpan { + bucketSpan := make([]*dto.BucketSpan, len(spans)) + for i, span := range spans { + bucketSpan[i] = convertSpan(span) + } + return bucketSpan +} + +func convertSpan(span histogram.Span) *dto.BucketSpan { + offset := span.Offset + length := span.Length + return &dto.BucketSpan{ + Offset: &offset, + Length: &length, + } +} + // buildMetricsFromSamples populates *dto.Metrics. If the MetricFamily doesn't // exist for a given sample, a new one is created. func (b *builder) buildMetricsFromSamples() { @@ -368,6 +429,12 @@ func (b *builder) injectExemplars() { continue } bucket.Exemplar = convertExemplar(dto.MetricType_HISTOGRAM, e.Exemplar) + // Exemplars support for native histograms is not yet available: https://github.com/prometheus/client_golang/issues/1126 + // We need to add the exemplars in the classic histogram buckets + case exemplarName == mf.GetName(): + m.Histogram.Bucket = append(m.Histogram.Bucket, &dto.Bucket{ + Exemplar: convertExemplar(dto.MetricType_HISTOGRAM, e.Exemplar), + }) } } } diff --git a/pkg/util/testappender/testappender.go b/pkg/util/testappender/testappender.go index 5d7c28e8cb05..6d92642096f4 100644 --- a/pkg/util/testappender/testappender.go +++ b/pkg/util/testappender/testappender.go @@ -33,9 +33,10 @@ type Appender struct { commitCalled, rollbackCalled bool - samples map[string]dtobuilder.Sample // metric labels -> sample - exemplars map[string]dtobuilder.SeriesExemplar // metric labels -> series exemplar - metadata map[string]metadata.Metadata // metric family name -> metadata + samples map[string]dtobuilder.Sample // metric labels -> sample + exemplars map[string]dtobuilder.SeriesExemplar // metric labels -> series exemplar + metadata map[string]metadata.Metadata // metric family name -> metadata + histograms map[string]dtobuilder.SeriesHistogram // metric labels - > series histogram families []*dto.MetricFamily } @@ -52,6 +53,9 @@ func (app *Appender) init() { if app.metadata == nil { app.metadata = make(map[string]metadata.Metadata) } + if app.histograms == nil { + app.histograms = make(map[string]dtobuilder.SeriesHistogram) + } } // Append adds or updates a sample for a given metric, identified by labels. l @@ -149,10 +153,25 @@ func (app *Appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m me return 0, nil } -// AppendHistogram implements storage.Appendable, but always returns an error -// as native histograms are not supported. +// AppendHistogram implements storage.Appendable func (app *Appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { - return 0, fmt.Errorf("native histograms are not supported") + if app.commitCalled || app.rollbackCalled { + return 0, fmt.Errorf("appender is closed") + } + app.init() + + l = l.WithoutEmpty() + if len(l) == 0 { + return 0, fmt.Errorf("empty labelset: %w", tsdb.ErrInvalidSample) + } + if lbl, dup := l.HasDuplicateLabelNames(); dup { + return 0, fmt.Errorf("label name %q is not unique: %w", lbl, tsdb.ErrInvalidSample) + } + app.histograms[l.String()] = dtobuilder.SeriesHistogram{ + Labels: l, + Histogram: *h, + } + return 0, nil } // Commit commits pending samples, exemplars, and metadata, converting them @@ -169,6 +188,7 @@ func (app *Appender) Commit() error { app.families = dtobuilder.Build( app.samples, app.exemplars, + app.histograms, app.metadata, ) return nil