Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(v2): implement ReadIndex for linearizable reads #3619

Merged
merged 3 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions pkg/experiment/metastore/metastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"context"
"flag"
"fmt"

"net"
"os"
"path/filepath"
"strings"
"sync"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -40,8 +40,6 @@ const (
raftTrailingLogs = 18 << 10
raftSnapshotInterval = 180 * time.Second
raftSnapshotThreshold = 8 << 10

metastoreRaftLeaderHealthServiceName = "metastore.v1.MetastoreService.RaftLeader"
)

type Config struct {
Expand Down Expand Up @@ -135,8 +133,10 @@ type Metastore struct {

walDir string

metrics *metastoreMetrics
client *metastoreclient.Client
metrics *metastoreMetrics
client *metastoreclient.Client

readyOnce sync.Once
readySince time.Time

dnsProvider *dns.Provider
Expand Down Expand Up @@ -174,7 +174,7 @@ func (m *Metastore) Shutdown() error {
return nil
}

func (m *Metastore) starting(ctx context.Context) error {
func (m *Metastore) starting(context.Context) error {
if err := m.db.open(false); err != nil {
return fmt.Errorf("failed to initialize database: %w", err)
}
Expand Down
62 changes: 62 additions & 0 deletions pkg/experiment/metastore/metastore_read_index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package metastore

import (
"context"
"fmt"
"time"

metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
)

// ReadIndex returns the current commit index and verifies leadership.
func (m *Metastore) ReadIndex(context.Context, *metastorev1.ReadIndexRequest) (*metastorev1.ReadIndexResponse, error) {
commitIndex := m.raft.CommitIndex()
if err := m.raft.VerifyLeader().Error(); err != nil {
return nil, wrapRetryableErrorWithRaftDetails(err, m.raft)
}
return &metastorev1.ReadIndexResponse{ReadIndex: commitIndex}, nil
}

// waitLeaderCommitIndexAppliedLocally ensures the node is up-to-date for read operations,
// providing linearizable read semantics. It calls metastore client ReadIndex
// and waits for the local applied index to catch up to the returned read index.
// This method should be used before performing local reads to ensure consistency.
func (m *Metastore) waitLeaderCommitIndexAppliedLocally(ctx context.Context) error {
r, err := m.client.ReadIndex(ctx, &metastorev1.ReadIndexRequest{})
if err != nil {
return err
}
if m.raft.AppliedIndex() >= r.ReadIndex {
return nil
}

t := time.NewTicker(10 * time.Millisecond)
defer t.Stop()

// Wait for the read index to be applied
for {
select {
case <-t.C:
if m.raft.AppliedIndex() >= r.ReadIndex {
return nil
}
case <-ctx.Done():
return ctx.Err()
}
}
}

// CheckReady verifies if the metastore is ready to serve requests by
// ensuring the node is up-to-date with the leader's commit index.
func (m *Metastore) CheckReady(ctx context.Context) error {
if err := m.waitLeaderCommitIndexAppliedLocally(ctx); err != nil {
return err
}
m.readyOnce.Do(func() {
m.readySince = time.Now()
})
if w := m.config.MinReadyDuration - time.Since(m.readySince); w > 0 {
return fmt.Errorf("%v before reporting readiness", w)
}
return nil
}
132 changes: 0 additions & 132 deletions pkg/experiment/metastore/metastore_readindex.go

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"math"
"sync"

"github.com/go-kit/log/level"

metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
"github.com/grafana/pyroscope/pkg/experiment/metastore/index"
Expand All @@ -14,7 +16,10 @@ func (m *Metastore) GetProfileStats(
ctx context.Context,
r *metastorev1.GetProfileStatsRequest,
) (*typesv1.GetProfileStatsResponse, error) {
// TODO(kolesnikovae): ReadIndex
if err := m.waitLeaderCommitIndexAppliedLocally(ctx); err != nil {
level.Error(m.logger).Log("msg", "failed to wait for leader commit index", "err", err)
return nil, err
}
return m.state.getProfileStats(r.TenantId, ctx)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ func (m *Metastore) QueryMetadata(
ctx context.Context,
request *metastorev1.QueryMetadataRequest,
) (*metastorev1.QueryMetadataResponse, error) {
// TODO(kolesnikovae): ReadIndex
if err := m.waitLeaderCommitIndexAppliedLocally(ctx); err != nil {
level.Error(m.logger).Log("msg", "failed to wait for leader commit index", "err", err)
return nil, err
}
return m.state.listBlocksForQuery(ctx, request)
}

Expand Down
Loading