Skip to content

Commit

Permalink
chore: Rename PendingItem to PendingSegment and clean up flush.go
Browse files Browse the repository at this point in the history
  • Loading branch information
grobinson-grafana committed Jul 17, 2024
1 parent 8ef86f8 commit f0d44bb
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 15 deletions.
11 changes: 3 additions & 8 deletions pkg/ingester-rf1/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (i *Ingester) flushWorker(j int) {
continue
}

err = i.flushItem(l, j, it)
err = i.flush(l, j, it)
if err != nil {
level.Error(l).Log("msg", "failed to flush", "err", err)
}
Expand All @@ -78,7 +78,7 @@ func (i *Ingester) flushWorker(j int) {
}
}

func (i *Ingester) flushItem(l log.Logger, j int, it *wal.PendingItem) error {
func (i *Ingester) flush(l log.Logger, j int, it *wal.PendingSegment) error {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()

Expand All @@ -94,11 +94,6 @@ func (i *Ingester) flushItem(l log.Logger, j int, it *wal.PendingItem) error {
return b.Err()
}

// flushChunk flushes the given chunk to the store.
//
// If the flush is successful, metrics for this flush are to be reported.
// If the flush isn't successful, the operation for this userID is requeued allowing this and all other unflushed
// segments to have another opportunity to be flushed.
func (i *Ingester) flushSegment(ctx context.Context, j int, w *wal.SegmentWriter) error {
id := ulid.MustNew(ulid.Timestamp(time.Now()), rand.Reader)

Expand All @@ -117,7 +112,7 @@ func (i *Ingester) flushSegment(ctx context.Context, j int, w *wal.SegmentWriter
i.metrics.flushesTotal.Add(1)
if err := i.store.PutObject(ctx, fmt.Sprintf("loki-v2/wal/anon/"+id.String()), buf); err != nil {
i.metrics.flushFailuresTotal.Inc()
return fmt.Errorf("store put chunk: %w", err)
return fmt.Errorf("failed to put object: %w", err)
}

return nil
Expand Down
14 changes: 7 additions & 7 deletions pkg/storage/wal/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ type Manager struct {
clock quartz.Clock
}

// item is similar to PendingItem, but it is an internal struct used in the
// item is similar to PendingSegment, but it is an internal struct used in the
// available and pending lists. It contains a single-use result that is returned
// to callers appending to the WAL and a re-usable segment that is reset after
// each flush.
Expand All @@ -130,8 +130,8 @@ type item struct {
w *SegmentWriter
}

// PendingItem contains a result and the segment to be flushed.
type PendingItem struct {
// PendingSegment contains a result and the segment to be flushed.
type PendingSegment struct {
Result *AppendResult
Writer *SegmentWriter
}
Expand Down Expand Up @@ -203,7 +203,7 @@ func (m *Manager) Close() {
// It returns nil if there are no segments waiting to be flushed. If the WAL
// is closed it returns all remaining segments from the pending list and then
// ErrClosed.
func (m *Manager) NextPending() (*PendingItem, error) {
func (m *Manager) NextPending() (*PendingSegment, error) {
m.mu.Lock()
defer m.mu.Unlock()
if m.pending.Len() == 0 && !m.moveFrontIfExpired() {
Expand All @@ -217,12 +217,12 @@ func (m *Manager) NextPending() (*PendingItem, error) {
m.pending.Remove(el)
m.metrics.NumPending.Dec()
m.metrics.NumFlushing.Inc()
return &PendingItem{Result: it.r, Writer: it.w}, nil
return &PendingSegment{Result: it.r, Writer: it.w}, nil
}

// Put resets the segment and puts it back in the available list to accept
// writes. A PendingItem should not be put back until it has been flushed.
func (m *Manager) Put(it *PendingItem) {
// writes. A PendingSegment should not be put back until it has been flushed.
func (m *Manager) Put(it *PendingSegment) {
it.Writer.Reset()
m.mu.Lock()
defer m.mu.Unlock()
Expand Down

0 comments on commit f0d44bb

Please sign in to comment.