From f0d44bb2f4fb6741f35d534f6b043ad1d8ea8782 Mon Sep 17 00:00:00 2001 From: George Robinson Date: Wed, 17 Jul 2024 11:30:57 +0100 Subject: [PATCH] chore: Rename PendingItem to PendingSegment and clean up flush.go --- pkg/ingester-rf1/flush.go | 11 +++-------- pkg/storage/wal/manager.go | 14 +++++++------- 2 files changed, 10 insertions(+), 15 deletions(-) diff --git a/pkg/ingester-rf1/flush.go b/pkg/ingester-rf1/flush.go index efd8f72df726f..b1446c08b3130 100644 --- a/pkg/ingester-rf1/flush.go +++ b/pkg/ingester-rf1/flush.go @@ -68,7 +68,7 @@ func (i *Ingester) flushWorker(j int) { continue } - err = i.flushItem(l, j, it) + err = i.flush(l, j, it) if err != nil { level.Error(l).Log("msg", "failed to flush", "err", err) } @@ -78,7 +78,7 @@ func (i *Ingester) flushWorker(j int) { } } -func (i *Ingester) flushItem(l log.Logger, j int, it *wal.PendingItem) error { +func (i *Ingester) flush(l log.Logger, j int, it *wal.PendingSegment) error { ctx, cancelFunc := context.WithCancel(context.Background()) defer cancelFunc() @@ -94,11 +94,6 @@ func (i *Ingester) flushItem(l log.Logger, j int, it *wal.PendingItem) error { return b.Err() } -// flushChunk flushes the given chunk to the store. -// -// If the flush is successful, metrics for this flush are to be reported. -// If the flush isn't successful, the operation for this userID is requeued allowing this and all other unflushed -// segments to have another opportunity to be flushed. func (i *Ingester) flushSegment(ctx context.Context, j int, w *wal.SegmentWriter) error { id := ulid.MustNew(ulid.Timestamp(time.Now()), rand.Reader) @@ -117,7 +112,7 @@ func (i *Ingester) flushSegment(ctx context.Context, j int, w *wal.SegmentWriter i.metrics.flushesTotal.Add(1) if err := i.store.PutObject(ctx, fmt.Sprintf("loki-v2/wal/anon/"+id.String()), buf); err != nil { i.metrics.flushFailuresTotal.Inc() - return fmt.Errorf("store put chunk: %w", err) + return fmt.Errorf("failed to put object: %w", err) } return nil diff --git a/pkg/storage/wal/manager.go b/pkg/storage/wal/manager.go index d5d8fff893f03..36f5ad7de0b17 100644 --- a/pkg/storage/wal/manager.go +++ b/pkg/storage/wal/manager.go @@ -121,7 +121,7 @@ type Manager struct { clock quartz.Clock } -// item is similar to PendingItem, but it is an internal struct used in the +// item is similar to PendingSegment, but it is an internal struct used in the // available and pending lists. It contains a single-use result that is returned // to callers appending to the WAL and a re-usable segment that is reset after // each flush. @@ -130,8 +130,8 @@ type item struct { w *SegmentWriter } -// PendingItem contains a result and the segment to be flushed. -type PendingItem struct { +// PendingSegment contains a result and the segment to be flushed. +type PendingSegment struct { Result *AppendResult Writer *SegmentWriter } @@ -203,7 +203,7 @@ func (m *Manager) Close() { // It returns nil if there are no segments waiting to be flushed. If the WAL // is closed it returns all remaining segments from the pending list and then // ErrClosed. -func (m *Manager) NextPending() (*PendingItem, error) { +func (m *Manager) NextPending() (*PendingSegment, error) { m.mu.Lock() defer m.mu.Unlock() if m.pending.Len() == 0 && !m.moveFrontIfExpired() { @@ -217,12 +217,12 @@ func (m *Manager) NextPending() (*PendingItem, error) { m.pending.Remove(el) m.metrics.NumPending.Dec() m.metrics.NumFlushing.Inc() - return &PendingItem{Result: it.r, Writer: it.w}, nil + return &PendingSegment{Result: it.r, Writer: it.w}, nil } // Put resets the segment and puts it back in the available list to accept -// writes. A PendingItem should not be put back until it has been flushed. -func (m *Manager) Put(it *PendingItem) { +// writes. A PendingSegment should not be put back until it has been flushed. +func (m *Manager) Put(it *PendingSegment) { it.Writer.Reset() m.mu.Lock() defer m.mu.Unlock()