Skip to content

Commit

Permalink
feat(v2): implement ReadIndex for linearizable reads
Browse files Browse the repository at this point in the history
  • Loading branch information
marcsanmi committed Oct 14, 2024
1 parent 7d12282 commit 1cc12b0
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 103 deletions.
139 changes: 38 additions & 101 deletions pkg/experiment/metastore/metastore_readindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,128 +5,65 @@ import (
"fmt"
"time"

"github.com/go-kit/log"
"github.com/google/uuid"

metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
)

var tcheckFreq = 10 * time.Millisecond

func (m *Metastore) ReadIndex(ctx context.Context, req *metastorev1.ReadIndexRequest) (*metastorev1.ReadIndexResponse, error) {
//todo
//If the leader has not yet marked an entry from its current term committed, it waits until it
//has done so. The Leader Completeness Property guarantees that a leader has all committed
//entries, but at the start of its term, it may not know which those are. To find out, it needs to
//commit an entry from its term. Raft handles this by having each leader commit a blank no-op
//entry into the log at the start of its term. As soon as this no-op entry is committed, the leader’s
//commit index will be at least as large as any other servers’ during its term.
t := time.Now()
readIndex := m.raft.CommitIndex()
raftLogger := func() log.Logger {
return log.With(m.logger, "component", "raft_debug",
"request_id", req.DebugRequestId,
"op", "ReadIndex",
"read_index", readIndex,
"applied_index", m.raft.AppliedIndex(),
"commit_index", m.raft.CommitIndex(),
"last_index", m.raft.LastIndex(),
"duration", time.Since(t),
)
}
const tCheckFreq = 10 * time.Millisecond

raftLogger().Log("msg", "verify_leader")
// ReadIndex returns the current commit index and verifies leadership.
func (m *Metastore) ReadIndex(_ context.Context, req *metastorev1.ReadIndexRequest) (*metastorev1.ReadIndexResponse, error) {
commitIndex := m.raft.CommitIndex()
if err := m.raft.VerifyLeader().Error(); err != nil {
return nil, wrapRetryableErrorWithRaftDetails(err, m.raft)
}
return &metastorev1.ReadIndexResponse{ReadIndex: commitIndex}, nil
}

// readIndex ensures the node is up-to-date for read operations,
// providing linearizable read semantics. It calls the ReadIndex RPC
// and waits for the local applied index to catch up to the returned read index.
// This method should be used before performing local reads to ensure consistency.
func (m *Metastore) readIndex(ctx context.Context) error {
r, err := m.client.ReadIndex(ctx, &metastorev1.ReadIndexRequest{})
if err != nil {
return err
}

tcheck := time.NewTicker(tcheckFreq)
defer tcheck.Stop()
timeout := time.NewTimer(5 * time.Second)
defer timeout.Stop()
t := time.NewTicker(tCheckFreq)
defer t.Stop()

// Wait for the read index to be applied
for {
select {
case <-tcheck.C:
case <-t.C:
appliedIndex := m.raft.AppliedIndex()
raftLogger().Log("msg", "tick")
if appliedIndex >= readIndex {
raftLogger().Log("msg", "caught up")
return &metastorev1.ReadIndexResponse{ReadIndex: readIndex}, nil
if appliedIndex >= r.ReadIndex {
return nil
}
continue
case <-timeout.C:
raftLogger().Log("err", "timeout")
return new(metastorev1.ReadIndexResponse), fmt.Errorf("timeout")
case <-ctx.Done():
raftLogger().Log("err", "context canceled")
return new(metastorev1.ReadIndexResponse), fmt.Errorf("canceled %w", ctx.Err())
return ctx.Err()
}
}
}

func (m *Metastore) CheckReady(ctx context.Context) (err error) {
const (
ready = "ready"
notReady = "not_ready"
status = "status"
)
debugRequestId := uuid.Must(uuid.NewRandom()).String() //todo delete
readIndex := uint64(0)
t := time.Now()
raftLogger := func() log.Logger {
return log.With(m.logger, "component", "raft_debug",
"request_id", debugRequestId,
"op", "CheckReady",
"read_index", readIndex,
"applied_index", m.raft.AppliedIndex(),
"commit_index", m.raft.CommitIndex(),
"last_index", m.raft.LastIndex(),
"duration", time.Since(t),
)
}
raftLogger().Log("msg", "check")
req := new(metastorev1.ReadIndexRequest)
req.DebugRequestId = debugRequestId
res, err := m.client.ReadIndex(ctx, req)
if err != nil {
err = fmt.Errorf("failed to get read index: %w", err)
raftLogger().Log(status, notReady, "err", err)
// CheckReady verifies if the node is ready to serve requests.
// It ensures the node is up-to-date by:
// 1. Performing a ReadIndex operation
// 2. Waiting for the applied index to catch up to the read index
// 3. Enforcing a minimum duration since first becoming ready
func (m *Metastore) CheckReady(ctx context.Context) error {
if err := m.readIndex(ctx); err != nil {
return err
}
readIndex = res.ReadIndex

tcheck := time.NewTicker(tcheckFreq)
defer tcheck.Stop()
timeout := time.NewTimer(5 * time.Second)
defer timeout.Stop()

for {
select {
case <-tcheck.C:
commitIndex := m.raft.CommitIndex()
raftLogger().Log("msg", "tick")
if commitIndex >= res.ReadIndex {
if m.readySince.IsZero() {
m.readySince = time.Now()
}
minReadyTime := m.config.MinReadyDuration
if time.Since(m.readySince) < minReadyTime {
err := fmt.Errorf("waiting for %v after being ready", minReadyTime)
raftLogger().Log(status, notReady, "err", err)
return err
}
if m.readySince.IsZero() {
m.readySince = time.Now()
}

raftLogger().Log(status, ready)
return nil
}
continue
case <-timeout.C:
raftLogger().Log(status, notReady, "err", "timeout")
return fmt.Errorf("metastore ready check timeout")
case <-ctx.Done():
raftLogger().Log(status, notReady, "err", "context canceled")
return fmt.Errorf("metastore check context canceled %w", ctx.Err())
}
minReadyTime := m.config.MinReadyDuration
if time.Since(m.readySince) < minReadyTime {
return fmt.Errorf("waiting for %v after being ready", minReadyTime)
}

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package metastore

import (
"context"
"fmt"
"math"
"sync"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
"github.com/grafana/pyroscope/pkg/experiment/metastore/index"
Expand All @@ -14,7 +18,9 @@ func (m *Metastore) GetProfileStats(
ctx context.Context,
r *metastorev1.GetProfileStatsRequest,
) (*typesv1.GetProfileStatsResponse, error) {
// TODO(kolesnikovae): ReadIndex
if err := m.readIndex(ctx); err != nil {
return nil, status.Error(codes.FailedPrecondition, fmt.Sprintf("failed to read index: %v", err))
}
return m.state.getProfileStats(r.TenantId, ctx)
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/experiment/metastore/metastore_state_query_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ func (m *Metastore) QueryMetadata(
ctx context.Context,
request *metastorev1.QueryMetadataRequest,
) (*metastorev1.QueryMetadataResponse, error) {
// TODO(kolesnikovae): ReadIndex
if err := m.readIndex(ctx); err != nil {
return nil, status.Error(codes.FailedPrecondition, fmt.Sprintf("failed to read index: %v", err))
}
return m.state.listBlocksForQuery(ctx, request)
}

Expand Down

0 comments on commit 1cc12b0

Please sign in to comment.