Skip to content

Commit

Permalink
Merge branch 'main' into fix-udp-syslogtarget
Browse files Browse the repository at this point in the history
  • Loading branch information
rfratto committed Nov 2, 2023
2 parents 5351b81 + 3683a95 commit a87eb92
Show file tree
Hide file tree
Showing 32 changed files with 1,537 additions and 200 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ Main (unreleased)

- Include Faro Measurement `type` field in `faro.receiver` Flow component and legacy `app_agent_receiver` integration. (@rlankfo)

- Mark `password` argument of `loki.source.kafka` as a `secret` rather than a `string`. (@harsiddhdave44)

- Fixed a bug where UDP syslog messages were never processed (@joshuapare)

### Enhancements
Expand Down Expand Up @@ -122,6 +124,9 @@ Main (unreleased)

- Support clustering in `loki.source.podlogs` (@rfratto).

- Adds new metrics (`mssql_server_total_memory_bytes`, `mssql_server_target_memory_bytes`,
and `mssql_available_commit_memory_bytes`) for `mssql` integration.

v0.37.3 (2023-10-26)
-----------------

Expand Down
4 changes: 2 additions & 2 deletions cmd/grafana-agent/example-config.river
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ otelcol.exporter.otlp "tempo" {
}
}

prometheus.exporter.unix { /* use defaults */ }
prometheus.exporter.unix "default" { /* use defaults */ }

prometheus.scrape "default" {
targets = prometheus.exporter.unix.targets
targets = prometheus.exporter.unix.default.targets
forward_to = [prometheus.remote_write.default.receiver]
}

Expand Down
42 changes: 36 additions & 6 deletions component/common/loki/client/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ const (
errMaxStreamsLimitExceeded = "streams limit exceeded, streams: %d exceeds limit: %d, stream: '%s'"
)

// SentDataMarkerHandler is a slice of the MarkerHandler interface, that the batch interacts with to report the event that
// all data in the batch has been delivered or a client failed to do so.
type SentDataMarkerHandler interface {
UpdateSentData(segmentId, dataCount int)
}

// batch holds pending log streams waiting to be sent to Loki, and it's used
// to reduce the number of push requests to Loki aggregating multiple log streams
// and entries in a single batch request. In case of multi-tenant Promtail, log
Expand All @@ -30,14 +36,18 @@ type batch struct {
createdAt time.Time

maxStreams int

// segmentCounter tracks the amount of entries for each segment present in this batch.
segmentCounter map[int]int
}

func newBatch(maxStreams int, entries ...loki.Entry) *batch {
b := &batch{
streams: map[string]*logproto.Stream{},
totalBytes: 0,
createdAt: time.Now(),
maxStreams: maxStreams,
streams: map[string]*logproto.Stream{},
totalBytes: 0,
createdAt: time.Now(),
maxStreams: maxStreams,
segmentCounter: map[int]int{},
}

// Add entries to the batch
Expand Down Expand Up @@ -72,14 +82,16 @@ func (b *batch) add(entry loki.Entry) error {
return nil
}

// add an entry to the batch
func (b *batch) addFromWAL(lbs model.LabelSet, entry logproto.Entry) error {
// addFromWAL adds an entry to the batch, tracking that the data being added comes from segment segmentNum read from the
// WAL.
func (b *batch) addFromWAL(lbs model.LabelSet, entry logproto.Entry, segmentNum int) error {
b.totalBytes += len(entry.Line)

// Append the entry to an already existing stream (if any)
labels := labelsMapToString(lbs, ReservedLabelTenantID)
if stream, ok := b.streams[labels]; ok {
stream.Entries = append(stream.Entries, entry)
b.countForSegment(segmentNum)
return nil
}

Expand All @@ -93,6 +105,7 @@ func (b *batch) addFromWAL(lbs model.LabelSet, entry logproto.Entry) error {
Labels: labels,
Entries: []logproto.Entry{entry},
}
b.countForSegment(segmentNum)

return nil
}
Expand Down Expand Up @@ -171,3 +184,20 @@ func (b *batch) createPushRequest() (*logproto.PushRequest, int) {
}
return &req, entriesCount
}

// countForSegment tracks that one data item has been read from a certain WAL segment.
func (b *batch) countForSegment(segmentNum int) {
if curr, ok := b.segmentCounter[segmentNum]; ok {
b.segmentCounter[segmentNum] = curr + 1
return
}
b.segmentCounter[segmentNum] = 1
}

// reportAsSentData will report for all segments whose data is part of this batch, the amount of that data as sent to
// the provided SentDataMarkerHandler
func (b *batch) reportAsSentData(h SentDataMarkerHandler) {
for seg, data := range b.segmentCounter {
h.UpdateSentData(seg, data)
}
}
55 changes: 55 additions & 0 deletions component/common/loki/client/internal/marker_encoding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package internal

import (
"encoding/binary"
"fmt"
"hash/crc32"
)

var (
markerHeaderV1 = []byte{'0', '1'}
)

// EncodeMarkerV1 encodes the segment number, from whom we need to create a marker, in the marker file format,
// which in v1 includes the segment number and a trailing CRC code of the first 10 bytes.
func EncodeMarkerV1(segment uint64) ([]byte, error) {
// marker format v1
// marker [ 0 , 1 ] - HEADER, which is used to track version
// marker [ 2 , 9 ] - encoded unit 64 which is the content of the marker, the last "consumed" segment
// marker [ 10, 13 ] - CRC32 of the first 10 bytes of the marker, using IEEE polynomial
bs := make([]byte, 14)
// write header with marker format version
bs[0] = markerHeaderV1[0]
bs[1] = markerHeaderV1[1]
// write actual marked segment number
binary.BigEndian.PutUint64(bs[2:10], segment)
// checksum is the IEEE CRC32 checksum of the first 10 bytes of the marker record
checksum := crc32.ChecksumIEEE(bs[0:10])
binary.BigEndian.PutUint32(bs[10:], checksum)

return bs, nil
}

// DecodeMarkerV1 decodes the segment number from a segment marker, encoded with EncodeMarkerV1.
func DecodeMarkerV1(bs []byte) (uint64, error) {
// first check that read byte stream has expected length
if len(bs) != 14 {
return 0, fmt.Errorf("bad length %d", len(bs))
}

// check CRC first
expectedCrc := crc32.ChecksumIEEE(bs[0:10])
gotCrc := binary.BigEndian.Uint32(bs[len(bs)-4:])
if expectedCrc != gotCrc {
return 0, fmt.Errorf("corrupted WAL marker")
}

// check expected version header
header := bs[:2]
if !(header[0] == markerHeaderV1[0] && header[1] == markerHeaderV1[1]) {
return 0, fmt.Errorf("wrong WAL marker header")
}

// lastly, decode marked segment number
return binary.BigEndian.Uint64(bs[2:10]), nil
}
50 changes: 50 additions & 0 deletions component/common/loki/client/internal/marker_encoding_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package internal

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestMarkerEncodingV1(t *testing.T) {
t.Run("encode and decode", func(t *testing.T) {
segment := uint64(123)
bs, err := EncodeMarkerV1(segment)
require.NoError(t, err)

gotSegment, err := DecodeMarkerV1(bs)
require.NoError(t, err)
require.Equal(t, segment, gotSegment)
})

t.Run("decoding errors", func(t *testing.T) {
t.Run("bad checksum", func(t *testing.T) {
segment := uint64(123)
bs, err := EncodeMarkerV1(segment)
require.NoError(t, err)

// change last byte
bs[13] = '5'

_, err = DecodeMarkerV1(bs)
require.Error(t, err)
})

t.Run("bad length", func(t *testing.T) {
_, err := DecodeMarkerV1(make([]byte, 15))
require.Error(t, err)
})

t.Run("bad header", func(t *testing.T) {
segment := uint64(123)
bs, err := EncodeMarkerV1(segment)
require.NoError(t, err)

// change first header byte
bs[0] = '5'

_, err = DecodeMarkerV1(bs)
require.Error(t, err)
})
})
}
101 changes: 101 additions & 0 deletions component/common/loki/client/internal/marker_file_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package internal

import (
"bytes"
"fmt"
"os"
"path/filepath"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/agent/component/common/loki/wal"
"github.com/natefinch/atomic"
)

const (
MarkerFolderName = "remote"
MarkerFileName = "segment_marker"

MarkerFolderMode os.FileMode = 0o700
MarkerFileMode os.FileMode = 0o600
)

// MarkerFileHandler is a file-backed wal.Marker, that also allows one to write to the backing store as particular
// segment number as the last one marked.
type MarkerFileHandler interface {
wal.Marker

// MarkSegment writes in the backing file-store that a particular segment is the last one marked.
MarkSegment(segment int)
}

type markerFileHandler struct {
logger log.Logger
lastMarkedSegmentDir string
lastMarkedSegmentFilePath string
}

var (
_ MarkerFileHandler = (*markerFileHandler)(nil)
)

// NewMarkerFileHandler creates a new markerFileHandler.
func NewMarkerFileHandler(logger log.Logger, walDir string) (MarkerFileHandler, error) {
markerDir := filepath.Join(walDir, MarkerFolderName)
// attempt to create dir if doesn't exist
if err := os.MkdirAll(markerDir, MarkerFolderMode); err != nil {
return nil, fmt.Errorf("error creating segment marker folder %q: %w", markerDir, err)
}

mfh := &markerFileHandler{
logger: logger,
lastMarkedSegmentDir: filepath.Join(markerDir),
lastMarkedSegmentFilePath: filepath.Join(markerDir, MarkerFileName),
}

return mfh, nil
}

// LastMarkedSegment implements wlog.Marker.
func (mfh *markerFileHandler) LastMarkedSegment() int {
bs, err := os.ReadFile(mfh.lastMarkedSegmentFilePath)
if os.IsNotExist(err) {
level.Warn(mfh.logger).Log("msg", "marker segment file does not exist", "file", mfh.lastMarkedSegmentFilePath)
return -1
} else if err != nil {
level.Error(mfh.logger).Log("msg", "could not access segment marker file", "file", mfh.lastMarkedSegmentFilePath, "err", err)
return -1
}

savedSegment, err := DecodeMarkerV1(bs)
if err != nil {
level.Error(mfh.logger).Log("msg", "could not decode segment marker file", "file", mfh.lastMarkedSegmentFilePath, "err", err)
return -1
}

return int(savedSegment)
}

// MarkSegment implements MarkerHandler.
func (mfh *markerFileHandler) MarkSegment(segment int) {
encodedMarker, err := EncodeMarkerV1(uint64(segment))
if err != nil {
level.Error(mfh.logger).Log("msg", "failed to encode marker when marking segment", "err", err)
return
}

if err := mfh.atomicallyWriteMarker(encodedMarker); err != nil {
level.Error(mfh.logger).Log("msg", "could not replace segment marker file", "file", mfh.lastMarkedSegmentFilePath, "err", err)
return
}

level.Debug(mfh.logger).Log("msg", "updated segment marker file", "file", mfh.lastMarkedSegmentFilePath, "segment", segment)
}

// atomicallyWriteMarker attempts to perform an atomic write of the marker contents. This is delegated to
// https://github.com/natefinch/atomic/blob/master/atomic.go, that first handles atomic file renaming for UNIX and
// Windows systems. Also, atomic.WriteFile will first write the contents to a temporal file, and then perform the atomic
// rename, swapping the marker, or not at all.
func (mfh *markerFileHandler) atomicallyWriteMarker(bs []byte) error {
return atomic.WriteFile(mfh.lastMarkedSegmentFilePath, bytes.NewReader(bs))
}
66 changes: 66 additions & 0 deletions component/common/loki/client/internal/marker_file_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package internal

import (
"os"
"path/filepath"
"testing"

"github.com/go-kit/log"
"github.com/stretchr/testify/require"
)

func TestMarkerFileHandler(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stdout)
getTempDir := func(t *testing.T) string {
dir := t.TempDir()
return dir
}

t.Run("invalid last marked segment when there's no marker file", func(t *testing.T) {
dir := getTempDir(t)
fh, err := NewMarkerFileHandler(logger, dir)
require.NoError(t, err)

require.Equal(t, -1, fh.LastMarkedSegment())
})

t.Run("reads the last segment from existing marker file", func(t *testing.T) {
dir := getTempDir(t)
fh, err := NewMarkerFileHandler(logger, dir)
require.NoError(t, err)

// write first something to marker
markerFile := filepath.Join(dir, MarkerFolderName, MarkerFileName)
bs, err := EncodeMarkerV1(10)
require.NoError(t, err)
err = os.WriteFile(markerFile, bs, MarkerFileMode)
require.NoError(t, err)

require.Equal(t, 10, fh.LastMarkedSegment())
})

t.Run("marks segment, and then reads value from it", func(t *testing.T) {
dir := getTempDir(t)
fh, err := NewMarkerFileHandler(logger, dir)
require.NoError(t, err)

fh.MarkSegment(12)
require.Equal(t, 12, fh.LastMarkedSegment())
})

t.Run("marker file and directory is created with correct permissions", func(t *testing.T) {
dir := getTempDir(t)
fh, err := NewMarkerFileHandler(logger, dir)
require.NoError(t, err)

fh.MarkSegment(12)
// check folder first
stats, err := os.Stat(filepath.Join(dir, MarkerFolderName))
require.NoError(t, err)
require.Equal(t, MarkerFolderMode, stats.Mode().Perm())
// then file
stats, err = os.Stat(filepath.Join(dir, MarkerFolderName, MarkerFileName))
require.NoError(t, err)
require.Equal(t, MarkerFileMode, stats.Mode().Perm())
})
}
Loading

0 comments on commit a87eb92

Please sign in to comment.