diff --git a/CHANGELOG.md b/CHANGELOG.md index dbd86b0f8e2c..0815b97ab4cc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,138 @@ This document contains a historical list of changes between releases. Only changes that impact end-user behavior are listed; changes to documentation or internal API changes are not present. +Main (unreleased) +----------------- + +### Breaking changes + +- Remove `otelcol.exporter.jaeger` component (@hainenber) + +- In the mysqld exporter integration, some metrics are removed and others are renamed. (@marctc) + - Removed metrics: + - "mysql_last_scrape_failed" (gauge) + - "mysql_exporter_scrapes_total" (counter) + - "mysql_exporter_scrape_errors_total" (counter) + - Metric names in the `info_schema.processlist` collector have been [changed](https://github.com/prometheus/mysqld_exporter/pull/603). + - Metric names in the `info_schema.replica_host` collector have been [changed](https://github.com/prometheus/mysqld_exporter/pull/496). + - Changes related to `replication_group_member_stats collector`: + - metric "transaction_in_queue" was Counter instead of Gauge + - renamed 3 metrics starting with `mysql_perf_schema_transaction_` to start with `mysql_perf_schema_transactions_` to be consistent with column names. + - exposing only server's own stats by matching `MEMBER_ID` with `@@server_uuid` resulting "member_id" label to be dropped. + +### Other changes + +- Bump `mysqld_exporter` version to v0.15.0. (@marctc) +- Bump `github-exporter` version to 1.0.6. (@marctc) + +### Features + +- Added a new `stage.decolorize` stage to `loki.process` component which + allows to strip ANSI color codes from the log lines. (@thampiotr) + +- Added a new `stage.sampling` stage to `loki.process` component which + allows to only process a fraction of logs and drop the rest. (@thampiotr) + +- Added a new `stage.eventlogmessage` stage to `loki.process` component which + allows to extract data from Windows Event Log. (@thampiotr) + +- Update version of River: + + - River now supports raw strings, which are strings surrounded by backticks + instead of double quotes. Raw strings can span multiple lines, and do not + support any escape sequences. (@erikbaranowski) + + - River now permits using `[]` to access non-existent keys in an object. + When this is done, the access evaluates to `null`, such that `{}["foo"] + == null` is true. (@rfratto) + +- Added support for python profiling to `pyroscope.ebpf` component. (@korniltsev) + +- Windows Flow Installer: Add /CONFIG /DISABLEPROFILING and /DISABLEREPORTING flag (@jkroepke) + +- Add queueing logs remote write client for `loki.write` when WAL is enabled. (@thepalbi) + +- New Grafana Agent Flow components: + + - `otelcol.processor.filter` - filters OTLP telemetry data using OpenTelemetry + Transformation Language (OTTL). (@hainenber) + +### Enhancements + +- The `loki.write` WAL now has snappy compression enabled by default. (@thepalbi) + +- Allow converting labels to structured metadata with Loki's structured_metadata stage. (@gonzalesraul) + +- Improved performance of `pyroscope.scrape` component when working with a large number of targets. (@cyriltovena) + +- Added support for comma-separated list of fields in `source` option and a + new `separator` option in `drop` stage of `loki.process`. (@thampiotr) + +- The `loki.source.docker` component now allows connecting to Docker daemons + over HTTP(S) and setting up TLS credentials. (@tpaschalis) + +- Added an `exclude_event_message` option to `loki.source.windowsevent` in flow mode, + which excludes the human-friendly event message from Windows event logs. (@ptodev) + +- Improve detection of rolled log files in `loki.source.kubernetes` and + `loki.source.podlogs` (@slim-bean). + +- Support clustering in `loki.source.kubernetes` (@slim-bean). + +- Support clustering in `loki.source.podlogs` (@rfratto). + +- Make component list sortable in web UI. (@hainenber) + +- Adds new metrics (`mssql_server_total_memory_bytes`, `mssql_server_target_memory_bytes`, + and `mssql_available_commit_memory_bytes`) for `mssql` integration. + +- Grafana Agent Operator: `config-reloader` container no longer runs as root. + (@rootmout) + +- Added support for replaying not sent data for `loki.write` when WAL is enabled. (@thepalbi) + +- Added support for unicode strings in `pyroscope.ebpf` python profiles. (@korniltsev) + +- Improved resilience of graph evaluation in presence of slow components. (@thampiotr) + +### Bugfixes + +- Set exit code 1 on grafana-agentctl non-runnable command. (@fgouteroux) + +- Fixed an issue where `loki.process` validation for stage `metric.counter` was + allowing invalid combination of configuration options. (@thampiotr) + +- Fixed issue where adding a module after initial start, that failed to load then subsequently resolving the issue would cause the module to + permanently fail to load with `id already exists` error. (@mattdurham) + +- Allow the usage of encodings other than UTF8 to be used with environment variable expansion. (@mattdurham) + +- Fixed an issue where native histogram time series were being dropped silently. (@krajorama) + +- Fix validation issue with ServiceMonitors when scrape timeout is greater than interval. (@captncraig) + +- Static mode's spanmetrics processor will now prune histograms when the dimension cache is pruned. + Dimension cache was always pruned but histograms were not being pruned. This caused metric series + created by the spanmetrics processor to grow unbounded. Only static mode has this issue. Flow mode's + `otelcol.connector.spanmetrics` does not have this bug. (@nijave) + +- Prevent logging errors on normal shutdown in `loki.source.journal`. (@wildum) + +- Break on iterate journal failure in `loki.source.journal`. (@wildum) + +- Fix file descriptor leak in `loki.source.journal`. (@wildum) + +- Fixed a bug in River where passing a non-string key to an object (such as + `{}[true]`) would incorrectly report that a number type was expected instead. (@rfratto) + +- Include Faro Measurement `type` field in `faro.receiver` Flow component and legacy `app_agent_receiver` integration. (@rlankfo) + +- Mark `password` argument of `loki.source.kafka` as a `secret` rather than a `string`. (@harsiddhdave44) + +- Fixed a bug where UDP syslog messages were never processed (@joshuapare) + +- Updating configuration for `loki.write` no longer drops data. (@thepalbi) + v0.37.4 (2023-11-06) ----------------- diff --git a/pkg/flow/flow_updates_test.go b/pkg/flow/flow_updates_test.go index 90960f6eef8e..c2349928f06a 100644 --- a/pkg/flow/flow_updates_test.go +++ b/pkg/flow/flow_updates_test.go @@ -114,7 +114,7 @@ func TestController_Updates_WithQueueFull(t *testing.T) { ModuleRegistry: newModuleRegistry(), IsModule: false, // The small number of workers and small queue means that a lot of updates will need to be retried. - WorkerPool: worker.NewShardedWorkerPool(1, 1), + WorkerPool: worker.NewFixedWorkerPool(1, 1), }) // Use testUpdatesFile from graph_builder_test.go. @@ -376,6 +376,6 @@ func newTestController(t *testing.T) *Flow { ModuleRegistry: newModuleRegistry(), IsModule: false, // Make sure that we have consistent number of workers for tests to make them deterministic. - WorkerPool: worker.NewShardedWorkerPool(4, 100), + WorkerPool: worker.NewFixedWorkerPool(4, 100), }) } diff --git a/pkg/flow/internal/worker/worker_pool.go b/pkg/flow/internal/worker/worker_pool.go index b5ff904b2024..25a8eeebe536 100644 --- a/pkg/flow/internal/worker/worker_pool.go +++ b/pkg/flow/internal/worker/worker_pool.go @@ -2,8 +2,6 @@ package worker import ( "fmt" - "hash/fnv" - "math/rand" "runtime" "sync" ) @@ -12,102 +10,72 @@ type Pool interface { // Stop stops the worker pool. It does not wait to drain any internal queues, but it does wait for the currently // running tasks to complete. It must only be called once. Stop() - // Submit submits a function to be executed by the worker pool on a random worker. Error is returned if the pool - // is unable to accept extra work. - Submit(func()) error - // SubmitWithKey submits a function to be executed by the worker pool, ensuring that only one - // job with given key can be queued at the time. Adding a job with a key that is already queued is a no-op (even if - // the submitted function is different). Error is returned if the pool is unable to accept extra work - the caller - // can decide how to handle this situation. + // SubmitWithKey submits a function to be executed by the worker pool, ensuring that: + // * Only one job with given key can be waiting to be executed at the time. This is desired if we don't want to + // run the same task multiple times, e.g. if it's a component update that we only need to run once. + // * Only one job with given key can be running at the time. This is desired when we don't want to duplicate work, + // and we want to protect the pool from a slow task hogging all the workers. + // + // Note that it is possible to have two tasks with the same key in the pool at the same time: one waiting to be + // executed and another one running. This ensures that a request to re-run a task with the same key is not lost. + // + // Adding a job with a key that is already queued is a no-op (even if the submitted function is different). + // Error is returned if the pool is unable to accept extra work - the caller can decide how to handle this situation. SubmitWithKey(string, func()) error - // QueueSize returns the number of tasks currently queued. + // QueueSize returns the number of tasks currently queued or running. QueueSize() int } -// shardedWorkerPool is a Pool that distributes work across a fixed number of workers, using a hash of the key to -// determine which worker to use. This, to an extent, defends the pool from a slow task hogging all the workers. -type shardedWorkerPool struct { +// fixedWorkerPool is a Pool that distributes work across a fixed number of workers. It uses workQueue to ensure +// that SubmitWithKey guarantees are met. +type fixedWorkerPool struct { workersCount int - workQueues []chan func() + workQueue *workQueue quit chan struct{} allStopped sync.WaitGroup - - lock sync.Mutex - set map[string]struct{} } -var _ Pool = (*shardedWorkerPool)(nil) +var _ Pool = (*fixedWorkerPool)(nil) func NewDefaultWorkerPool() Pool { - return NewShardedWorkerPool(runtime.NumCPU(), 1024) + return NewFixedWorkerPool(runtime.NumCPU(), 1024) } -// NewShardedWorkerPool creates a new worker pool with the given number of workers and queue size for each worker. -// The queued tasks are sharded across the workers using a hash of the key. The pool is automatically started and -// ready to accept work. To prevent resource leak, Stop() must be called when the pool is no longer needed. -func NewShardedWorkerPool(workersCount int, queueSize int) Pool { +// NewFixedWorkerPool creates a new Pool with the given number of workers and given max queue size. +// The max queue size is the maximum number of tasks that can be queued OR running at the same time. +// The tasks can run on a random worker, but workQueue ensures only one task with given key is running at a time. +// The pool is automatically started and ready to accept work. To prevent resource leak, Stop() must be called when the +// pool is no longer needed. +func NewFixedWorkerPool(workersCount int, maxQueueSize int) Pool { if workersCount <= 0 { panic(fmt.Sprintf("workersCount must be positive, got %d", workersCount)) } - queues := make([]chan func(), workersCount) - for i := 0; i < workersCount; i++ { - queues[i] = make(chan func(), queueSize) - } - pool := &shardedWorkerPool{ + pool := &fixedWorkerPool{ workersCount: workersCount, - workQueues: queues, + workQueue: newWorkQueue(maxQueueSize), quit: make(chan struct{}), - set: make(map[string]struct{}), } pool.start() return pool } -func (w *shardedWorkerPool) Submit(f func()) error { - return w.SubmitWithKey(fmt.Sprintf("%d", rand.Int()), f) -} - -func (w *shardedWorkerPool) SubmitWithKey(key string, f func()) error { - wrapped := func() { - // NOTE: we intentionally remove from the queue before executing the function. This means that while a task is - // executing, another task with the same key can be added to the queue, potentially even by the task itself. - w.lock.Lock() - delete(w.set, key) - w.lock.Unlock() - - f() - } - queue := w.workQueues[w.workerFor(key)] - - w.lock.Lock() - defer w.lock.Unlock() - if _, exists := w.set[key]; exists { - return nil // only queue one job for given key - } - - select { - case queue <- wrapped: - w.set[key] = struct{}{} - return nil - default: - return fmt.Errorf("worker queue is full") - } +func (w *fixedWorkerPool) SubmitWithKey(key string, f func()) error { + _, err := w.workQueue.tryEnqueue(key, f) + return err } -func (w *shardedWorkerPool) QueueSize() int { - w.lock.Lock() - defer w.lock.Unlock() - return len(w.set) +// QueueSize returns the number of tasks in the queue - waiting or currently running. +func (w *fixedWorkerPool) QueueSize() int { + return w.workQueue.queueSize() } -func (w *shardedWorkerPool) Stop() { +func (w *fixedWorkerPool) Stop() { close(w.quit) w.allStopped.Wait() } -func (w *shardedWorkerPool) start() { +func (w *fixedWorkerPool) start() { for i := 0; i < w.workersCount; i++ { - queue := w.workQueues[i] w.allStopped.Add(1) go func() { defer w.allStopped.Done() @@ -115,7 +83,7 @@ func (w *shardedWorkerPool) start() { select { case <-w.quit: return - case f := <-queue: + case f := <-w.workQueue.tasksToRun: f() } } @@ -123,8 +91,99 @@ func (w *shardedWorkerPool) start() { } } -func (w *shardedWorkerPool) workerFor(s string) int { - h := fnv.New32a() - _, _ = h.Write([]byte(s)) - return int(h.Sum32()) % w.workersCount +type workQueue struct { + maxSize int + tasksToRun chan func() + + lock sync.Mutex + waitingOrder []string + waiting map[string]func() + running map[string]struct{} +} + +func newWorkQueue(maxSize int) *workQueue { + return &workQueue{ + maxSize: maxSize, + tasksToRun: make(chan func(), maxSize), + waiting: make(map[string]func()), + running: make(map[string]struct{}), + } +} + +func (w *workQueue) tryEnqueue(key string, f func()) (bool, error) { + w.lock.Lock() + defer w.lock.Unlock() + + // Don't enqueue if same task already waiting + if _, exists := w.waiting[key]; exists { + return false, nil + } + + // Don't exceed queue size + queueSize := len(w.waitingOrder) + len(w.running) + if queueSize >= w.maxSize { + return false, fmt.Errorf("worker queue is full") + } + + // Else enqueue + w.waitingOrder = append(w.waitingOrder, key) + w.waiting[key] = f + + // A task may have become runnable now, emit it + w.emitNextTask() + + return true, nil +} + +func (w *workQueue) taskDone(key string) { + w.lock.Lock() + defer w.lock.Unlock() + delete(w.running, key) + // A task may have become runnable now, emit it + w.emitNextTask() +} + +// emitNextTask emits the next eligible task to be run if there is one. It must be called whenever the queue state +// changes (e.g. a task is added or a task finishes). The lock must be held when calling this function. +func (w *workQueue) emitNextTask() { + var ( + task func() + key string + index int + found = false + ) + + // Find the first key in waitingOrder that is not yet running + for i, k := range w.waitingOrder { + if _, alreadyRunning := w.running[k]; !alreadyRunning { + found, key, index = true, k, i + break + } + } + + // Return if we didn't find any task ready to run + if !found { + return + } + + // Remove the task from waiting and add it to running set + w.waitingOrder = append(w.waitingOrder[:index], w.waitingOrder[index+1:]...) + task = w.waiting[key] + delete(w.waiting, key) + w.running[key] = struct{}{} + + // Wrap the actual task to make sure we mark it as done when it finishes + wrapped := func() { + defer w.taskDone(key) + task() + } + + // Emit the task to be run. There will always be space in this buffered channel, because we limit queue size. + w.tasksToRun <- wrapped +} + +func (w *workQueue) queueSize() int { + w.lock.Lock() + defer w.lock.Unlock() + return len(w.waitingOrder) + len(w.running) } diff --git a/pkg/flow/internal/worker/worker_pool_test.go b/pkg/flow/internal/worker/worker_pool_test.go index 27b5c24b7ebc..3a594cba2622 100644 --- a/pkg/flow/internal/worker/worker_pool_test.go +++ b/pkg/flow/internal/worker/worker_pool_test.go @@ -1,6 +1,7 @@ package worker import ( + "fmt" "testing" "time" @@ -13,7 +14,7 @@ func TestWorkerPool(t *testing.T) { t.Run("worker pool", func(t *testing.T) { t.Run("should start and stop cleanly", func(t *testing.T) { defer goleak.VerifyNone(t) - pool := NewShardedWorkerPool(4, 1) + pool := NewFixedWorkerPool(4, 1) require.Equal(t, 0, pool.QueueSize()) defer pool.Stop() }) @@ -22,28 +23,28 @@ func TestWorkerPool(t *testing.T) { defer goleak.VerifyNone(t) require.Panics(t, func() { - NewShardedWorkerPool(0, 0) + NewFixedWorkerPool(0, 0) }) require.Panics(t, func() { - NewShardedWorkerPool(-1, 0) + NewFixedWorkerPool(-1, 0) }) }) t.Run("should reject invalid buffer size", func(t *testing.T) { defer goleak.VerifyNone(t) require.Panics(t, func() { - NewShardedWorkerPool(1, -1) + NewFixedWorkerPool(1, -1) }) }) t.Run("should process a single task", func(t *testing.T) { defer goleak.VerifyNone(t) done := make(chan struct{}) - pool := NewShardedWorkerPool(4, 1) + pool := NewFixedWorkerPool(4, 1) defer pool.Stop() - err := pool.Submit(func() { + err := pool.SubmitWithKey("123", func() { done <- struct{}{} }) require.NoError(t, err) @@ -59,7 +60,7 @@ func TestWorkerPool(t *testing.T) { t.Run("should process a single task with key", func(t *testing.T) { defer goleak.VerifyNone(t) done := make(chan struct{}) - pool := NewShardedWorkerPool(4, 1) + pool := NewFixedWorkerPool(4, 1) defer pool.Stop() err := pool.SubmitWithKey("testKey", func() { @@ -77,7 +78,7 @@ func TestWorkerPool(t *testing.T) { t.Run("should not queue duplicated keys", func(t *testing.T) { defer goleak.VerifyNone(t) - pool := NewShardedWorkerPool(4, 10) + pool := NewFixedWorkerPool(4, 10) defer pool.Stop() tasksDone := atomic.Int32{} @@ -90,17 +91,18 @@ func TestWorkerPool(t *testing.T) { tasksDone.Inc() }) require.NoError(t, err) + defer func() { close(blockFirstTask) }() // Wait for the first task to be running already and blocking the worker <-firstTaskRunning - require.Equal(t, 0, pool.QueueSize()) + require.Equal(t, 1, pool.QueueSize()) // Second task will be queued err = pool.SubmitWithKey("k1", func() { tasksDone.Inc() }) require.NoError(t, err) - require.Equal(t, 1, pool.QueueSize()) + require.Equal(t, 2, pool.QueueSize()) // Third task will be skipped, as we already have k1 in the queue err = pool.SubmitWithKey("k1", func() { @@ -115,17 +117,17 @@ func TestWorkerPool(t *testing.T) { blockFirstTask <- struct{}{} require.Eventually(t, func() bool { return tasksDone.Load() == 2 - }, 3*time.Second, 5*time.Millisecond) + }, 3*time.Second, 1*time.Millisecond) require.Equal(t, 0, pool.QueueSize()) // No more tasks should be done, verify again with some delay - time.Sleep(100 * time.Millisecond) + time.Sleep(10 * time.Millisecond) require.Equal(t, int32(2), tasksDone.Load()) }) t.Run("should concurrently process for different keys", func(t *testing.T) { defer goleak.VerifyNone(t) - pool := NewShardedWorkerPool(4, 10) + pool := NewFixedWorkerPool(4, 10) defer pool.Stop() tasksDone := atomic.Int32{} @@ -138,6 +140,7 @@ func TestWorkerPool(t *testing.T) { tasksDone.Inc() }) require.NoError(t, err) + defer func() { close(blockFirstTask) }() // Wait for the first task to be running already and blocking the worker <-firstTaskRunning @@ -164,7 +167,7 @@ func TestWorkerPool(t *testing.T) { t.Run("should reject when queue is full", func(t *testing.T) { defer goleak.VerifyNone(t) // Pool with one worker and queue size of 1 - all work goes to one queue - pool := NewShardedWorkerPool(1, 1) + pool := NewFixedWorkerPool(1, 2) defer pool.Stop() tasksDone := atomic.Int32{} @@ -177,21 +180,102 @@ func TestWorkerPool(t *testing.T) { tasksDone.Inc() }) require.NoError(t, err) - defer func() { blockFirstTask <- struct{}{} }() + defer func() { close(blockFirstTask) }() // Wait for the first task to be running already and blocking the worker <-firstTaskRunning - require.Equal(t, 0, pool.QueueSize()) + require.Equal(t, 1, pool.QueueSize()) // Second task will be queued err = pool.SubmitWithKey("k2", func() { tasksDone.Inc() }) require.NoError(t, err) - require.Equal(t, 1, pool.QueueSize()) + require.Equal(t, 2, pool.QueueSize()) // Third task cannot be accepted, because the queue is full err = pool.SubmitWithKey("k3", func() { tasksDone.Inc() }) require.ErrorContains(t, err, "queue is full") + require.Equal(t, 2, pool.QueueSize()) + }) + + t.Run("should not block when one task is stuck", func(t *testing.T) { + defer goleak.VerifyNone(t) + tasksCount := 1000 + + // Queue size is sufficient to queue all tasks + pool := NewFixedWorkerPool(3, tasksCount+1) + defer pool.Stop() + tasksDone := atomic.Int32{} + + // First task will block + blockFirstTask := make(chan struct{}) + firstTaskRunning := make(chan struct{}) + err := pool.SubmitWithKey("k-blocking", func() { + firstTaskRunning <- struct{}{} + <-blockFirstTask + tasksDone.Inc() + }) + require.NoError(t, err) + defer func() { close(blockFirstTask) }() + + // Wait for the first task to be running already and blocking the worker + <-firstTaskRunning + require.Equal(t, 1, pool.QueueSize()) + + // Submit a lot of tasks with random keys - no task should be blocked by the above one. + for i := 0; i < tasksCount; i++ { + err = pool.SubmitWithKey(fmt.Sprintf("t%d", i), func() { tasksDone.Inc() }) + require.NoError(t, err) + } + + // Ensure all tasks are done + require.Eventually(t, func() bool { + return tasksDone.Load() == int32(tasksCount) + }, 3*time.Second, 1*time.Millisecond) + }) + + t.Run("should NOT run concurrently tasks with the same key", func(t *testing.T) { + defer goleak.VerifyNone(t) + tasksCount := 1000 + + // Queue size is sufficient to queue all tasks + pool := NewFixedWorkerPool(10, 10) + defer pool.Stop() + tasksDone := atomic.Int32{} + + // First task will block + blockFirstTask := make(chan struct{}) + firstTaskRunning := make(chan struct{}) + err := pool.SubmitWithKey("k1", func() { + firstTaskRunning <- struct{}{} + <-blockFirstTask + tasksDone.Inc() + }) + require.NoError(t, err) + defer func() { close(blockFirstTask) }() + + // Wait for the first task to be running already and blocking the worker + <-firstTaskRunning require.Equal(t, 1, pool.QueueSize()) + + // Enqueue one more task with the same key - it should be allowed + err = pool.SubmitWithKey("k1", func() { tasksDone.Inc() }) + require.NoError(t, err) + + // Submit a lot of tasks with same key - all should be a no-op, since this task is already in queue + for i := 0; i < tasksCount; i++ { + err = pool.SubmitWithKey("k1", func() { tasksDone.Inc() }) + require.NoError(t, err) + } + + require.Equal(t, int32(0), tasksDone.Load()) + + // Unblock the first task + blockFirstTask <- struct{}{} + + // The first task and the second one should be the only ones that complete + require.Eventually(t, func() bool { + return tasksDone.Load() == 2 + }, 3*time.Second, 1*time.Millisecond) }) }) } diff --git a/pkg/flow/module_test.go b/pkg/flow/module_test.go index d877bc4ba51c..2cc3eae1ce5b 100644 --- a/pkg/flow/module_test.go +++ b/pkg/flow/module_test.go @@ -236,7 +236,7 @@ func testModuleControllerOptions(t *testing.T) *moduleControllerOptions { DataPath: t.TempDir(), Reg: prometheus.NewRegistry(), ModuleRegistry: newModuleRegistry(), - WorkerPool: worker.NewShardedWorkerPool(1, 100), + WorkerPool: worker.NewFixedWorkerPool(1, 100), } }