From e2cbde884f65dc2193bbac7dc21af0639c6604b1 Mon Sep 17 00:00:00 2001 From: George Robinson Date: Wed, 17 Jul 2024 09:00:52 +0100 Subject: [PATCH] chore: Clean up manager_test.go (#13549) --- pkg/storage/wal/manager_test.go | 209 ++++++++++++++++---------------- 1 file changed, 105 insertions(+), 104 deletions(-) diff --git a/pkg/storage/wal/manager_test.go b/pkg/storage/wal/manager_test.go index ef14b2420108..9dc49dce4759 100644 --- a/pkg/storage/wal/manager_test.go +++ b/pkg/storage/wal/manager_test.go @@ -87,19 +87,15 @@ func TestManager_AppendFailed(t *testing.T) { require.EqualError(t, res.Err(), "failed to flush") } -func TestManager_AppendMaxAge(t *testing.T) { +func TestManager_AppendFailedWALClosed(t *testing.T) { m, err := NewManager(Config{ - MaxAge: 100 * time.Millisecond, - MaxSegments: 1, - MaxSegmentSize: 8 * 1024 * 1024, // 8MB + MaxAge: 30 * time.Second, + MaxSegments: 10, + MaxSegmentSize: 1024, // 1KB }, NewMetrics(nil)) require.NoError(t, err) - // Create a mock clock. - clock := quartz.NewMock(t) - m.clock = clock - - // Append 1B of data. + // Append some data. lbs := labels.Labels{{Name: "a", Value: "b"}} entries := []*logproto.Entry{{Timestamp: time.Now(), Line: "c"}} res, err := m.Append(AppendRequest{ @@ -111,40 +107,70 @@ func TestManager_AppendMaxAge(t *testing.T) { require.NoError(t, err) require.NotNil(t, res) - // The segment that was just appended to has neither reached the maximum - // age nor maximum size to be flushed. - require.Equal(t, 1, m.available.Len()) - require.Equal(t, 0, m.pending.Len()) + // Close the WAL. + m.Close() - // Wait 100ms and append some more data. - clock.Advance(100 * time.Millisecond) - entries = []*logproto.Entry{{Timestamp: time.Now(), Line: "c"}} + // Should not be able to append more data as the WAL is closed. res, err = m.Append(AppendRequest{ TenantID: "1", Labels: lbs, LabelsStr: lbs.String(), Entries: entries, }) + require.Nil(t, res) + require.ErrorIs(t, err, ErrClosed) +} + +func TestManager_AppendFailedWALFull(t *testing.T) { + m, err := NewManager(Config{ + MaxAge: 30 * time.Second, + MaxSegments: 10, + MaxSegmentSize: 1024, // 1KB + }, NewMetrics(nil)) require.NoError(t, err) - require.NotNil(t, res) - // The segment has reached the maximum age and should have been moved to - // pending list to be flushed. - require.Equal(t, 0, m.available.Len()) - require.Equal(t, 1, m.pending.Len()) + // Should be able to write 100KB of data, 10KB per segment. + lbs := labels.Labels{{Name: "a", Value: "b"}} + for i := 0; i < 10; i++ { + entries := []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 1024)}} + res, err := m.Append(AppendRequest{ + TenantID: "1", + Labels: lbs, + LabelsStr: lbs.String(), + Entries: entries, + }) + require.NoError(t, err) + require.NotNil(t, res) + } + + // However, appending more data should fail as all segments are full and + // waiting to be flushed. + entries := []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 1024)}} + res, err := m.Append(AppendRequest{ + TenantID: "1", + Labels: lbs, + LabelsStr: lbs.String(), + Entries: entries, + }) + require.ErrorIs(t, err, ErrFull) + require.Nil(t, res) } -func TestManager_AppendMaxSize(t *testing.T) { +func TestManager_AppendMaxAgeExceeded(t *testing.T) { m, err := NewManager(Config{ - MaxAge: 30 * time.Second, + MaxAge: 100 * time.Millisecond, MaxSegments: 1, - MaxSegmentSize: 1024, // 1KB + MaxSegmentSize: 8 * 1024 * 1024, // 8MB }, NewMetrics(nil)) require.NoError(t, err) - // Append 512B of data. + // Create a mock clock. + clock := quartz.NewMock(t) + m.clock = clock + + // Append 1B of data. lbs := labels.Labels{{Name: "a", Value: "b"}} - entries := []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 512)}} + entries := []*logproto.Entry{{Timestamp: time.Now(), Line: "c"}} res, err := m.Append(AppendRequest{ TenantID: "1", Labels: lbs, @@ -159,8 +185,9 @@ func TestManager_AppendMaxSize(t *testing.T) { require.Equal(t, 1, m.available.Len()) require.Equal(t, 0, m.pending.Len()) - // Append another 512B of data. - entries = []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 512)}} + // Wait 100ms and append some more data. + clock.Advance(100 * time.Millisecond) + entries = []*logproto.Entry{{Timestamp: time.Now(), Line: "c"}} res, err = m.Append(AppendRequest{ TenantID: "1", Labels: lbs, @@ -170,23 +197,23 @@ func TestManager_AppendMaxSize(t *testing.T) { require.NoError(t, err) require.NotNil(t, res) - // The segment has reached the maximum size and should have been moved to + // The segment has reached the maximum age and should have been moved to // pending list to be flushed. require.Equal(t, 0, m.available.Len()) require.Equal(t, 1, m.pending.Len()) } -func TestManager_AppendWALClosed(t *testing.T) { +func TestManager_AppendMaxSizeExceeded(t *testing.T) { m, err := NewManager(Config{ MaxAge: 30 * time.Second, - MaxSegments: 10, + MaxSegments: 1, MaxSegmentSize: 1024, // 1KB }, NewMetrics(nil)) require.NoError(t, err) - // Append some data. + // Append 512B of data. lbs := labels.Labels{{Name: "a", Value: "b"}} - entries := []*logproto.Entry{{Timestamp: time.Now(), Line: "c"}} + entries := []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 512)}} res, err := m.Append(AppendRequest{ TenantID: "1", Labels: lbs, @@ -196,53 +223,26 @@ func TestManager_AppendWALClosed(t *testing.T) { require.NoError(t, err) require.NotNil(t, res) - // Close the WAL. - m.Close() + // The segment that was just appended to has neither reached the maximum + // age nor maximum size to be flushed. + require.Equal(t, 1, m.available.Len()) + require.Equal(t, 0, m.pending.Len()) - // Should not be able to append more data as the WAL is closed. + // Append another 512B of data. + entries = []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 512)}} res, err = m.Append(AppendRequest{ TenantID: "1", Labels: lbs, LabelsStr: lbs.String(), Entries: entries, }) - require.Nil(t, res) - require.ErrorIs(t, err, ErrClosed) -} - -func TestManager_AppendWALFull(t *testing.T) { - m, err := NewManager(Config{ - MaxAge: 30 * time.Second, - MaxSegments: 10, - MaxSegmentSize: 1024, // 1KB - }, NewMetrics(nil)) require.NoError(t, err) + require.NotNil(t, res) - // Should be able to write 100KB of data, 10KB per segment. - lbs := labels.Labels{{Name: "a", Value: "b"}} - for i := 0; i < 10; i++ { - entries := []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 1024)}} - res, err := m.Append(AppendRequest{ - TenantID: "1", - Labels: lbs, - LabelsStr: lbs.String(), - Entries: entries, - }) - require.NoError(t, err) - require.NotNil(t, res) - } - - // However, appending more data should fail as all segments are full and - // waiting to be flushed. - entries := []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 1024)}} - res, err := m.Append(AppendRequest{ - TenantID: "1", - Labels: lbs, - LabelsStr: lbs.String(), - Entries: entries, - }) - require.ErrorIs(t, err, ErrFull) - require.Nil(t, res) + // The segment has reached the maximum size and should have been moved to + // pending list to be flushed. + require.Equal(t, 0, m.available.Len()) + require.Equal(t, 1, m.pending.Len()) } func TestManager_NextPending(t *testing.T) { @@ -281,15 +281,19 @@ func TestManager_NextPending(t *testing.T) { require.Nil(t, it) } -func TestManager_NextPendingClosed(t *testing.T) { +func TestManager_NextPendingMaxAgeExceeded(t *testing.T) { m, err := NewManager(Config{ - MaxAge: 30 * time.Second, - MaxSegments: 10, + MaxAge: 100 * time.Millisecond, + MaxSegments: 1, MaxSegmentSize: 1024, // 1KB }, NewMetrics(nil)) require.NoError(t, err) - // Append some data. + // Create a mock clock. + clock := quartz.NewMock(t) + m.clock = clock + + // Append 1B of data. lbs := labels.Labels{{Name: "a", Value: "b"}} entries := []*logproto.Entry{{Timestamp: time.Now(), Line: "c"}} res, err := m.Append(AppendRequest{ @@ -301,40 +305,33 @@ func TestManager_NextPendingClosed(t *testing.T) { require.NoError(t, err) require.NotNil(t, res) - // There should be no segments waiting to be flushed as neither the maximum - // age nor maximum size has been exceeded. + // The segment that was just appended to has neither reached the maximum + // age nor maximum size to be flushed. it, err := m.NextPending() require.NoError(t, err) require.Nil(t, it) + require.Equal(t, 1, m.available.Len()) + require.Equal(t, 0, m.pending.Len()) - // Close the WAL. - m.Close() - - // There should be one segment waiting to be flushed. + // Wait 100ms. The segment that was just appended to should have reached + // the maximum age. + clock.Advance(100 * time.Millisecond) it, err = m.NextPending() require.NoError(t, err) require.NotNil(t, it) - - // There are no more segments waiting to be flushed, and since the WAL is - // closed, successive calls should return ErrClosed. - it, err = m.NextPending() - require.ErrorIs(t, err, ErrClosed) - require.Nil(t, it) + require.Equal(t, 0, m.available.Len()) + require.Equal(t, 0, m.pending.Len()) } -func TestManager_NexPendingMaxAge(t *testing.T) { +func TestManager_NextPendingWALClosed(t *testing.T) { m, err := NewManager(Config{ - MaxAge: 100 * time.Millisecond, + MaxAge: 30 * time.Second, MaxSegments: 1, MaxSegmentSize: 1024, // 1KB }, NewMetrics(nil)) require.NoError(t, err) - // Create a mock clock. - clock := quartz.NewMock(t) - m.clock = clock - - // Append 1B of data. + // Append some data. lbs := labels.Labels{{Name: "a", Value: "b"}} entries := []*logproto.Entry{{Timestamp: time.Now(), Line: "c"}} res, err := m.Append(AppendRequest{ @@ -346,22 +343,27 @@ func TestManager_NexPendingMaxAge(t *testing.T) { require.NoError(t, err) require.NotNil(t, res) - // The segment that was just appended to has neither reached the maximum - // age nor maximum size to be flushed. + // There should be no segments waiting to be flushed as neither the maximum + // age nor maximum size has been exceeded. it, err := m.NextPending() require.NoError(t, err) require.Nil(t, it) - require.Equal(t, 1, m.available.Len()) - require.Equal(t, 0, m.pending.Len()) - // Wait 100ms. The segment that was just appended to should have reached - // the maximum age. - clock.Advance(100 * time.Millisecond) + // Close the WAL. + m.Close() + + // There should be one segment waiting to be flushed. it, err = m.NextPending() require.NoError(t, err) require.NotNil(t, it) - require.Equal(t, 0, m.available.Len()) - require.Equal(t, 0, m.pending.Len()) + + // There are no more segments waiting to be flushed, and since the WAL is + // closed, successive calls should return ErrClosed. + for i := 0; i < 10; i++ { + it, err = m.NextPending() + require.ErrorIs(t, err, ErrClosed) + require.Nil(t, it) + } } func TestManager_Put(t *testing.T) { @@ -493,5 +495,4 @@ wal_segments_flushing 0 wal_segments_pending 0 ` require.NoError(t, testutil.CollectAndCompare(r, strings.NewReader(expected), metricNames...)) - }