Skip to content

Commit

Permalink
fix(agent): Fix buffer not flushing if all metrics are written (influ…
Browse files Browse the repository at this point in the history
  • Loading branch information
DStrand1 authored and asaharn committed Oct 16, 2024
1 parent 632b858 commit 1fc5a5f
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 9 deletions.
39 changes: 30 additions & 9 deletions models/buffer_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"errors"
"fmt"
"log"
"os"
"path/filepath"
"sync"

Expand All @@ -27,6 +26,11 @@ type DiskBuffer struct {
// Ending point of metrics read from disk on telegraf launch.
// Used to know whether to discard tracking metrics.
originalEnd uint64

// The WAL library currently has no way to "fully empty" the walfile. In this case,
// we have to do our best and track that the walfile "should" be empty, so that next
// write, we can remove the invalid entry (also skipping this entry if it is being read).
isEmpty bool
}

func NewDiskBuffer(name string, path string, stats BufferStats) (*DiskBuffer, error) {
Expand All @@ -53,6 +57,9 @@ func (b *DiskBuffer) Len() int {
}

func (b *DiskBuffer) length() int {
if b.isEmpty {
return 0
}
// Special case for when the read index is zero, it must be empty (otherwise it would be >= 1)
if b.readIndex() == 0 {
return 0
Expand Down Expand Up @@ -87,6 +94,8 @@ func (b *DiskBuffer) Add(metrics ...telegraf.Metric) int {
if !b.addSingleMetric(m) {
dropped++
}
// as soon as a new metric is added, if this was empty, try to flush the "empty" metric out
b.handleEmptyFile()
}
b.BufferSize.Set(int64(b.length()))
return dropped
Expand Down Expand Up @@ -169,7 +178,7 @@ func (b *DiskBuffer) Accept(batch []telegraf.Metric) {
b.metricWritten(m)
}
if b.length() == len(batch) {
b.resetWalFile()
b.emptyFile()
} else {
err := b.file.TruncateFront(b.batchFirst + uint64(len(batch)))
if err != nil {
Expand Down Expand Up @@ -205,15 +214,27 @@ func (b *DiskBuffer) resetBatch() {
}

// This is very messy and not ideal, but serves as the only way I can find currently
// to actually clear the walfile completely if needed, since Truncate() calls require
// to actually treat the walfile as empty if needed, since Truncate() calls require
// that at least one entry remains in them otherwise they return an error.
// Related issue: https://github.com/tidwall/wal/issues/20
func (b *DiskBuffer) resetWalFile() {
b.file.Close()
os.Remove(b.path)
walFile, err := wal.Open(b.path, nil)
if err != nil {
func (b *DiskBuffer) handleEmptyFile() {
if !b.isEmpty {
return
}
if err := b.file.TruncateFront(b.readIndex() + 1); err != nil {
log.Printf("E! readIndex: %d, buffer len: %d", b.readIndex(), b.length())
panic(err)
}
b.isEmpty = false
}

func (b *DiskBuffer) emptyFile() {
if b.isEmpty || b.length() == 0 {
return
}
if err := b.file.TruncateFront(b.writeIndex() - 1); err != nil {
log.Printf("E! writeIndex: %d, buffer len: %d", b.writeIndex(), b.length())
panic(err)
}
b.file = walFile
b.isEmpty = true
}
23 changes: 23 additions & 0 deletions models/buffer_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,3 +809,26 @@ func (s *BufferSuiteTest) TestBuffer_RejectEmptyBatch() {
s.NotNil(m)
}
}

func (s *BufferSuiteTest) TestBuffer_FlushedPartial() {
b := s.newTestBuffer(5)
b.Add(MetricTime(1))
b.Add(MetricTime(2))
b.Add(MetricTime(3))
batch := b.Batch(2)
s.Len(batch, 2)

b.Accept(batch)
s.Equal(1, b.Len())
}

func (s *BufferSuiteTest) TestBuffer_FlushedFull() {
b := s.newTestBuffer(5)
b.Add(MetricTime(1))
b.Add(MetricTime(2))
batch := b.Batch(2)
s.Len(batch, 2)

b.Accept(batch)
s.Equal(0, b.Len())
}

0 comments on commit 1fc5a5f

Please sign in to comment.