Skip to content

Commit

Permalink
storage: Refactor block compaction to allow shard-splitting (#2366)
Browse files Browse the repository at this point in the history
* storage: Refactor block compaction to allow shard-splitting

* Rename to CompactWithSplitting for consistency

* Introduce back compaction series testing.

* Add sharding compaction level

* Add some tests for CompactWithSplitting

* Fix symbols split-compaction (#2371)

* Fixes a race that was actually surfacing another real issue.

* Add a tests for meta min/max time

* Fixes meta min/max after split

---------

Co-authored-by: Anton Kolesnikov <[email protected]>
  • Loading branch information
cyriltovena and kolesnikovae authored Sep 7, 2023
1 parent cb6420d commit 63561d7
Show file tree
Hide file tree
Showing 10 changed files with 802 additions and 222 deletions.
462 changes: 272 additions & 190 deletions pkg/phlaredb/compact.go

Large diffs are not rendered by default.

186 changes: 165 additions & 21 deletions pkg/phlaredb/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
_ "net/http/pprof"
"os"
"path/filepath"
"sort"
"testing"
"time"

Expand All @@ -14,6 +15,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -23,6 +25,7 @@ import (
"github.com/grafana/pyroscope/pkg/objstore/client"
"github.com/grafana/pyroscope/pkg/objstore/providers/filesystem"
"github.com/grafana/pyroscope/pkg/phlaredb/block"
"github.com/grafana/pyroscope/pkg/phlaredb/sharding"
"github.com/grafana/pyroscope/pkg/phlaredb/tsdb/index"
"github.com/grafana/pyroscope/pkg/pprof/testhelper"
)
Expand Down Expand Up @@ -85,6 +88,153 @@ func TestCompact(t *testing.T) {
require.Equal(t, expected.String(), res.String())
}

func TestCompactWithSplitting(t *testing.T) {
ctx := context.Background()

b1 := newBlock(t, func() []*testhelper.ProfileBuilder {
return append(
profileSeriesGenerator(t, time.Unix(1, 0), time.Unix(10, 0), time.Second, "job", "a"),
profileSeriesGenerator(t, time.Unix(11, 0), time.Unix(20, 0), time.Second, "job", "b")...,
)
})
b2 := newBlock(t, func() []*testhelper.ProfileBuilder {
return append(
append(
append(
profileSeriesGenerator(t, time.Unix(1, 0), time.Unix(10, 0), time.Second, "job", "c"),
profileSeriesGenerator(t, time.Unix(11, 0), time.Unix(20, 0), time.Second, "job", "d")...,
), profileSeriesGenerator(t, time.Unix(1, 0), time.Unix(10, 0), time.Second, "job", "a")...,
),
profileSeriesGenerator(t, time.Unix(11, 0), time.Unix(20, 0), time.Second, "job", "b")...,
)
})
dst := t.TempDir()
compacted, err := CompactWithSplitting(ctx, []BlockReader{b1, b2, b2, b1}, 16, dst)
require.NoError(t, err)

// 4 shards one per series.
require.Equal(t, 4, len(compacted))
require.Equal(t, "1_of_16", compacted[0].Labels[sharding.CompactorShardIDLabel])
require.Equal(t, "6_of_16", compacted[1].Labels[sharding.CompactorShardIDLabel])
require.Equal(t, "7_of_16", compacted[2].Labels[sharding.CompactorShardIDLabel])
require.Equal(t, "14_of_16", compacted[3].Labels[sharding.CompactorShardIDLabel])

// The series b should span from 11 to 20 and not 1 to 20.
require.Equal(t, model.TimeFromUnix(11), compacted[1].MinTime)
require.Equal(t, model.TimeFromUnix(20), compacted[1].MaxTime)

// We first verify we have all series and timestamps across querying all blocks.
queriers := make(Queriers, len(compacted))
for i, blk := range compacted {
queriers[i] = blockQuerierFromMeta(t, dst, blk)
}

err = queriers.Open(context.Background())
require.NoError(t, err)
matchAll := &ingesterv1.SelectProfilesRequest{
LabelSelector: "{}",
Type: mustParseProfileSelector(t, "process_cpu:cpu:nanoseconds:cpu:nanoseconds"),
Start: 0,
End: 40000,
}
it, err := queriers.SelectMatchingProfiles(context.Background(), matchAll)
require.NoError(t, err)

seriesMap := make(map[model.Fingerprint]lo.Tuple2[phlaremodel.Labels, []model.Time])
for it.Next() {
r := it.At()
seriesMap[r.Fingerprint()] = lo.T2(r.Labels().WithoutPrivateLabels(), append(seriesMap[r.Fingerprint()].B, r.Timestamp()))
}
require.NoError(t, it.Err())
require.NoError(t, it.Close())
series := lo.Values(seriesMap)
sort.Slice(series, func(i, j int) bool {
return phlaremodel.CompareLabelPairs(series[i].A, series[j].A) < 0
})
require.Equal(t, []lo.Tuple2[phlaremodel.Labels, []model.Time]{
lo.T2(phlaremodel.LabelsFromStrings("job", "a"),
generateTimes(t, model.TimeFromUnix(1), model.TimeFromUnix(10)),
),
lo.T2(phlaremodel.LabelsFromStrings("job", "b"),
generateTimes(t, model.TimeFromUnix(11), model.TimeFromUnix(20)),
),
lo.T2(phlaremodel.LabelsFromStrings("job", "c"),
generateTimes(t, model.TimeFromUnix(1), model.TimeFromUnix(10)),
),
lo.T2(phlaremodel.LabelsFromStrings("job", "d"),
generateTimes(t, model.TimeFromUnix(11), model.TimeFromUnix(20)),
),
}, series)

// Then we query 2 different shards and verify we have a subset of series.
it, err = queriers[0].SelectMatchingProfiles(ctx, matchAll)
require.NoError(t, err)
seriesResult, err := queriers[0].MergeByLabels(context.Background(), it, "job")
require.NoError(t, err)
require.Equal(t,
[]*typesv1.Series{
{
Labels: phlaremodel.LabelsFromStrings("job", "a"),
Points: generatePoints(t, model.TimeFromUnix(1), model.TimeFromUnix(10)),
},
}, seriesResult)

it, err = queriers[1].SelectMatchingProfiles(ctx, matchAll)
require.NoError(t, err)
seriesResult, err = queriers[1].MergeByLabels(context.Background(), it, "job")
require.NoError(t, err)
require.Equal(t,
[]*typesv1.Series{
{
Labels: phlaremodel.LabelsFromStrings("job", "b"),
Points: generatePoints(t, model.TimeFromUnix(11), model.TimeFromUnix(20)),
},
}, seriesResult)

// Finally test some stacktraces resolution.
it, err = queriers[1].SelectMatchingProfiles(ctx, matchAll)
require.NoError(t, err)
res, err := queriers[1].MergeByStacktraces(ctx, it)
require.NoError(t, err)

expected := new(phlaremodel.Tree)
expected.InsertStack(10, "baz", "bar", "foo")
require.Equal(t, expected.String(), res.String())
}

// nolint:unparam
func profileSeriesGenerator(t *testing.T, from, through time.Time, interval time.Duration, lbls ...string) []*testhelper.ProfileBuilder {
t.Helper()
var builders []*testhelper.ProfileBuilder
for ts := from; ts.Before(through) || ts.Equal(through); ts = ts.Add(interval) {
builders = append(builders,
testhelper.NewProfileBuilder(ts.UnixNano()).
CPUProfile().
WithLabels(
lbls...,
).ForStacktraceString("foo", "bar", "baz").AddSamples(1))
}
return builders
}

func generatePoints(t *testing.T, from, through model.Time) []*typesv1.Point {
t.Helper()
var points []*typesv1.Point
for ts := from; ts.Before(through) || ts.Equal(through); ts = ts.Add(time.Second) {
points = append(points, &typesv1.Point{Timestamp: int64(ts), Value: 1})
}
return points
}

func generateTimes(t *testing.T, from, through model.Time) []model.Time {
t.Helper()
var times []model.Time
for ts := from; ts.Before(through) || ts.Equal(through); ts = ts.Add(time.Second) {
times = append(times, ts)
}
return times
}

func TestProfileRowIterator(t *testing.T) {
b := newBlock(t, func() []*testhelper.ProfileBuilder {
return []*testhelper.ProfileBuilder{
Expand Down Expand Up @@ -268,28 +418,22 @@ func TestSeriesRewriter(t *testing.T) {
})
rows, err := newProfileRowIterator(blk)
require.NoError(t, err)
filePath := filepath.Join(t.TempDir(), block.IndexFilename)
idxw, err := prepareIndexWriter(context.Background(), filePath, []BlockReader{blk})
require.NoError(t, err)
it := newSeriesRewriter(rows, idxw)
// tests that all rows are written to the correct series index
require.True(t, it.Next())
require.Equal(t, uint32(0), it.At().row.SeriesIndex())
require.True(t, it.Next())
require.Equal(t, uint32(0), it.At().row.SeriesIndex())
require.True(t, it.Next())
require.Equal(t, uint32(0), it.At().row.SeriesIndex())
require.True(t, it.Next())
require.Equal(t, uint32(1), it.At().row.SeriesIndex())
require.True(t, it.Next())
require.Equal(t, uint32(2), it.At().row.SeriesIndex())
require.True(t, it.Next())
require.Equal(t, uint32(2), it.At().row.SeriesIndex())
require.False(t, it.Next())
path := t.TempDir()
filePath := filepath.Join(path, block.IndexFilename)
idxw := newIndexRewriter(path)
seriesIdx := []uint32{}
for rows.Next() {
r := rows.At()
require.NoError(t, idxw.ReWriteRow(r))
seriesIdx = append(seriesIdx, r.row.SeriesIndex())
}
require.NoError(t, rows.Err())
require.NoError(t, rows.Close())

require.NoError(t, it.Err())
require.NoError(t, it.Close())
require.NoError(t, idxw.Close())
require.Equal(t, []uint32{0, 0, 0, 1, 2, 2}, seriesIdx)

err = idxw.Close(context.Background())
require.NoError(t, err)

idxr, err := index.NewFileReader(filePath)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/phlaredb/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ func (h *Head) flush(ctx context.Context) error {
// It must be guaranteed that no new inserts will happen
// after the call start.
h.inFlightProfiles.Wait()
if len(h.profiles.slice) == 0 {
if h.profiles.index.totalProfiles.Load() == 0 {
level.Info(h.logger).Log("msg", "head empty - no block written")
return os.RemoveAll(h.headPath)
}
Expand Down
14 changes: 8 additions & 6 deletions pkg/phlaredb/profile_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,13 @@ type profileStore struct {
flushBufferLbs []phlaremodel.Labels
}

func newProfileWriter(writer io.Writer) *parquet.GenericWriter[*schemav1.Profile] {
return parquet.NewGenericWriter[*schemav1.Profile](writer, schemav1.ProfilesSchema,
parquet.ColumnPageBuffers(parquet.NewFileBufferPool(os.TempDir(), "pyroscopedb-parquet-buffers*")),
parquet.CreatedBy("github.com/grafana/pyroscope/", build.Version, build.Revision),
parquet.PageBufferSize(3*1024*1024),
func newParquetProfileWriter(writer io.Writer, options ...parquet.WriterOption) *parquet.GenericWriter[*schemav1.Profile] {
options = append(options, parquet.PageBufferSize(3*1024*1024))
options = append(options, parquet.CreatedBy("github.com/grafana/pyroscope/", build.Version, build.Revision))
options = append(options, parquet.ColumnPageBuffers(parquet.NewFileBufferPool(os.TempDir(), "pyroscopedb-parquet-buffers*")))
options = append(options, schemav1.ProfilesSchema)
return parquet.NewGenericWriter[*schemav1.Profile](
writer, options...,
)
}

Expand All @@ -82,7 +84,7 @@ func newProfileStore(phlarectx context.Context) *profileStore {
go s.cutRowGroupLoop()
// Initialize writer on /dev/null
// TODO: Reuse parquet.Writer beyond life time of the head.
s.writer = newProfileWriter(io.Discard)
s.writer = newParquetProfileWriter(io.Discard)

return s
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/phlaredb/schemas/v1/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,8 @@ type InMemoryFunction struct {
// Line number in source file.
StartLine uint32
}

func (f *InMemoryFunction) Clone() *InMemoryFunction {
n := *f
return &n
}
7 changes: 7 additions & 0 deletions pkg/phlaredb/schemas/v1/locations.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ type InMemoryLocation struct {
Line []InMemoryLine
}

func (l *InMemoryLocation) Clone() *InMemoryLocation {
x := *l
x.Line = make([]InMemoryLine, len(l.Line))
copy(x.Line, l.Line)
return &x
}

type InMemoryLine struct {
// The id of the corresponding profile.Function for this line.
FunctionId uint32
Expand Down
5 changes: 5 additions & 0 deletions pkg/phlaredb/schemas/v1/mappings.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,8 @@ type InMemoryMapping struct {
HasLineNumbers bool
HasInlineFrames bool
}

func (m *InMemoryMapping) Clone() *InMemoryMapping {
n := *m
return &n
}
106 changes: 106 additions & 0 deletions pkg/phlaredb/sharding/label.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// SPDX-License-Identifier: AGPL-3.0-only

package sharding

import (
"fmt"
"strconv"
"strings"

"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
)

const (
// ShardLabel is a reserved label referencing a shard on read path.
ShardLabel = "__query_shard__"
// CompactorShardIDLabel is the external label used to store
// the ID of a sharded block generated by the split-and-merge compactor. If a block hasn't
// this label, it means the block hasn't been split.
CompactorShardIDLabel = "__compactor_shard_id__"
)

// ShardSelector holds information about the configured query shard.
type ShardSelector struct {
ShardIndex uint64
ShardCount uint64
}

// LabelValue returns the label value to use to select this shard.
func (shard ShardSelector) LabelValue() string {
return FormatShardIDLabelValue(shard.ShardIndex, shard.ShardCount)
}

// Label generates the ShardSelector as a label.
func (shard ShardSelector) Label() labels.Label {
return labels.Label{
Name: ShardLabel,
Value: shard.LabelValue(),
}
}

// Matcher converts ShardSelector to Matcher.
func (shard ShardSelector) Matcher() *labels.Matcher {
return labels.MustNewMatcher(labels.MatchEqual, ShardLabel, shard.LabelValue())
}

// ShardFromMatchers extracts a ShardSelector and the index it was pulled from the matcher list.
func ShardFromMatchers(matchers []*labels.Matcher) (shard *ShardSelector, idx int, err error) {
for i, matcher := range matchers {
if matcher.Name == ShardLabel && matcher.Type == labels.MatchEqual {
index, count, err := ParseShardIDLabelValue(matcher.Value)
if err != nil {
return nil, i, err
}
return &ShardSelector{
ShardIndex: index,
ShardCount: count,
}, i, nil
}
}
return nil, 0, nil
}

// RemoveShardFromMatchers returns the input matchers without the label matcher on the query shard (if any).
func RemoveShardFromMatchers(matchers []*labels.Matcher) (shard *ShardSelector, filtered []*labels.Matcher, err error) {
shard, idx, err := ShardFromMatchers(matchers)
if err != nil || shard == nil {
return nil, matchers, err
}

// Create a new slice with the shard matcher removed.
filtered = make([]*labels.Matcher, 0, len(matchers)-1)
filtered = append(filtered, matchers[:idx]...)
filtered = append(filtered, matchers[idx+1:]...)

return shard, filtered, nil
}

// FormatShardIDLabelValue expects 0-based shardID, but uses 1-based shard in the output string.
func FormatShardIDLabelValue(shardID, shardCount uint64) string {
return fmt.Sprintf("%d_of_%d", shardID+1, shardCount)
}

// ParseShardIDLabelValue returns original (0-based) shard index and shard count parsed from formatted value.
func ParseShardIDLabelValue(val string) (index, shardCount uint64, _ error) {
// If we fail to parse shardID, we better not consider this block fully included in successors.
matches := strings.Split(val, "_")
if len(matches) != 3 || matches[1] != "of" {
return 0, 0, errors.Errorf("invalid shard ID: %q", val)
}

index, err := strconv.ParseUint(matches[0], 10, 64)
if err != nil {
return 0, 0, errors.Errorf("invalid shard ID: %q: %v", val, err)
}
count, err := strconv.ParseUint(matches[2], 10, 64)
if err != nil {
return 0, 0, errors.Errorf("invalid shard ID: %q: %v", val, err)
}

if index == 0 || count == 0 || index > count {
return 0, 0, errors.Errorf("invalid shard ID: %q", val)
}

return index - 1, count, nil
}
Loading

0 comments on commit 63561d7

Please sign in to comment.