Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding the serialization features. #1666

Merged
merged 13 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ lint: alloylint
# final command runs tests for all other submodules.
test:
$(GO_ENV) go test $(GO_FLAGS) -race $(shell go list ./... | grep -v /integration-tests/)
$(GO_ENV) go test $(GO_FLAGS) ./internal/static/integrations/node_exporter ./internal/static/logs ./internal/component/otelcol/processor/tail_sampling ./internal/component/loki/source/file ./internal/component/loki/source/docker
$(GO_ENV) go test $(GO_FLAGS) ./internal/static/integrations/node_exporter ./internal/static/logs ./internal/component/otelcol/processor/tail_sampling ./internal/component/loki/source/file ./internal/component/loki/source/docker ./internal/component/prometheus/remote/queue/serialization
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd be running these tests twice, second time without -race - I don't see the reason why, is that an accident?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is one test that will not be ran twice since I am accessing the var directly to test its value. The others will be ran, I could add the //go:build race to the others. Note most of our exclusions above have some tests that run twice.

$(GO_ENV) find . -name go.mod -not -path "./go.mod" -execdir go test -race ./... \;

test-packages:
Expand Down
20 changes: 10 additions & 10 deletions internal/component/prometheus/remote/queue/filequeue/filequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type queue struct {
// block until ready for another record.
out func(ctx context.Context, dh types.DataHandle)
// existingFiles is the list of files found initially.
existingsFiles []string
existingFiles []string
}

// NewQueue returns a implementation of FileStorage.
Expand Down Expand Up @@ -61,18 +61,18 @@ func NewQueue(directory string, out func(ctx context.Context, dh types.DataHandl
currentMaxID = ids[len(ids)-1]
}
q := &queue{
directory: directory,
maxID: currentMaxID,
logger: logger,
out: out,
dataQueue: actor.NewMailbox[types.Data](),
existingsFiles: make([]string, 0),
directory: directory,
maxID: currentMaxID,
logger: logger,
out: out,
dataQueue: actor.NewMailbox[types.Data](),
existingFiles: make([]string, 0),
}

// Save the existing files in `q.existingFiles`, which will have their data pushed to `out` when actor starts.
for _, id := range ids {
name := filepath.Join(directory, fmt.Sprintf("%d.committed", id))
q.existingsFiles = append(q.existingsFiles, name)
q.existingFiles = append(q.existingFiles, name)
}
return q, nil
}
Expand Down Expand Up @@ -115,7 +115,7 @@ func get(logger log.Logger, name string) (map[string]string, []byte, error) {
// DoWork allows most of the queue to be single threaded with work only coming in and going out via mailboxes(channels).
func (q *queue) DoWork(ctx actor.Context) actor.WorkerStatus {
// Queue up our existing items.
for _, name := range q.existingsFiles {
for _, name := range q.existingFiles {
q.out(ctx, types.DataHandle{
Name: name,
Pop: func() (map[string]string, []byte, error) {
Expand All @@ -124,7 +124,7 @@ func (q *queue) DoWork(ctx actor.Context) actor.WorkerStatus {
})
}
// We only want to process existing files once.
q.existingsFiles = nil
q.existingFiles = nil
select {
case <-ctx.Done():
return actor.WorkerEnd
Expand Down
121 changes: 121 additions & 0 deletions internal/component/prometheus/remote/queue/serialization/appender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package serialization

import (
"context"
"time"

"github.com/go-kit/log"
"github.com/grafana/alloy/internal/component/prometheus/remote/queue/types"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/storage"
)

type appender struct {
ctx context.Context
ttl time.Duration
s types.Serializer
logger log.Logger
}

func (a *appender) AppendCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64) (storage.SeriesRef, error) {
// TODO @mattdurham figure out what to do here later. This mirrors what we do elsewhere.
mattdurham marked this conversation as resolved.
Show resolved Hide resolved
return ref, nil
}

// NewAppender returns an Appender that writes to a given serializer. NOTE the Appender returned writes
// data immediately and does not honor commit or rollback.
mattdurham marked this conversation as resolved.
Show resolved Hide resolved
func NewAppender(ctx context.Context, ttl time.Duration, s types.Serializer, logger log.Logger) storage.Appender {
app := &appender{
ttl: ttl,
s: s,
logger: logger,
ctx: ctx,
}
return app
}

// Append metric
func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
// Check to see if the TTL has expired for this record.
endTime := time.Now().Unix() - int64(a.ttl.Seconds())
if t < endTime {
return ref, nil
}
ts := types.GetTimeSeriesBinary()
mattdurham marked this conversation as resolved.
Show resolved Hide resolved
ts.Labels = l
ts.TS = t
ts.Value = v
ts.Hash = l.Hash()
err := a.s.SendSeries(a.ctx, ts)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it guaranteed that ts will be returned eventually to the object pool? Would we have a leak if, e.g. the component was removed from Alloy config? I don't see any issues, but would be nice to make this code a bit more clear that this is what's going on, with naming or comments.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be a required that all time series are returned. Though not in this PR this is checked in a future test via OutStandingTimeSeriesBinary atomic int. There are end to end tests that ensure at the end of the test this is zero.

return ref, err
}

// Commit is a no op since we always write.
func (a *appender) Commit() (_ error) {
return nil
}

// Rollback is a no op since we write all the data.
func (a *appender) Rollback() error {
return nil
}

// AppendExemplar appends exemplar to cache. The passed in labels is unused, instead use the labels on the exemplar.
func (a *appender) AppendExemplar(ref storage.SeriesRef, _ labels.Labels, e exemplar.Exemplar) (_ storage.SeriesRef, _ error) {
endTime := time.Now().Unix() - int64(a.ttl.Seconds())
if e.HasTs && e.Ts < endTime {
return ref, nil
}
ts := types.GetTimeSeriesBinary()
ts.Hash = e.Labels.Hash()
ts.TS = e.Ts
ts.Labels = e.Labels
ts.Hash = e.Labels.Hash()
err := a.s.SendSeries(a.ctx, ts)
return ref, err
}

// AppendHistogram appends histogram
func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (_ storage.SeriesRef, _ error) {
endTime := time.Now().Unix() - int64(a.ttl.Seconds())
if t < endTime {
return ref, nil
}
ts := types.GetTimeSeriesBinary()
ts.Labels = l
ts.TS = t
if h != nil {
ts.FromHistogram(t, h)
} else {
ts.FromFloatHistogram(t, fh)
}
ts.Hash = l.Hash()
err := a.s.SendSeries(a.ctx, ts)
return ref, err
}

// UpdateMetadata updates metadata.
func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (_ storage.SeriesRef, _ error) {
ts := types.GetTimeSeriesBinary()
// We are going to handle converting some strings to hopefully not reused label names. TimeSeriesBinary has a lot of work
// to ensure its efficient it makes sense to encode metadata into it.
combinedLabels := l.Copy()
combinedLabels = append(combinedLabels, labels.Label{
Name: "__alloy_metadata_type__",
Value: string(m.Type),
})
combinedLabels = append(combinedLabels, labels.Label{
Name: "__alloy_metadata_help__",
Value: m.Help,
})
combinedLabels = append(combinedLabels, labels.Label{
Name: "__alloy_metadata_unit__",
Value: m.Unit,
Comment on lines +107 to +116
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are used in more than one place, can we make them constants? Helps find usages and generally nice practice IMO.

})
ts.Labels = combinedLabels
err := a.s.SendMetadata(a.ctx, ts)
return ref, err
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package serialization

import (
"context"
log2 "github.com/go-kit/log"
"github.com/grafana/alloy/internal/component/prometheus/remote/queue/types"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"testing"
"time"
)

func TestAppenderTTL(t *testing.T) {
fake := &counterSerializer{}
l := log2.NewNopLogger()

app := NewAppender(context.Background(), 1*time.Minute, fake, l)
_, err := app.Append(0, labels.FromStrings("one", "two"), time.Now().Unix(), 0)
require.NoError(t, err)

for i := 0; i < 10; i++ {
_, err = app.Append(0, labels.FromStrings("one", "two"), time.Now().Add(-5*time.Minute).Unix(), 0)
require.NoError(t, err)
}
// Only one record should make it through.
require.True(t, fake.received == 1)
}

var _ types.Serializer = (*fakeSerializer)(nil)

type counterSerializer struct {
received int
}

func (f *counterSerializer) Start() {

}

func (f *counterSerializer) Stop() {

}

func (f *counterSerializer) SendSeries(ctx context.Context, data *types.TimeSeriesBinary) error {
f.received++
return nil

}

func (f *counterSerializer) SendMetadata(ctx context.Context, data *types.TimeSeriesBinary) error {
return nil
}

func (f *counterSerializer) UpdateConfig(ctx context.Context, data types.SerializerConfig) error {
return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
//go:build !race

mattdurham marked this conversation as resolved.
Show resolved Hide resolved
package serialization

import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"

"github.com/go-kit/log"
"github.com/golang/snappy"
"github.com/grafana/alloy/internal/component/prometheus/remote/queue/types"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
)

func TestRoundTripSerialization(t *testing.T) {
totalSeries := atomic.Int64{}
f := &fqq{t: t}
l := log.NewNopLogger()
start := time.Now().Add(-1 * time.Second).Unix()

s, err := NewSerializer(types.SerializerConfig{
MaxSignalsInBatch: 10,
FlushFrequency: 5 * time.Second,
}, f, func(stats types.SerializerStats) {
totalSeries.Add(int64(stats.SeriesStored))
require.True(t, stats.SeriesStored == 10)
require.True(t, stats.Errors == 0)
require.True(t, stats.MetadataStored == 0)
require.True(t, stats.NewestTimestamp > start)
}, l)
require.NoError(t, err)

s.Start()
defer s.Stop()
for i := 0; i < 100; i++ {
tss := types.GetTimeSeriesBinary()
tss.Labels = make(labels.Labels, 10)
for j := 0; j < 10; j++ {
tss.Labels[j] = labels.Label{
Name: fmt.Sprintf("name_%d_%d", i, j),
Value: fmt.Sprintf("value_%d_%d", i, j),
}
tss.Value = float64(i)
tss.TS = time.Now().Unix()
}
sendErr := s.SendSeries(context.Background(), tss)
require.NoError(t, sendErr)
}
require.Eventually(t, func() bool {
return f.total.Load() == 100
}, 5*time.Second, 100*time.Millisecond)
// 100 series send from the above for loop
require.True(t, totalSeries.Load() == 100)
}

func TestUpdateConfig(t *testing.T) {
f := &fqq{t: t}
l := log.NewNopLogger()
s, err := NewSerializer(types.SerializerConfig{
MaxSignalsInBatch: 10,
FlushFrequency: 5 * time.Second,
}, f, func(stats types.SerializerStats) {}, l)
require.NoError(t, err)
s.Start()
defer s.Stop()
err = s.UpdateConfig(context.Background(), types.SerializerConfig{
MaxSignalsInBatch: 1,
FlushFrequency: 1 * time.Second,
})
require.NoError(t, err)
require.Eventually(t, func() bool {
return s.(*serializer).maxItemsBeforeFlush == 1 && s.(*serializer).flushFrequency == 1*time.Second
}, 5*time.Second, 100*time.Millisecond)
}

var _ types.FileStorage = (*fqq)(nil)

type fqq struct {
t *testing.T
buf []byte
total atomic.Int64
}

func (f *fqq) Start() {

}

func (f *fqq) Stop() {

}

func (f *fqq) Store(ctx context.Context, meta map[string]string, value []byte) error {
f.buf, _ = snappy.Decode(nil, value)
sg := &types.SeriesGroup{}
sg, _, err := types.DeserializeToSeriesGroup(sg, f.buf)
require.NoError(f.t, err)
require.Len(f.t, sg.Series, 10)
for _, series := range sg.Series {
require.Len(f.t, series.LabelsNames, 0)
require.Len(f.t, series.LabelsValues, 0)
require.Len(f.t, series.Labels, 10)
for j := 0; j < 10; j++ {
series.Labels[j].Name = fmt.Sprintf("name_%d_%d", int(series.Value), j)
series.Labels[j].Value = fmt.Sprintf("value_%d_%d", int(series.Value), j)
}
}
f.total.Add(int64(len(sg.Series)))
return nil
}
Loading
Loading