From d3d9923410c3a724d3dea7472407b977f50d9f69 Mon Sep 17 00:00:00 2001 From: George Robinson Date: Wed, 17 Jul 2024 13:57:07 +0100 Subject: [PATCH] chore: Rename PendingItem to PendingSegment and clean up flush.go (#13554) --- pkg/ingester-rf1/flush.go | 20 ++++++++------------ pkg/storage/wal/manager.go | 14 +++++++------- pkg/storage/wal/segment.go | 6 +++--- 3 files changed, 18 insertions(+), 22 deletions(-) diff --git a/pkg/ingester-rf1/flush.go b/pkg/ingester-rf1/flush.go index efd8f72df726..631fc9728738 100644 --- a/pkg/ingester-rf1/flush.go +++ b/pkg/ingester-rf1/flush.go @@ -68,7 +68,7 @@ func (i *Ingester) flushWorker(j int) { continue } - err = i.flushItem(l, j, it) + err = i.flush(l, j, it) if err != nil { level.Error(l).Log("msg", "failed to flush", "err", err) } @@ -78,7 +78,7 @@ func (i *Ingester) flushWorker(j int) { } } -func (i *Ingester) flushItem(l log.Logger, j int, it *wal.PendingItem) error { +func (i *Ingester) flush(l log.Logger, j int, it *wal.PendingSegment) error { ctx, cancelFunc := context.WithCancel(context.Background()) defer cancelFunc() @@ -94,30 +94,26 @@ func (i *Ingester) flushItem(l log.Logger, j int, it *wal.PendingItem) error { return b.Err() } -// flushChunk flushes the given chunk to the store. -// -// If the flush is successful, metrics for this flush are to be reported. -// If the flush isn't successful, the operation for this userID is requeued allowing this and all other unflushed -// segments to have another opportunity to be flushed. func (i *Ingester) flushSegment(ctx context.Context, j int, w *wal.SegmentWriter) error { - id := ulid.MustNew(ulid.Timestamp(time.Now()), rand.Reader) - start := time.Now() defer func() { i.metrics.flushDuration.Observe(time.Since(start).Seconds()) - w.Observe() + w.ReportMetrics() }() + i.metrics.flushesTotal.Add(1) + buf := i.flushBuffers[j] defer buf.Reset() if _, err := w.WriteTo(buf); err != nil { + i.metrics.flushFailuresTotal.Inc() return err } - i.metrics.flushesTotal.Add(1) + id := ulid.MustNew(ulid.Timestamp(time.Now()), rand.Reader) if err := i.store.PutObject(ctx, fmt.Sprintf("loki-v2/wal/anon/"+id.String()), buf); err != nil { i.metrics.flushFailuresTotal.Inc() - return fmt.Errorf("store put chunk: %w", err) + return fmt.Errorf("failed to put object: %w", err) } return nil diff --git a/pkg/storage/wal/manager.go b/pkg/storage/wal/manager.go index 57ac8b3207fa..6a4ef7f052ee 100644 --- a/pkg/storage/wal/manager.go +++ b/pkg/storage/wal/manager.go @@ -114,7 +114,7 @@ type Manager struct { clock quartz.Clock } -// item is similar to PendingItem, but it is an internal struct used in the +// item is similar to PendingSegment, but it is an internal struct used in the // available and pending lists. It contains a single-use result that is returned // to callers appending to the WAL and a re-usable segment that is reset after // each flush. @@ -123,8 +123,8 @@ type item struct { w *SegmentWriter } -// PendingItem contains a result and the segment to be flushed. -type PendingItem struct { +// PendingSegment contains a result and the segment to be flushed. +type PendingSegment struct { Result *AppendResult Writer *SegmentWriter } @@ -196,7 +196,7 @@ func (m *Manager) Close() { // It returns nil if there are no segments waiting to be flushed. If the WAL // is closed it returns all remaining segments from the pending list and then // ErrClosed. -func (m *Manager) NextPending() (*PendingItem, error) { +func (m *Manager) NextPending() (*PendingSegment, error) { m.mu.Lock() defer m.mu.Unlock() if m.pending.Len() == 0 && !m.moveFrontIfExpired() { @@ -210,12 +210,12 @@ func (m *Manager) NextPending() (*PendingItem, error) { m.pending.Remove(el) m.metrics.NumPending.Dec() m.metrics.NumFlushing.Inc() - return &PendingItem{Result: it.r, Writer: it.w}, nil + return &PendingSegment{Result: it.r, Writer: it.w}, nil } // Put resets the segment and puts it back in the available list to accept -// writes. A PendingItem should not be put back until it has been flushed. -func (m *Manager) Put(it *PendingItem) { +// writes. A PendingSegment should not be put back until it has been flushed. +func (m *Manager) Put(it *PendingSegment) { it.Writer.Reset() m.mu.Lock() defer m.mu.Unlock() diff --git a/pkg/storage/wal/segment.go b/pkg/storage/wal/segment.go index 6a792988bdb4..b21e477c0d69 100644 --- a/pkg/storage/wal/segment.go +++ b/pkg/storage/wal/segment.go @@ -147,9 +147,9 @@ func (b *SegmentWriter) Append(tenantID, labelsString string, lbls labels.Labels } } -// Observe updates metrics for the writer. If called before WriteTo then the -// output size histogram will observe 0. -func (b *SegmentWriter) Observe() { +// ReportMetrics for the writer. If called before WriteTo then the output size +// histogram will observe 0. +func (b *SegmentWriter) ReportMetrics() { b.consistencyMtx.Lock() defer b.consistencyMtx.Unlock()