diff --git a/pkg/ingester/tailer.go b/pkg/ingester/tailer.go index 80cceba78fca..441c688612d9 100644 --- a/pkg/ingester/tailer.go +++ b/pkg/ingester/tailer.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "hash/fnv" "sync" + "sync/atomic" "time" "github.com/go-kit/log/level" @@ -46,6 +47,7 @@ type tailer struct { // and the loop and senders should stop closeChan chan struct{} closeOnce sync.Once + closed atomic.Bool blockedAt *time.Time blockedMtx sync.RWMutex @@ -74,6 +76,7 @@ func newTailer(orgID string, expr syntax.LogSelectorExpr, conn TailServer, maxDr maxDroppedStreams: maxDroppedStreams, id: generateUniqueID(orgID, expr.String()), closeChan: make(chan struct{}), + closed: atomic.Bool{}, pipeline: pipeline, }, nil } @@ -227,17 +230,13 @@ func isMatching(lbs labels.Labels, matchers []*labels.Matcher) bool { } func (t *tailer) isClosed() bool { - select { - case <-t.closeChan: - return true - default: - return false - } + return t.closed.Load() } func (t *tailer) close() { t.closeOnce.Do(func() { - // Signal the close channel + // Signal the close channel & flip the atomic bool so tailers will exit + t.closed.Store(true) close(t.closeChan) // We intentionally do not close sendChan in order to avoid a panic on diff --git a/pkg/ingester/tailer_test.go b/pkg/ingester/tailer_test.go index 1f49ec009508..f52e87040ce3 100644 --- a/pkg/ingester/tailer_test.go +++ b/pkg/ingester/tailer_test.go @@ -17,6 +17,7 @@ import ( ) func TestTailer_RoundTrip(t *testing.T) { + t.Parallel() server := &fakeTailServer{} lbs := makeRandomLabels() @@ -66,6 +67,7 @@ func TestTailer_RoundTrip(t *testing.T) { } func TestTailer_sendRaceConditionOnSendWhileClosing(t *testing.T) { + t.Parallel() runs := 100 stream := logproto.Stream{ @@ -103,6 +105,7 @@ func TestTailer_sendRaceConditionOnSendWhileClosing(t *testing.T) { } func Test_dropstream(t *testing.T) { + t.Parallel() maxDroppedStreams := 10 entry := logproto.Entry{Timestamp: time.Now(), Line: "foo"} @@ -224,6 +227,7 @@ func Test_TailerSendRace(t *testing.T) { } func Test_IsMatching(t *testing.T) { + t.Parallel() for _, tt := range []struct { name string lbs labels.Labels @@ -241,6 +245,7 @@ func Test_IsMatching(t *testing.T) { } func Test_StructuredMetadata(t *testing.T) { + t.Parallel() lbs := makeRandomLabels() for _, tc := range []struct { @@ -364,3 +369,21 @@ func Test_StructuredMetadata(t *testing.T) { }) } } + +func Benchmark_isClosed(t *testing.B) { + var server fakeTailServer + expr, err := syntax.ParseLogSelector(`{app="foo"}`, true) + require.NoError(t, err) + tail, err := newTailer("foo", expr, &server, 0) + require.NoError(t, err) + + require.Equal(t, false, tail.isClosed()) + + t.ResetTimer() + for i := 0; i < t.N; i++ { + tail.isClosed() + } + + tail.close() + require.Equal(t, true, tail.isClosed()) +}