diff --git a/CHANGELOG.md b/CHANGELOG.md index 3cf38a42107e..6406022292bb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -94,6 +94,8 @@ Main (unreleased) - Include Faro Measurement `type` field in `faro.receiver` Flow component and legacy `app_agent_receiver` integration. (@rlankfo) +- Mark `password` argument of `loki.source.kafka` as a `secret` rather than a `string`. (@harsiddhdave44) + - Fixed a bug where UDP syslog messages were never processed (@joshuapare) ### Enhancements @@ -122,6 +124,9 @@ Main (unreleased) - Support clustering in `loki.source.podlogs` (@rfratto). +- Adds new metrics (`mssql_server_total_memory_bytes`, `mssql_server_target_memory_bytes`, + and `mssql_available_commit_memory_bytes`) for `mssql` integration. + v0.37.3 (2023-10-26) ----------------- diff --git a/cmd/grafana-agent/example-config.river b/cmd/grafana-agent/example-config.river index 4a552f9b9457..9eb477b790a5 100644 --- a/cmd/grafana-agent/example-config.river +++ b/cmd/grafana-agent/example-config.river @@ -23,10 +23,10 @@ otelcol.exporter.otlp "tempo" { } } -prometheus.exporter.unix { /* use defaults */ } +prometheus.exporter.unix "default" { /* use defaults */ } prometheus.scrape "default" { - targets = prometheus.exporter.unix.targets + targets = prometheus.exporter.unix.default.targets forward_to = [prometheus.remote_write.default.receiver] } diff --git a/component/common/loki/client/batch.go b/component/common/loki/client/batch.go index 8417ae019f53..66a2ac7859de 100644 --- a/component/common/loki/client/batch.go +++ b/component/common/loki/client/batch.go @@ -19,6 +19,12 @@ const ( errMaxStreamsLimitExceeded = "streams limit exceeded, streams: %d exceeds limit: %d, stream: '%s'" ) +// SentDataMarkerHandler is a slice of the MarkerHandler interface, that the batch interacts with to report the event that +// all data in the batch has been delivered or a client failed to do so. +type SentDataMarkerHandler interface { + UpdateSentData(segmentId, dataCount int) +} + // batch holds pending log streams waiting to be sent to Loki, and it's used // to reduce the number of push requests to Loki aggregating multiple log streams // and entries in a single batch request. In case of multi-tenant Promtail, log @@ -30,14 +36,18 @@ type batch struct { createdAt time.Time maxStreams int + + // segmentCounter tracks the amount of entries for each segment present in this batch. + segmentCounter map[int]int } func newBatch(maxStreams int, entries ...loki.Entry) *batch { b := &batch{ - streams: map[string]*logproto.Stream{}, - totalBytes: 0, - createdAt: time.Now(), - maxStreams: maxStreams, + streams: map[string]*logproto.Stream{}, + totalBytes: 0, + createdAt: time.Now(), + maxStreams: maxStreams, + segmentCounter: map[int]int{}, } // Add entries to the batch @@ -72,14 +82,16 @@ func (b *batch) add(entry loki.Entry) error { return nil } -// add an entry to the batch -func (b *batch) addFromWAL(lbs model.LabelSet, entry logproto.Entry) error { +// addFromWAL adds an entry to the batch, tracking that the data being added comes from segment segmentNum read from the +// WAL. +func (b *batch) addFromWAL(lbs model.LabelSet, entry logproto.Entry, segmentNum int) error { b.totalBytes += len(entry.Line) // Append the entry to an already existing stream (if any) labels := labelsMapToString(lbs, ReservedLabelTenantID) if stream, ok := b.streams[labels]; ok { stream.Entries = append(stream.Entries, entry) + b.countForSegment(segmentNum) return nil } @@ -93,6 +105,7 @@ func (b *batch) addFromWAL(lbs model.LabelSet, entry logproto.Entry) error { Labels: labels, Entries: []logproto.Entry{entry}, } + b.countForSegment(segmentNum) return nil } @@ -171,3 +184,20 @@ func (b *batch) createPushRequest() (*logproto.PushRequest, int) { } return &req, entriesCount } + +// countForSegment tracks that one data item has been read from a certain WAL segment. +func (b *batch) countForSegment(segmentNum int) { + if curr, ok := b.segmentCounter[segmentNum]; ok { + b.segmentCounter[segmentNum] = curr + 1 + return + } + b.segmentCounter[segmentNum] = 1 +} + +// reportAsSentData will report for all segments whose data is part of this batch, the amount of that data as sent to +// the provided SentDataMarkerHandler +func (b *batch) reportAsSentData(h SentDataMarkerHandler) { + for seg, data := range b.segmentCounter { + h.UpdateSentData(seg, data) + } +} diff --git a/component/common/loki/client/internal/marker_encoding.go b/component/common/loki/client/internal/marker_encoding.go new file mode 100644 index 000000000000..48415f421c7f --- /dev/null +++ b/component/common/loki/client/internal/marker_encoding.go @@ -0,0 +1,55 @@ +package internal + +import ( + "encoding/binary" + "fmt" + "hash/crc32" +) + +var ( + markerHeaderV1 = []byte{'0', '1'} +) + +// EncodeMarkerV1 encodes the segment number, from whom we need to create a marker, in the marker file format, +// which in v1 includes the segment number and a trailing CRC code of the first 10 bytes. +func EncodeMarkerV1(segment uint64) ([]byte, error) { + // marker format v1 + // marker [ 0 , 1 ] - HEADER, which is used to track version + // marker [ 2 , 9 ] - encoded unit 64 which is the content of the marker, the last "consumed" segment + // marker [ 10, 13 ] - CRC32 of the first 10 bytes of the marker, using IEEE polynomial + bs := make([]byte, 14) + // write header with marker format version + bs[0] = markerHeaderV1[0] + bs[1] = markerHeaderV1[1] + // write actual marked segment number + binary.BigEndian.PutUint64(bs[2:10], segment) + // checksum is the IEEE CRC32 checksum of the first 10 bytes of the marker record + checksum := crc32.ChecksumIEEE(bs[0:10]) + binary.BigEndian.PutUint32(bs[10:], checksum) + + return bs, nil +} + +// DecodeMarkerV1 decodes the segment number from a segment marker, encoded with EncodeMarkerV1. +func DecodeMarkerV1(bs []byte) (uint64, error) { + // first check that read byte stream has expected length + if len(bs) != 14 { + return 0, fmt.Errorf("bad length %d", len(bs)) + } + + // check CRC first + expectedCrc := crc32.ChecksumIEEE(bs[0:10]) + gotCrc := binary.BigEndian.Uint32(bs[len(bs)-4:]) + if expectedCrc != gotCrc { + return 0, fmt.Errorf("corrupted WAL marker") + } + + // check expected version header + header := bs[:2] + if !(header[0] == markerHeaderV1[0] && header[1] == markerHeaderV1[1]) { + return 0, fmt.Errorf("wrong WAL marker header") + } + + // lastly, decode marked segment number + return binary.BigEndian.Uint64(bs[2:10]), nil +} diff --git a/component/common/loki/client/internal/marker_encoding_test.go b/component/common/loki/client/internal/marker_encoding_test.go new file mode 100644 index 000000000000..8a5d419f8d01 --- /dev/null +++ b/component/common/loki/client/internal/marker_encoding_test.go @@ -0,0 +1,50 @@ +package internal + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestMarkerEncodingV1(t *testing.T) { + t.Run("encode and decode", func(t *testing.T) { + segment := uint64(123) + bs, err := EncodeMarkerV1(segment) + require.NoError(t, err) + + gotSegment, err := DecodeMarkerV1(bs) + require.NoError(t, err) + require.Equal(t, segment, gotSegment) + }) + + t.Run("decoding errors", func(t *testing.T) { + t.Run("bad checksum", func(t *testing.T) { + segment := uint64(123) + bs, err := EncodeMarkerV1(segment) + require.NoError(t, err) + + // change last byte + bs[13] = '5' + + _, err = DecodeMarkerV1(bs) + require.Error(t, err) + }) + + t.Run("bad length", func(t *testing.T) { + _, err := DecodeMarkerV1(make([]byte, 15)) + require.Error(t, err) + }) + + t.Run("bad header", func(t *testing.T) { + segment := uint64(123) + bs, err := EncodeMarkerV1(segment) + require.NoError(t, err) + + // change first header byte + bs[0] = '5' + + _, err = DecodeMarkerV1(bs) + require.Error(t, err) + }) + }) +} diff --git a/component/common/loki/client/internal/marker_file_handler.go b/component/common/loki/client/internal/marker_file_handler.go new file mode 100644 index 000000000000..21b4afcd12d6 --- /dev/null +++ b/component/common/loki/client/internal/marker_file_handler.go @@ -0,0 +1,101 @@ +package internal + +import ( + "bytes" + "fmt" + "os" + "path/filepath" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/agent/component/common/loki/wal" + "github.com/natefinch/atomic" +) + +const ( + MarkerFolderName = "remote" + MarkerFileName = "segment_marker" + + MarkerFolderMode os.FileMode = 0o700 + MarkerFileMode os.FileMode = 0o600 +) + +// MarkerFileHandler is a file-backed wal.Marker, that also allows one to write to the backing store as particular +// segment number as the last one marked. +type MarkerFileHandler interface { + wal.Marker + + // MarkSegment writes in the backing file-store that a particular segment is the last one marked. + MarkSegment(segment int) +} + +type markerFileHandler struct { + logger log.Logger + lastMarkedSegmentDir string + lastMarkedSegmentFilePath string +} + +var ( + _ MarkerFileHandler = (*markerFileHandler)(nil) +) + +// NewMarkerFileHandler creates a new markerFileHandler. +func NewMarkerFileHandler(logger log.Logger, walDir string) (MarkerFileHandler, error) { + markerDir := filepath.Join(walDir, MarkerFolderName) + // attempt to create dir if doesn't exist + if err := os.MkdirAll(markerDir, MarkerFolderMode); err != nil { + return nil, fmt.Errorf("error creating segment marker folder %q: %w", markerDir, err) + } + + mfh := &markerFileHandler{ + logger: logger, + lastMarkedSegmentDir: filepath.Join(markerDir), + lastMarkedSegmentFilePath: filepath.Join(markerDir, MarkerFileName), + } + + return mfh, nil +} + +// LastMarkedSegment implements wlog.Marker. +func (mfh *markerFileHandler) LastMarkedSegment() int { + bs, err := os.ReadFile(mfh.lastMarkedSegmentFilePath) + if os.IsNotExist(err) { + level.Warn(mfh.logger).Log("msg", "marker segment file does not exist", "file", mfh.lastMarkedSegmentFilePath) + return -1 + } else if err != nil { + level.Error(mfh.logger).Log("msg", "could not access segment marker file", "file", mfh.lastMarkedSegmentFilePath, "err", err) + return -1 + } + + savedSegment, err := DecodeMarkerV1(bs) + if err != nil { + level.Error(mfh.logger).Log("msg", "could not decode segment marker file", "file", mfh.lastMarkedSegmentFilePath, "err", err) + return -1 + } + + return int(savedSegment) +} + +// MarkSegment implements MarkerHandler. +func (mfh *markerFileHandler) MarkSegment(segment int) { + encodedMarker, err := EncodeMarkerV1(uint64(segment)) + if err != nil { + level.Error(mfh.logger).Log("msg", "failed to encode marker when marking segment", "err", err) + return + } + + if err := mfh.atomicallyWriteMarker(encodedMarker); err != nil { + level.Error(mfh.logger).Log("msg", "could not replace segment marker file", "file", mfh.lastMarkedSegmentFilePath, "err", err) + return + } + + level.Debug(mfh.logger).Log("msg", "updated segment marker file", "file", mfh.lastMarkedSegmentFilePath, "segment", segment) +} + +// atomicallyWriteMarker attempts to perform an atomic write of the marker contents. This is delegated to +// https://github.com/natefinch/atomic/blob/master/atomic.go, that first handles atomic file renaming for UNIX and +// Windows systems. Also, atomic.WriteFile will first write the contents to a temporal file, and then perform the atomic +// rename, swapping the marker, or not at all. +func (mfh *markerFileHandler) atomicallyWriteMarker(bs []byte) error { + return atomic.WriteFile(mfh.lastMarkedSegmentFilePath, bytes.NewReader(bs)) +} diff --git a/component/common/loki/client/internal/marker_file_handler_test.go b/component/common/loki/client/internal/marker_file_handler_test.go new file mode 100644 index 000000000000..0b74f82ef973 --- /dev/null +++ b/component/common/loki/client/internal/marker_file_handler_test.go @@ -0,0 +1,66 @@ +package internal + +import ( + "os" + "path/filepath" + "testing" + + "github.com/go-kit/log" + "github.com/stretchr/testify/require" +) + +func TestMarkerFileHandler(t *testing.T) { + logger := log.NewLogfmtLogger(os.Stdout) + getTempDir := func(t *testing.T) string { + dir := t.TempDir() + return dir + } + + t.Run("invalid last marked segment when there's no marker file", func(t *testing.T) { + dir := getTempDir(t) + fh, err := NewMarkerFileHandler(logger, dir) + require.NoError(t, err) + + require.Equal(t, -1, fh.LastMarkedSegment()) + }) + + t.Run("reads the last segment from existing marker file", func(t *testing.T) { + dir := getTempDir(t) + fh, err := NewMarkerFileHandler(logger, dir) + require.NoError(t, err) + + // write first something to marker + markerFile := filepath.Join(dir, MarkerFolderName, MarkerFileName) + bs, err := EncodeMarkerV1(10) + require.NoError(t, err) + err = os.WriteFile(markerFile, bs, MarkerFileMode) + require.NoError(t, err) + + require.Equal(t, 10, fh.LastMarkedSegment()) + }) + + t.Run("marks segment, and then reads value from it", func(t *testing.T) { + dir := getTempDir(t) + fh, err := NewMarkerFileHandler(logger, dir) + require.NoError(t, err) + + fh.MarkSegment(12) + require.Equal(t, 12, fh.LastMarkedSegment()) + }) + + t.Run("marker file and directory is created with correct permissions", func(t *testing.T) { + dir := getTempDir(t) + fh, err := NewMarkerFileHandler(logger, dir) + require.NoError(t, err) + + fh.MarkSegment(12) + // check folder first + stats, err := os.Stat(filepath.Join(dir, MarkerFolderName)) + require.NoError(t, err) + require.Equal(t, MarkerFolderMode, stats.Mode().Perm()) + // then file + stats, err = os.Stat(filepath.Join(dir, MarkerFolderName, MarkerFileName)) + require.NoError(t, err) + require.Equal(t, MarkerFileMode, stats.Mode().Perm()) + }) +} diff --git a/component/common/loki/client/internal/marker_handler.go b/component/common/loki/client/internal/marker_handler.go new file mode 100644 index 000000000000..0aaf5ff19364 --- /dev/null +++ b/component/common/loki/client/internal/marker_handler.go @@ -0,0 +1,214 @@ +package internal + +import ( + "fmt" + "sort" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/grafana/agent/component/common/loki/wal" + "github.com/grafana/agent/pkg/flow/logging/level" +) + +type MarkerHandler interface { + wal.Marker + + // UpdateReceivedData sends an update event to the handler, that informs that some dataUpdate, coming from a particular WAL + // segment, has been read out of the WAL and enqueued for sending. + UpdateReceivedData(segmentId, dataCount int) + + // UpdateSentData sends an update event to the handler, informing that some dataUpdate, coming from a particular WAL + // segment, has been delivered, or the sender has given up on it. + UpdateSentData(segmentId, dataCount int) // Data which was sent or given up on sending + + // Stop stops the handler, and it's async processing of receive/send dataUpdate updates. + Stop() +} + +// markerHandler implements MarkerHandler, processing data update events in an asynchronous manner, and tracking the last +// consumed segment in a file. +type markerHandler struct { + dataIOUpdate chan dataUpdate + lastMarkedSegment int + logger log.Logger + markerFileHandler MarkerFileHandler + maxSegmentAge time.Duration + metrics *MarkerMetrics + quit chan struct{} + runFindTicker *time.Ticker + wg sync.WaitGroup +} + +// dataUpdate is an update event that some amount of data has been read out of the WAL and enqueued, delivered or dropped. +type dataUpdate struct { + segmentId int + dataCount int +} + +var ( + _ MarkerHandler = (*markerHandler)(nil) +) + +// NewMarkerHandler creates a new markerHandler. +func NewMarkerHandler(mfh MarkerFileHandler, maxSegmentAge time.Duration, logger log.Logger, metrics *MarkerMetrics) MarkerHandler { + mh := &markerHandler{ + lastMarkedSegment: -1, // Segment ID last marked on disk. + markerFileHandler: mfh, + //TODO: What is a good size for the channel? + dataIOUpdate: make(chan dataUpdate, 100), + quit: make(chan struct{}), + logger: logger, + metrics: metrics, + + maxSegmentAge: maxSegmentAge, + // runFindTicker will force the execution of the find markable segment routine every second + runFindTicker: time.NewTicker(time.Second), + } + + // Load the last marked segment from disk (if it exists). + if lastSegment := mh.markerFileHandler.LastMarkedSegment(); lastSegment >= 0 { + mh.lastMarkedSegment = lastSegment + } + + mh.wg.Add(1) + go mh.runUpdatePendingData() + + return mh +} + +func (mh *markerHandler) LastMarkedSegment() int { + return mh.markerFileHandler.LastMarkedSegment() +} + +func (mh *markerHandler) UpdateReceivedData(segmentId, dataCount int) { + mh.dataIOUpdate <- dataUpdate{ + segmentId: segmentId, + dataCount: dataCount, + } +} + +func (mh *markerHandler) UpdateSentData(segmentId, dataCount int) { + mh.dataIOUpdate <- dataUpdate{ + segmentId: segmentId, + dataCount: -1 * dataCount, + } +} + +// countDataItem tracks inside a map the count of in-flight log entries, and the last update received, for a given segment. +type countDataItem struct { + count int + lastUpdate time.Time +} + +// processDataItem is a version of countDataItem, with the segment number the information corresponds to included. +type processDataItem struct { + segment int + count int + lastUpdate time.Time +} + +// runUpdatePendingData is assumed to run in a separate routine, asynchronously keeping track of how much data each WAL +// segment the Watcher reads from, has left to send. When a segment reaches zero, it means that is has been consumed, +// and a procedure is triggered to find the "last consumed segment", implemented by FindMarkableSegment. Since this +// last procedure could be expensive, it's execution is run at most if a segment has reached count zero, of when a timer +// is fired (once per second). +func (mh *markerHandler) runUpdatePendingData() { + defer mh.wg.Done() + + segmentDataCount := make(map[int]*countDataItem) + + for { + // shouldRunFind will be true if a markable segment should be found after the update, that is if one reached a count + // of zero, or a ticker fired + shouldRunFind := false + select { + case <-mh.quit: + return + case update := <-mh.dataIOUpdate: + if di, ok := segmentDataCount[update.segmentId]; ok { + di.lastUpdate = time.Now() + resultingCount := di.count + update.dataCount + di.count = resultingCount + // if a segment reached zero, run find routine because a segment might be ready to be marked + shouldRunFind = resultingCount == 0 + } else { + segmentDataCount[update.segmentId] = &countDataItem{ + count: update.dataCount, + lastUpdate: time.Now(), + } + } + } + + // if ticker fired, force run find + select { + case <-mh.runFindTicker.C: + shouldRunFind = true + default: + } + + if !shouldRunFind { + continue + } + + markableSegment := FindMarkableSegment(segmentDataCount, mh.maxSegmentAge) + level.Debug(mh.logger).Log("msg", fmt.Sprintf("found as markable segment %d", markableSegment)) + if markableSegment > mh.lastMarkedSegment { + mh.markerFileHandler.MarkSegment(markableSegment) + mh.lastMarkedSegment = markableSegment + mh.metrics.lastMarkedSegment.WithLabelValues().Set(float64(markableSegment)) + } + } +} + +func (mh *markerHandler) Stop() { + mh.runFindTicker.Stop() + mh.quit <- struct{}{} + mh.wg.Wait() +} + +// FindMarkableSegment finds, given the summary of data updates received, and a threshold on how much time can pass for +// a segment that hasn't received updates to be considered as "live", the segment that should be marked as last consumed. +// The algorithm will find the highest numbered segment that is considered as "consumed", with its all predecessors +// "consumed" as well. +// +// A consumed segment is one with data count of zero, meaning that there's no data left in flight for it, or it hasn't +// received any updates for tooOldThreshold time. +// +// Also, while reviewing the data items in segmentDataCount, those who are consumed will be deleted to clean up space. +// +// This algorithm runs in O(N log N), being N the size of segmentDataCount, and allocates O(N) memory. +func FindMarkableSegment(segmentDataCount map[int]*countDataItem, tooOldThreshold time.Duration) int { + // N = len(segmentDataCount) + // alloc slice, N + orderedSegmentCounts := make([]processDataItem, 0, len(segmentDataCount)) + + // convert map into slice, which already has expected capacity, N + for seg, item := range segmentDataCount { + orderedSegmentCounts = append(orderedSegmentCounts, processDataItem{ + segment: seg, + count: item.count, + lastUpdate: item.lastUpdate, + }) + } + + // sort orderedSegmentCounts, N log N + sort.Slice(orderedSegmentCounts, func(i, j int) bool { + return orderedSegmentCounts[i].segment < orderedSegmentCounts[j].segment + }) + + var lastZero = -1 + for _, item := range orderedSegmentCounts { + // we consider a segment as "consumed if it's data count is zero, or the lastUpdate is too old + if item.count == 0 || time.Since(item.lastUpdate) > tooOldThreshold { + lastZero = item.segment + // since the segment has been consumed, clear from map + delete(segmentDataCount, item.segment) + } else { + // if we find a "non consumed" segment, we exit + break + } + } + + return lastZero +} diff --git a/component/common/loki/client/internal/marker_handler_test.go b/component/common/loki/client/internal/marker_handler_test.go new file mode 100644 index 000000000000..71610610a5d4 --- /dev/null +++ b/component/common/loki/client/internal/marker_handler_test.go @@ -0,0 +1,187 @@ +package internal + +import ( + "os" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" +) + +type mockMarkerFileHandler struct { + lastMarkedSegment atomic.Int64 +} + +func newMockMarkerFileHandler(seg int) *mockMarkerFileHandler { + mh := &mockMarkerFileHandler{} + mh.MarkSegment(seg) + return mh +} + +func (m *mockMarkerFileHandler) LastMarkedSegment() int { + return int(m.lastMarkedSegment.Load()) +} + +func (m *mockMarkerFileHandler) MarkSegment(segment int) { + m.lastMarkedSegment.Store(int64(segment)) +} + +func TestMarkerHandler(t *testing.T) { + logger := log.NewLogfmtLogger(os.Stdout) + // drive-by test: if metrics don't have the id curried, it panics when emitting them + metrics := NewMarkerMetrics(nil).WithCurriedId("test") + t.Run("returns last marked segment from file handler on start", func(t *testing.T) { + mockMFH := newMockMarkerFileHandler(10) + mh := NewMarkerHandler(mockMFH, time.Minute, logger, metrics) + defer mh.Stop() + + require.Equal(t, 10, mh.LastMarkedSegment()) + }) + + t.Run("last marked segment is updated when sends complete", func(t *testing.T) { + mockMFH := newMockMarkerFileHandler(10) + mh := NewMarkerHandler(mockMFH, time.Minute, logger, metrics) + defer mh.Stop() + + mh.UpdateReceivedData(11, 10) + mh.UpdateSentData(11, 5) + mh.UpdateSentData(11, 5) + + require.Eventually(t, func() bool { + return mh.LastMarkedSegment() == 11 + }, time.Second, time.Millisecond*100, "expected last marked segment to catch up") + require.Equal(t, 11, mockMFH.LastMarkedSegment()) + }) + + t.Run("last marked segment is updated when segment becomes old", func(t *testing.T) { + mockMFH := newMockMarkerFileHandler(10) + mh := NewMarkerHandler(mockMFH, 2*time.Second, logger, metrics) + defer mh.Stop() + + // segment 11 has 5 pending data items, and will become old after 2 secs + mh.UpdateReceivedData(11, 10) + mh.UpdateSentData(11, 5) + + // wait until segment becomes old + time.Sleep(2*time.Second + time.Millisecond*100) + + // send dummy data item to trigger find + mh.UpdateReceivedData(12, 1) + + require.Eventually(t, func() bool { + return mh.LastMarkedSegment() == 11 + }, 3*time.Second, time.Millisecond*100, "expected last marked segment to catch up") + require.Equal(t, 11, mockMFH.LastMarkedSegment()) + }) +} + +func TestFindLastMarkableSegment(t *testing.T) { + t.Run("all segments with count zero, highest numbered should be marked", func(t *testing.T) { + now := time.Now() + data := map[int]*countDataItem{ + 1: { + count: 0, + lastUpdate: now, + }, + 2: { + count: 0, + lastUpdate: now, + }, + 3: { + count: 0, + lastUpdate: now, + }, + 4: { + count: 0, + lastUpdate: now, + }, + } + require.Equal(t, 4, FindMarkableSegment(data, time.Minute)) + }) + + t.Run("all segments with count zero, and one too old, highest numbered should be marked", func(t *testing.T) { + now := time.Now() + data := map[int]*countDataItem{ + 1: { + count: 0, + lastUpdate: now, + }, + 2: { + count: 0, + lastUpdate: now, + }, + 3: { + count: 10, + lastUpdate: now.Add(-2 * time.Minute), + }, + 4: { + count: 0, + lastUpdate: now, + }, + } + require.Equal(t, 4, FindMarkableSegment(data, time.Minute)) + // items that should have been cleanup up + require.Len(t, data, 0) + }) + t.Run("should find the zeroed segment before the last non-zero", func(t *testing.T) { + now := time.Now() + data := map[int]*countDataItem{ + 1: { + count: 0, + lastUpdate: now, + }, + 2: { + count: 0, + lastUpdate: now, + }, + 3: { + count: 10, + lastUpdate: now, + }, + 4: { + count: 0, + lastUpdate: now, + }, + } + require.Equal(t, 2, FindMarkableSegment(data, time.Minute)) + require.NotContains(t, data, 1) + require.NotContains(t, data, 2) + }) + t.Run("should return -1 when no segment is markable", func(t *testing.T) { + now := time.Now() + data := map[int]*countDataItem{ + 1: { + count: 11, + lastUpdate: now, + }, + 2: { + count: 5, + lastUpdate: now, + }, + 3: { + count: 10, + lastUpdate: now, + }, + 4: { + count: 2, + lastUpdate: now, + }, + } + lenBefore := len(data) + require.Equal(t, -1, FindMarkableSegment(data, time.Minute)) + require.Len(t, data, lenBefore, "none key should have been deleted") + }) + t.Run("should find only item with zero, and clean it up", func(t *testing.T) { + now := time.Now() + data := map[int]*countDataItem{ + 11: { + count: 0, + lastUpdate: now, + }, + } + require.Equal(t, 11, FindMarkableSegment(data, time.Minute)) + require.Len(t, data, 0) + }) +} diff --git a/component/common/loki/client/internal/metrics.go b/component/common/loki/client/internal/metrics.go new file mode 100644 index 000000000000..3abd7572eac2 --- /dev/null +++ b/component/common/loki/client/internal/metrics.go @@ -0,0 +1,35 @@ +package internal + +import "github.com/prometheus/client_golang/prometheus" + +type MarkerMetrics struct { + lastMarkedSegment *prometheus.GaugeVec +} + +func NewMarkerMetrics(reg prometheus.Registerer) *MarkerMetrics { + m := &MarkerMetrics{ + lastMarkedSegment: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "loki_write", + Subsystem: "wal_marker", + Name: "last_marked_segment", + Help: "Last marked WAL segment.", + }, + []string{"id"}, + ), + } + if reg != nil { + reg.MustRegister(m.lastMarkedSegment) + } + return m +} + +// WithCurriedId returns a curried version of MarkerMetrics, with the id label pre-filled. This is a helper that avoids +// having to move the id around where it's unnecessary, and won't change inside the consumer of the metrics. +func (m *MarkerMetrics) WithCurriedId(id string) *MarkerMetrics { + return &MarkerMetrics{ + lastMarkedSegment: m.lastMarkedSegment.MustCurryWith(map[string]string{ + "id": id, + }), + } +} diff --git a/component/common/loki/client/manager.go b/component/common/loki/client/manager.go index e91792a31228..290ddcc0f951 100644 --- a/component/common/loki/client/manager.go +++ b/component/common/loki/client/manager.go @@ -7,6 +7,7 @@ import ( "sync" "github.com/go-kit/log" + "github.com/grafana/agent/component/common/loki/client/internal" "github.com/grafana/agent/pkg/flow/logging/level" "github.com/prometheus/client_golang/prometheus" @@ -67,7 +68,8 @@ type Manager struct { func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg prometheus.Registerer, walCfg wal.Config, notifier WriterEventsNotifier, clientCfgs ...Config) (*Manager, error) { var fake struct{} - watcherMetrics := wal.NewWatcherMetrics(reg) + walWatcherMetrics := wal.NewWatcherMetrics(reg) + walMarkerMetrics := internal.NewMarkerMetrics(reg) if len(clientCfgs) == 0 { return nil, fmt.Errorf("at least one client config must be provided") @@ -90,7 +92,13 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr // add some context information for the logger the watcher uses wlog := log.With(logger, "client", clientName) - queue, err := NewQueue(metrics, cfg, limits.MaxStreams, limits.MaxLineSize.Val(), limits.MaxLineSizeTruncate, logger) + markerFileHandler, err := internal.NewMarkerFileHandler(logger, walCfg.Dir) + if err != nil { + return nil, err + } + markerHandler := internal.NewMarkerHandler(markerFileHandler, walCfg.MaxSegmentAge, logger, walMarkerMetrics.WithCurriedId(clientName)) + + queue, err := NewQueue(metrics, cfg, limits.MaxStreams, limits.MaxLineSize.Val(), limits.MaxLineSizeTruncate, logger, markerHandler) if err != nil { return nil, fmt.Errorf("error starting queue client: %w", err) } @@ -100,7 +108,7 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr // series cache whenever a segment is deleted. notifier.SubscribeCleanup(queue) - watcher := wal.NewWatcher(walCfg.Dir, clientName, watcherMetrics, queue, wlog, walCfg.WatchConfig) + watcher := wal.NewWatcher(walCfg.Dir, clientName, walWatcherMetrics, queue, wlog, walCfg.WatchConfig, markerHandler) // subscribe watcher to wal write events notifier.SubscribeWrite(watcher) diff --git a/component/common/loki/client/queue_client.go b/component/common/loki/client/queue_client.go index 3d869dda3ce6..458839db60dd 100644 --- a/component/common/loki/client/queue_client.go +++ b/component/common/loki/client/queue_client.go @@ -33,6 +33,14 @@ type StoppableWriteTo interface { StopNow() } +// MarkerHandler re-defines the interface of internal.MarkerHandler that the queue client interacts with, to contribute +// to the feedback loop of when data from a segment is read from the WAL, or delivered. +type MarkerHandler interface { + UpdateReceivedData(segmentId, dataCount int) // Data queued for sending + UpdateSentData(segmentId, dataCount int) // Data which was sent or given up on sending + Stoppable +} + // queuedBatch is a batch specific to a tenant, that is considered ready to be sent. type queuedBatch struct { TenantID string @@ -62,7 +70,8 @@ func newQueue(client *queueClient, size int, logger log.Logger) *queue { return &q } -// enqueue is a blocking operation to add to the send queue a batch ready to be sent. +// enqueue adds to the send queue a batch ready to be sent. Note that if the backing queue is has no +// remaining capacity to enqueue the batch, calling enqueue might block. func (q *queue) enqueue(qb queuedBatch) { q.q <- qb } @@ -87,8 +96,8 @@ func (q *queue) run() { return case qb := <-q.q: // Since inside the actual send operation a context with time out is used, we should exceed that timeout - // instead of cancelling this send operations, since that batch has been taken out of the queue. - q.client.sendBatch(context.Background(), qb.TenantID, qb.Batch) + // instead of cancelling this send operation, since that batch has been taken out of the queue. + q.sendAndReport(context.Background(), qb.TenantID, qb.Batch) } } } @@ -110,7 +119,7 @@ func (q *queue) closeAndDrain(ctx context.Context) { case qb := <-q.q: // drain uses the same timeout, so if a timeout was applied to the parent context, it can cancel the underlying // send operation preemptively. - q.client.sendBatch(ctx, qb.TenantID, qb.Batch) + q.sendAndReport(ctx, qb.TenantID, qb.Batch) case <-ctx.Done(): level.Warn(q.logger).Log("msg", "timeout exceeded while draining send queue") return @@ -122,6 +131,14 @@ func (q *queue) closeAndDrain(ctx context.Context) { } } +// sendAndReport attempts to send the batch for the given tenant, and either way that operation succeeds or fails, reports +// the data as sent. +func (q *queue) sendAndReport(ctx context.Context, tenantId string, b *batch) { + q.client.sendBatch(ctx, tenantId, b) + // mark segment data for that batch as sent, even if the send operation failed + b.reportAsSentData(q.client.markerHandler) +} + // closeNow closes the queue, without draining batches that might be buffered to be sent. func (q *queue) closeNow() { close(q.quit) @@ -159,17 +176,18 @@ type queueClient struct { maxLineSize int maxLineSizeTruncate bool quit chan struct{} + markerHandler MarkerHandler } // NewQueue creates a new queueClient. -func NewQueue(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, maxLineSizeTruncate bool, logger log.Logger) (StoppableWriteTo, error) { +func NewQueue(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, maxLineSizeTruncate bool, logger log.Logger, markerHandler MarkerHandler) (StoppableWriteTo, error) { if cfg.StreamLagLabels.String() != "" { return nil, fmt.Errorf("client config stream_lag_labels is deprecated and the associated metric has been removed, stream_lag_labels: %+v", cfg.StreamLagLabels.String()) } - return newQueueClient(metrics, cfg, maxStreams, maxLineSize, maxLineSizeTruncate, logger) + return newQueueClient(metrics, cfg, maxStreams, maxLineSize, maxLineSizeTruncate, logger, markerHandler) } -func newQueueClient(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, maxLineSizeTruncate bool, logger log.Logger) (*queueClient, error) { +func newQueueClient(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, maxLineSizeTruncate bool, logger log.Logger, markerHandler MarkerHandler) (*queueClient, error) { if cfg.URL.URL == nil { return nil, errors.New("client needs target URL") } @@ -183,7 +201,8 @@ func newQueueClient(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, m drainTimeout: cfg.Queue.DrainTimeout, quit: make(chan struct{}), - batches: make(map[string]*batch), + batches: make(map[string]*batch), + markerHandler: markerHandler, series: make(map[chunks.HeadSeriesRef]model.LabelSet), seriesSegment: make(map[chunks.HeadSeriesRef]int), @@ -260,14 +279,16 @@ func (c *queueClient) StoreSeries(series []record.RefSeries, segment int) { } } -func (c *queueClient) AppendEntries(entries wal.RefEntries, _ int) error { +func (c *queueClient) AppendEntries(entries wal.RefEntries, segment int) error { c.seriesLock.RLock() l, ok := c.series[entries.Ref] c.seriesLock.RUnlock() if ok { for _, e := range entries.Entries { - c.appendSingleEntry(l, e) + c.appendSingleEntry(segment, l, e) } + // count all enqueued appended entries as received from WAL + c.markerHandler.UpdateReceivedData(segment, len(entries.Entries)) } else { // TODO(thepalbi): Add metric here level.Debug(c.logger).Log("msg", "series for entry not found") @@ -275,7 +296,7 @@ func (c *queueClient) AppendEntries(entries wal.RefEntries, _ int) error { return nil } -func (c *queueClient) appendSingleEntry(lbs model.LabelSet, e logproto.Entry) { +func (c *queueClient) appendSingleEntry(segmentNum int, lbs model.LabelSet, e logproto.Entry) { lbs, tenantID := c.processLabels(lbs) // Either drop or mutate the log entry because its length is greater than maxLineSize. maxLineSize == 0 means disabled. @@ -301,7 +322,7 @@ func (c *queueClient) appendSingleEntry(lbs model.LabelSet, e logproto.Entry) { nb := newBatch(c.maxStreams) // since the batch is new, adding a new entry, and hence a new stream, won't fail since there aren't any stream // registered in the batch. - _ = nb.addFromWAL(lbs, e) + _ = nb.addFromWAL(lbs, e, segmentNum) c.batches[tenantID] = nb c.batchesMtx.Unlock() @@ -319,7 +340,7 @@ func (c *queueClient) appendSingleEntry(lbs model.LabelSet, e logproto.Entry) { }) nb := newBatch(c.maxStreams) - _ = nb.addFromWAL(lbs, e) + _ = nb.addFromWAL(lbs, e, segmentNum) c.batches[tenantID] = nb c.batchesMtx.Unlock() @@ -327,7 +348,7 @@ func (c *queueClient) appendSingleEntry(lbs model.LabelSet, e logproto.Entry) { } // The max size of the batch isn't reached, so we can add the entry - err := batch.addFromWAL(lbs, e) + err := batch.addFromWAL(lbs, e, segmentNum) c.batchesMtx.Unlock() if err != nil { @@ -559,6 +580,8 @@ func (c *queueClient) Stop() { // stop request after drain times out or exits c.cancel() + + c.markerHandler.Stop() } // StopNow stops the client without retries or draining the send queue @@ -568,6 +591,7 @@ func (c *queueClient) StopNow() { close(c.quit) c.sendQueue.closeNow() c.wg.Wait() + c.markerHandler.Stop() } func (c *queueClient) processLabels(lbs model.LabelSet) (model.LabelSet, string) { diff --git a/component/common/loki/client/queue_client_test.go b/component/common/loki/client/queue_client_test.go index e64f9afab45a..a23804d44634 100644 --- a/component/common/loki/client/queue_client_test.go +++ b/component/common/loki/client/queue_client_test.go @@ -9,6 +9,7 @@ import ( "github.com/alecthomas/units" "github.com/go-kit/log" "github.com/grafana/agent/component/common/loki" + "github.com/grafana/agent/component/common/loki/client/internal" "github.com/grafana/agent/component/common/loki/utils" "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/flagext" @@ -43,6 +44,17 @@ type testCase struct { expectedRWReqsCount int64 } +type nilMarkerHandler struct{} + +func (n nilMarkerHandler) UpdateReceivedData(segmentId, dataCount int) { +} + +func (n nilMarkerHandler) UpdateSentData(segmentId, dataCount int) { +} + +func (n nilMarkerHandler) Stop() { +} + func TestQueueClient(t *testing.T) { for name, tc := range map[string]testCase{ "small test": { @@ -124,7 +136,7 @@ func TestQueueClient(t *testing.T) { logger := log.NewLogfmtLogger(os.Stdout) m := NewMetrics(reg) - qc, err := NewQueue(m, cfg, 0, 0, false, logger) + qc, err := NewQueue(m, cfg, 0, 0, false, logger, nilMarkerHandler{}) require.NoError(t, err) //labels := model.LabelSet{"app": "test"} @@ -194,8 +206,24 @@ func BenchmarkClientImplementations(b *testing.B) { }, } { b.Run(name, func(b *testing.B) { - b.Run("implementation=queue", func(b *testing.B) { - runQueueClientBenchCase(b, bc) + b.Run("implementation=queue_nil_marker_handler", func(b *testing.B) { + runQueueClientBenchCase(b, bc, func(t *testing.B) MarkerHandler { + return &nilMarkerHandler{} + }) + }) + + b.Run("implementation=queue_marker_handler", func(b *testing.B) { + runQueueClientBenchCase(b, bc, func(t *testing.B) MarkerHandler { + dir := b.TempDir() + nopLogger := log.NewNopLogger() + + markerFileHandler, err := internal.NewMarkerFileHandler(nopLogger, dir) + require.NoError(b, err) + + markerHandler := internal.NewMarkerHandler(markerFileHandler, time.Minute, nopLogger, internal.NewMarkerMetrics(nil).WithCurriedId("test")) + + return markerHandler + }) }) b.Run("implementation=regular", func(b *testing.B) { @@ -205,7 +233,7 @@ func BenchmarkClientImplementations(b *testing.B) { } } -func runQueueClientBenchCase(b *testing.B, bc testCase) { +func runQueueClientBenchCase(b *testing.B, bc testCase, mhFactory func(t *testing.B) MarkerHandler) { reg := prometheus.NewRegistry() // Create a buffer channel where we do enqueue received requests @@ -254,7 +282,7 @@ func runQueueClientBenchCase(b *testing.B, bc testCase) { logger := log.NewLogfmtLogger(os.Stdout) m := NewMetrics(reg) - qc, err := NewQueue(m, cfg, 0, 0, false, logger) + qc, err := NewQueue(m, cfg, 0, 0, false, logger, mhFactory(b)) require.NoError(b, err) //labels := model.LabelSet{"app": "test"} diff --git a/component/common/loki/wal/watcher.go b/component/common/loki/wal/watcher.go index ac2416b4ec67..0972f32f8f8a 100644 --- a/component/common/loki/wal/watcher.go +++ b/component/common/loki/wal/watcher.go @@ -61,6 +61,17 @@ type WriteTo interface { AppendEntries(entries wal.RefEntries, segmentNum int) error } +// Marker allows the Watcher to start from a specific segment in the WAL. +// Implementers can use this interface to save and restore save points. +type Marker interface { + // LastMarkedSegment should return the last segment stored in the marker. + // Must return -1 if there is no mark. + // + // The Watcher will start reading the first segment whose value is greater + // than the return value. + LastMarkedSegment() int +} + type Watcher struct { // id identifies the Watcher. Used when one Watcher is instantiated per remote write client, to be able to track to whom // the metric/log line corresponds. @@ -74,25 +85,29 @@ type Watcher struct { logger log.Logger MaxSegment int - metrics *WatcherMetrics - minReadFreq time.Duration - maxReadFreq time.Duration + metrics *WatcherMetrics + minReadFreq time.Duration + maxReadFreq time.Duration + marker Marker + savedSegment int } // NewWatcher creates a new Watcher. -func NewWatcher(walDir, id string, metrics *WatcherMetrics, writeTo WriteTo, logger log.Logger, config WatchConfig) *Watcher { +func NewWatcher(walDir, id string, metrics *WatcherMetrics, writeTo WriteTo, logger log.Logger, config WatchConfig, marker Marker) *Watcher { return &Watcher{ - walDir: walDir, - id: id, - actions: writeTo, - readNotify: make(chan struct{}), - quit: make(chan struct{}), - done: make(chan struct{}), - MaxSegment: -1, - logger: logger, - metrics: metrics, - minReadFreq: config.MinReadFrequency, - maxReadFreq: config.MaxReadFrequency, + walDir: walDir, + id: id, + actions: writeTo, + readNotify: make(chan struct{}), + quit: make(chan struct{}), + done: make(chan struct{}), + MaxSegment: -1, + marker: marker, + savedSegment: -1, + logger: logger, + metrics: metrics, + minReadFreq: config.MinReadFrequency, + maxReadFreq: config.MaxReadFrequency, } } @@ -107,6 +122,11 @@ func (w *Watcher) Start() { func (w *Watcher) mainLoop() { defer close(w.done) for !isClosed(w.quit) { + if w.marker != nil { + w.savedSegment = w.marker.LastMarkedSegment() + level.Debug(w.logger).Log("msg", "last saved segment", "segment", w.savedSegment) + } + if err := w.run(); err != nil { level.Error(w.logger).Log("msg", "error tailing WAL", "err", err) } @@ -119,8 +139,7 @@ func (w *Watcher) mainLoop() { } } -// Run the watcher, which will tail the WAL until the quit channel is closed -// or an error case is hit. +// Run the watcher, which will tail the WAL until the quit channel is closed or an error case is hit. func (w *Watcher) run() error { _, lastSegment, err := w.firstAndLast() if err != nil { @@ -128,6 +147,18 @@ func (w *Watcher) run() error { } currentSegment := lastSegment + + // if the marker contains a valid segment number stored, and we correctly find the segment that follows that one, + // start tailing from there. + if nextToMarkedSegment, err := w.findNextSegmentFor(w.savedSegment); w.savedSegment != -1 && err == nil { + currentSegment = nextToMarkedSegment + // keep a separate metric that will help us track when the segment in the marker is used. This should be considered + // a replay event + w.metrics.replaySegment.WithLabelValues(w.id).Set(float64(currentSegment)) + } else { + level.Debug(w.logger).Log("msg", fmt.Sprintf("failed to find segment for marked index %d", w.savedSegment), "err", err) + } + level.Debug(w.logger).Log("msg", "Tailing WAL", "currentSegment", currentSegment, "lastSegment", lastSegment) for !isClosed(w.quit) { w.metrics.currentSegment.WithLabelValues(w.id).Set(float64(currentSegment)) @@ -135,7 +166,7 @@ func (w *Watcher) run() error { // On start, we have a pointer to what is the latest segment. On subsequent calls to this function, // currentSegment will have been incremented, and we should open that segment. - if err := w.watch(currentSegment); err != nil { + if err := w.watch(currentSegment, currentSegment >= lastSegment); err != nil { return err } @@ -150,10 +181,12 @@ func (w *Watcher) run() error { return nil } -// watch will start reading from the segment identified by segmentNum. If an EOF is reached, it will keep -// reading for more WAL records with a wlog.LiveReader. Periodically, it will check if there's a new segment, and if positive -// read the remaining from the current one and return. -func (w *Watcher) watch(segmentNum int) error { +// watch will start reading from the segment identified by segmentNum. +// If an EOF is reached and tail is true, it will keep reading for more WAL records with a wlog.LiveReader. Periodically, +// it will check if there's a new segment, and if positive read the remaining from the current one and return. +// If tail is false, we know the segment we are "watching" over is closed (no further write will occur to it). Then, the +// segment is read fully, any errors are logged as Warnings, and no error is returned. +func (w *Watcher) watch(segmentNum int, tail bool) error { segment, err := wlog.OpenReadSegment(wlog.SegmentName(w.walDir, segmentNum)) if err != nil { return err @@ -167,6 +200,19 @@ func (w *Watcher) watch(segmentNum int) error { segmentTicker := time.NewTicker(segmentCheckPeriod) defer segmentTicker.Stop() + // If we're replaying the segment we need to know the size of the file to know when to return from watch and move on + // to the next segment. + size := int64(math.MaxInt64) + if !tail { + // stop segment ticker since we know we'll read the segment fully, and then exit to the next segment loop + segmentTicker.Stop() + var err error + size, err = getSegmentSize(w.walDir, segmentNum) + if err != nil { + return fmt.Errorf("error getting segment size: %w", err) + } + } + for { select { case <-w.quit: @@ -212,7 +258,20 @@ func (w *Watcher) watch(segmentNum int) error { // read from open segment routine ok, err := w.readSegment(reader, segmentNum) if debug { - level.Warn(w.logger).Log("msg", "Error reading segment inside readTicker", "segment", segmentNum, "read", reader.Offset(), "err", err) + level.Warn(w.logger).Log("msg", "Error reading segment inside read ticker or notification", "segment", segmentNum, "read", reader.Offset(), "err", err) + } + + // Ignore all errors reading to end of segment whilst replaying the WAL. This is because when replaying not the + // last segment, we assume that segment is not written anymore (closed), and the call to readSegment will read + // to the end of it. If error, log a warning accordingly. After, error or no error, nil is returned so that the + // caller can continue to the following segment. + if !tail { + if err != nil && errors.Unwrap(err) != io.EOF { + level.Warn(w.logger).Log("msg", "Ignoring error reading to end of segment, may have dropped data", "segment", segmentNum, "err", err) + } else if reader.Offset() != size { + level.Warn(w.logger).Log("msg", "Expected to have read whole segment, may have dropped data", "segment", segmentNum, "read", reader.Offset(), "size", size) + } + return nil } // io.EOF error are non-fatal since we are tailing the wal @@ -321,6 +380,23 @@ func (w *Watcher) NotifyWrite() { } } +// findNextSegmentFor finds the first segment greater than or equal to index. +func (w *Watcher) findNextSegmentFor(index int) (int, error) { + // TODO(thepalbi): is segs in order? + segs, err := readSegmentNumbers(w.walDir) + if err != nil { + return -1, err + } + + for _, r := range segs { + if r > index { + return r, nil + } + } + + return -1, errors.New("failed to find segment for index") +} + // isClosed checks in a non-blocking manner if a channel is closed or not. func isClosed(c chan struct{}) bool { select { @@ -349,3 +425,13 @@ func readSegmentNumbers(dir string) ([]int, error) { } return refs, nil } + +// Get size of segment. +func getSegmentSize(dir string, index int) (int64, error) { + i := int64(-1) + fi, err := os.Stat(wlog.SegmentName(dir, index)) + if err == nil { + i = fi.Size() + } + return i, err +} diff --git a/component/common/loki/wal/watcher_metrics.go b/component/common/loki/wal/watcher_metrics.go index 715ae59aaf24..697b111c69af 100644 --- a/component/common/loki/wal/watcher_metrics.go +++ b/component/common/loki/wal/watcher_metrics.go @@ -8,6 +8,7 @@ type WatcherMetrics struct { droppedWriteNotifications *prometheus.CounterVec segmentRead *prometheus.CounterVec currentSegment *prometheus.GaugeVec + replaySegment *prometheus.GaugeVec watchersRunning *prometheus.GaugeVec } @@ -58,6 +59,15 @@ func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics { }, []string{"id"}, ), + replaySegment: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "loki_write", + Subsystem: "wal_watcher", + Name: "replay_segment", + Help: "Segment the WAL watcher will start replaying the WAL from on startup.", + }, + []string{"id"}, + ), watchersRunning: prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "loki_write", diff --git a/component/common/loki/wal/watcher_test.go b/component/common/loki/wal/watcher_test.go index 1e59966a1375..a24b7ff63049 100644 --- a/component/common/loki/wal/watcher_test.go +++ b/component/common/loki/wal/watcher_test.go @@ -52,6 +52,26 @@ func (t *testWriteTo) AppendEntries(entries wal.RefEntries, _ int) error { return nil } +func (t *testWriteTo) AssertContainsLines(tst *testing.T, lines ...string) { + seen := map[string]bool{} + for _, l := range lines { + seen[l] = false + } + for _, e := range t.ReadEntries.StartIterate() { + if _, ok := seen[e.Line]; ok { + seen[e.Line] = true + } + } + t.ReadEntries.DoneIterate() + + allSeen := true + for _, wasSeen := range seen { + allSeen = allSeen && wasSeen + } + + require.True(tst, allSeen, "expected all entries to have been received") +} + // watcherTestResources contains all resources necessary to test an individual Watcher functionality type watcherTestResources struct { writeEntry func(entry loki.Entry) @@ -301,6 +321,12 @@ var cases = map[string]watcherTest{ }, } +type noMarker struct{} + +func (n noMarker) LastMarkedSegment() int { + return -1 +} + // TestWatcher is the main test function, that works as framework to test different scenarios of the Watcher. It bootstraps // necessary test components. func TestWatcher(t *testing.T) { @@ -317,7 +343,7 @@ func TestWatcher(t *testing.T) { ReadEntries: utils.NewSyncSlice[loki.Entry](), } // create new watcher, and defer stop - watcher := NewWatcher(dir, "test", metrics, writeTo, logger, DefaultWatchConfig) + watcher := NewWatcher(dir, "test", metrics, writeTo, logger, DefaultWatchConfig, noMarker{}) defer watcher.Stop() wl, err := New(Config{ Enabled: true, @@ -355,3 +381,193 @@ func TestWatcher(t *testing.T) { }) } } + +type mockMarker struct { + LastMarkedSegmentFunc func() int +} + +func (m mockMarker) LastMarkedSegment() int { + return m.LastMarkedSegmentFunc() +} + +func TestWatcher_Replay(t *testing.T) { + labels := model.LabelSet{ + "app": "test", + } + segment1Lines := []string{ + "before 1", + "before 2", + "before 3", + } + segment2Lines := []string{ + "after 1", + "after 2", + "after 3", + } + + t.Run("replay from marked segment if marker is not invalid", func(t *testing.T) { + reg := prometheus.NewRegistry() + logger := level.NewFilter(log.NewLogfmtLogger(os.Stdout), level.AllowDebug()) + dir := t.TempDir() + metrics := NewWatcherMetrics(reg) + writeTo := &testWriteTo{ + series: map[uint64]model.LabelSet{}, + logger: logger, + ReadEntries: utils.NewSyncSlice[loki.Entry](), + } + // create new watcher, and defer stop + watcher := NewWatcher(dir, "test", metrics, writeTo, logger, DefaultWatchConfig, mockMarker{ + LastMarkedSegmentFunc: func() int { + // when starting watcher, read from segment 0 + return 0 + }, + }) + defer watcher.Stop() + wl, err := New(Config{ + Enabled: true, + Dir: dir, + }, logger, reg) + require.NoError(t, err) + defer wl.Close() + + ew := newEntryWriter() + + // First, write to segment 0. This will be the last "marked" segment + err = ew.WriteEntry(loki.Entry{ + Labels: labels, + Entry: logproto.Entry{ + Timestamp: time.Now(), + Line: "this line should appear in received entries", + }, + }, wl, logger) + require.NoError(t, err) + + // cut segment and sync + _, err = wl.NextSegment() + require.NoError(t, err) + + // Now, write to segment 1, this will be a segment not marked, hence replayed + for _, line := range segment1Lines { + err = ew.WriteEntry(loki.Entry{ + Labels: labels, + Entry: logproto.Entry{ + Timestamp: time.Now(), + Line: line, + }, + }, wl, logger) + require.NoError(t, err) + } + + // cut segment and sync + _, err = wl.NextSegment() + require.NoError(t, err) + + // Finally, write some data to the last segment, this will be the write head + for _, line := range segment2Lines { + err = ew.WriteEntry(loki.Entry{ + Labels: labels, + Entry: logproto.Entry{ + Timestamp: time.Now(), + Line: line, + }, + }, wl, logger) + require.NoError(t, err) + } + + // sync wal, and start watcher + require.NoError(t, wl.Sync()) + + // start watcher + watcher.Start() + + require.Eventually(t, func() bool { + return writeTo.ReadEntries.Length() == 6 // wait for watcher to catch up with both segments + }, time.Second*10, time.Second, "timed out waiting for watcher to catch up") + writeTo.AssertContainsLines(t, segment1Lines...) + writeTo.AssertContainsLines(t, segment2Lines...) + }) + + t.Run("do not replay at all if invalid marker", func(t *testing.T) { + reg := prometheus.NewRegistry() + logger := level.NewFilter(log.NewLogfmtLogger(os.Stdout), level.AllowDebug()) + dir := t.TempDir() + metrics := NewWatcherMetrics(reg) + writeTo := &testWriteTo{ + series: map[uint64]model.LabelSet{}, + logger: logger, + ReadEntries: utils.NewSyncSlice[loki.Entry](), + } + // create new watcher, and defer stop + watcher := NewWatcher(dir, "test", metrics, writeTo, logger, DefaultWatchConfig, mockMarker{ + LastMarkedSegmentFunc: func() int { + // when starting watcher, read from segment 0 + return -1 + }, + }) + defer watcher.Stop() + wl, err := New(Config{ + Enabled: true, + Dir: dir, + }, logger, reg) + require.NoError(t, err) + defer wl.Close() + + ew := newEntryWriter() + + // First, write to segment 0. This will be the last "marked" segment + err = ew.WriteEntry(loki.Entry{ + Labels: labels, + Entry: logproto.Entry{ + Timestamp: time.Now(), + Line: "this line should appear in received entries", + }, + }, wl, logger) + require.NoError(t, err) + + // cut segment and sync + _, err = wl.NextSegment() + require.NoError(t, err) + + // Now, write to segment 1, this will be a segment not marked, hence replayed + for _, line := range segment1Lines { + err = ew.WriteEntry(loki.Entry{ + Labels: labels, + Entry: logproto.Entry{ + Timestamp: time.Now(), + Line: line, + }, + }, wl, logger) + require.NoError(t, err) + } + + // cut segment and sync + _, err = wl.NextSegment() + require.NoError(t, err) + + // sync wal, and start watcher + require.NoError(t, wl.Sync()) + + // start watcher + watcher.Start() + + // Write something after watcher started + for _, line := range segment2Lines { + err = ew.WriteEntry(loki.Entry{ + Labels: labels, + Entry: logproto.Entry{ + Timestamp: time.Now(), + Line: line, + }, + }, wl, logger) + require.NoError(t, err) + } + + // sync wal, and start watcher + require.NoError(t, wl.Sync()) + + require.Eventually(t, func() bool { + return writeTo.ReadEntries.Length() == 3 // wait for watcher to catch up with both segments + }, time.Second*10, time.Second, "timed out waiting for watcher to catch up") + writeTo.AssertContainsLines(t, segment2Lines...) + }) +} diff --git a/component/loki/source/kafka/kafka.go b/component/loki/source/kafka/kafka.go index b8c14583fc96..91308423ae49 100644 --- a/component/loki/source/kafka/kafka.go +++ b/component/loki/source/kafka/kafka.go @@ -12,6 +12,7 @@ import ( kt "github.com/grafana/agent/component/loki/source/internal/kafkatarget" "github.com/grafana/agent/pkg/flow/logging/level" "github.com/grafana/dskit/flagext" + "github.com/grafana/river/rivertypes" "github.com/prometheus/common/model" ) @@ -53,7 +54,7 @@ type KafkaAuthentication struct { type KafkaSASLConfig struct { Mechanism string `river:"mechanism,attr,optional"` User string `river:"user,attr,optional"` - Password string `river:"password,attr,optional"` + Password rivertypes.Secret `river:"password,attr,optional"` UseTLS bool `river:"use_tls,attr,optional"` TLSConfig config.TLSConfig `river:"tls_config,block,optional"` OAuthConfig OAuthConfigConfig `river:"oauth_config,block,optional"` @@ -192,21 +193,13 @@ func (args *Arguments) Convert() kt.Config { } func (auth KafkaAuthentication) Convert() kt.Authentication { - var secret flagext.Secret - if auth.SASLConfig.Password != "" { - err := secret.Set(auth.SASLConfig.Password) - if err != nil { - panic("Unable to set kafka SASLConfig password") - } - } - return kt.Authentication{ Type: kt.AuthenticationType(auth.Type), TLSConfig: *auth.TLSConfig.Convert(), SASLConfig: kt.SASLConfig{ Mechanism: sarama.SASLMechanism(auth.SASLConfig.Mechanism), User: auth.SASLConfig.User, - Password: secret, + Password: flagext.SecretWithValue(string(auth.SASLConfig.Password)), UseTLS: auth.SASLConfig.UseTLS, TLSConfig: *auth.SASLConfig.TLSConfig.Convert(), OAuthConfig: kt.OAuthConfig{ diff --git a/converter/internal/promtailconvert/internal/build/kafka.go b/converter/internal/promtailconvert/internal/build/kafka.go index c92740bdbf95..7f954c6298bd 100644 --- a/converter/internal/promtailconvert/internal/build/kafka.go +++ b/converter/internal/promtailconvert/internal/build/kafka.go @@ -51,7 +51,7 @@ func convertKafkaAuthConfig(kafkaCfg *scrapeconfig.KafkaTargetConfig) kafka.Kafk SASLConfig: kafka.KafkaSASLConfig{ Mechanism: string(kafkaCfg.Authentication.SASLConfig.Mechanism), User: kafkaCfg.Authentication.SASLConfig.User, - Password: kafkaCfg.Authentication.SASLConfig.Password.String(), + Password: rivertypes.Secret(kafkaCfg.Authentication.SASLConfig.Password.String()), UseTLS: kafkaCfg.Authentication.SASLConfig.UseTLS, TLSConfig: *common.ToTLSConfig(&kafkaCfg.Authentication.SASLConfig.TLSConfig), }, diff --git a/converter/internal/staticconvert/internal/build/agent_exporter.go b/converter/internal/staticconvert/internal/build/agent_exporter.go index 62f34b82909d..4d9a56cc3abd 100644 --- a/converter/internal/staticconvert/internal/build/agent_exporter.go +++ b/converter/internal/staticconvert/internal/build/agent_exporter.go @@ -9,7 +9,7 @@ import ( func (b *IntegrationsConfigBuilder) appendAgentExporter(config *agent_exporter.Config) discovery.Exports { args := toAgentExporter(config) - return b.appendExporterBlock(args, config.Name(), "agent") + return b.appendExporterBlock(args, config.Name(), nil, "agent") } func toAgentExporter(config *agent_exporter.Config) *agent.Arguments { @@ -18,7 +18,7 @@ func toAgentExporter(config *agent_exporter.Config) *agent.Arguments { func (b *IntegrationsConfigBuilder) appendAgentExporterV2(config *agent_exporter_v2.Config) discovery.Exports { args := toAgentExporterV2(config) - return b.appendExporterBlock(args, config.Name(), "agent") + return b.appendExporterBlock(args, config.Name(), config.Common.InstanceKey, "agent") } func toAgentExporterV2(config *agent_exporter_v2.Config) *agent.Arguments { diff --git a/converter/internal/staticconvert/internal/build/apache_exporter.go b/converter/internal/staticconvert/internal/build/apache_exporter.go index 0d3b1865b04f..d6a6a78ff540 100644 --- a/converter/internal/staticconvert/internal/build/apache_exporter.go +++ b/converter/internal/staticconvert/internal/build/apache_exporter.go @@ -1,24 +1,15 @@ package build import ( - "fmt" - "github.com/grafana/agent/component/discovery" "github.com/grafana/agent/component/prometheus/exporter/apache" - "github.com/grafana/agent/converter/internal/common" "github.com/grafana/agent/pkg/integrations/apache_http" + apache_exporter_v2 "github.com/grafana/agent/pkg/integrations/v2/apache_http" ) func (b *IntegrationsConfigBuilder) appendApacheExporter(config *apache_http.Config) discovery.Exports { args := toApacheExporter(config) - compLabel := common.LabelForParts(b.globalCtx.LabelPrefix, config.Name()) - b.f.Body().AppendBlock(common.NewBlockWithOverride( - []string{"prometheus", "exporter", "apache"}, - compLabel, - args, - )) - - return common.NewDiscoveryExports(fmt.Sprintf("prometheus.exporter.apache.%s.targets", compLabel)) + return b.appendExporterBlock(args, config.Name(), nil, "apache") } func toApacheExporter(config *apache_http.Config) *apache.Arguments { @@ -28,3 +19,16 @@ func toApacheExporter(config *apache_http.Config) *apache.Arguments { ApacheInsecure: config.ApacheInsecure, } } + +func (b *IntegrationsConfigBuilder) appendApacheExporterV2(config *apache_exporter_v2.Config) discovery.Exports { + args := toApacheExporterV2(config) + return b.appendExporterBlock(args, config.Name(), config.Common.InstanceKey, "apache") +} + +func toApacheExporterV2(config *apache_exporter_v2.Config) *apache.Arguments { + return &apache.Arguments{ + ApacheAddr: config.ApacheAddr, + ApacheHostOverride: config.ApacheHostOverride, + ApacheInsecure: config.ApacheInsecure, + } +} diff --git a/converter/internal/staticconvert/internal/build/azure_exporter.go b/converter/internal/staticconvert/internal/build/azure_exporter.go index 0a096901cef8..1a6a39f7c842 100644 --- a/converter/internal/staticconvert/internal/build/azure_exporter.go +++ b/converter/internal/staticconvert/internal/build/azure_exporter.go @@ -1,24 +1,14 @@ package build import ( - "fmt" - "github.com/grafana/agent/component/discovery" "github.com/grafana/agent/component/prometheus/exporter/azure" - "github.com/grafana/agent/converter/internal/common" "github.com/grafana/agent/pkg/integrations/azure_exporter" ) -func (b *IntegrationsConfigBuilder) appendAzureExporter(config *azure_exporter.Config) discovery.Exports { +func (b *IntegrationsConfigBuilder) appendAzureExporter(config *azure_exporter.Config, instanceKey *string) discovery.Exports { args := toAzureExporter(config) - compLabel := common.LabelForParts(b.globalCtx.LabelPrefix, config.Name()) - b.f.Body().AppendBlock(common.NewBlockWithOverride( - []string{"prometheus", "exporter", "azure"}, - compLabel, - args, - )) - - return common.NewDiscoveryExports(fmt.Sprintf("prometheus.exporter.azure.%s.targets", compLabel)) + return b.appendExporterBlock(args, config.Name(), instanceKey, "azure") } func toAzureExporter(config *azure_exporter.Config) *azure.Arguments { diff --git a/converter/internal/staticconvert/internal/build/builder.go b/converter/internal/staticconvert/internal/build/builder.go index c68531abd566..5a0c19a90f3a 100644 --- a/converter/internal/staticconvert/internal/build/builder.go +++ b/converter/internal/staticconvert/internal/build/builder.go @@ -38,7 +38,9 @@ import ( "github.com/grafana/agent/pkg/integrations/squid_exporter" "github.com/grafana/agent/pkg/integrations/statsd_exporter" agent_exporter_v2 "github.com/grafana/agent/pkg/integrations/v2/agent" + apache_exporter_v2 "github.com/grafana/agent/pkg/integrations/v2/apache_http" common_v2 "github.com/grafana/agent/pkg/integrations/v2/common" + "github.com/grafana/agent/pkg/integrations/v2/metricsutils" "github.com/grafana/agent/pkg/integrations/windows_exporter" "github.com/grafana/river/scanner" "github.com/grafana/river/token/builder" @@ -147,7 +149,7 @@ func (b *IntegrationsConfigBuilder) appendV1Integrations() { case *windows_exporter.Config: exports = b.appendWindowsExporter(itg) case *azure_exporter.Config: - exports = b.appendAzureExporter(itg) + exports = b.appendAzureExporter(itg, nil) case *cadvisor.Config: exports = b.appendCadvisorExporter(itg) } @@ -160,7 +162,7 @@ func (b *IntegrationsConfigBuilder) appendV1Integrations() { func (b *IntegrationsConfigBuilder) appendExporter(commonConfig *int_config.Common, name string, extraTargets []discovery.Target) { scrapeConfig := prom_config.DefaultScrapeConfig - scrapeConfig.JobName = fmt.Sprintf("integrations/%s", name) + scrapeConfig.JobName = b.formatJobName(name, nil) scrapeConfig.RelabelConfigs = commonConfig.RelabelConfigs scrapeConfig.MetricRelabelConfigs = commonConfig.MetricRelabelConfigs scrapeConfig.HTTPClientConfig.TLSConfig = b.cfg.Integrations.ConfigV1.TLSConfig @@ -184,12 +186,7 @@ func (b *IntegrationsConfigBuilder) appendExporter(commonConfig *int_config.Comm } jobNameToCompLabelsFunc := func(jobName string) string { - labelSuffix := strings.TrimPrefix(jobName, "integrations/") - if labelSuffix == "" { - return b.globalCtx.LabelPrefix - } - - return fmt.Sprintf("%s_%s", b.globalCtx.LabelPrefix, labelSuffix) + return b.jobNameToCompLabel(jobName) } b.diags.AddAll(prometheusconvert.AppendAllNested(b.f, promConfig, jobNameToCompLabelsFunc, extraTargets, b.globalCtx.RemoteWriteExports)) @@ -205,6 +202,15 @@ func (b *IntegrationsConfigBuilder) appendV2Integrations() { case *agent_exporter_v2.Config: exports = b.appendAgentExporterV2(itg) commonConfig = itg.Common + case *apache_exporter_v2.Config: + exports = b.appendApacheExporterV2(itg) + commonConfig = itg.Common + case *metricsutils.ConfigShim: + commonConfig = itg.Common + switch v1_itg := itg.Orig.(type) { + case *azure_exporter.Config: + exports = b.appendAzureExporter(v1_itg, itg.Common.InstanceKey) + } } if len(exports.Targets) > 0 { @@ -228,7 +234,7 @@ func (b *IntegrationsConfigBuilder) appendExporterV2(commonConfig *common_v2.Met commonConfig.ApplyDefaults(b.cfg.Integrations.ConfigV2.Metrics.Autoscrape) scrapeConfig := prom_config.DefaultScrapeConfig - scrapeConfig.JobName = fmt.Sprintf("integrations/%s", name) + scrapeConfig.JobName = b.formatJobName(name, commonConfig.InstanceKey) scrapeConfig.RelabelConfigs = commonConfig.Autoscrape.RelabelConfigs scrapeConfig.MetricRelabelConfigs = commonConfig.Autoscrape.MetricRelabelConfigs scrapeConfig.ScrapeInterval = commonConfig.Autoscrape.ScrapeInterval @@ -263,12 +269,7 @@ func (b *IntegrationsConfigBuilder) appendExporterV2(commonConfig *common_v2.Met } jobNameToCompLabelsFunc := func(jobName string) string { - labelSuffix := strings.TrimPrefix(jobName, "integrations/") - if labelSuffix == "" { - return b.globalCtx.LabelPrefix - } - - return fmt.Sprintf("%s_%s", b.globalCtx.LabelPrefix, labelSuffix) + return b.jobNameToCompLabel(jobName) } // Need to pass in the remote write reference from the metrics config here: @@ -283,8 +284,32 @@ func splitByCommaNullOnEmpty(s string) []string { return strings.Split(s, ",") } -func (b *IntegrationsConfigBuilder) appendExporterBlock(args component.Arguments, name string, exporterName string) discovery.Exports { - compLabel := common.LabelForParts(b.globalCtx.LabelPrefix, name) +func (b *IntegrationsConfigBuilder) jobNameToCompLabel(jobName string) string { + labelSuffix := strings.TrimPrefix(jobName, "integrations/") + if labelSuffix == "" { + return b.globalCtx.LabelPrefix + } + + return fmt.Sprintf("%s_%s", b.globalCtx.LabelPrefix, labelSuffix) +} + +func (b *IntegrationsConfigBuilder) formatJobName(name string, instanceKey *string) string { + jobName := b.globalCtx.LabelPrefix + if instanceKey != nil { + jobName = fmt.Sprintf("%s/%s", jobName, *instanceKey) + } else { + jobName = fmt.Sprintf("%s/%s", jobName, name) + } + + return jobName +} + +func (b *IntegrationsConfigBuilder) appendExporterBlock(args component.Arguments, configName string, instanceKey *string, exporterName string) discovery.Exports { + compLabel, err := scanner.SanitizeIdentifier(b.formatJobName(configName, instanceKey)) + if err != nil { + b.diags.Add(diag.SeverityLevelCritical, fmt.Sprintf("failed to sanitize job name: %s", err)) + } + b.f.Body().AppendBlock(common.NewBlockWithOverride( []string{"prometheus", "exporter", exporterName}, compLabel, diff --git a/converter/internal/staticconvert/testdata-v2/integrations_v2.river b/converter/internal/staticconvert/testdata-v2/integrations_v2.river index 5d63ba9ab89f..c0e2acba450b 100644 --- a/converter/internal/staticconvert/testdata-v2/integrations_v2.river +++ b/converter/internal/staticconvert/testdata-v2/integrations_v2.river @@ -9,6 +9,30 @@ prometheus.remote_write "metrics_default" { } } +prometheus.exporter.azure "integrations_azure1" { + subscriptions = ["subId"] + resource_type = "Microsoft.Dashboard/grafana" + metrics = ["HttpRequestCount"] +} + +prometheus.scrape "integrations_azure1" { + targets = prometheus.exporter.azure.integrations_azure1.targets + forward_to = [prometheus.remote_write.metrics_default.receiver] + job_name = "integrations/azure1" +} + +prometheus.exporter.azure "integrations_azure2" { + subscriptions = ["subId"] + resource_type = "Microsoft.Dashboard/grafana" + metrics = ["HttpRequestCount"] +} + +prometheus.scrape "integrations_azure2" { + targets = prometheus.exporter.azure.integrations_azure2.targets + forward_to = [prometheus.remote_write.metrics_default.receiver] + job_name = "integrations/azure2" +} + prometheus.exporter.agent "integrations_agent" { } discovery.relabel "integrations_agent" { @@ -32,3 +56,37 @@ prometheus.scrape "integrations_agent" { forward_to = [prometheus.remote_write.metrics_default.receiver] job_name = "integrations/agent" } + +prometheus.exporter.apache "integrations_apache1" { + insecure = true +} + +prometheus.scrape "integrations_apache1" { + targets = prometheus.exporter.apache.integrations_apache1.targets + forward_to = [prometheus.remote_write.metrics_default.receiver] + job_name = "integrations/apache1" +} + +prometheus.exporter.apache "integrations_apache2" { } + +discovery.relabel "integrations_apache2" { + targets = prometheus.exporter.apache.integrations_apache2.targets + + rule { + source_labels = ["__address__"] + target_label = "test_label" + replacement = "test_label_value" + } + + rule { + source_labels = ["__address__"] + target_label = "test_label_2" + replacement = "test_label_value_2" + } +} + +prometheus.scrape "integrations_apache2" { + targets = discovery.relabel.integrations_apache2.output + forward_to = [prometheus.remote_write.metrics_default.receiver] + job_name = "integrations/apache2" +} diff --git a/converter/internal/staticconvert/testdata-v2/integrations_v2.yaml b/converter/internal/staticconvert/testdata-v2/integrations_v2.yaml index 8808295a91f0..057609597a56 100644 --- a/converter/internal/staticconvert/testdata-v2/integrations_v2.yaml +++ b/converter/internal/staticconvert/testdata-v2/integrations_v2.yaml @@ -7,9 +7,28 @@ metrics: integrations: agent: - instance: "default" autoscrape: metrics_instance: "default" extra_labels: test_label: test_label_value - test_label_2: test_label_value_2 \ No newline at end of file + test_label_2: test_label_value_2 + apache_http_configs: + - instance: "apache1" + insecure: true + - instance: "apache2" + extra_labels: + test_label: test_label_value + test_label_2: test_label_value_2 + azure_configs: + - instance: "azure1" + subscriptions: + - "subId" + resource_type: "Microsoft.Dashboard/grafana" + metrics: + - "HttpRequestCount" + - instance: "azure2" + subscriptions: + - "subId" + resource_type: "Microsoft.Dashboard/grafana" + metrics: + - "HttpRequestCount" \ No newline at end of file diff --git a/converter/internal/staticconvert/validate.go b/converter/internal/staticconvert/validate.go index b35384d8f8bf..b79760a69b49 100644 --- a/converter/internal/staticconvert/validate.go +++ b/converter/internal/staticconvert/validate.go @@ -34,6 +34,8 @@ import ( "github.com/grafana/agent/pkg/integrations/statsd_exporter" v2 "github.com/grafana/agent/pkg/integrations/v2" agent_exporter_v2 "github.com/grafana/agent/pkg/integrations/v2/agent" + apache_exporter_v2 "github.com/grafana/agent/pkg/integrations/v2/apache_http" + "github.com/grafana/agent/pkg/integrations/v2/metricsutils" "github.com/grafana/agent/pkg/integrations/windows_exporter" "github.com/grafana/agent/pkg/logs" "github.com/grafana/agent/pkg/metrics" @@ -161,6 +163,13 @@ func validateIntegrationsV2(integrationsConfig *v2.SubsystemOptions) diag.Diagno for _, integration := range integrationsConfig.Configs { switch itg := integration.(type) { case *agent_exporter_v2.Config: + case *apache_exporter_v2.Config: + case *metricsutils.ConfigShim: + switch v1_itg := itg.Orig.(type) { + case *azure_exporter.Config: + default: + diags.Add(diag.SeverityLevelError, fmt.Sprintf("The converter does not support converting the provided %s integration.", v1_itg.Name())) + } default: diags.Add(diag.SeverityLevelError, fmt.Sprintf("The converter does not support converting the provided %s integration.", itg.Name())) } diff --git a/docs/sources/flow/reference/components/loki.source.kafka.md b/docs/sources/flow/reference/components/loki.source.kafka.md index 039cc976e8a9..a93e870cdabd 100644 --- a/docs/sources/flow/reference/components/loki.source.kafka.md +++ b/docs/sources/flow/reference/components/loki.source.kafka.md @@ -38,17 +38,17 @@ loki.source.kafka "LABEL" { `loki.source.kafka` supports the following arguments: - Name | Type | Description | Default | Required + Name | Type | Description | Default | Required --------------------------|----------------------|----------------------------------------------------------|-----------------------|---------- - `brokers` | `list(string)` | The list of brokers to connect to Kafka. | | yes - `topics` | `list(string)` | The list of Kafka topics to consume. | | yes - `group_id` | `string` | The Kafka consumer group id. | `"loki.source.kafka"` | no - `assignor` | `string` | The consumer group rebalancing strategy to use. | `"range"` | no - `version` | `string` | Kafka version to connect to. | `"2.2.1"` | no - `use_incoming_timestamp` | `bool` | Whether or not to use the timestamp received from Kafka. | `false` | no - `labels` | `map(string)` | The labels to associate with each received Kafka event. | `{}` | no - `forward_to` | `list(LogsReceiver)` | List of receivers to send log entries to. | | yes - `relabel_rules` | `RelabelRules` | Relabeling rules to apply on log entries. | `{}` | no + `brokers` | `list(string)` | The list of brokers to connect to Kafka. | | yes + `topics` | `list(string)` | The list of Kafka topics to consume. | | yes + `group_id` | `string` | The Kafka consumer group id. | `"loki.source.kafka"` | no + `assignor` | `string` | The consumer group rebalancing strategy to use. | `"range"` | no + `version` | `string` | Kafka version to connect to. | `"2.2.1"` | no + `use_incoming_timestamp` | `bool` | Whether or not to use the timestamp received from Kafka. | `false` | no + `labels` | `map(string)` | The labels to associate with each received Kafka event. | `{}` | no + `forward_to` | `list(LogsReceiver)` | List of receivers to send log entries to. | | yes + `relabel_rules` | `RelabelRules` | Relabeling rules to apply on log entries. | `{}` | no `assignor` values can be either `"range"`, `"roundrobin"`, or `"sticky"`. @@ -77,13 +77,13 @@ keep these labels, relabel them using a [loki.relabel][] component and pass its The following blocks are supported inside the definition of `loki.source.kafka`: - Hierarchy | Name | Description | Required + Hierarchy | Name | Description | Required ---------------------------------------------|------------------|-----------------------------------------------------------|---------- - authentication | [authentication] | Optional authentication configuration with Kafka brokers. | no - authentication > tls_config | [tls_config] | Optional authentication configuration with Kafka brokers. | no - authentication > sasl_config | [sasl_config] | Optional authentication configuration with Kafka brokers. | no - authentication > sasl_config > tls_config | [tls_config] | Optional authentication configuration with Kafka brokers. | no - authentication > sasl_config > oauth_config | [oauth_config] | Optional authentication configuration with Kafka brokers. | no + authentication | [authentication] | Optional authentication configuration with Kafka brokers. | no + authentication > tls_config | [tls_config] | Optional authentication configuration with Kafka brokers. | no + authentication > sasl_config | [sasl_config] | Optional authentication configuration with Kafka brokers. | no + authentication > sasl_config > tls_config | [tls_config] | Optional authentication configuration with Kafka brokers. | no + authentication > sasl_config > oauth_config | [oauth_config] | Optional authentication configuration with Kafka brokers. | no [authentication]: #authentication-block @@ -97,9 +97,9 @@ The following blocks are supported inside the definition of `loki.source.kafka`: The `authentication` block defines the authentication method when communicating with the Kafka event brokers. - Name | Type | Description | Default | Required + Name | Type | Description | Default | Required --------|----------|-------------------------|----------|---------- - `type` | `string` | Type of authentication. | `"none"` | no + `type` | `string` | Type of authentication. | `"none"` | no `type` supports the values `"none"`, `"ssl"`, and `"sasl"`. If `"ssl"` is used, you must set the `tls_config` block. If `"sasl"` is used, you must set the `sasl_config` block. @@ -113,21 +113,21 @@ you must set the `tls_config` block. If `"sasl"` is used, you must set the `sasl The `sasl_config` block defines the listen address and port where the listener expects Kafka messages to be sent to. - Name | Type | Description | Default | Required --------------|----------|-------------------------------------------------------------------------------|------------|---------- - `mechanism` | `string` | Specifies the SASL mechanism the client uses to authenticate with the broker. | `"PLAIN""` | no - `user` | `string` | The user name to use for SASL authentication. | `""` | no - `password` | `string` | The password to use for SASL authentication. | `""` | no - `use_tls` | `bool` | If true, SASL authentication is executed over TLS. | `false` | no + Name | Type | Description | Default | Required +-------------|----------|--------------------------------------------------------------------|----------|----------------------- + `mechanism` | `string` | Specifies the SASL mechanism the client uses to authenticate with the broker. | `"PLAIN""` | no + `user` | `string` | The user name to use for SASL authentication. | `""` | no + `password` | `secret` | The password to use for SASL authentication. | `""` | no + `use_tls` | `bool` | If true, SASL authentication is executed over TLS. | `false` | no ### oauth_config block The `oauth_config` is required when the SASL mechanism is set to `OAUTHBEARER`. - Name | Type | Description | Default | Required + Name | Type | Description | Default | Required ------------------|----------------|------------------------------------------------------------------------|---------|---------- - `token_provider` | `string` | The OAuth provider to be used. The only supported provider is `azure`. | `""` | yes - `scopes` | `list(string)` | The scopes to set in the access token | `[]` | yes + `token_provider` | `string` | The OAuth provider to be used. The only supported provider is `azure`. | `""` | yes + `scopes` | `list(string)` | The scopes to set in the access token | `[]` | yes ## Exported fields diff --git a/go.mod b/go.mod index 19227e9963b9..d62e88a06943 100644 --- a/go.mod +++ b/go.mod @@ -26,7 +26,7 @@ require ( github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf github.com/coreos/go-systemd/v22 v22.5.0 github.com/davidmparrott/kafka_exporter/v2 v2.0.1 - github.com/docker/docker v24.0.6+incompatible + github.com/docker/docker v24.0.7+incompatible github.com/docker/go-connections v0.4.0 github.com/drone/envsubst/v2 v2.0.0-20210730161058-179042472c46 github.com/fatih/color v1.15.0 @@ -83,7 +83,7 @@ require ( github.com/heroku/x v0.0.61 github.com/iamseth/oracledb_exporter v0.0.0-20230918193147-95e16f21ceee github.com/influxdata/go-syslog/v3 v3.0.1-0.20210608084020-ac565dc76ba6 - github.com/jaegertracing/jaeger v1.48.0 + github.com/jaegertracing/jaeger v1.50.0 github.com/jmespath/go-jmespath v0.4.0 github.com/json-iterator/go v1.1.12 github.com/klauspost/compress v1.17.0 @@ -360,7 +360,7 @@ require ( github.com/gabriel-vasile/mimetype v1.4.2 // indirect github.com/go-git/gcfg v1.5.0 // indirect github.com/go-git/go-billy/v5 v5.3.1 // indirect - github.com/go-kit/kit v0.12.0 // indirect + github.com/go-kit/kit v0.13.0 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-ole/go-ole v1.2.6 // indirect github.com/go-openapi/analysis v0.21.4 // indirect @@ -409,7 +409,7 @@ require ( github.com/hashicorp/go-hclog v1.5.0 // indirect github.com/hashicorp/go-immutable-radix v1.3.1 // indirect github.com/hashicorp/go-msgpack v0.5.5 // indirect - github.com/hashicorp/go-plugin v1.4.10 // indirect + github.com/hashicorp/go-plugin v1.5.2 // indirect github.com/hashicorp/go-retryablehttp v0.7.4 // indirect github.com/hashicorp/go-rootcerts v1.0.2 // indirect github.com/hashicorp/go-secure-stdlib/awsutil v0.1.6 // indirect @@ -591,7 +591,7 @@ require ( go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/collector/config/internal v0.87.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.45.0 // indirect - go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.45.0 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.45.0 go.opentelemetry.io/contrib/propagators/b3 v1.19.0 // indirect go.opentelemetry.io/otel/bridge/opencensus v0.42.0 // indirect go4.org/netipx v0.0.0-20230125063823-8449b0a6169f // indirect @@ -621,7 +621,11 @@ require ( require github.com/ianlancetaylor/demangle v0.0.0-20230524184225-eabc099b10ab -require github.com/githubexporter/github-exporter v0.0.0-20231025122338-656e7dc33fe7 +require ( + github.com/githubexporter/github-exporter v0.0.0-20231025122338-656e7dc33fe7 + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.42.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0 +) require ( dario.cat/mergo v1.0.0 // indirect @@ -635,6 +639,7 @@ require ( github.com/leoluk/perflib_exporter v0.2.0 // indirect github.com/lightstep/go-expohisto v1.0.0 // indirect github.com/metalmatze/signal v0.0.0-20210307161603-1c9aa721a97a // indirect + github.com/natefinch/atomic v1.0.1 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.87.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka v0.87.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor v0.87.0 // indirect @@ -644,9 +649,7 @@ require ( github.com/sercand/kuberesolver/v4 v4.0.0 // indirect github.com/sony/gobreaker v0.5.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.42.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.42.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.42.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 // indirect go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.42.0 // indirect go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.19.0 // indirect diff --git a/go.sum b/go.sum index 8a4186e90e5c..6097d5c430b3 100644 --- a/go.sum +++ b/go.sum @@ -431,6 +431,8 @@ github.com/boynux/squid-exporter v1.10.5-0.20230618153315-c1fae094e18e h1:C1vYe7 github.com/boynux/squid-exporter v1.10.5-0.20230618153315-c1fae094e18e/go.mod h1:8NpZERGK+R9DGuZqqsKfnf2qI/rh7yBT8End29IvgNA= github.com/bufbuild/connect-go v1.10.0 h1:QAJ3G9A1OYQW2Jbk3DeoJbkCxuKArrvZgDt47mjdTbg= github.com/bufbuild/connect-go v1.10.0/go.mod h1:CAIePUgkDR5pAFaylSMtNK45ANQjp9JvpluG20rhpV8= +github.com/bufbuild/protocompile v0.4.0 h1:LbFKd2XowZvQ/kajzguUp2DC9UEIQhIq77fZZlaQsNA= +github.com/bufbuild/protocompile v0.4.0/go.mod h1:3v93+mbWn/v3xzN+31nwkJfrEpAUwp+BagBSZWx+TP8= github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs= github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= github.com/burningalchemist/sql_exporter v0.0.0-20221222155641-2ff59aa75200 h1:1zECtssRshqhP8+DELKyWeg8rxaRC5OO72kJQhrJOE8= @@ -582,8 +584,8 @@ github.com/docker/distribution v2.8.2+incompatible h1:T3de5rq0dB1j30rp0sA2rER+m3 github.com/docker/distribution v2.8.2+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= github.com/docker/docker v17.12.0-ce-rc1.0.20200916142827-bd33bbf0497b+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/docker/docker v20.10.7+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= -github.com/docker/docker v24.0.6+incompatible h1:hceabKCtUgDqPu+qm0NgsaXf28Ljf4/pWFL7xjWWDgE= -github.com/docker/docker v24.0.6+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= +github.com/docker/docker v24.0.7+incompatible h1:Wo6l37AuwP3JaMnZa226lzVXGA3F9Ig1seQen0cKYlM= +github.com/docker/docker v24.0.7+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/docker/go-connections v0.3.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec= github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ= github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec= @@ -725,8 +727,8 @@ github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2 github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.10.0/go.mod h1:xUsJbQ/Fp4kEt7AFgCuvyX4a71u8h9jB8tj/ORgOZ7o= github.com/go-kit/kit v0.11.0/go.mod h1:73/6Ixaufkvb5Osvkls8C79vuQ49Ba1rUEUYNSf+FUw= -github.com/go-kit/kit v0.12.0 h1:e4o3o3IsBfAKQh5Qbbiqyfu97Ku7jrO/JbohvztANh4= -github.com/go-kit/kit v0.12.0/go.mod h1:lHd+EkCZPIwYItmGDDRdhinkzX2A1sj+M9biaEaizzs= +github.com/go-kit/kit v0.13.0 h1:OoneCcHKHQ03LfBpoQCUfCluwd2Vt3ohz+kvbJneZAU= +github.com/go-kit/kit v0.13.0/go.mod h1:phqEHMMUbyrCFCTgH48JueqrM3md2HcAZ8N3XE4FKDg= github.com/go-ldap/ldap v3.0.2+incompatible/go.mod h1:qfd9rJvER9Q0/D/Sqn1DfHRoBp40uXYvFoEVrNEPqRc= github.com/go-ldap/ldap/v3 v3.1.10/go.mod h1:5Zun81jBTabRaI8lzN7E1JjyEl1g6zI6u9pd8luAK4Q= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= @@ -1165,8 +1167,8 @@ github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9 github.com/hashicorp/go-plugin v0.0.0-20180331002553-e8d22c780116/go.mod h1:JSqWYsict+jzcj0+xElxyrBQRPNoiWQuddnxArJ7XHQ= github.com/hashicorp/go-plugin v1.0.1/go.mod h1:++UyYGoz3o5w9ZzAdZxtQKrWWP+iqPBn3cQptSMzBuY= github.com/hashicorp/go-plugin v1.4.3/go.mod h1:5fGEH17QVwTTcR0zV7yhDPLLmFX9YSZ38b18Udy6vYQ= -github.com/hashicorp/go-plugin v1.4.10 h1:xUbmA4jC6Dq163/fWcp8P3JuHilrHHMLNRxzGQJ9hNk= -github.com/hashicorp/go-plugin v1.4.10/go.mod h1:6/1TEzT0eQznvI/gV2CM29DLSkAK/e58mUWKVsPaph0= +github.com/hashicorp/go-plugin v1.5.2 h1:aWv8eimFqWlsEiMrYZdPYl+FdHaBJSN4AWwGWfT1G2Y= +github.com/hashicorp/go-plugin v1.5.2/go.mod h1:w1sAEES3g3PuV/RzUrgow20W2uErMly84hhD3um1WL4= github.com/hashicorp/go-retryablehttp v0.0.0-20180531211321-3b087ef2d313/go.mod h1:fXcdFsQoipQa7mwORhKad5jmDCeSy/RCGzWA08PO0lM= github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs= github.com/hashicorp/go-retryablehttp v0.5.4/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs= @@ -1364,8 +1366,8 @@ github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0f github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.3.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= -github.com/jaegertracing/jaeger v1.48.0 h1:YuKooQ7qJsjgxws9xuf8C/BLNTPx8qTAJz4wv7IHhSc= -github.com/jaegertracing/jaeger v1.48.0/go.mod h1:BoAPkdCAIEuLsVz/EDhjXd+GSVpHtJhiGqWoFEvBCKg= +github.com/jaegertracing/jaeger v1.50.0 h1:qsOcPeB3nAc3h8tx+gnZ3JODAZfqbYmQr45jPEwBd2w= +github.com/jaegertracing/jaeger v1.50.0/go.mod h1:MVGvxf4+Pcn31gz9RnLo0097w3khKFwJIprIZHOt89s= github.com/jarcoal/httpmock v0.0.0-20180424175123-9c70cfe4a1da/go.mod h1:ks+b9deReOc7jgqp+e7LuFiCBH6Rm5hL32cLcEAArb4= github.com/jarcoal/httpmock v1.3.0 h1:2RJ8GP0IIaWwcC9Fp2BmVi8Kog3v2Hn7VXM3fTd+nuc= github.com/jarcoal/httpmock v1.3.0/go.mod h1:3yb8rc4BI7TCBhFY8ng0gjuLKJNquuDNiPaZjnENuYg= @@ -1388,8 +1390,9 @@ github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJk github.com/jefferai/jsonx v0.0.0-20160721235117-9cc31c3135ee/go.mod h1:N0t2vlmpe8nyZB5ouIbJQPDSR+mH6oe7xHB9VZHSUzM= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4= -github.com/jhump/protoreflect v1.6.0 h1:h5jfMVslIg6l29nsMs0D8Wj17RDVdNYti0vDN/PZZoE= github.com/jhump/protoreflect v1.6.0/go.mod h1:eaTn3RZAmMBcV0fifFvlm6VHNz3wSkYyXYWUh7ymB74= +github.com/jhump/protoreflect v1.15.1 h1:HUMERORf3I3ZdX05WaQ6MIpd/NJ434hTp5YiKgfCL6c= +github.com/jhump/protoreflect v1.15.1/go.mod h1:jD/2GMKKE6OqX8qTjhADU1e6DShO+gavG9e0Q693nKo= github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= @@ -1672,6 +1675,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= github.com/naoina/go-stringutil v0.1.0/go.mod h1:XJ2SJL9jCtBh+P9q5btrd/Ylo8XwT/h1USek5+NqSA0= +github.com/natefinch/atomic v1.0.1 h1:ZPYKxkqQOx3KZ+RsbnP/YsgvxWQPGxjC0oBt2AhwV0A= +github.com/natefinch/atomic v1.0.1/go.mod h1:N/D/ELrljoqDyT3rZrsUmtsuzvHkeB/wWjHV22AZRbM= github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg= github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU= github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q= diff --git a/pkg/integrations/mssql/collector_config.yaml b/pkg/integrations/mssql/collector_config.yaml index f105ba7febf2..dcd8b1314bb4 100644 --- a/pkg/integrations/mssql/collector_config.yaml +++ b/pkg/integrations/mssql/collector_config.yaml @@ -121,6 +121,12 @@ metrics: values: [virtual_memory_bytes] query_ref: mssql_process_memory + - metric_name: mssql_available_commit_memory_bytes + type: gauge + help: 'SQL Server available to be committed memory size.' + values: [available_commit_limit_bytes] + query_ref: mssql_process_memory + - metric_name: mssql_memory_utilization_percentage type: gauge help: 'The percentage of committed memory that is in the working set.' @@ -133,6 +139,21 @@ metrics: values: [page_fault_count] query_ref: mssql_process_memory + # + # Collected from sys.dm_os_sys_info + # + - metric_name: mssql_server_total_memory_bytes + type: gauge + help: 'SQL Server committed memory in the memory manager.' + values: [committed_memory_bytes] + query_ref: mssql_os_sys_info + + - metric_name: mssql_server_target_memory_bytes + type: gauge + help: 'SQL Server target committed memory set for the memory manager.' + values: [committed_memory_target_bytes] + query_ref: mssql_os_sys_info + # # Collected from sys.dm_os_sys_memory # @@ -168,13 +189,21 @@ queries: sys.dm_io_virtual_file_stats(null, null) a INNER JOIN sys.master_files b ON a.database_id = b.database_id AND a.file_id = b.file_id GROUP BY a.database_id - # Populates `mssql_resident_memory_bytes`, `mssql_virtual_memory_bytes`, `mssql_memory_utilization_percentage` and - # `mssql_page_fault_count`. + # Populates `mssql_resident_memory_bytes`, `mssql_virtual_memory_bytes`, mssql_available_commit_memory_bytes, + # and `mssql_memory_utilization_percentage`, and `mssql_page_fault_count_total` - query_name: mssql_process_memory query: | SELECT physical_memory_in_use_kb * 1024 AS resident_memory_bytes, virtual_address_space_committed_kb * 1024 AS virtual_memory_bytes, + available_commit_limit_kb * 1024 AS available_commit_limit_bytes, memory_utilization_percentage, page_fault_count FROM sys.dm_os_process_memory + # Populates `mssql_server_total_memory_bytes` and `mssql_server_target_memory_bytes`. + - query_name: mssql_os_sys_info + query: | + SELECT + committed_kb * 1024 AS committed_memory_bytes, + committed_target_kb * 1024 AS committed_memory_target_bytes + FROM sys.dm_os_sys_info diff --git a/pkg/integrations/v2/metricsutils/versionshim.go b/pkg/integrations/v2/metricsutils/versionshim.go index 6fcb1656998a..5a4db10efdc8 100644 --- a/pkg/integrations/v2/metricsutils/versionshim.go +++ b/pkg/integrations/v2/metricsutils/versionshim.go @@ -19,9 +19,9 @@ import ( // v2.Config with a new name. func NewNamedShim(newName string) v2.UpgradeFunc { return func(before v1.Config, common common.MetricsConfig) v2.UpgradedConfig { - return &configShim{ - orig: before, - common: common, + return &ConfigShim{ + Orig: before, + Common: common, nameOverride: newName, } } @@ -30,55 +30,55 @@ func NewNamedShim(newName string) v2.UpgradeFunc { // Shim upgrades a v1.Config to a v2.Config. The resulting config is NOT // registered. Shim matches the v2.UpgradeFunc type. func Shim(before v1.Config, common common.MetricsConfig) (after v2.UpgradedConfig) { - return &configShim{orig: before, common: common} + return &ConfigShim{Orig: before, Common: common} } -type configShim struct { - orig v1.Config - common common.MetricsConfig +type ConfigShim struct { + Orig v1.Config + Common common.MetricsConfig nameOverride string } var ( - _ v2.Config = (*configShim)(nil) - _ v2.UpgradedConfig = (*configShim)(nil) - _ v2.ComparableConfig = (*configShim)(nil) + _ v2.Config = (*ConfigShim)(nil) + _ v2.UpgradedConfig = (*ConfigShim)(nil) + _ v2.ComparableConfig = (*ConfigShim)(nil) ) -func (s *configShim) LegacyConfig() (v1.Config, common.MetricsConfig) { return s.orig, s.common } +func (s *ConfigShim) LegacyConfig() (v1.Config, common.MetricsConfig) { return s.Orig, s.Common } -func (s *configShim) Name() string { +func (s *ConfigShim) Name() string { if s.nameOverride != "" { return s.nameOverride } - return s.orig.Name() + return s.Orig.Name() } -func (s *configShim) ApplyDefaults(g v2.Globals) error { - s.common.ApplyDefaults(g.SubsystemOpts.Metrics.Autoscrape) +func (s *ConfigShim) ApplyDefaults(g v2.Globals) error { + s.Common.ApplyDefaults(g.SubsystemOpts.Metrics.Autoscrape) if id, err := s.Identifier(g); err == nil { - s.common.InstanceKey = &id + s.Common.InstanceKey = &id } return nil } -func (s *configShim) ConfigEquals(c v2.Config) bool { - o, ok := c.(*configShim) +func (s *ConfigShim) ConfigEquals(c v2.Config) bool { + o, ok := c.(*ConfigShim) if !ok { return false } - return util.CompareYAML(s.orig, o.orig) && util.CompareYAML(s.common, o.common) + return util.CompareYAML(s.Orig, o.Orig) && util.CompareYAML(s.Common, o.Common) } -func (s *configShim) Identifier(g v2.Globals) (string, error) { - if s.common.InstanceKey != nil { - return *s.common.InstanceKey, nil +func (s *ConfigShim) Identifier(g v2.Globals) (string, error) { + if s.Common.InstanceKey != nil { + return *s.Common.InstanceKey, nil } - return s.orig.InstanceKey(g.AgentIdentifier) + return s.Orig.InstanceKey(g.AgentIdentifier) } -func (s *configShim) NewIntegration(l log.Logger, g v2.Globals) (v2.Integration, error) { - v1Integration, err := s.orig.NewIntegration(l) +func (s *ConfigShim) NewIntegration(l log.Logger, g v2.Globals) (v2.Integration, error) { + v1Integration, err := s.Orig.NewIntegration(l) if err != nil { return nil, err } @@ -137,7 +137,7 @@ func (s *configShim) NewIntegration(l log.Logger, g v2.Globals) (v2.Integration, integrationName: s.Name(), instanceID: id, - common: s.common, + common: s.Common, globals: g, handler: handler, targets: targets, diff --git a/web/ui/package.json b/web/ui/package.json index 996d4d46402d..2f20549e10ba 100644 --- a/web/ui/package.json +++ b/web/ui/package.json @@ -28,7 +28,7 @@ "@types/prettier": "^2.7.2", "@types/prismjs": "^1.26.0", "@types/react": "^18.2.6", - "@types/react-dom": "^18.2.4", + "@types/react-dom": "^18.2.14", "@types/react-syntax-highlighter": "^15.5.6", "@types/react-table": "^7.7.14", "eslint": "^8.40.0", diff --git a/web/ui/yarn.lock b/web/ui/yarn.lock index ab3af4184b3d..20ee26ab4206 100644 --- a/web/ui/yarn.lock +++ b/web/ui/yarn.lock @@ -31,6 +31,14 @@ dependencies: "@babel/highlight" "^7.18.6" +"@babel/code-frame@^7.22.13": + version "7.22.13" + resolved "https://registry.yarnpkg.com/@babel/code-frame/-/code-frame-7.22.13.tgz#e3c1c099402598483b7a8c46a721d1038803755e" + integrity sha512-XktuhWlJ5g+3TJXc5upd9Ks1HutSArik6jf2eAjYFyIOf4ej3RN+184cZbzDvbPnuTJIUhPKKJE3cIsYTiAT3w== + dependencies: + "@babel/highlight" "^7.22.13" + chalk "^2.4.2" + "@babel/compat-data@^7.17.7", "@babel/compat-data@^7.20.5", "@babel/compat-data@^7.21.5": version "7.21.7" resolved "https://registry.yarnpkg.com/@babel/compat-data/-/compat-data-7.21.7.tgz#61caffb60776e49a57ba61a88f02bedd8714f6bc" @@ -76,6 +84,16 @@ "@jridgewell/trace-mapping" "^0.3.17" jsesc "^2.5.1" +"@babel/generator@^7.23.0": + version "7.23.0" + resolved "https://registry.yarnpkg.com/@babel/generator/-/generator-7.23.0.tgz#df5c386e2218be505b34837acbcb874d7a983420" + integrity sha512-lN85QRR+5IbYrMWM6Y4pE/noaQtg4pNiqeNGX60eqOfo6gtEj6uw/JagelB8vVztSd7R6M5n1+PQkDbHbBRU4g== + dependencies: + "@babel/types" "^7.23.0" + "@jridgewell/gen-mapping" "^0.3.2" + "@jridgewell/trace-mapping" "^0.3.17" + jsesc "^2.5.1" + "@babel/helper-annotate-as-pure@^7.18.6": version "7.18.6" resolved "https://registry.yarnpkg.com/@babel/helper-annotate-as-pure/-/helper-annotate-as-pure-7.18.6.tgz#eaa49f6f80d5a33f9a5dd2276e6d6e451be0a6bb" @@ -142,6 +160,11 @@ resolved "https://registry.yarnpkg.com/@babel/helper-environment-visitor/-/helper-environment-visitor-7.21.5.tgz#c769afefd41d171836f7cb63e295bedf689d48ba" integrity sha512-IYl4gZ3ETsWocUWgsFZLM5i1BYx9SoemminVEXadgLBa9TdeorzgLKm8wWLA6J1N/kT3Kch8XIk1laNzYoHKvQ== +"@babel/helper-environment-visitor@^7.22.20": + version "7.22.20" + resolved "https://registry.yarnpkg.com/@babel/helper-environment-visitor/-/helper-environment-visitor-7.22.20.tgz#96159db61d34a29dba454c959f5ae4a649ba9167" + integrity sha512-zfedSIzFhat/gFhWfHtgWvlec0nqB9YEIVrpuwjruLlXfUSnA8cJB0miHKwqDnQ7d32aKo2xt88/xZptwxbfhA== + "@babel/helper-function-name@^7.18.9", "@babel/helper-function-name@^7.19.0", "@babel/helper-function-name@^7.21.0": version "7.21.0" resolved "https://registry.yarnpkg.com/@babel/helper-function-name/-/helper-function-name-7.21.0.tgz#d552829b10ea9f120969304023cd0645fa00b1b4" @@ -150,6 +173,14 @@ "@babel/template" "^7.20.7" "@babel/types" "^7.21.0" +"@babel/helper-function-name@^7.23.0": + version "7.23.0" + resolved "https://registry.yarnpkg.com/@babel/helper-function-name/-/helper-function-name-7.23.0.tgz#1f9a3cdbd5b2698a670c30d2735f9af95ed52759" + integrity sha512-OErEqsrxjZTJciZ4Oo+eoZqeW9UIiOcuYKRJA4ZAgV9myA+pOXhhmpfNCKjEH/auVfEYVFJ6y1Tc4r0eIApqiw== + dependencies: + "@babel/template" "^7.22.15" + "@babel/types" "^7.23.0" + "@babel/helper-hoist-variables@^7.18.6": version "7.18.6" resolved "https://registry.yarnpkg.com/@babel/helper-hoist-variables/-/helper-hoist-variables-7.18.6.tgz#d4d2c8fb4baeaa5c68b99cc8245c56554f926678" @@ -157,6 +188,13 @@ dependencies: "@babel/types" "^7.18.6" +"@babel/helper-hoist-variables@^7.22.5": + version "7.22.5" + resolved "https://registry.yarnpkg.com/@babel/helper-hoist-variables/-/helper-hoist-variables-7.22.5.tgz#c01a007dac05c085914e8fb652b339db50d823bb" + integrity sha512-wGjk9QZVzvknA6yKIUURb8zY3grXCcOZt+/7Wcy8O2uctxhplmUPkOdlgoNhmdVee2c92JXbf1xpMtVNbfoxRw== + dependencies: + "@babel/types" "^7.22.5" + "@babel/helper-member-expression-to-functions@^7.21.5": version "7.21.5" resolved "https://registry.yarnpkg.com/@babel/helper-member-expression-to-functions/-/helper-member-expression-to-functions-7.21.5.tgz#3b1a009af932e586af77c1030fba9ee0bde396c0" @@ -240,16 +278,33 @@ dependencies: "@babel/types" "^7.18.6" +"@babel/helper-split-export-declaration@^7.22.6": + version "7.22.6" + resolved "https://registry.yarnpkg.com/@babel/helper-split-export-declaration/-/helper-split-export-declaration-7.22.6.tgz#322c61b7310c0997fe4c323955667f18fcefb91c" + integrity sha512-AsUnxuLhRYsisFiaJwvp1QF+I3KjD5FOxut14q/GzovUe6orHLesW2C7d754kRm53h5gqrz6sFl6sxc4BVtE/g== + dependencies: + "@babel/types" "^7.22.5" + "@babel/helper-string-parser@^7.21.5": version "7.21.5" resolved "https://registry.yarnpkg.com/@babel/helper-string-parser/-/helper-string-parser-7.21.5.tgz#2b3eea65443c6bdc31c22d037c65f6d323b6b2bd" integrity sha512-5pTUx3hAJaZIdW99sJ6ZUUgWq/Y+Hja7TowEnLNMm1VivRgZQL3vpBY3qUACVsvw+yQU6+YgfBVmcbLaZtrA1w== +"@babel/helper-string-parser@^7.22.5": + version "7.22.5" + resolved "https://registry.yarnpkg.com/@babel/helper-string-parser/-/helper-string-parser-7.22.5.tgz#533f36457a25814cf1df6488523ad547d784a99f" + integrity sha512-mM4COjgZox8U+JcXQwPijIZLElkgEpO5rsERVDJTc2qfCDfERyob6k5WegS14SX18IIjv+XD+GrqNumY5JRCDw== + "@babel/helper-validator-identifier@^7.18.6", "@babel/helper-validator-identifier@^7.19.1": version "7.19.1" resolved "https://registry.yarnpkg.com/@babel/helper-validator-identifier/-/helper-validator-identifier-7.19.1.tgz#7eea834cf32901ffdc1a7ee555e2f9c27e249ca2" integrity sha512-awrNfaMtnHUr653GgGEs++LlAvW6w+DcPrOliSMXWCKo597CwL5Acf/wWdNkf/tfEQE3mjkeD1YOVZOUV/od1w== +"@babel/helper-validator-identifier@^7.22.20": + version "7.22.20" + resolved "https://registry.yarnpkg.com/@babel/helper-validator-identifier/-/helper-validator-identifier-7.22.20.tgz#c4ae002c61d2879e724581d96665583dbc1dc0e0" + integrity sha512-Y4OZ+ytlatR8AI+8KZfKuL5urKp7qey08ha31L8b3BwewJAoJamTzyvxPR/5D+KkdJCGPq/+8TukHBlY10FX9A== + "@babel/helper-validator-option@^7.18.6", "@babel/helper-validator-option@^7.21.0": version "7.21.0" resolved "https://registry.yarnpkg.com/@babel/helper-validator-option/-/helper-validator-option-7.21.0.tgz#8224c7e13ace4bafdc4004da2cf064ef42673180" @@ -283,11 +338,25 @@ chalk "^2.0.0" js-tokens "^4.0.0" -"@babel/parser@^7.1.0", "@babel/parser@^7.14.7", "@babel/parser@^7.20.7", "@babel/parser@^7.21.5", "@babel/parser@^7.21.8": +"@babel/highlight@^7.22.13": + version "7.22.20" + resolved "https://registry.yarnpkg.com/@babel/highlight/-/highlight-7.22.20.tgz#4ca92b71d80554b01427815e06f2df965b9c1f54" + integrity sha512-dkdMCN3py0+ksCgYmGG8jKeGA/8Tk+gJwSYYlFGxG5lmhfKNoAy004YpLxpS1W2J8m/EK2Ew+yOs9pVRwO89mg== + dependencies: + "@babel/helper-validator-identifier" "^7.22.20" + chalk "^2.4.2" + js-tokens "^4.0.0" + +"@babel/parser@^7.1.0", "@babel/parser@^7.14.7", "@babel/parser@^7.20.7", "@babel/parser@^7.21.8": version "7.21.8" resolved "https://registry.yarnpkg.com/@babel/parser/-/parser-7.21.8.tgz#642af7d0333eab9c0ad70b14ac5e76dbde7bfdf8" integrity sha512-6zavDGdzG3gUqAdWvlLFfk+36RilI+Pwyuuh7HItyeScCWP3k6i8vKclAQ0bM/0y/Kz/xiwvxhMv9MgTJP5gmA== +"@babel/parser@^7.22.15", "@babel/parser@^7.23.0": + version "7.23.0" + resolved "https://registry.yarnpkg.com/@babel/parser/-/parser-7.23.0.tgz#da950e622420bf96ca0d0f2909cdddac3acd8719" + integrity sha512-vvPKKdMemU85V9WE/l5wZEmImpCtLqbnTvqDS2U1fJ96KrxoW7KrXhNsNCblQlg8Ck4b85yxdTyelsMUgFUXiw== + "@babel/plugin-bugfix-safari-id-destructuring-collision-in-function-expression@^7.18.6": version "7.18.6" resolved "https://registry.yarnpkg.com/@babel/plugin-bugfix-safari-id-destructuring-collision-in-function-expression/-/plugin-bugfix-safari-id-destructuring-collision-in-function-expression-7.18.6.tgz#da5b8f9a580acdfbe53494dba45ea389fb09a4d2" @@ -1049,19 +1118,28 @@ "@babel/parser" "^7.20.7" "@babel/types" "^7.20.7" -"@babel/traverse@^7.20.5", "@babel/traverse@^7.21.5", "@babel/traverse@^7.7.2": - version "7.21.5" - resolved "https://registry.yarnpkg.com/@babel/traverse/-/traverse-7.21.5.tgz#ad22361d352a5154b498299d523cf72998a4b133" - integrity sha512-AhQoI3YjWi6u/y/ntv7k48mcrCXmus0t79J9qPNlk/lAsFlCiJ047RmbfMOawySTHtywXhbXgpx/8nXMYd+oFw== +"@babel/template@^7.22.15": + version "7.22.15" + resolved "https://registry.yarnpkg.com/@babel/template/-/template-7.22.15.tgz#09576efc3830f0430f4548ef971dde1350ef2f38" + integrity sha512-QPErUVm4uyJa60rkI73qneDacvdvzxshT3kksGqlGWYdOTIUOwJ7RDUL8sGqslY1uXWSL6xMFKEXDS3ox2uF0w== dependencies: - "@babel/code-frame" "^7.21.4" - "@babel/generator" "^7.21.5" - "@babel/helper-environment-visitor" "^7.21.5" - "@babel/helper-function-name" "^7.21.0" - "@babel/helper-hoist-variables" "^7.18.6" - "@babel/helper-split-export-declaration" "^7.18.6" - "@babel/parser" "^7.21.5" - "@babel/types" "^7.21.5" + "@babel/code-frame" "^7.22.13" + "@babel/parser" "^7.22.15" + "@babel/types" "^7.22.15" + +"@babel/traverse@^7.20.5", "@babel/traverse@^7.21.5", "@babel/traverse@^7.7.2": + version "7.23.2" + resolved "https://registry.yarnpkg.com/@babel/traverse/-/traverse-7.23.2.tgz#329c7a06735e144a506bdb2cad0268b7f46f4ad8" + integrity sha512-azpe59SQ48qG6nu2CzcMLbxUudtN+dOM9kDbUqGq3HXUJRlo7i8fvPoxQUzYgLZ4cMVmuZgm8vvBpNeRhd6XSw== + dependencies: + "@babel/code-frame" "^7.22.13" + "@babel/generator" "^7.23.0" + "@babel/helper-environment-visitor" "^7.22.20" + "@babel/helper-function-name" "^7.23.0" + "@babel/helper-hoist-variables" "^7.22.5" + "@babel/helper-split-export-declaration" "^7.22.6" + "@babel/parser" "^7.23.0" + "@babel/types" "^7.23.0" debug "^4.1.0" globals "^11.1.0" @@ -1074,6 +1152,15 @@ "@babel/helper-validator-identifier" "^7.19.1" to-fast-properties "^2.0.0" +"@babel/types@^7.22.15", "@babel/types@^7.22.5", "@babel/types@^7.23.0": + version "7.23.0" + resolved "https://registry.yarnpkg.com/@babel/types/-/types-7.23.0.tgz#8c1f020c9df0e737e4e247c0619f58c68458aaeb" + integrity sha512-0oIyUfKoI3mSqMvsxBdclDwxXKXAUA8v/apZbc+iSyARYou1o8ZGDxbUYyLFoW2arqS2jDGqJuZvv1d/io1axg== + dependencies: + "@babel/helper-string-parser" "^7.22.5" + "@babel/helper-validator-identifier" "^7.22.20" + to-fast-properties "^2.0.0" + "@bcoe/v8-coverage@^0.2.3": version "0.2.3" resolved "https://registry.yarnpkg.com/@bcoe/v8-coverage/-/v8-coverage-0.2.3.tgz#75a2e8b51cb758a7553d6804a5932d7aace75c39" @@ -3084,10 +3171,10 @@ resolved "https://registry.yarnpkg.com/@types/range-parser/-/range-parser-1.2.4.tgz#cd667bcfdd025213aafb7ca5915a932590acdcdc" integrity sha512-EEhsLsD6UsDM1yFhAvy0Cjr6VwmpMWqFBCb9w07wVugF7w9nfajxLuVmngTIpgS6svCnm6Vaw+MZhoDCKnOfsw== -"@types/react-dom@^18.2.4": - version "18.2.4" - resolved "https://registry.yarnpkg.com/@types/react-dom/-/react-dom-18.2.4.tgz#13f25bfbf4e404d26f62ac6e406591451acba9e0" - integrity sha512-G2mHoTMTL4yoydITgOGwWdWMVd8sNgyEP85xVmMKAPUBwQWm9wBPQUmvbeF4V3WBY1P7mmL4BkjQ0SqUpf1snw== +"@types/react-dom@^18.2.14": + version "18.2.14" + resolved "https://registry.yarnpkg.com/@types/react-dom/-/react-dom-18.2.14.tgz#c01ba40e5bb57fc1dc41569bb3ccdb19eab1c539" + integrity sha512-V835xgdSVmyQmI1KLV2BEIUgqEuinxp9O4G6g3FqO/SqLac049E53aysv0oEFD2kHfejeKU+ZqL2bcFWj9gLAQ== dependencies: "@types/react" "*" @@ -4122,7 +4209,7 @@ case-sensitive-paths-webpack-plugin@^2.4.0: resolved "https://registry.yarnpkg.com/case-sensitive-paths-webpack-plugin/-/case-sensitive-paths-webpack-plugin-2.4.0.tgz#db64066c6422eed2e08cc14b986ca43796dbc6d4" integrity sha512-roIFONhcxog0JSSWbvVAh3OocukmSgpqOH6YpMkCvav/ySIV3JKg4Dc8vYtQjYi/UxpNE36r/9v+VqTQqgkYmw== -chalk@^2.0.0: +chalk@^2.0.0, chalk@^2.4.2: version "2.4.2" resolved "https://registry.yarnpkg.com/chalk/-/chalk-2.4.2.tgz#cd42541677a54333cf541a49108c1432b44c9424" integrity sha512-Mti+f9lpJNcwF4tWV8/OrTTtF1gZi+f8FqlyAdouralcFWFQWF2+NgCHShjkCb+IFBLq9buZwE1xckQU4peSuQ==