From 4e04d07168a8c5cb7086ced8486c6d584faa1045 Mon Sep 17 00:00:00 2001 From: Paul Rogers <129207811+paul1r@users.noreply.github.com> Date: Fri, 19 Apr 2024 03:38:48 -0400 Subject: [PATCH] fix: promtail race fixes (#12656) --- .../promtail/client/client_writeto_test.go | 7 ++ clients/pkg/promtail/client/manager_test.go | 24 ++++++ clients/pkg/promtail/promtail_wal_test.go | 26 ++++++- .../promtail/targets/cloudflare/util_test.go | 6 ++ .../pkg/promtail/targets/file/filetarget.go | 77 ++++++++++++++++--- .../promtail/targets/file/filetarget_test.go | 55 +++++++------ .../promtail/targets/kafka/consumer_test.go | 4 +- .../targets/kafka/target_syncer_test.go | 14 +++- .../pkg/promtail/targets/kafka/target_test.go | 3 + .../pkg/promtail/targets/kafka/topics_test.go | 4 + clients/pkg/promtail/utils/entries_test.go | 14 +++- clients/pkg/promtail/wal/watcher_test.go | 30 ++++++++ clients/pkg/promtail/wal/writer_test.go | 11 +++ 13 files changed, 234 insertions(+), 41 deletions(-) diff --git a/clients/pkg/promtail/client/client_writeto_test.go b/clients/pkg/promtail/client/client_writeto_test.go index 3693b677f2cc..4044d1641fb1 100644 --- a/clients/pkg/promtail/client/client_writeto_test.go +++ b/clients/pkg/promtail/client/client_writeto_test.go @@ -29,11 +29,14 @@ func TestClientWriter_LogEntriesAreReconstructedAndForwardedCorrectly(t *testing ch := make(chan api.Entry) defer close(ch) + var mu sync.Mutex var receivedEntries []api.Entry go func() { for e := range ch { + mu.Lock() receivedEntries = append(receivedEntries, e) + mu.Unlock() } }() @@ -72,12 +75,16 @@ func TestClientWriter_LogEntriesAreReconstructedAndForwardedCorrectly(t *testing } require.Eventually(t, func() bool { + mu.Lock() + defer mu.Unlock() return len(receivedEntries) == len(lines) }, time.Second*10, time.Second) + mu.Lock() for _, receivedEntry := range receivedEntries { require.Contains(t, lines, receivedEntry.Line, "entry line was not expected") require.Equal(t, model.LabelValue("test"), receivedEntry.Labels["app"]) } + mu.Unlock() } func TestClientWriter_LogEntriesWithoutMatchingSeriesAreIgnored(t *testing.T) { diff --git a/clients/pkg/promtail/client/manager_test.go b/clients/pkg/promtail/client/manager_test.go index f11821c82120..2105e6a90e3d 100644 --- a/clients/pkg/promtail/client/manager_test.go +++ b/clients/pkg/promtail/client/manager_test.go @@ -6,6 +6,7 @@ import ( "net/http" "net/url" "os" + "sync" "testing" "time" @@ -127,10 +128,13 @@ func TestManager_WALEnabled(t *testing.T) { require.NoError(t, err) require.Equal(t, "wal:test-client", manager.Name()) + var mu sync.Mutex receivedRequests := []utils.RemoteWriteRequest{} go func() { for req := range rwReceivedReqs { + mu.Lock() receivedRequests = append(receivedRequests, req) + mu.Unlock() } }() @@ -155,17 +159,21 @@ func TestManager_WALEnabled(t *testing.T) { } require.Eventually(t, func() bool { + mu.Lock() + defer mu.Unlock() return len(receivedRequests) == totalLines }, 5*time.Second, time.Second, "timed out waiting for requests to be received") var seenEntries = map[string]struct{}{} // assert over rw client received entries + mu.Lock() for _, req := range receivedRequests { require.Len(t, req.Request.Streams, 1, "expected 1 stream requests to be received") require.Len(t, req.Request.Streams[0].Entries, 1, "expected 1 entry in the only stream received per request") require.Equal(t, `{wal_enabled="true"}`, req.Request.Streams[0].Labels) seenEntries[req.Request.Streams[0].Entries[0].Line] = struct{}{} } + mu.Unlock() require.Len(t, seenEntries, totalLines) } @@ -182,10 +190,13 @@ func TestManager_WALDisabled(t *testing.T) { require.NoError(t, err) require.Equal(t, "multi:test-client", manager.Name()) + var mu sync.Mutex receivedRequests := []utils.RemoteWriteRequest{} go func() { for req := range rwReceivedReqs { + mu.Lock() receivedRequests = append(receivedRequests, req) + mu.Unlock() } }() @@ -209,17 +220,21 @@ func TestManager_WALDisabled(t *testing.T) { } require.Eventually(t, func() bool { + mu.Lock() + defer mu.Unlock() return len(receivedRequests) == totalLines }, 5*time.Second, time.Second, "timed out waiting for requests to be received") var seenEntries = map[string]struct{}{} // assert over rw client received entries + mu.Lock() for _, req := range receivedRequests { require.Len(t, req.Request.Streams, 1, "expected 1 stream requests to be received") require.Len(t, req.Request.Streams[0].Entries, 1, "expected 1 entry in the only stream received per request") require.Equal(t, `{pizza-flavour="fugazzeta"}`, req.Request.Streams[0].Labels) seenEntries[req.Request.Streams[0].Entries[0].Line] = struct{}{} } + mu.Unlock() require.Len(t, seenEntries, totalLines) } @@ -250,15 +265,20 @@ func TestManager_WALDisabled_MultipleConfigs(t *testing.T) { require.NoError(t, err) require.Equal(t, "multi:test-client,test-client-2", manager.Name()) + var mu sync.Mutex receivedRequests := []utils.RemoteWriteRequest{} ctx, cancel := context.WithCancel(context.Background()) go func(ctx context.Context) { for { select { case req := <-rwReceivedReqs: + mu.Lock() receivedRequests = append(receivedRequests, req) + mu.Unlock() case req := <-rwReceivedReqs2: + mu.Lock() receivedRequests = append(receivedRequests, req) + mu.Unlock() case <-ctx.Done(): return } @@ -289,16 +309,20 @@ func TestManager_WALDisabled_MultipleConfigs(t *testing.T) { // times 2 due to clients being run expectedTotalLines := totalLines * 2 require.Eventually(t, func() bool { + mu.Lock() + defer mu.Unlock() return len(receivedRequests) == expectedTotalLines }, 5*time.Second, time.Second, "timed out waiting for requests to be received") var seenEntries = map[string]struct{}{} // assert over rw client received entries + mu.Lock() for _, req := range receivedRequests { require.Len(t, req.Request.Streams, 1, "expected 1 stream requests to be received") require.Len(t, req.Request.Streams[0].Entries, 1, "expected 1 entry in the only stream received per request") seenEntries[fmt.Sprintf("%s-%s", req.Request.Streams[0].Labels, req.Request.Streams[0].Entries[0].Line)] = struct{}{} } + mu.Unlock() require.Len(t, seenEntries, expectedTotalLines) } diff --git a/clients/pkg/promtail/promtail_wal_test.go b/clients/pkg/promtail/promtail_wal_test.go index dfc7ce727345..b4027ed2d909 100644 --- a/clients/pkg/promtail/promtail_wal_test.go +++ b/clients/pkg/promtail/promtail_wal_test.go @@ -59,19 +59,25 @@ func TestPromtailWithWAL_SingleTenant(t *testing.T) { // create receive channel and start a collect routine receivedCh := make(chan utils.RemoteWriteRequest) received := map[string][]push.Entry{} + var mu sync.Mutex + // Create a channel for log messages + logCh := make(chan string, 100) // Buffered channel to avoid blocking + wg.Add(1) go func() { defer wg.Done() for req := range receivedCh { + mu.Lock() // Add some observability to the requests received in the remote write endpoint var counts []string for _, str := range req.Request.Streams { counts = append(counts, fmt.Sprint(len(str.Entries))) } - t.Logf("received request: %s", counts) + logCh <- fmt.Sprintf("received request: %s", counts) for _, stream := range req.Request.Streams { received[stream.Labels] = append(received[stream.Labels], stream.Entries...) } + mu.Unlock() } }() @@ -120,14 +126,23 @@ func TestPromtailWithWAL_SingleTenant(t *testing.T) { for i := 0; i < entriesToWrite; i++ { _, err = logsFile.WriteString(fmt.Sprintf("log line # %d\n", i)) if err != nil { - t.Logf("error writing to log file. Err: %s", err.Error()) + logCh <- fmt.Sprintf("error writing to log file. Err: %s", err.Error()) } // not overkill log file time.Sleep(1 * time.Millisecond) } }() + // Goroutine to handle log messages + go func() { + for msg := range logCh { + t.Log(msg) + } + }() + require.Eventually(t, func() bool { + mu.Lock() + defer mu.Unlock() if seen, ok := received[expectedLabelSet]; ok { return len(seen) == entriesToWrite } @@ -158,11 +173,13 @@ func TestPromtailWithWAL_MultipleTenants(t *testing.T) { receivedCh := make(chan utils.RemoteWriteRequest) // received is a mapping from tenant, string-formatted label set to received entries received := map[string]map[string][]push.Entry{} + var mu sync.Mutex var totalReceived = 0 wg.Add(1) go func() { defer wg.Done() for req := range receivedCh { + mu.Lock() // start received label entries map if first time tenant is seen if _, ok := received[req.TenantID]; !ok { received[req.TenantID] = map[string][]push.Entry{} @@ -173,6 +190,7 @@ func TestPromtailWithWAL_MultipleTenants(t *testing.T) { // increment total count totalReceived += len(stream.Entries) } + mu.Unlock() } }() @@ -250,15 +268,19 @@ func TestPromtailWithWAL_MultipleTenants(t *testing.T) { // wait for all entries to be remote written require.Eventually(t, func() bool { + mu.Lock() + defer mu.Unlock() return totalReceived == entriesToWrite }, time.Second*20, time.Second, "timed out waiting for entries to be remote written") // assert over received entries require.Len(t, received, expectedTenantCounts, "not expected tenant count") + mu.Lock() for tenantID := 0; tenantID < expectedTenantCounts; tenantID++ { // we should've received at least entriesToWrite / expectedTenantCounts require.GreaterOrEqual(t, len(received[fmt.Sprint(tenantID)][expectedLabelSet]), entriesToWrite/expectedTenantCounts) } + mu.Unlock() pr.Shutdown() close(receivedCh) diff --git a/clients/pkg/promtail/targets/cloudflare/util_test.go b/clients/pkg/promtail/targets/cloudflare/util_test.go index 18efefee5cb5..a702bb90f5dd 100644 --- a/clients/pkg/promtail/targets/cloudflare/util_test.go +++ b/clients/pkg/promtail/targets/cloudflare/util_test.go @@ -3,6 +3,7 @@ package cloudflare import ( "context" "errors" + "sync" "time" "github.com/grafana/cloudflare-go" @@ -13,10 +14,13 @@ var ErrorLogpullReceived = errors.New("error logpull received") type fakeCloudflareClient struct { mock.Mock + mu sync.Mutex } func (f *fakeCloudflareClient) CallCount() int { var actualCalls int + f.mu.Lock() + defer f.mu.Unlock() for _, call := range f.Calls { if call.Method == "LogpullReceived" { actualCalls++ @@ -59,7 +63,9 @@ func newFakeCloudflareClient() *fakeCloudflareClient { } func (f *fakeCloudflareClient) LogpullReceived(ctx context.Context, start, end time.Time) (cloudflare.LogpullReceivedIterator, error) { + f.mu.Lock() r := f.Called(ctx, start, end) + f.mu.Unlock() if r.Get(0) != nil { it := r.Get(0).(cloudflare.LogpullReceivedIterator) if it.Err() == ErrorLogpullReceived { diff --git a/clients/pkg/promtail/targets/file/filetarget.go b/clients/pkg/promtail/targets/file/filetarget.go index 97dc10f14829..0ade51902b49 100644 --- a/clients/pkg/promtail/targets/file/filetarget.go +++ b/clients/pkg/promtail/targets/file/filetarget.go @@ -4,6 +4,7 @@ import ( "flag" "os" "path/filepath" + "sync" "time" "github.com/bmatcuk/doublestar" @@ -92,12 +93,14 @@ type FileTarget struct { fileEventWatcher chan fsnotify.Event targetEventHandler chan fileTargetEvent watches map[string]struct{} + watchesMutex sync.Mutex path string pathExclude string quit chan struct{} done chan struct{} - readers map[string]Reader + readers map[string]Reader + readersMutex sync.Mutex targetConfig *Config watchConfig WatchConfig @@ -150,7 +153,7 @@ func NewFileTarget( // Ready if at least one file is being tailed func (t *FileTarget) Ready() bool { - return len(t.readers) > 0 + return t.getReadersLen() > 0 } // Stop the target. @@ -178,17 +181,21 @@ func (t *FileTarget) Labels() model.LabelSet { // Details implements a Target func (t *FileTarget) Details() interface{} { files := map[string]int64{} + t.readersMutex.Lock() for fileName := range t.readers { files[fileName], _ = t.positions.Get(fileName) } + t.readersMutex.Unlock() return files } func (t *FileTarget) run() { defer func() { + t.readersMutex.Lock() for _, v := range t.readers { v.Stop() } + t.readersMutex.Unlock() level.Info(t.logger).Log("msg", "filetarget: watcher closed, tailer stopped, positions saved", "path", t.path) close(t.done) }() @@ -281,15 +288,22 @@ func (t *FileTarget) sync() error { } // Add any directories which are not already being watched. + t.watchesMutex.Lock() toStartWatching := missing(t.watches, dirs) + t.watchesMutex.Unlock() t.startWatching(toStartWatching) // Remove any directories which no longer need watching. + t.watchesMutex.Lock() toStopWatching := missing(dirs, t.watches) + t.watchesMutex.Unlock() + t.stopWatching(toStopWatching) // fsnotify.Watcher doesn't allow us to see what is currently being watched so we have to track it ourselves. + t.watchesMutex.Lock() t.watches = dirs + t.watchesMutex.Unlock() // Check if any running tailers have stopped because of errors and remove them from the running list // (They will be restarted in startTailing) @@ -299,7 +313,9 @@ func (t *FileTarget) sync() error { t.startTailing(matches) // Stop tailing any files which no longer exist + t.readersMutex.Lock() toStopTailing := toStopTailing(matches, t.readers) + t.readersMutex.Unlock() t.stopTailingAndRemovePosition(toStopTailing) return nil @@ -307,9 +323,10 @@ func (t *FileTarget) sync() error { func (t *FileTarget) startWatching(dirs map[string]struct{}) { for dir := range dirs { - if _, ok := t.watches[dir]; ok { + if _, ok := t.getWatch(dir); ok { continue } + level.Info(t.logger).Log("msg", "watching new directory", "directory", dir) t.targetEventHandler <- fileTargetEvent{ path: dir, @@ -320,9 +337,10 @@ func (t *FileTarget) startWatching(dirs map[string]struct{}) { func (t *FileTarget) stopWatching(dirs map[string]struct{}) { for dir := range dirs { - if _, ok := t.watches[dir]; !ok { + if _, ok := t.getWatch(dir); !ok { continue } + level.Info(t.logger).Log("msg", "removing directory from watcher", "directory", dir) t.targetEventHandler <- fileTargetEvent{ path: dir, @@ -333,7 +351,7 @@ func (t *FileTarget) stopWatching(dirs map[string]struct{}) { func (t *FileTarget) startTailing(ps []string) { for _, p := range ps { - if _, ok := t.readers[p]; ok { + if _, ok := t.getReader(p); ok { continue } @@ -387,7 +405,7 @@ func (t *FileTarget) startTailing(ps []string) { } reader = tailer } - t.readers[p] = reader + t.setReader(p, reader) } } @@ -395,10 +413,10 @@ func (t *FileTarget) startTailing(ps []string) { // Call this when a file no longer exists and you want to remove all traces of it. func (t *FileTarget) stopTailingAndRemovePosition(ps []string) { for _, p := range ps { - if reader, ok := t.readers[p]; ok { + if reader, ok := t.getReader(p); ok { reader.Stop() t.positions.Remove(reader.Path()) - delete(t.readers, p) + t.removeReader(p) } } } @@ -406,6 +424,7 @@ func (t *FileTarget) stopTailingAndRemovePosition(ps []string) { // pruneStoppedTailers removes any tailers which have stopped running from // the list of active tailers. This allows them to be restarted if there were errors. func (t *FileTarget) pruneStoppedTailers() { + t.readersMutex.Lock() toRemove := make([]string, 0, len(t.readers)) for k, t := range t.readers { if !t.IsRunning() { @@ -415,6 +434,45 @@ func (t *FileTarget) pruneStoppedTailers() { for _, tr := range toRemove { delete(t.readers, tr) } + t.readersMutex.Unlock() +} + +func (t *FileTarget) getReadersLen() int { + t.readersMutex.Lock() + defer t.readersMutex.Unlock() + return len(t.readers) +} + +func (t *FileTarget) getReader(val string) (Reader, bool) { + t.readersMutex.Lock() + defer t.readersMutex.Unlock() + reader, ok := t.readers[val] + return reader, ok +} + +func (t *FileTarget) setReader(val string, reader Reader) { + t.readersMutex.Lock() + defer t.readersMutex.Unlock() + t.readers[val] = reader +} + +func (t *FileTarget) getWatch(val string) (struct{}, bool) { + t.watchesMutex.Lock() + defer t.watchesMutex.Unlock() + fileTarget, ok := t.watches[val] + return fileTarget, ok +} + +func (t *FileTarget) removeReader(val string) { + t.readersMutex.Lock() + defer t.readersMutex.Unlock() + delete(t.readers, val) +} + +func (t *FileTarget) getWatchesLen() int { + t.watchesMutex.Lock() + defer t.watchesMutex.Unlock() + return len(t.watches) } func toStopTailing(nt []string, et map[string]Reader) []string { @@ -442,7 +500,7 @@ func toStopTailing(nt []string, et map[string]Reader) []string { func (t *FileTarget) reportSize(ms []string) { for _, m := range ms { // Ask the tailer to update the size if a tailer exists, this keeps position and size metrics in sync - if reader, ok := t.readers[m]; ok { + if reader, ok := t.getReader(m); ok { err := reader.MarkPositionAndSize() if err != nil { level.Warn(t.logger).Log("msg", "failed to get file size from tailer, ", "file", m, "error", err) @@ -459,7 +517,6 @@ func (t *FileTarget) reportSize(ms []string) { } t.metrics.totalBytes.WithLabelValues(m).Set(float64(fi.Size())) } - } } diff --git a/clients/pkg/promtail/targets/file/filetarget_test.go b/clients/pkg/promtail/targets/file/filetarget_test.go index 57bc31b0802e..579ea19e2e56 100644 --- a/clients/pkg/promtail/targets/file/filetarget_test.go +++ b/clients/pkg/promtail/targets/file/filetarget_test.go @@ -76,10 +76,10 @@ func TestFileTargetSync(t *testing.T) { assert.NoError(t, err) // Start with nothing watched. - if len(target.watches) != 0 { + if target.getWatchesLen() != 0 { t.Fatal("Expected watches to be 0 at this point in the test...") } - if len(target.readers) != 0 { + if target.getReadersLen() != 0 { t.Fatal("Expected tails to be 0 at this point in the test...") } @@ -90,10 +90,10 @@ func TestFileTargetSync(t *testing.T) { err = target.sync() assert.NoError(t, err) - if len(target.watches) != 0 { + if target.getWatchesLen() != 0 { t.Fatal("Expected watches to be 0 at this point in the test...") } - if len(target.readers) != 0 { + if target.getReadersLen() != 0 { t.Fatal("Expected tails to be 0 at this point in the test...") } @@ -106,10 +106,10 @@ func TestFileTargetSync(t *testing.T) { err = target.sync() assert.NoError(t, err) - assert.Equal(t, 1, len(target.watches), + assert.Equal(t, 1, target.getWatchesLen(), "Expected watches to be 1 at this point in the test...", ) - assert.Equal(t, 1, len(target.readers), + assert.Equal(t, 1, target.getReadersLen(), "Expected tails to be 1 at this point in the test...", ) @@ -124,10 +124,10 @@ func TestFileTargetSync(t *testing.T) { err = target.sync() assert.NoError(t, err) - assert.Equal(t, 1, len(target.watches), + assert.Equal(t, 1, target.getWatchesLen(), "Expected watches to be 1 at this point in the test...", ) - assert.Equal(t, 2, len(target.readers), + assert.Equal(t, 2, target.getReadersLen(), "Expected tails to be 2 at this point in the test...", ) @@ -138,10 +138,10 @@ func TestFileTargetSync(t *testing.T) { err = target.sync() assert.NoError(t, err) - assert.Equal(t, 1, len(target.watches), + assert.Equal(t, 1, target.getWatchesLen(), "Expected watches to be 1 at this point in the test...", ) - assert.Equal(t, 1, len(target.readers), + assert.Equal(t, 1, target.getReadersLen(), "Expected tails to be 1 at this point in the test...", ) @@ -152,10 +152,10 @@ func TestFileTargetSync(t *testing.T) { err = target.sync() assert.NoError(t, err) - assert.Equal(t, 0, len(target.watches), + assert.Equal(t, 0, target.getWatchesLen(), "Expected watches to be 0 at this point in the test...", ) - assert.Equal(t, 0, len(target.readers), + assert.Equal(t, 0, target.getReadersLen(), "Expected tails to be 0 at this point in the test...", ) requireEventually(t, func() bool { @@ -198,7 +198,7 @@ func TestFileTarget_StopsTailersCleanly(t *testing.T) { assert.NoError(t, err) requireEventually(t, func() bool { - return len(target.readers) == 1 + return target.getReadersLen() == 1 }, "expected 1 tailer to be created") require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(` @@ -208,12 +208,19 @@ func TestFileTarget_StopsTailersCleanly(t *testing.T) { `), "promtail_files_active_total")) // Inject an error to tailer - initailTailer := target.readers[logFile].(*tailer) + + initialReader, _ := target.getReader(logFile) + initailTailer := initialReader.(*tailer) _ = initailTailer.tail.Tomb.Killf("test: network file systems can be unreliable") // Tailer will be replaced by a new one requireEventually(t, func() bool { - return len(target.readers) == 1 && target.readers[logFile].(*tailer) != initailTailer + currentReader, _ := target.getReader(logFile) + var currentTailer *tailer + if currentReader != nil { + currentTailer = currentReader.(*tailer) + } + return target.getReadersLen() == 1 && currentTailer != initailTailer }, "expected dead tailer to be replaced by a new one") // The old tailer should be stopped: @@ -389,10 +396,10 @@ func TestFileTargetPathExclusion(t *testing.T) { assert.NoError(t, err) // Start with nothing watched. - if len(target.watches) != 0 { + if target.getWatchesLen() != 0 { t.Fatal("Expected watches to be 0 at this point in the test...") } - if len(target.readers) != 0 { + if target.getReadersLen() != 0 { t.Fatal("Expected tails to be 0 at this point in the test...") } @@ -407,10 +414,10 @@ func TestFileTargetPathExclusion(t *testing.T) { err = target.sync() assert.NoError(t, err) - if len(target.watches) != 0 { + if target.getWatchesLen() != 0 { t.Fatal("Expected watches to be 0 at this point in the test...") } - if len(target.readers) != 0 { + if target.getReadersLen() != 0 { t.Fatal("Expected tails to be 0 at this point in the test...") } @@ -425,10 +432,10 @@ func TestFileTargetPathExclusion(t *testing.T) { err = target.sync() assert.NoError(t, err) - assert.Equal(t, 2, len(target.watches), + assert.Equal(t, 2, target.getWatchesLen(), "Expected watches to be 2 at this point in the test...", ) - assert.Equal(t, 3, len(target.readers), + assert.Equal(t, 3, target.getReadersLen(), "Expected tails to be 3 at this point in the test...", ) requireEventually(t, func() bool { @@ -446,10 +453,10 @@ func TestFileTargetPathExclusion(t *testing.T) { err = target.sync() assert.NoError(t, err) - assert.Equal(t, 1, len(target.watches), + assert.Equal(t, 1, target.getWatchesLen(), "Expected watches to be 1 at this point in the test...", ) - assert.Equal(t, 1, len(target.readers), + assert.Equal(t, 1, target.getReadersLen(), "Expected tails to be 1 at this point in the test...", ) requireEventually(t, func() bool { @@ -538,7 +545,7 @@ func TestHandleFileCreationEvent(t *testing.T) { Op: fsnotify.Create, } requireEventually(t, func() bool { - return len(target.readers) == 1 + return target.getReadersLen() == 1 }, "Expected tails to be 1 at this point in the test...") } diff --git a/clients/pkg/promtail/targets/kafka/consumer_test.go b/clients/pkg/promtail/targets/kafka/consumer_test.go index 7420bdf6c1f1..a4d87e7c3c71 100644 --- a/clients/pkg/promtail/targets/kafka/consumer_test.go +++ b/clients/pkg/promtail/targets/kafka/consumer_test.go @@ -3,6 +3,7 @@ package kafka import ( "context" "errors" + "sync" "testing" "time" @@ -34,7 +35,7 @@ func (f *fakeTarget) Details() interface{} { return nil } func Test_ComsumerConsume(t *testing.T) { var ( - group = &testConsumerGroupHandler{} + group = &testConsumerGroupHandler{mu: &sync.Mutex{}} session = &testSession{} ctx, cancel = context.WithCancel(context.Background()) c = &consumer{ @@ -86,6 +87,7 @@ func Test_ComsumerConsume(t *testing.T) { func Test_ComsumerRetry(_ *testing.T) { var ( group = &testConsumerGroupHandler{ + mu: &sync.Mutex{}, returnErr: errors.New("foo"), } ctx, cancel = context.WithCancel(context.Background()) diff --git a/clients/pkg/promtail/targets/kafka/target_syncer_test.go b/clients/pkg/promtail/targets/kafka/target_syncer_test.go index 1f0255cedf62..6514afeefcb0 100644 --- a/clients/pkg/promtail/targets/kafka/target_syncer_test.go +++ b/clients/pkg/promtail/targets/kafka/target_syncer_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "reflect" + "sync" "testing" "time" @@ -24,7 +25,7 @@ import ( func Test_TopicDiscovery(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - group := &testConsumerGroupHandler{} + group := &testConsumerGroupHandler{mu: &sync.Mutex{}} TopicPollInterval = time.Microsecond var closed bool client := &mockKafkaClient{ @@ -52,21 +53,28 @@ func Test_TopicDiscovery(t *testing.T) { } ts.loop() + tmpTopics := []string{} require.Eventually(t, func() bool { if !group.consuming.Load() { return false } + group.mu.Lock() + defer group.mu.Unlock() + tmpTopics = group.topics return reflect.DeepEqual([]string{"topic1"}, group.topics) - }, 200*time.Millisecond, time.Millisecond, "expected topics: %v, got: %v", []string{"topic1"}, group.topics) + }, 200*time.Millisecond, time.Millisecond, "expected topics: %v, got: %v", []string{"topic1"}, tmpTopics) + client.mu.Lock() client.topics = []string{"topic1", "topic2"} // introduce new topics + client.mu.Unlock() require.Eventually(t, func() bool { if !group.consuming.Load() { return false } + tmpTopics = group.topics return reflect.DeepEqual([]string{"topic1", "topic2"}, group.topics) - }, 200*time.Millisecond, time.Millisecond, "expected topics: %v, got: %v", []string{"topic1", "topic2"}, group.topics) + }, 200*time.Millisecond, time.Millisecond, "expected topics: %v, got: %v", []string{"topic1", "topic2"}, tmpTopics) require.NoError(t, ts.Stop()) require.True(t, closed) diff --git a/clients/pkg/promtail/targets/kafka/target_test.go b/clients/pkg/promtail/targets/kafka/target_test.go index 0f8061027de3..3ffe4ac69f16 100644 --- a/clients/pkg/promtail/targets/kafka/target_test.go +++ b/clients/pkg/promtail/targets/kafka/target_test.go @@ -21,6 +21,7 @@ type testConsumerGroupHandler struct { handler sarama.ConsumerGroupHandler ctx context.Context topics []string + mu *sync.Mutex returnErr error @@ -32,7 +33,9 @@ func (c *testConsumerGroupHandler) Consume(ctx context.Context, topics []string, return c.returnErr } c.ctx = ctx + c.mu.Lock() c.topics = topics + c.mu.Unlock() c.handler = handler c.consuming.Store(true) <-ctx.Done() diff --git a/clients/pkg/promtail/targets/kafka/topics_test.go b/clients/pkg/promtail/targets/kafka/topics_test.go index e24d8fd1eb60..447a8a0a65af 100644 --- a/clients/pkg/promtail/targets/kafka/topics_test.go +++ b/clients/pkg/promtail/targets/kafka/topics_test.go @@ -3,12 +3,14 @@ package kafka import ( "errors" "strings" + "sync" "testing" "github.com/stretchr/testify/require" ) type mockKafkaClient struct { + mu sync.Mutex topics []string err error } @@ -18,6 +20,8 @@ func (m *mockKafkaClient) RefreshMetadata(_ ...string) error { } func (m *mockKafkaClient) Topics() ([]string, error) { + m.mu.Lock() + defer m.mu.Unlock() return m.topics, m.err } diff --git a/clients/pkg/promtail/utils/entries_test.go b/clients/pkg/promtail/utils/entries_test.go index c9b098d9ee4a..0164794a89d2 100644 --- a/clients/pkg/promtail/utils/entries_test.go +++ b/clients/pkg/promtail/utils/entries_test.go @@ -43,7 +43,14 @@ func TestFanoutEntryHandler_SuccessfulFanout(t *testing.T) { } require.Eventually(t, func() bool { - return len(eh1.Received) == len(expectedLines) && len(eh2.Received) == len(expectedLines) + eh1.mu.Lock() + len1 := len(eh1.Received) + eh1.mu.Unlock() + eh2.mu.Lock() + len2 := len(eh2.Received) + eh2.mu.Unlock() + + return len1 == len(expectedLines) && len2 == len(expectedLines) }, time.Second*10, time.Second, "expected entries to be received by fanned out channels") } @@ -77,6 +84,8 @@ func TestFanoutEntryHandler_TimeoutWaitingForEntriesToBeSent(t *testing.T) { }() require.Eventually(t, func() bool { + controlEH.mu.Lock() + defer controlEH.mu.Unlock() return len(controlEH.Received) == 1 }, time.Second*5, time.Second, "expected control entry handler to receive an entry") @@ -89,6 +98,7 @@ type savingEntryHandler struct { entries chan api.Entry Received []api.Entry wg sync.WaitGroup + mu sync.Mutex } func newSavingEntryHandler() *savingEntryHandler { @@ -99,7 +109,9 @@ func newSavingEntryHandler() *savingEntryHandler { eh.wg.Add(1) go func() { for e := range eh.entries { + eh.mu.Lock() eh.Received = append(eh.Received, e) + eh.mu.Unlock() } eh.wg.Done() }() diff --git a/clients/pkg/promtail/wal/watcher_test.go b/clients/pkg/promtail/wal/watcher_test.go index b41880f5d20f..adf6dbef32de 100644 --- a/clients/pkg/promtail/wal/watcher_test.go +++ b/clients/pkg/promtail/wal/watcher_test.go @@ -3,6 +3,7 @@ package wal import ( "fmt" "os" + "sync" "testing" "time" @@ -25,6 +26,7 @@ type testWriteTo struct { series map[uint64]model.LabelSet logger log.Logger ReceivedSeriesReset []int + mu sync.Mutex } func (t *testWriteTo) StoreSeries(series []record.RefSeries, _ int) { @@ -42,10 +44,12 @@ func (t *testWriteTo) AppendEntries(entries wal.RefEntries) error { var entry api.Entry if l, ok := t.series[uint64(entries.Ref)]; ok { entry.Labels = l + t.mu.Lock() for _, e := range entries.Entries { entry.Entry = e t.ReadEntries = append(t.ReadEntries, entry) } + t.mu.Unlock() } else { level.Debug(t.logger).Log("series for entry not found") } @@ -94,11 +98,15 @@ var cases = map[string]watcherTest{ res.notifyWrite() require.Eventually(t, func() bool { + res.writeTo.mu.Lock() + defer res.writeTo.mu.Unlock() return len(res.writeTo.ReadEntries) == 3 }, time.Second*10, time.Second, "expected watcher to catch up with written entries") + res.writeTo.mu.Lock() for _, readEntry := range res.writeTo.ReadEntries { require.Contains(t, lines, readEntry.Line, "not expected log line") } + res.writeTo.mu.Unlock() }, "read entries from WAL, just using backup timer to trigger reads": func(t *testing.T, res *watcherTestResources) { @@ -127,11 +135,15 @@ var cases = map[string]watcherTest{ // do not notify, let the backup timer trigger the watcher reads require.Eventually(t, func() bool { + res.writeTo.mu.Lock() + defer res.writeTo.mu.Unlock() return len(res.writeTo.ReadEntries) == 3 }, time.Second*10, time.Second, "expected watcher to catch up with written entries") + res.writeTo.mu.Lock() for _, readEntry := range res.writeTo.ReadEntries { require.Contains(t, lines, readEntry.Line, "not expected log line") } + res.writeTo.mu.Unlock() }, "continue reading entries in next segment after initial segment is closed": func(t *testing.T, res *watcherTestResources) { @@ -164,11 +176,15 @@ var cases = map[string]watcherTest{ res.notifyWrite() require.Eventually(t, func() bool { + res.writeTo.mu.Lock() + defer res.writeTo.mu.Unlock() return len(res.writeTo.ReadEntries) == 3 }, time.Second*10, time.Second, "expected watcher to catch up with written entries") + res.writeTo.mu.Lock() for _, readEntry := range res.writeTo.ReadEntries { require.Contains(t, lines, readEntry.Line, "not expected log line") } + res.writeTo.mu.Unlock() err := res.nextWALSegment() require.NoError(t, err, "expected no error when moving to next wal segment") @@ -186,12 +202,16 @@ var cases = map[string]watcherTest{ res.notifyWrite() require.Eventually(t, func() bool { + res.writeTo.mu.Lock() + defer res.writeTo.mu.Unlock() return len(res.writeTo.ReadEntries) == 6 }, time.Second*10, time.Second, "expected watcher to catch up after new wal segment is cut") // assert over second half of entries + res.writeTo.mu.Lock() for _, readEntry := range res.writeTo.ReadEntries[3:] { require.Contains(t, linesAfter, readEntry.Line, "not expected log line") } + res.writeTo.mu.Unlock() }, "start reading from last segment": func(t *testing.T, res *watcherTestResources) { @@ -234,12 +254,16 @@ var cases = map[string]watcherTest{ res.notifyWrite() require.Eventually(t, func() bool { + res.writeTo.mu.Lock() + defer res.writeTo.mu.Unlock() return len(res.writeTo.ReadEntries) == 3 }, time.Second*10, time.Second, "expected watcher to catch up after new wal segment is cut") // assert over second half of entries + res.writeTo.mu.Lock() for _, readEntry := range res.writeTo.ReadEntries[3:] { require.Contains(t, linesAfter, readEntry.Line, "not expected log line") } + res.writeTo.mu.Unlock() }, "watcher receives segments reclaimed notifications correctly": func(t *testing.T, res *watcherTestResources) { @@ -259,6 +283,8 @@ var cases = map[string]watcherTest{ require.NoError(t, res.syncWAL()) res.notifyWrite() require.Eventually(t, func() bool { + res.writeTo.mu.Lock() + defer res.writeTo.mu.Unlock() return len(res.writeTo.ReadEntries) == expectedReadEntries }, time.Second*10, time.Second, "expected watcher to catch up with written entries") } @@ -275,6 +301,8 @@ var cases = map[string]watcherTest{ // collecting segment 0 res.notifySegmentReclaimed(0) require.Eventually(t, func() bool { + res.writeTo.mu.Lock() + defer res.writeTo.mu.Unlock() return len(res.writeTo.ReceivedSeriesReset) == 1 && res.writeTo.ReceivedSeriesReset[0] == 0 }, time.Second*10, time.Second, "timed out waiting to receive series reset") @@ -290,6 +318,8 @@ var cases = map[string]watcherTest{ res.notifySegmentReclaimed(2) // Expect second SeriesReset call to have the highest numbered deleted segment, 2 require.Eventually(t, func() bool { + res.writeTo.mu.Lock() + defer res.writeTo.mu.Unlock() t.Logf("received series reset: %v", res.writeTo.ReceivedSeriesReset) return len(res.writeTo.ReceivedSeriesReset) == 2 && res.writeTo.ReceivedSeriesReset[1] == 2 }, time.Second*10, time.Second, "timed out waiting to receive series reset") diff --git a/clients/pkg/promtail/wal/writer_test.go b/clients/pkg/promtail/wal/writer_test.go index a9c637f98b1c..4dae54604493 100644 --- a/clients/pkg/promtail/wal/writer_test.go +++ b/clients/pkg/promtail/wal/writer_test.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "path/filepath" + "sync" "testing" "time" @@ -77,6 +78,8 @@ func TestWriter_OldSegmentsAreCleanedUp(t *testing.T) { maxSegmentAge := time.Second * 2 + var mu1 sync.Mutex + var mu2 sync.Mutex subscriber1 := []int{} subscriber2 := []int{} @@ -92,10 +95,14 @@ func TestWriter_OldSegmentsAreCleanedUp(t *testing.T) { // add writer events subscriber. Add multiple to test fanout writer.SubscribeCleanup(notifySegmentsCleanedFunc(func(num int) { + mu1.Lock() subscriber1 = append(subscriber1, num) + mu1.Unlock() })) writer.SubscribeCleanup(notifySegmentsCleanedFunc(func(num int) { + mu2.Lock() subscriber2 = append(subscriber2, num) + mu2.Unlock() })) // write entries to wal and sync @@ -148,11 +155,15 @@ func TestWriter_OldSegmentsAreCleanedUp(t *testing.T) { require.ErrorIs(t, err, os.ErrNotExist, "expected file not exists error") // assert all subscribers were notified + mu1.Lock() require.Len(t, subscriber1, 1, "expected one segment reclaimed notification in subscriber1") require.Equal(t, 0, subscriber1[0]) + mu1.Unlock() + mu2.Lock() require.Len(t, subscriber2, 1, "expected one segment reclaimed notification in subscriber2") require.Equal(t, 0, subscriber2[0]) + mu2.Unlock() // Expect last, or "head" segment to still be alive _, err = os.Stat(filepath.Join(dir, "00000001"))