Skip to content

Commit

Permalink
chore: improve error handling and remove MinReadyDuration
Browse files Browse the repository at this point in the history
  • Loading branch information
marcsanmi committed Oct 14, 2024
1 parent 1cc12b0 commit 17673e1
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 33 deletions.
2 changes: 0 additions & 2 deletions pkg/experiment/metastore/metastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ type Config struct {
DataDir string `yaml:"data_dir"`
Raft RaftConfig `yaml:"raft"`
Compaction CompactionConfig `yaml:"compaction_config"`
MinReadyDuration time.Duration `yaml:"min_ready_duration" category:"advanced"`
DLQRecoveryPeriod time.Duration `yaml:"dlq_recovery_period" category:"advanced"`
Index index.Config `yaml:"index_config"`
}
Expand All @@ -73,7 +72,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.Address, prefix+"address", "localhost:9095", "")
cfg.GRPCClientConfig.RegisterFlagsWithPrefix(prefix+"grpc-client-config", f)
f.StringVar(&cfg.DataDir, prefix+"data-dir", "./data-metastore/data", "")
f.DurationVar(&cfg.MinReadyDuration, prefix+"min-ready-duration", 15*time.Second, "Minimum duration to wait after the internal readiness checks have passed but before succeeding the readiness endpoint. This is used to slowdown deployment controllers (eg. Kubernetes) after an instance is ready and before they proceed with a rolling update, to give the rest of the cluster instances enough time to receive some (DNS?) updates.")
f.DurationVar(&cfg.DLQRecoveryPeriod, prefix+"dlq-recovery-period", 15*time.Second, "Period for DLQ recovery loop.")
cfg.Raft.RegisterFlagsWithPrefix(prefix+"raft.", f)
cfg.Compaction.RegisterFlagsWithPrefix(prefix+"compaction.", f)
Expand Down
29 changes: 6 additions & 23 deletions pkg/experiment/metastore/metastore_readindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package metastore

import (
"context"
"fmt"
"time"

metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
Expand All @@ -19,11 +18,11 @@ func (m *Metastore) ReadIndex(_ context.Context, req *metastorev1.ReadIndexReque
return &metastorev1.ReadIndexResponse{ReadIndex: commitIndex}, nil
}

// readIndex ensures the node is up-to-date for read operations,
// providing linearizable read semantics. It calls the ReadIndex RPC
// waitLeaderCommitIndexAppliedLocally ensures the node is up-to-date for read operations,
// providing linearizable read semantics. It calls metastore client ReadIndex
// and waits for the local applied index to catch up to the returned read index.
// This method should be used before performing local reads to ensure consistency.
func (m *Metastore) readIndex(ctx context.Context) error {
func (m *Metastore) waitLeaderCommitIndexAppliedLocally(ctx context.Context) error {
r, err := m.client.ReadIndex(ctx, &metastorev1.ReadIndexRequest{})
if err != nil {
return err
Expand All @@ -46,24 +45,8 @@ func (m *Metastore) readIndex(ctx context.Context) error {
}
}

// CheckReady verifies if the node is ready to serve requests.
// It ensures the node is up-to-date by:
// 1. Performing a ReadIndex operation
// 2. Waiting for the applied index to catch up to the read index
// 3. Enforcing a minimum duration since first becoming ready
// CheckReady verifies if the metastore is ready to serve requests by ensuring
// the node is up-to-date with the leader's commit index.
func (m *Metastore) CheckReady(ctx context.Context) error {
if err := m.readIndex(ctx); err != nil {
return err
}

if m.readySince.IsZero() {
m.readySince = time.Now()
}

minReadyTime := m.config.MinReadyDuration
if time.Since(m.readySince) < minReadyTime {
return fmt.Errorf("waiting for %v after being ready", minReadyTime)
}

return nil
return m.waitLeaderCommitIndexAppliedLocally(ctx)
}
9 changes: 4 additions & 5 deletions pkg/experiment/metastore/metastore_state_get_profile_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ package metastore

import (
"context"
"fmt"
"math"
"sync"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/go-kit/log/level"

metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
Expand All @@ -18,8 +16,9 @@ func (m *Metastore) GetProfileStats(
ctx context.Context,
r *metastorev1.GetProfileStatsRequest,
) (*typesv1.GetProfileStatsResponse, error) {
if err := m.readIndex(ctx); err != nil {
return nil, status.Error(codes.FailedPrecondition, fmt.Sprintf("failed to read index: %v", err))
if err := m.waitLeaderCommitIndexAppliedLocally(ctx); err != nil {
level.Error(m.logger).Log("msg", "failed to wait for leader commit index", "err", err, "method", "GetProfileStats")
return nil, err
}
return m.state.getProfileStats(r.TenantId, ctx)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/experiment/metastore/metastore_state_query_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ func (m *Metastore) QueryMetadata(
ctx context.Context,
request *metastorev1.QueryMetadataRequest,
) (*metastorev1.QueryMetadataResponse, error) {
if err := m.readIndex(ctx); err != nil {
return nil, status.Error(codes.FailedPrecondition, fmt.Sprintf("failed to read index: %v", err))
if err := m.waitLeaderCommitIndexAppliedLocally(ctx); err != nil {
level.Error(m.logger).Log("msg", "failed to wait for leader commit index", "err", err, "method", "QueryMetadata")
return nil, err
}
return m.state.listBlocksForQuery(ctx, request)
}
Expand Down
1 change: 0 additions & 1 deletion pkg/experiment/metastore/test/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ func NewMetastoreSet(t *testing.T, cfg *metastore.Config, n int, bucket objstore
bootstrapPeers[i] = fmt.Sprintf("%s/%s", raftAddresses[i], raftIds[i])

icfg := *cfg
icfg.MinReadyDuration = 0
icfg.Address = grpcAddresses[i]
icfg.DataDir = t.TempDir()
icfg.Raft.ServerID = raftIds[i]
Expand Down

0 comments on commit 17673e1

Please sign in to comment.