Skip to content

Commit

Permalink
chore: refactor WAL Manager (#13551)
Browse files Browse the repository at this point in the history
  • Loading branch information
grobinson-grafana authored Jul 17, 2024
1 parent e2cbde8 commit 8ef86f8
Showing 1 changed file with 89 additions and 90 deletions.
179 changes: 89 additions & 90 deletions pkg/storage/wal/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,19 @@ import (
)

const (
// DefaultMaxAge is the default value for the maximum amount of time a
// segment can can be buffered in memory before it should be flushed.
DefaultMaxAge = 500 * time.Millisecond
// DefaultMaxSegments is the default value for the maximum number of
// segments that can be buffered in memory, including segments waiting to
// be flushed.
DefaultMaxSegments = 10
// DefaultMaxSegmentSize is the default value for the maximum segment size
// (uncompressed).
DefaultMaxAge = 500 * time.Millisecond
DefaultMaxSegments = 10
DefaultMaxSegmentSize = 8 * 1024 * 1024 // 8MB.
)

var (
// ErrClosed is returned when the WAL is closed.
// ErrClosed is returned when the WAL is closed. It is a permanent error
// as once closed, a WAL cannot be re-opened.
ErrClosed = errors.New("WAL is closed")

// ErrFull is returned when an append fails because the WAL is full. This
// ErrFull is returned when the WAL is full. It is a transient error that
// happens when all segments are either in the pending list waiting to be
// flushed, or in the process of being flushed.
// flushed or in the process of being flushed.
ErrFull = errors.New("WAL is full")
)

Expand All @@ -42,19 +36,20 @@ type AppendRequest struct {
Entries []*logproto.Entry
}

// AppendResult contains the result of an AppendRequest.
type AppendResult struct {
done chan struct{}
err error
}

// Done returns a channel that is closed when the result of an append is
// available. Err() should be called to check if the operation was successful.
// Done returns a channel that is closed when the result for the AppendRequest
// is available. Use Err() to check if the request succeeded or failed.
func (p *AppendResult) Done() <-chan struct{} {
return p.done
}

// Err returns a non-nil error if the operation failed, and nil if it was
// successful. It should not be called until Done() is closed to avoid data
// Err returns a non-nil error if the append request failed, and nil if it
// succeeded. It should not be called until Done() is closed to avoid data
// races.
func (p *AppendResult) Err() error {
return p.err
Expand All @@ -69,71 +64,70 @@ func (p *AppendResult) SetDone(err error) {
type Config struct {
// MaxAge is the maximum amount of time a segment can be buffered in memory
// before it is moved to the pending list to be flushed. Increasing MaxAge
// allows more time for a segment to grow to MaxSegmentSize, but may increase
// latency if the write volume is too small.
// allows more time for a segment to grow to MaxSegmentSize, but may
// increase latency if appends cannot fill segments quickly enough.
MaxAge time.Duration

// MaxSegments is the maximum number of segments that can be buffered in
// memory. Increasing MaxSegments allows for large bursts of writes to be
// buffered in memory, but may increase latency if the write volume exceeds
// the rate at which segments can be flushed.
// memory. Increasing MaxSegments allows more data to be buffered, but may
// increase latency if the incoming volume of data exceeds the rate at
// which segments can be flushed.
MaxSegments int64

// MaxSegmentSize is the maximum size (uncompressed) of a segment. It is
// not a strict limit, and segments can exceed the maximum size when
// MaxSegmentSize is the maximum size of an uncompressed segment in bytes.
// It is not a strict limit, and segments can exceed the maximum size when
// individual appends are larger than the remaining capacity.
MaxSegmentSize int64
}

// Manager buffers segments in memory, and keeps track of which segments are
// available and which are waiting to be flushed. The maximum number of
// segments that can be buffered in memory, and their maximum age and maximum
// size before being flushed are configured when creating the Manager.
// Manager is a pool of in-memory segments. It keeps track of which segments
// are accepting writes and which are waiting to be flushed using two doubly
// linked lists called the available and pending lists.
//
// By buffering segments in memory, the WAL can tolerate bursts of append
// requests that arrive faster than can be flushed. The amount of data that can
// be buffered is configured using MaxSegments and MaxSegmentSize. You must use
// caution when configuring these to avoid excessive latency.
// By buffering segments in memory, the WAL can tolerate bursts of writes that
// arrive faster than can be flushed. The amount of data that can be buffered
// is configured using MaxSegments and MaxSegmentSize. You must use caution
// when configuring these to avoid excessive latency.
//
// The WAL is full when all segments are waiting to be flushed or in the process
// of being flushed. When the WAL is full, subsequent appends fail with ErrFull.
// It is not permitted to append more data until another segment has been flushed
// and returned to the available list. This allows the manager to apply back-pressure
// and avoid congestion collapse due to excessive timeouts and retries.
// The WAL is full when all segments are waiting to be flushed or in the
// process of being flushed. When the WAL is full, subsequent appends fail with
// the transient error ErrFull, and will not succeed until one or more other
// segments have been flushed and returned to the available list. Callers
// should back off and retry at a later time.
//
// On shutdown, the WAL must be closed to avoid losing data. This prevents
// additional appends to the WAL and allows all remaining segments to be
// flushed.
type Manager struct {
cfg Config
metrics *ManagerMetrics

// available is a list of segments that are available and accepting data.
// All segments other than the segment at the front of the list are empty,
// and only the segment at the front of the list is written to. When this
// segment has exceeded its maximum age or maximum size it is moved to the
// pending list to be flushed, and the next segment in the available list
// takes its place.
cfg Config
metrics *ManagerMetrics
available *list.List

// pending is a list of segments that are waiting to be flushed. Once
// flushed, the segment is reset and moved to the back of the available
// list to accept writes again.
pending *list.List
closed bool
mu sync.Mutex

// firstAppend is the time of the first append to the segment at the
// front of the available list. It is used to know when the segment has
// exceeded the maximum age and should be moved to the pending list.
// It is reset each time this happens.
firstAppend time.Time

closed bool
mu sync.Mutex

// Used in tests.
clock quartz.Clock
}

// item is similar to PendingItem, but it is an internal struct used in the
// available and pending lists. It contains a single-use result that is returned
// to callers of Manager.Append() and a re-usable segment that is reset after
// to callers appending to the WAL and a re-usable segment that is reset after
// each flush.
type item struct {
r *AppendResult
w *SegmentWriter
// firstAppendedAt is the time of the first append to the segment, and is
// used to know when the segment has exceeded the maximum age and should
// be moved to the pending list. It is reset after each flush.
firstAppendedAt time.Time
}

// PendingItem contains a result and the segment to be flushed.
Expand All @@ -150,6 +144,7 @@ func NewManager(cfg Config, metrics *Metrics) (*Manager, error) {
pending: list.New(),
clock: quartz.NewReal(),
}
m.metrics.NumAvailable.Set(0)
m.metrics.NumPending.Set(0)
m.metrics.NumFlushing.Set(0)
for i := int64(0); i < cfg.MaxSegments; i++ {
Expand All @@ -172,70 +167,50 @@ func (m *Manager) Append(r AppendRequest) (*AppendResult, error) {
if m.closed {
return nil, ErrClosed
}
if m.available.Len() == 0 {
el := m.available.Front()
if el == nil {
return nil, ErrFull
}
el := m.available.Front()
it := el.Value.(*item)
if it.firstAppendedAt.IsZero() {
if m.firstAppend.IsZero() {
// This is the first append to the segment. This time will be used in
// know when the segment has exceeded its maximum age and should be
// moved to the pending list.
it.firstAppendedAt = m.clock.Now()
m.firstAppend = m.clock.Now()
}
it.w.Append(r.TenantID, r.LabelsStr, r.Labels, r.Entries)
// If the segment exceeded the maximum age or the maximum size, move it to
// the closed list to be flushed.
if m.clock.Since(it.firstAppendedAt) >= m.cfg.MaxAge || it.w.InputSize() >= m.cfg.MaxSegmentSize {
m.pending.PushBack(it)
m.metrics.NumPending.Inc()
m.available.Remove(el)
m.metrics.NumAvailable.Dec()
if m.clock.Since(m.firstAppend) >= m.cfg.MaxAge || it.w.InputSize() >= m.cfg.MaxSegmentSize {
m.move(el, it)
}
return it.r, nil
}

func (m *Manager) Close() {
m.mu.Lock()
defer m.mu.Unlock()
m.closed = true
if m.available.Len() > 0 {
el := m.available.Front()
if el := m.available.Front(); el != nil {
it := el.Value.(*item)
if it.w.InputSize() > 0 {
m.pending.PushBack(it)
m.metrics.NumPending.Inc()
m.available.Remove(el)
m.metrics.NumAvailable.Dec()
m.move(el, it)
}
}
m.closed = true
}

// NextPending returns the next segment to be flushed. It returns nil if the
// pending list is empty, and ErrClosed if the WAL is closed.
// NextPending returns the next segment to be flushed from the pending list.
// It returns nil if there are no segments waiting to be flushed. If the WAL
// is closed it returns all remaining segments from the pending list and then
// ErrClosed.
func (m *Manager) NextPending() (*PendingItem, error) {
m.mu.Lock()
defer m.mu.Unlock()
if m.pending.Len() == 0 {
if m.available.Len() > 0 {
// Check if the current segment has exceeded its maximum age and
// should be moved to the pending list.
el := m.available.Front()
it := el.Value.(*item)
if !it.firstAppendedAt.IsZero() && m.clock.Since(it.firstAppendedAt) >= m.cfg.MaxAge {
m.pending.PushBack(it)
m.metrics.NumPending.Inc()
m.available.Remove(el)
m.metrics.NumAvailable.Dec()
}
}
// If the pending list is still empty return nil.
if m.pending.Len() == 0 {
if m.closed {
return nil, ErrClosed
}
return nil, nil
if m.pending.Len() == 0 && !m.moveFrontIfExpired() {
if m.closed {
return nil, ErrClosed
}
return nil, nil
}
el := m.pending.Front()
it := el.Value.(*item)
Expand All @@ -248,13 +223,37 @@ func (m *Manager) NextPending() (*PendingItem, error) {
// Put resets the segment and puts it back in the available list to accept
// writes. A PendingItem should not be put back until it has been flushed.
func (m *Manager) Put(it *PendingItem) {
it.Writer.Reset()
m.mu.Lock()
defer m.mu.Unlock()
it.Writer.Reset()
m.metrics.NumFlushing.Dec()
m.metrics.NumAvailable.Inc()
m.available.PushBack(&item{
r: &AppendResult{done: make(chan struct{})},
w: it.Writer,
})
}

// move the element from the available list to the pending list and sets the
// relevant metrics.
func (m *Manager) move(el *list.Element, it *item) {
m.pending.PushBack(it)
m.metrics.NumPending.Inc()
m.available.Remove(el)
m.metrics.NumAvailable.Dec()
m.firstAppend = time.Time{}
}

// moveFrontIfExpired moves the element from the front of the available list to
// the pending list if the segment has exceeded its maximum age and sets the
// relevant metrics.
func (m *Manager) moveFrontIfExpired() bool {
if el := m.available.Front(); el != nil {
it := el.Value.(*item)
if !m.firstAppend.IsZero() && m.clock.Since(m.firstAppend) >= m.cfg.MaxAge {
m.move(el, it)
return true
}
}
return false
}

0 comments on commit 8ef86f8

Please sign in to comment.