diff --git a/pkg/storage/wal/manager.go b/pkg/storage/wal/manager.go index 196d02296b5f..d5d8fff893f0 100644 --- a/pkg/storage/wal/manager.go +++ b/pkg/storage/wal/manager.go @@ -13,25 +13,19 @@ import ( ) const ( - // DefaultMaxAge is the default value for the maximum amount of time a - // segment can can be buffered in memory before it should be flushed. - DefaultMaxAge = 500 * time.Millisecond - // DefaultMaxSegments is the default value for the maximum number of - // segments that can be buffered in memory, including segments waiting to - // be flushed. - DefaultMaxSegments = 10 - // DefaultMaxSegmentSize is the default value for the maximum segment size - // (uncompressed). + DefaultMaxAge = 500 * time.Millisecond + DefaultMaxSegments = 10 DefaultMaxSegmentSize = 8 * 1024 * 1024 // 8MB. ) var ( - // ErrClosed is returned when the WAL is closed. + // ErrClosed is returned when the WAL is closed. It is a permanent error + // as once closed, a WAL cannot be re-opened. ErrClosed = errors.New("WAL is closed") - // ErrFull is returned when an append fails because the WAL is full. This + // ErrFull is returned when the WAL is full. It is a transient error that // happens when all segments are either in the pending list waiting to be - // flushed, or in the process of being flushed. + // flushed or in the process of being flushed. ErrFull = errors.New("WAL is full") ) @@ -42,19 +36,20 @@ type AppendRequest struct { Entries []*logproto.Entry } +// AppendResult contains the result of an AppendRequest. type AppendResult struct { done chan struct{} err error } -// Done returns a channel that is closed when the result of an append is -// available. Err() should be called to check if the operation was successful. +// Done returns a channel that is closed when the result for the AppendRequest +// is available. Use Err() to check if the request succeeded or failed. func (p *AppendResult) Done() <-chan struct{} { return p.done } -// Err returns a non-nil error if the operation failed, and nil if it was -// successful. It should not be called until Done() is closed to avoid data +// Err returns a non-nil error if the append request failed, and nil if it +// succeeded. It should not be called until Done() is closed to avoid data // races. func (p *AppendResult) Err() error { return p.err @@ -69,55 +64,58 @@ func (p *AppendResult) SetDone(err error) { type Config struct { // MaxAge is the maximum amount of time a segment can be buffered in memory // before it is moved to the pending list to be flushed. Increasing MaxAge - // allows more time for a segment to grow to MaxSegmentSize, but may increase - // latency if the write volume is too small. + // allows more time for a segment to grow to MaxSegmentSize, but may + // increase latency if appends cannot fill segments quickly enough. MaxAge time.Duration // MaxSegments is the maximum number of segments that can be buffered in - // memory. Increasing MaxSegments allows for large bursts of writes to be - // buffered in memory, but may increase latency if the write volume exceeds - // the rate at which segments can be flushed. + // memory. Increasing MaxSegments allows more data to be buffered, but may + // increase latency if the incoming volume of data exceeds the rate at + // which segments can be flushed. MaxSegments int64 - // MaxSegmentSize is the maximum size (uncompressed) of a segment. It is - // not a strict limit, and segments can exceed the maximum size when + // MaxSegmentSize is the maximum size of an uncompressed segment in bytes. + // It is not a strict limit, and segments can exceed the maximum size when // individual appends are larger than the remaining capacity. MaxSegmentSize int64 } -// Manager buffers segments in memory, and keeps track of which segments are -// available and which are waiting to be flushed. The maximum number of -// segments that can be buffered in memory, and their maximum age and maximum -// size before being flushed are configured when creating the Manager. +// Manager is a pool of in-memory segments. It keeps track of which segments +// are accepting writes and which are waiting to be flushed using two doubly +// linked lists called the available and pending lists. // -// By buffering segments in memory, the WAL can tolerate bursts of append -// requests that arrive faster than can be flushed. The amount of data that can -// be buffered is configured using MaxSegments and MaxSegmentSize. You must use -// caution when configuring these to avoid excessive latency. +// By buffering segments in memory, the WAL can tolerate bursts of writes that +// arrive faster than can be flushed. The amount of data that can be buffered +// is configured using MaxSegments and MaxSegmentSize. You must use caution +// when configuring these to avoid excessive latency. // -// The WAL is full when all segments are waiting to be flushed or in the process -// of being flushed. When the WAL is full, subsequent appends fail with ErrFull. -// It is not permitted to append more data until another segment has been flushed -// and returned to the available list. This allows the manager to apply back-pressure -// and avoid congestion collapse due to excessive timeouts and retries. +// The WAL is full when all segments are waiting to be flushed or in the +// process of being flushed. When the WAL is full, subsequent appends fail with +// the transient error ErrFull, and will not succeed until one or more other +// segments have been flushed and returned to the available list. Callers +// should back off and retry at a later time. +// +// On shutdown, the WAL must be closed to avoid losing data. This prevents +// additional appends to the WAL and allows all remaining segments to be +// flushed. type Manager struct { - cfg Config - metrics *ManagerMetrics - - // available is a list of segments that are available and accepting data. - // All segments other than the segment at the front of the list are empty, - // and only the segment at the front of the list is written to. When this - // segment has exceeded its maximum age or maximum size it is moved to the - // pending list to be flushed, and the next segment in the available list - // takes its place. + cfg Config + metrics *ManagerMetrics available *list.List // pending is a list of segments that are waiting to be flushed. Once // flushed, the segment is reset and moved to the back of the available // list to accept writes again. pending *list.List - closed bool - mu sync.Mutex + + // firstAppend is the time of the first append to the segment at the + // front of the available list. It is used to know when the segment has + // exceeded the maximum age and should be moved to the pending list. + // It is reset each time this happens. + firstAppend time.Time + + closed bool + mu sync.Mutex // Used in tests. clock quartz.Clock @@ -125,15 +123,11 @@ type Manager struct { // item is similar to PendingItem, but it is an internal struct used in the // available and pending lists. It contains a single-use result that is returned -// to callers of Manager.Append() and a re-usable segment that is reset after +// to callers appending to the WAL and a re-usable segment that is reset after // each flush. type item struct { r *AppendResult w *SegmentWriter - // firstAppendedAt is the time of the first append to the segment, and is - // used to know when the segment has exceeded the maximum age and should - // be moved to the pending list. It is reset after each flush. - firstAppendedAt time.Time } // PendingItem contains a result and the segment to be flushed. @@ -150,6 +144,7 @@ func NewManager(cfg Config, metrics *Metrics) (*Manager, error) { pending: list.New(), clock: quartz.NewReal(), } + m.metrics.NumAvailable.Set(0) m.metrics.NumPending.Set(0) m.metrics.NumFlushing.Set(0) for i := int64(0); i < cfg.MaxSegments; i++ { @@ -172,25 +167,22 @@ func (m *Manager) Append(r AppendRequest) (*AppendResult, error) { if m.closed { return nil, ErrClosed } - if m.available.Len() == 0 { + el := m.available.Front() + if el == nil { return nil, ErrFull } - el := m.available.Front() it := el.Value.(*item) - if it.firstAppendedAt.IsZero() { + if m.firstAppend.IsZero() { // This is the first append to the segment. This time will be used in // know when the segment has exceeded its maximum age and should be // moved to the pending list. - it.firstAppendedAt = m.clock.Now() + m.firstAppend = m.clock.Now() } it.w.Append(r.TenantID, r.LabelsStr, r.Labels, r.Entries) // If the segment exceeded the maximum age or the maximum size, move it to // the closed list to be flushed. - if m.clock.Since(it.firstAppendedAt) >= m.cfg.MaxAge || it.w.InputSize() >= m.cfg.MaxSegmentSize { - m.pending.PushBack(it) - m.metrics.NumPending.Inc() - m.available.Remove(el) - m.metrics.NumAvailable.Dec() + if m.clock.Since(m.firstAppend) >= m.cfg.MaxAge || it.w.InputSize() >= m.cfg.MaxSegmentSize { + m.move(el, it) } return it.r, nil } @@ -198,44 +190,27 @@ func (m *Manager) Append(r AppendRequest) (*AppendResult, error) { func (m *Manager) Close() { m.mu.Lock() defer m.mu.Unlock() - m.closed = true - if m.available.Len() > 0 { - el := m.available.Front() + if el := m.available.Front(); el != nil { it := el.Value.(*item) if it.w.InputSize() > 0 { - m.pending.PushBack(it) - m.metrics.NumPending.Inc() - m.available.Remove(el) - m.metrics.NumAvailable.Dec() + m.move(el, it) } } + m.closed = true } -// NextPending returns the next segment to be flushed. It returns nil if the -// pending list is empty, and ErrClosed if the WAL is closed. +// NextPending returns the next segment to be flushed from the pending list. +// It returns nil if there are no segments waiting to be flushed. If the WAL +// is closed it returns all remaining segments from the pending list and then +// ErrClosed. func (m *Manager) NextPending() (*PendingItem, error) { m.mu.Lock() defer m.mu.Unlock() - if m.pending.Len() == 0 { - if m.available.Len() > 0 { - // Check if the current segment has exceeded its maximum age and - // should be moved to the pending list. - el := m.available.Front() - it := el.Value.(*item) - if !it.firstAppendedAt.IsZero() && m.clock.Since(it.firstAppendedAt) >= m.cfg.MaxAge { - m.pending.PushBack(it) - m.metrics.NumPending.Inc() - m.available.Remove(el) - m.metrics.NumAvailable.Dec() - } - } - // If the pending list is still empty return nil. - if m.pending.Len() == 0 { - if m.closed { - return nil, ErrClosed - } - return nil, nil + if m.pending.Len() == 0 && !m.moveFrontIfExpired() { + if m.closed { + return nil, ErrClosed } + return nil, nil } el := m.pending.Front() it := el.Value.(*item) @@ -248,9 +223,9 @@ func (m *Manager) NextPending() (*PendingItem, error) { // Put resets the segment and puts it back in the available list to accept // writes. A PendingItem should not be put back until it has been flushed. func (m *Manager) Put(it *PendingItem) { + it.Writer.Reset() m.mu.Lock() defer m.mu.Unlock() - it.Writer.Reset() m.metrics.NumFlushing.Dec() m.metrics.NumAvailable.Inc() m.available.PushBack(&item{ @@ -258,3 +233,27 @@ func (m *Manager) Put(it *PendingItem) { w: it.Writer, }) } + +// move the element from the available list to the pending list and sets the +// relevant metrics. +func (m *Manager) move(el *list.Element, it *item) { + m.pending.PushBack(it) + m.metrics.NumPending.Inc() + m.available.Remove(el) + m.metrics.NumAvailable.Dec() + m.firstAppend = time.Time{} +} + +// moveFrontIfExpired moves the element from the front of the available list to +// the pending list if the segment has exceeded its maximum age and sets the +// relevant metrics. +func (m *Manager) moveFrontIfExpired() bool { + if el := m.available.Front(); el != nil { + it := el.Value.(*item) + if !m.firstAppend.IsZero() && m.clock.Since(m.firstAppend) >= m.cfg.MaxAge { + m.move(el, it) + return true + } + } + return false +}