Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding the serialization features. #1666

Merged
merged 13 commits into from
Sep 16, 2024
Merged

Adding the serialization features. #1666

merged 13 commits into from
Sep 16, 2024

Conversation

mattdurham
Copy link
Collaborator

This adds the serialization side of converting series to the binary format. The binary format is time series where the strings are deduplicated with actual marshalling handled by the msgp library. I tested roughly 6 libraries listed here. Msgp hit the sweetspot of ease of use and size and features. Such as reusing arrays if they were passed in.

@mattdurham mattdurham marked this pull request as ready for review September 11, 2024 14:53
@mattdurham
Copy link
Collaborator Author

Don't be put off by the lines change count, 80% of that is generated code from msgp

@mattdurham
Copy link
Collaborator Author

Tests are failing from a data race in the tests themselves since I am accessing them directly. Lemme see if I can fix that.

Copy link
Contributor

@wildum wildum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great work, I like the deduplication algo. I only did a first pass, I'm not yet familiar with the full picture

@mattdurham
Copy link
Collaborator Author

Stats are protocol agnostic, so that if used in prometheus or otel environment they can add their own specific metrics and we dont define protocol specific in the lower level structs. The component in the last PR will expose prometheus compatible ones derived from the callback.

Copy link
Contributor

@thampiotr thampiotr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First pass. Looking good :)

@@ -141,7 +141,7 @@ lint: alloylint
# final command runs tests for all other submodules.
test:
$(GO_ENV) go test $(GO_FLAGS) -race $(shell go list ./... | grep -v /integration-tests/)
$(GO_ENV) go test $(GO_FLAGS) ./internal/static/integrations/node_exporter ./internal/static/logs ./internal/component/otelcol/processor/tail_sampling ./internal/component/loki/source/file ./internal/component/loki/source/docker
$(GO_ENV) go test $(GO_FLAGS) ./internal/static/integrations/node_exporter ./internal/static/logs ./internal/component/otelcol/processor/tail_sampling ./internal/component/loki/source/file ./internal/component/loki/source/docker ./internal/component/prometheus/remote/queue/serialization
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd be running these tests twice, second time without -race - I don't see the reason why, is that an accident?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is one test that will not be ran twice since I am accessing the var directly to test its value. The others will be ran, I could add the //go:build race to the others. Note most of our exclusions above have some tests that run twice.

ts.TS = t
ts.Value = v
ts.Hash = l.Hash()
err := a.s.SendSeries(a.ctx, ts)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it guaranteed that ts will be returned eventually to the object pool? Would we have a leak if, e.g. the component was removed from Alloy config? I don't see any issues, but would be nice to make this code a bit more clear that this is what's going on, with naming or comments.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be a required that all time series are returned. Though not in this PR this is checked in a future test via OutStandingTimeSeriesBinary atomic int. There are end to end tests that ensure at the end of the test this is zero.

Comment on lines 181 to 185
stringsSlice := make([]string, len(strMapToInt))
for stringValue, index := range strMapToInt {
stringsSlice[index] = stringValue
}
group.Strings = stringsSlice
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
stringsSlice := make([]string, len(strMapToInt))
for stringValue, index := range strMapToInt {
stringsSlice[index] = stringValue
}
group.Strings = stringsSlice
dictionary := make([]string, len(strMapToInt))
for stringValue, index := range strMapToInt {
dictionary[index] = stringValue
}
group.dictionary = dictionary

I like to use the concept of dictionary here, or lookup table... it makes it easier to figure out what's going on.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean to use an actual map? Or a rename like above?

}
group.Strings = stringsSlice

buf, err := group.MarshalMsg(s.msgpBuffer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sooo... is it worth it to do the dictionary stuff? I guess yes, but on the other hand I know that compression algos would do something similar automatically, snappy can refer to previous part of the data to reduce repetition.

Copy link
Collaborator Author

@mattdurham mattdurham Sep 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One second had a bug in my test re-evaluating.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright back with much more verifiable test.

//go:generate msgp

package main

import (
	"fmt"
	"math/rand"
	"reflect"

	"github.com/golang/snappy"
)

// 5 long really random
// 5371616
// 4732108

// 5 long half random
// 5050060
// 3929185

// 5 long quarter random
// 4455979
// 2918973
func main() {
	metrics := make([]map[string]string, 0)
	// 100k metrics with 10 labels each
	for i := 0; i < 100_000; i++ {
		metrics = append(metrics, getLabels())

	}

	ss := &StringString{Labels: metrics}
	bb, err := ss.MarshalMsg(nil)
	if err != nil {
		panic(err)
	}
	out := snappy.Encode(nil, bb)
	dc, _ := snappy.Decode(nil, out)
	err = validateStringString(dc, metrics)
	if err != nil {
		panic(err)
	}
	println(fmt.Printf("dictionary based is %d bytes", len(out)))

	ib := &IndexBased{
		String: make([]string, 0),
		Names:  make([][]uint32, 0),
		Values: make([][]uint32, 0),
	}
	alignIndexBased(ib, metrics)
	bb, err = ib.MarshalMsg(nil)
	if err != nil {
		panic(err)
	}
	out = snappy.Encode(nil, bb)
	dc, _ = snappy.Decode(nil, out)
	err = validateIndexBased(dc, metrics)
	println(fmt.Printf("index based is %d bytes", len(out)))
}

func validateStringString(bb []byte, metrics []map[string]string) error {
	ss := &StringString{}
	_, err := ss.UnmarshalMsg(bb)
	if err != nil {
		return err
	}
	for i, m := range metrics {
		if !reflect.DeepEqual(ss.Labels[i], m) {
			return fmt.Errorf("invalid metric at index %d", i)
		}

	}
	return nil
}

func validateIndexBased(bb []byte, metrics []map[string]string) error {
	ss := &IndexBased{}
	_, err := ss.UnmarshalMsg(bb)
	if err != nil {
		return err
	}
	for i, m := range metrics {
		if !reflect.DeepEqual(getMetric(ss.Names[i], ss.Values[i], ss.String), m) {
			return fmt.Errorf("invalid metric at index %d", i)
		}

	}
	return nil
}

func getMetric(names []uint32, values []uint32, strings []string) map[string]string {
	metric := make(map[string]string)
	for i, v := range names {
		metric[strings[v]] = strings[values[i]]
	}
	return metric
}

func alignIndexBased(ib *IndexBased, strings []map[string]string) {
	index := 0
	stringsList := make(map[string]int)
	for _, metric := range strings {
		names := make([]uint32, 0)
		values := make([]uint32, 0)
		for k, v := range metric {
			keyIndex, ok := stringsList[k]
			if !ok {
				stringsList[k] = index
				ib.String = append(ib.String, k)
				keyIndex = index
				index++
			}
			valIndex, ok := stringsList[v]
			if !ok {
				stringsList[v] = index
				ib.String = append(ib.String, v)
				valIndex = index
				index++
			}
			names = append(names, uint32(keyIndex))
			values = append(values, uint32(valIndex))
		}
		ib.Names = append(ib.Names, names)
		ib.Values = append(ib.Values, values)
	}

	ib.String = make([]string, len(stringsList))
	for k, v := range stringsList {
		ib.String[v] = k
	}
}

func getLabels() map[string]string {
	retLbls := make(map[string]string, 0)
	for i := 0; i < 10; i++ {
		retLbls[fmt.Sprintf("label_%d", i)] = randString()
	}
	return retLbls
}

var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
var halfRandom = []rune("abcdefghijklmnopqrstuvwxyz")
var quarterRandom = []rune("abcdefghijkl")

func randString() string {
	b := make([]rune, rand.Intn(5))
	for i := range b {
		b[i] = letterRunes[rand.Intn(len(letterRunes))]
	}
	return string(b)
}

type IndexBased struct {
	Names  [][]uint32
	Values [][]uint32
	String []string
}

type StringString struct {
	Labels []map[string]string
}

Copy link
Collaborator Author

@mattdurham mattdurham Sep 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general the index based is never worse, and IMO in many cases is 60% of the size of the pure string based.
Results from the above test, changing out the letterRunes to small sets.

// 5 char long really random
// 5371616 string map
// 4732108 index based

// 5 long half random
// 5050060
// 3929185

// 5 long quarter random
// 4455979
// 2918973

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lower the cardinality the better but even in worse case its not terrible.

@mattdurham
Copy link
Collaborator Author

Going to merge this and we can revisit any followup in the big merge on specific points.

@mattdurham mattdurham merged commit 626113f into dev.new-wal Sep 16, 2024
17 checks passed
@mattdurham mattdurham deleted the wal_serialization branch September 16, 2024 14:07
mattdurham added a commit that referenced this pull request Oct 16, 2024
* readme

* fix readme

* Add filequeue functionality (#1601)

* Checkin for file queue

* add comment

* Update internal/component/prometheus/remote/queue/filequeue/filequeue.go

Co-authored-by: Piotr <[email protected]>

* Update internal/component/prometheus/remote/queue/filequeue/filequeue.go

Co-authored-by: Piotr <[email protected]>

* Update internal/component/prometheus/remote/queue/filequeue/filequeue.go

Co-authored-by: Piotr <[email protected]>

* Update internal/component/prometheus/remote/queue/filequeue/filequeue.go

Co-authored-by: Piotr <[email protected]>

* Update internal/component/prometheus/remote/queue/filequeue/filequeue.go

Co-authored-by: Piotr <[email protected]>

* Update internal/component/prometheus/remote/queue/filequeue/filequeue.go

Co-authored-by: Piotr <[email protected]>

* Update internal/component/prometheus/remote/queue/filequeue/filequeue.go

Co-authored-by: Piotr <[email protected]>

* naming and error handling feedback from PR

* Update internal/component/prometheus/remote/queue/filequeue/filequeue.go

Co-authored-by: Piotr <[email protected]>

* Update internal/component/prometheus/remote/queue/filequeue/filequeue.go

Co-authored-by: Piotr <[email protected]>

* Update internal/component/prometheus/remote/queue/filequeue/filequeue.go

Co-authored-by: Piotr <[email protected]>

* drop benchmark

* rename get to pop

---------

Co-authored-by: Piotr <[email protected]>

* Adding the serialization features. (#1666)

* Adding the serialization features.

* Dont test this with race condition since we access vars directly.

* Fix test.

* Fix typo in file name and return early in DeserializeToSeriesGroup.

* Update internal/component/prometheus/remote/queue/serialization/appender.go

Co-authored-by: Piotr <[email protected]>

* Update internal/component/prometheus/remote/queue/serialization/serializer.go

Co-authored-by: Piotr <[email protected]>

* Rename to indicate that TimeSeries are Put/Get from a pool.

* Remove func that was about the same number of lines as inlining.

* Update internal/component/prometheus/remote/queue/types/serialization.go

Co-authored-by: Piotr <[email protected]>

* Update internal/component/prometheus/remote/queue/serialization/serializer.go

Co-authored-by: Piotr <[email protected]>

* Change benchmark to be more specific.

---------

Co-authored-by: Piotr <[email protected]>

* Network wal pr (#1717)

* Checkin the networking items.

* Fix for config updating and tests.

* Update internal/component/prometheus/remote/queue/network/loop.go

Co-authored-by: William Dumont <[email protected]>

* Update internal/component/prometheus/remote/queue/network/loop.go

Co-authored-by: Piotr <[email protected]>

* pr feedback

* pr feedback

* simplify stats

* PR feedback

---------

Co-authored-by: William Dumont <[email protected]>
Co-authored-by: Piotr <[email protected]>

* Component (#1823)

* Checkin the networking items.

* Fix for config updating and tests.

* Update internal/component/prometheus/remote/queue/network/loop.go

Co-authored-by: William Dumont <[email protected]>

* Update internal/component/prometheus/remote/queue/network/loop.go

Co-authored-by: Piotr <[email protected]>

* pr feedback

* pr feedback

* simplify stats

* simplify stats

* Initial push.

* docs and some renaming

* Update docs/sources/reference/components/prometheus/prometheus.remote.queue.md

Co-authored-by: Clayton Cornell <[email protected]>

* Update docs/sources/reference/components/prometheus/prometheus.remote.queue.md

Co-authored-by: Clayton Cornell <[email protected]>

* Update docs/sources/reference/components/prometheus/prometheus.remote.queue.md

Co-authored-by: Clayton Cornell <[email protected]>

* Update docs/sources/reference/components/prometheus/prometheus.remote.queue.md

Co-authored-by: Clayton Cornell <[email protected]>

* Update docs/sources/reference/components/prometheus/prometheus.remote.queue.md

Co-authored-by: Clayton Cornell <[email protected]>

* Update docs/sources/reference/components/prometheus/prometheus.remote.queue.md

Co-authored-by: Clayton Cornell <[email protected]>

* Update docs/sources/reference/components/prometheus/prometheus.remote.queue.md

Co-authored-by: Clayton Cornell <[email protected]>

* Changes and testing.

* Update docs.

* Update docs.

* Fix race conditions in unit tests.

* Tweaking unit tests.

* lower threshold more.

* lower threshold more.

* Fix deadlock in manager tests.

* rollback to previous

* Update docs/sources/reference/components/prometheus/prometheus.remote.queue.md

Co-authored-by: Clayton Cornell <[email protected]>

* Update docs/sources/reference/components/prometheus/prometheus.remote.queue.md

Co-authored-by: Clayton Cornell <[email protected]>

* Update docs/sources/reference/components/prometheus/prometheus.remote.queue.md

Co-authored-by: Clayton Cornell <[email protected]>

* Update docs/sources/reference/components/prometheus/prometheus.remote.queue.md

Co-authored-by: Clayton Cornell <[email protected]>

* Update docs/sources/reference/components/prometheus/prometheus.remote.queue.md

Co-authored-by: Clayton Cornell <[email protected]>

* Update docs/sources/reference/components/prometheus/prometheus.remote.queue.md

Co-authored-by: Clayton Cornell <[email protected]>

* Update docs/sources/reference/components/prometheus/prometheus.remote.queue.md

Co-authored-by: Clayton Cornell <[email protected]>

* Update docs/sources/reference/components/prometheus/prometheus.remote.queue.md

Co-authored-by: Paulin Todev <[email protected]>

* Docs PR feedback

* Update docs/sources/reference/components/prometheus/prometheus.remote.queue.md

Co-authored-by: Piotr <[email protected]>

* PR feedback

* PR feedback

* PR feedback

* PR feedback

* Fix typo

* Fix typo

* Fix bug.

* Fix docs

---------

Co-authored-by: William Dumont <[email protected]>
Co-authored-by: Piotr <[email protected]>
Co-authored-by: Clayton Cornell <[email protected]>
Co-authored-by: Paulin Todev <[email protected]>

* Change name to write instead of remote.

* Fix issue.

* Fix issue.

* Dont depend on random sync.pool behavior.

* small clarification on changelog.

* PR feedback

---------

Co-authored-by: Piotr <[email protected]>
Co-authored-by: William Dumont <[email protected]>
Co-authored-by: Clayton Cornell <[email protected]>
Co-authored-by: Paulin Todev <[email protected]>
@github-actions github-actions bot locked as resolved and limited conversation to collaborators Oct 17, 2024
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants