From 363d45392058f08604af089997395bedf1d068d5 Mon Sep 17 00:00:00 2001 From: huochexizhan Date: Wed, 2 Oct 2024 12:45:49 +0800 Subject: [PATCH 1/2] fix: fix slice init length (#3600) --- pkg/frontend/dot/graph/graph.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/frontend/dot/graph/graph.go b/pkg/frontend/dot/graph/graph.go index 7ddea47c47..c74fc80d5a 100644 --- a/pkg/frontend/dot/graph/graph.go +++ b/pkg/frontend/dot/graph/graph.go @@ -438,7 +438,7 @@ func newTree(prof *profile.Profile, o *Options) (g *Graph) { } } - nodes := make(Nodes, len(prof.Location)) + nodes := make(Nodes, 0, len(prof.Location)) for _, nm := range parentNodeMap { nodes = append(nodes, nm.nodes()...) } From 3cc5bd857f252b3969033aea1f4e9857bfb32d99 Mon Sep 17 00:00:00 2001 From: Tolya Korniltsev Date: Wed, 2 Oct 2024 13:31:52 +0700 Subject: [PATCH 2/2] feat(v2): dlq recovery (#3595) --- .mockery.yaml | 3 + Makefile | 2 +- pkg/experiment/ingester/segment.go | 40 ++--- pkg/experiment/ingester/segment_test.go | 101 ++++++++++- pkg/experiment/ingester/storage/path.go | 34 ++++ pkg/experiment/metastore/client/client.go | 18 -- .../metastore/client/client_test.go | 5 +- .../metastore/client/server_mock_test.go | 16 -- pkg/experiment/metastore/discovery/parse.go | 69 ++++++++ pkg/experiment/metastore/discovery/static.go | 20 +++ pkg/experiment/metastore/dlq/recovery.go | 150 ++++++++++++++++ pkg/experiment/metastore/dlq/recovery_test.go | 157 +++++++++++++++++ pkg/experiment/metastore/metastore.go | 37 +++- .../metastore/metastore_bootstrap.go | 31 +--- pkg/experiment/metastore/metastore_fsm.go | 6 +- pkg/experiment/metastore/metastore_metrics.go | 7 + .../metastore/metastore_readindex.go | 4 +- .../metastore/metastore_state_add_block.go | 27 ++- pkg/experiment/metastore/raftleader/err.go | 14 ++ .../metastore/raftleader/raftleader.go | 49 +++--- pkg/experiment/metastore/test/create.go | 165 ++++++++++++++++++ .../test/metastore_leader_details_test.go | 77 ++++++++ pkg/phlare/modules_experimental.go | 19 +- pkg/phlare/phlare.go | 2 +- pkg/phlaredb/block/metadata.go | 4 +- pkg/scheduler/queue/queue_test.go | 4 +- pkg/test/integration/helper.go | 22 +-- pkg/test/logger.go | 11 +- pkg/test/mocks/mockdlq/mock_local_server.go | 97 ++++++++++ pkg/test/ports.go | 20 +++ pkg/util/http/error_test.go | 2 +- tools/update_examples.go | 2 +- 32 files changed, 1029 insertions(+), 186 deletions(-) create mode 100644 pkg/experiment/ingester/storage/path.go create mode 100644 pkg/experiment/metastore/discovery/parse.go create mode 100644 pkg/experiment/metastore/discovery/static.go create mode 100644 pkg/experiment/metastore/dlq/recovery.go create mode 100644 pkg/experiment/metastore/dlq/recovery_test.go create mode 100644 pkg/experiment/metastore/raftleader/err.go create mode 100644 pkg/experiment/metastore/test/create.go create mode 100644 pkg/experiment/metastore/test/metastore_leader_details_test.go create mode 100644 pkg/test/mocks/mockdlq/mock_local_server.go create mode 100644 pkg/test/ports.go diff --git a/.mockery.yaml b/.mockery.yaml index 70c33a0df7..95d8c88f48 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -24,3 +24,6 @@ packages: github.com/grafana/pyroscope/pkg/experiment/metastore/discovery: interfaces: Discovery: + github.com/grafana/pyroscope/pkg/experiment/metastore/dlq: + interfaces: + LocalServer: diff --git a/Makefile b/Makefile index 38fc9e5177..0164d968d5 100644 --- a/Makefile +++ b/Makefile @@ -290,7 +290,7 @@ $(BIN)/buf: Makefile $(BIN)/golangci-lint: Makefile @mkdir -p $(@D) - GOBIN=$(abspath $(@D)) $(GO) install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.59.1 + GOBIN=$(abspath $(@D)) $(GO) install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.61.0 $(BIN)/protoc-gen-go: Makefile go.mod @mkdir -p $(@D) diff --git a/pkg/experiment/ingester/segment.go b/pkg/experiment/ingester/segment.go index 066b4e7a88..bae5c2cc5c 100644 --- a/pkg/experiment/ingester/segment.go +++ b/pkg/experiment/ingester/segment.go @@ -5,7 +5,6 @@ import ( "context" "crypto/rand" "fmt" - "path" "runtime/pprof" "slices" "strings" @@ -22,19 +21,14 @@ import ( metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" "github.com/grafana/pyroscope/pkg/experiment/ingester/memdb" + segmentstorage "github.com/grafana/pyroscope/pkg/experiment/ingester/storage" "github.com/grafana/pyroscope/pkg/model" pprofsplit "github.com/grafana/pyroscope/pkg/model/pprof_split" pprofmodel "github.com/grafana/pyroscope/pkg/pprof" - "github.com/grafana/pyroscope/pkg/tenant" "github.com/grafana/pyroscope/pkg/util/math" "github.com/grafana/pyroscope/pkg/validation" ) -const pathSegments = "segments" -const pathDLQ = "dlq" -const pathAnon = tenant.DefaultTenantID -const pathBlock = "block.bin" - var ErrMetastoreDLQFailed = fmt.Errorf("failed to store block metadata in DLQ") type shardKey uint32 @@ -193,17 +187,15 @@ func (sw *segmentsWriter) newShard(sk shardKey) *shard { func (sw *segmentsWriter) newSegment(sh *shard, sk shardKey, sl log.Logger) *segment { id := ulid.MustNew(ulid.Timestamp(time.Now()), rand.Reader) sshard := fmt.Sprintf("%d", sk) - blockPath := path.Join(pathSegments, sshard, pathAnon, id.String(), pathBlock) s := &segment{ - l: log.With(sl, "segment-id", id.String()), - ulid: id, - heads: make(map[serviceKey]serviceHead), - sw: sw, - sh: sh, - shard: sk, - sshard: sshard, - blockPath: blockPath, - doneChan: make(chan struct{}, 0), + l: log.With(sl, "segment-id", id.String()), + ulid: id, + heads: make(map[serviceKey]serviceHead), + sw: sw, + sh: sh, + shard: sk, + sshard: sshard, + doneChan: make(chan struct{}, 0), } return s } @@ -234,7 +226,7 @@ func (s *segment) flush(ctx context.Context) (err error) { return fmt.Errorf("failed to flush block %s: %w", s.ulid.String(), err) } // TODO(kolesnikovae): Add sane timeouts to all the operations. - if err = s.sw.uploadBlock(ctx, blockData, s); err != nil { + if err = s.sw.uploadBlock(ctx, blockData, blockMeta, s); err != nil { return fmt.Errorf("failed to upload block %s: %w", s.ulid.String(), err) } if err = s.sw.storeMeta(ctx, blockMeta, s); err != nil { @@ -406,7 +398,6 @@ type segment struct { heads map[serviceKey]serviceHead headsLock sync.RWMutex sw *segmentsWriter - blockPath string doneChan chan struct{} flushErr error flushErrMutex sync.Mutex @@ -517,16 +508,19 @@ func (s *segment) headForIngest(k serviceKey) *memdb.Head { return nh } -func (sw *segmentsWriter) uploadBlock(ctx context.Context, blockData []byte, s *segment) error { +func (sw *segmentsWriter) uploadBlock(ctx context.Context, blockData []byte, meta *metastorev1.BlockMeta, s *segment) error { t1 := time.Now() defer func() { sw.metrics.blockUploadDuration.WithLabelValues(s.sshard).Observe(time.Since(t1).Seconds()) }() sw.metrics.segmentBlockSizeBytes.WithLabelValues(s.sshard).Observe(float64(len(blockData))) - if err := sw.bucket.Upload(ctx, s.blockPath, bytes.NewReader(blockData)); err != nil { + + blockPath := segmentstorage.PathForSegment(meta) + + if err := sw.bucket.Upload(ctx, blockPath, bytes.NewReader(blockData)); err != nil { return err } - sw.l.Log("msg", "uploaded block", "path", s.blockPath, "upload_duration", time.Since(t1)) + sw.l.Log("msg", "uploaded block", "path", blockPath, "upload_duration", time.Since(t1)) return nil } @@ -549,7 +543,7 @@ func (sw *segmentsWriter) storeMetaDLQ(ctx context.Context, meta *metastorev1.Bl sw.metrics.storeMetaDLQ.WithLabelValues(s.sshard, "err").Inc() return err } - fullPath := path.Join(pathDLQ, s.sshard, pathAnon, s.ulid.String(), "meta.pb") + fullPath := segmentstorage.PathForDLQ(meta) if err = sw.bucket.Upload(ctx, fullPath, bytes.NewReader(metaBlob)); err != nil { sw.metrics.storeMetaDLQ.WithLabelValues(s.sshard, "err").Inc() return fmt.Errorf("%w, %w", ErrMetastoreDLQFailed, err) diff --git a/pkg/experiment/ingester/segment_test.go b/pkg/experiment/ingester/segment_test.go index 52b5ac0528..2689c567f7 100644 --- a/pkg/experiment/ingester/segment_test.go +++ b/pkg/experiment/ingester/segment_test.go @@ -4,6 +4,9 @@ import ( "bytes" "context" "fmt" + "github.com/grafana/dskit/flagext" + "github.com/grafana/pyroscope/pkg/experiment/metastore" + "github.com/grafana/pyroscope/pkg/test/mocks/mockdlq" "io" "math/rand" "path/filepath" @@ -14,7 +17,6 @@ import ( "time" gprofile "github.com/google/pprof/profile" - profilev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1" ingesterv1 "github.com/grafana/pyroscope/api/gen/proto/go/ingester/v1" "github.com/grafana/pyroscope/api/gen/proto/go/ingester/v1/ingesterv1connect" @@ -22,6 +24,9 @@ import ( typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" "github.com/grafana/pyroscope/pkg/experiment/ingester/memdb" testutil2 "github.com/grafana/pyroscope/pkg/experiment/ingester/memdb/testutil" + segmentstorage "github.com/grafana/pyroscope/pkg/experiment/ingester/storage" + "github.com/grafana/pyroscope/pkg/experiment/metastore/dlq" + metastoretest "github.com/grafana/pyroscope/pkg/experiment/metastore/test" "github.com/grafana/pyroscope/pkg/model" "github.com/grafana/pyroscope/pkg/objstore/providers/filesystem" "github.com/grafana/pyroscope/pkg/objstore/providers/memory" @@ -226,10 +231,10 @@ func TestDLQFail(t *testing.T) { l := testutil.NewLogger(t) bucket := mockobjstore.NewMockBucket(t) bucket.On("Upload", mock.Anything, mock.MatchedBy(func(name string) bool { - return strings.HasSuffix(name, pathBlock) + return segmentstorage.IsSegmentPath(name) }), mock.Anything).Return(nil) bucket.On("Upload", mock.Anything, mock.MatchedBy(func(name string) bool { - return strings.Contains(name, pathDLQ) + return segmentstorage.IsDLQPath(name) }), mock.Anything).Return(fmt.Errorf("mock upload DLQ error")) client := mockmetastorev1.NewMockMetastoreServiceClient(t) client.On("AddBlock", mock.Anything, mock.Anything, mock.Anything). @@ -369,6 +374,92 @@ func TestQueryMultipleSeriesSingleTenant(t *testing.T) { require.Equal(t, expectedCollapsed, actualCollapsed) } +func TestDLQRecoveryMock(t *testing.T) { + chunk := inputChunk([]input{ + {shard: 1, tenant: "tb", profile: cpuProfile(42, 239, "svc1", "kek", "foo", "bar")}, + }) + + sw := newTestSegmentWriter(t, segmentWriterConfig{ + segmentDuration: 100 * time.Millisecond, + }) + sw.client.On("AddBlock", mock.Anything, mock.Anything, mock.Anything). + Return(nil, fmt.Errorf("mock metastore unavailable")) + + _ = sw.ingestChunk(t, chunk, false) + allBlocks := sw.getMetadataDLQ() + assert.Len(t, allBlocks, 1) + + recoveredMetas := make(chan *metastorev1.BlockMeta, 1) + srv := mockdlq.NewMockLocalServer(t) + srv.On("AddRecoveredBlock", mock.Anything, mock.Anything). + Once(). + Run(func(args mock.Arguments) { + meta := args.Get(1).(*metastorev1.AddBlockRequest).Block + recoveredMetas <- meta + }). + Return(&metastorev1.AddBlockResponse{}, nil) + recovery := dlq.NewRecovery(dlq.RecoveryConfig{ + Period: 100 * time.Millisecond, + }, testutil.NewLogger(t), srv, sw.bucket) + recovery.Start() + defer recovery.Stop() + + meta := <-recoveredMetas + assert.Equal(t, allBlocks[0].Id, meta.Id) + + clients := sw.createBlocksFromMetas(allBlocks) + inputs := groupInputs(t, chunk) + sw.queryInputs(clients, inputs) +} + +func TestDLQRecovery(t *testing.T) { + const tenant = "tb" + const ts = 239 + chunk := inputChunk([]input{ + {shard: 1, tenant: tenant, profile: cpuProfile(42, ts, "svc1", "kek", "foo", "bar")}, + }) + + sw := newTestSegmentWriter(t, segmentWriterConfig{ + segmentDuration: 100 * time.Millisecond, + }) + sw.client.On("AddBlock", mock.Anything, mock.Anything, mock.Anything). + Return(nil, fmt.Errorf("mock metastore unavailable")) + + _ = sw.ingestChunk(t, chunk, false) + + cfg := new(metastore.Config) + flagext.DefaultValues(cfg) + cfg.DLQRecoveryPeriod = 100 * time.Millisecond + m := metastoretest.NewMetastoreSet(t, cfg, 3, sw.bucket) + defer m.Close() + + queryBlock := func() *metastorev1.BlockMeta { + res, err := m.Client.QueryMetadata(context.Background(), &metastorev1.QueryMetadataRequest{ + TenantId: []string{tenant}, + StartTime: ts - 1, + EndTime: ts + 1, + Query: "{service_name=~\"svc1\"}", + }) + if err != nil { + return nil + } + if len(res.Blocks) == 1 { + return res.Blocks[0] + } + return nil + } + require.Eventually(t, func() bool { + return queryBlock() != nil + }, 10*time.Second, 100*time.Millisecond) + + block := queryBlock() + require.NotNil(t, block) + + clients := sw.createBlocksFromMetas([]*metastorev1.BlockMeta{block}) + inputs := groupInputs(t, chunk) + sw.queryInputs(clients, inputs) +} + type sw struct { *segmentsWriter bucket *memory.InMemBucket @@ -407,7 +498,7 @@ func defaultTestSegmentWriterConfig() segmentWriterConfig { func (sw *sw) createBlocksFromMetas(blocks []*metastorev1.BlockMeta) tenantClients { dir := sw.t.TempDir() for _, meta := range blocks { - blobReader, err := sw.bucket.Get(context.Background(), fmt.Sprintf("%s/%d/%s/%s/%s", pathSegments, meta.Shard, pathAnon, meta.Id, pathBlock)) + blobReader, err := sw.bucket.Get(context.Background(), segmentstorage.PathForSegment(meta)) require.NoError(sw.t, err) blob, err := io.ReadAll(blobReader) require.NoError(sw.t, err) @@ -561,7 +652,7 @@ func (sw *sw) getMetadataDLQ() []*metastorev1.BlockMeta { objects := sw.bucket.Objects() dlqFiles := []string{} for s := range objects { - if strings.HasPrefix(s, pathDLQ) { + if segmentstorage.IsDLQPath(s) { dlqFiles = append(dlqFiles, s) } else { } diff --git a/pkg/experiment/ingester/storage/path.go b/pkg/experiment/ingester/storage/path.go new file mode 100644 index 0000000000..b2d87fdd2c --- /dev/null +++ b/pkg/experiment/ingester/storage/path.go @@ -0,0 +1,34 @@ +package storage + +import ( + "fmt" + metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" + "github.com/grafana/pyroscope/pkg/tenant" + "path" + "strings" +) + +const PathDLQ = "dlq" + +const pathSegments = "segments" +const pathAnon = tenant.DefaultTenantID +const pathBlock = "block.bin" +const pathMetaPB = "meta.pb" + +func PathForDLQ(meta *metastorev1.BlockMeta) string { + return path.Join(PathDLQ, fmt.Sprintf("%d", meta.Shard), pathAnon, meta.Id, pathMetaPB) +} + +func PathForSegment(meta *metastorev1.BlockMeta) string { + return path.Join(pathSegments, fmt.Sprintf("%d", meta.Shard), pathAnon, meta.Id, pathBlock) +} + +func IsDLQPath(p string) bool { + fs := strings.Split(p, "/") + return len(fs) == 5 && fs[0] == PathDLQ && fs[2] == pathAnon && fs[4] == pathMetaPB +} + +func IsSegmentPath(p string) bool { + fs := strings.Split(p, "/") + return len(fs) == 5 && fs[0] == pathSegments && fs[2] == pathAnon && fs[4] == pathBlock +} diff --git a/pkg/experiment/metastore/client/client.go b/pkg/experiment/metastore/client/client.go index 0856fd91c2..8a30fbc008 100644 --- a/pkg/experiment/metastore/client/client.go +++ b/pkg/experiment/metastore/client/client.go @@ -167,21 +167,3 @@ func dial(address string, grpcClientConfig grpcclient.Config, _ log.Logger) (*gr ) return grpc.Dial(address, options...) } - -const grpcServiceConfig = `{ - "healthCheckConfig": { - "serviceName": "metastore.v1.MetastoreService.RaftLeader" - }, - "loadBalancingPolicy":"round_robin", - "methodConfig": [{ - "name": [{"service": "metastore.v1.MetastoreService"}], - "waitForReady": true, - "retryPolicy": { - "MaxAttempts": 16, - "InitialBackoff": ".01s", - "MaxBackoff": ".01s", - "BackoffMultiplier": 1.0, - "RetryableStatusCodes": [ "UNAVAILABLE" ] - } - }] -}` diff --git a/pkg/experiment/metastore/client/client_test.go b/pkg/experiment/metastore/client/client_test.go index 74b14c4661..dd0fb7f328 100644 --- a/pkg/experiment/metastore/client/client_test.go +++ b/pkg/experiment/metastore/client/client_test.go @@ -6,6 +6,7 @@ import ( "github.com/grafana/dskit/grpcclient" compactorv1 "github.com/grafana/pyroscope/api/gen/proto/go/compactor/v1" metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" + "github.com/grafana/pyroscope/pkg/test" "github.com/grafana/pyroscope/pkg/test/mocks/mockdiscovery" "github.com/prometheus/prometheus/util/testutil" "github.com/stretchr/testify/assert" @@ -22,7 +23,7 @@ func TestUnavailable(t *testing.T) { d.On("Subscribe", mock.Anything).Return() l := testutil.NewLogger(t) c := New(l, grpcclient.Config{}, d) - ports, err := getFreePorts(nServers) + ports, err := test.GetFreePorts(nServers) assert.NoError(t, err) d.On("ServerError", mock.Anything).Run(func(args mock.Arguments) { @@ -87,7 +88,7 @@ func testRediscoverWrongLeader(t *testing.T, f func(c *Client)) { config := &grpcclient.Config{} flagext.DefaultValues(config) c := New(l, *config, d) - ports, err := getFreePorts(nServers * 2) + ports, err := test.GetFreePorts(nServers * 2) assert.NoError(t, err) p1 := ports[:nServers] diff --git a/pkg/experiment/metastore/client/server_mock_test.go b/pkg/experiment/metastore/client/server_mock_test.go index c25f4a355d..add600fe2e 100644 --- a/pkg/experiment/metastore/client/server_mock_test.go +++ b/pkg/experiment/metastore/client/server_mock_test.go @@ -202,19 +202,3 @@ func newMockServer(t *testing.T) *mockServer { compactorv1.RegisterCompactionPlannerServer(res.srv, res) return res } - -func getFreePorts(len int) (ports []int, err error) { - ports = make([]int, len) - for i := 0; i < len; i++ { - var a *net.TCPAddr - if a, err = net.ResolveTCPAddr("tcp", "127.0.0.1:0"); err == nil { - var l *net.TCPListener - if l, err = net.ListenTCP("tcp", a); err != nil { - return nil, err - } - ports[i] = l.Addr().(*net.TCPAddr).Port - l.Close() - } - } - return ports, nil -} diff --git a/pkg/experiment/metastore/discovery/parse.go b/pkg/experiment/metastore/discovery/parse.go new file mode 100644 index 0000000000..e6f5ab8baa --- /dev/null +++ b/pkg/experiment/metastore/discovery/parse.go @@ -0,0 +1,69 @@ +package discovery + +import ( + "github.com/go-kit/log" + kuberesolver2 "github.com/grafana/pyroscope/pkg/experiment/metastore/discovery/kuberesolver" + "github.com/hashicorp/raft" + "net" + "strings" +) + +func NewDiscovery(l log.Logger, address string) (Discovery, error) { + + kubeClient, err := kuberesolver2.NewInClusterK8sClient() + if err != nil { + return nil, err + } + + if strings.HasPrefix(address, "dns:///_grpc._tcp.") { + address = strings.Replace(address, "dns:///_grpc._tcp.", "kubernetes:///", 1) // todo support dns discovery + } + if strings.HasPrefix(address, "kubernetes:///") { + return NewKubeResolverDiscovery(l, address, kubeClient) + } + peers := ParsePeers(address) + srvs := make([]Server, 0, len(peers)) + for _, peer := range peers { + srvs = append(srvs, Server{ + Raft: peer, + ResolvedAddress: string(peer.Address), + }) + } + return NewStaticDiscovery(srvs), nil +} + +func ParsePeers(raw string) []raft.Server { + rpeers := strings.Split(raw, ",") + peers := make([]raft.Server, 0, len(rpeers)) + for _, rpeer := range rpeers { + peers = append(peers, ParsePeer(rpeer)) + } + return peers +} + +func ParsePeer(raw string) raft.Server { + // The string may be "{addr}" or "{addr}/{node_id}". + parts := strings.SplitN(raw, "/", 2) + var addr string + var node string + if len(parts) == 2 { + addr = parts[0] + node = parts[1] + } else { + addr = raw + } + host, _, err := net.SplitHostPort(addr) + if err != nil { + // No port specified. + host = addr + } + if node == "" { + // No node_id specified. + node = host + } + return raft.Server{ + Suffrage: raft.Voter, + ID: raft.ServerID(node), + Address: raft.ServerAddress(addr), + } +} diff --git a/pkg/experiment/metastore/discovery/static.go b/pkg/experiment/metastore/discovery/static.go new file mode 100644 index 0000000000..617fbeae57 --- /dev/null +++ b/pkg/experiment/metastore/discovery/static.go @@ -0,0 +1,20 @@ +package discovery + +type StaticDiscovery struct { + servers []Server +} + +func NewStaticDiscovery(servers []Server) *StaticDiscovery { + return &StaticDiscovery{servers: servers} +} + +func (s *StaticDiscovery) Subscribe(updates Updates) { + updates.Servers(s.servers) +} + +func (s *StaticDiscovery) ServerError(srv Server) { +} + +func (s *StaticDiscovery) Close() { + +} diff --git a/pkg/experiment/metastore/dlq/recovery.go b/pkg/experiment/metastore/dlq/recovery.go new file mode 100644 index 0000000000..655a7d082e --- /dev/null +++ b/pkg/experiment/metastore/dlq/recovery.go @@ -0,0 +1,150 @@ +package dlq + +import ( + "context" + "fmt" + "io" + "strconv" + "strings" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" + segmentstorage "github.com/grafana/pyroscope/pkg/experiment/ingester/storage" + "github.com/grafana/pyroscope/pkg/experiment/metastore/raftleader" + "github.com/thanos-io/objstore" +) + +type RecoveryConfig struct { + Period time.Duration +} + +type LocalServer interface { + AddRecoveredBlock(context.Context, *metastorev1.AddBlockRequest) (*metastorev1.AddBlockResponse, error) +} + +type Recovery struct { + cfg RecoveryConfig + l log.Logger + srv LocalServer + bucket objstore.Bucket + + started bool + wg sync.WaitGroup + m sync.Mutex + cancel func() +} + +func NewRecovery(cfg RecoveryConfig, l log.Logger, srv LocalServer, bucket objstore.Bucket) *Recovery { + return &Recovery{ + cfg: cfg, + l: l, + srv: srv, + bucket: bucket, + } +} + +func (r *Recovery) Start() { + r.m.Lock() + defer r.m.Unlock() + if r.started { + r.l.Log("msg", "recovery already started") + return + } + ctx, cancel := context.WithCancel(context.Background()) + r.cancel = cancel + r.started = true + go r.recoverLoop(ctx) + r.l.Log("msg", "recovery started") +} + +func (r *Recovery) Stop() { + r.m.Lock() + defer r.m.Unlock() + if !r.started { + r.l.Log("msg", "recovery already stopped") + return + } + r.cancel() + r.wg.Wait() + r.started = false + r.l.Log("msg", "recovery stopped") +} + +func (r *Recovery) recoverLoop(ctx context.Context) { + ticker := time.NewTicker(r.cfg.Period) + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + r.recoverTick(ctx) + } + } +} + +func (r *Recovery) recoverTick(ctx context.Context) { + err := r.bucket.Iter(ctx, segmentstorage.PathDLQ, func(metaPath string) error { + if ctx.Err() != nil { + return ctx.Err() + } + return r.recover(ctx, metaPath) + }, objstore.WithRecursiveIter) + if err != nil { + level.Error(r.l).Log("msg", "failed to iterate over dlq", "err", err) + } +} + +func (r *Recovery) recover(ctx context.Context, metaPath string) error { + fields := strings.Split(metaPath, "/") + if len(fields) != 5 { + r.l.Log("msg", "unexpected path", "path", metaPath) + return nil + } + sshard := fields[1] + ulid := fields[3] + meta, err := r.get(ctx, metaPath) + if err != nil { + level.Error(r.l).Log("msg", "failed to get block meta", "err", err, "path", metaPath) + return nil + } + shard, _ := strconv.ParseUint(sshard, 10, 64) + if ulid != meta.Id || meta.Shard != uint32(shard) { + level.Error(r.l).Log("msg", "unexpected block meta", "path", metaPath, "meta", fmt.Sprintf("%+v", meta)) + return nil + } + _, err = r.srv.AddRecoveredBlock(ctx, &metastorev1.AddBlockRequest{ + Block: meta, + }) + if err != nil { + if raftleader.IsRaftLeadershipError(err) { + return err + } + level.Error(r.l).Log("msg", "failed to add block", "err", err, "path", metaPath) + return nil + } + err = r.bucket.Delete(ctx, metaPath) + if err != nil { + level.Error(r.l).Log("msg", "failed to delete block meta", "err", err, "path", metaPath) + } + return nil +} + +func (r *Recovery) get(ctx context.Context, metaPath string) (*metastorev1.BlockMeta, error) { + meta, err := r.bucket.Get(ctx, metaPath) + if err != nil { + return nil, err + } + metaBytes, err := io.ReadAll(meta) + if err != nil { + return nil, err + } + recovered := new(metastorev1.BlockMeta) + err = recovered.UnmarshalVT(metaBytes) + if err != nil { + return nil, err + } + return recovered, nil +} diff --git a/pkg/experiment/metastore/dlq/recovery_test.go b/pkg/experiment/metastore/dlq/recovery_test.go new file mode 100644 index 0000000000..fa05f7d617 --- /dev/null +++ b/pkg/experiment/metastore/dlq/recovery_test.go @@ -0,0 +1,157 @@ +package dlq + +import ( + "context" + "crypto/rand" + metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" + typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" + segmentstorage "github.com/grafana/pyroscope/pkg/experiment/ingester/storage" + "github.com/grafana/pyroscope/pkg/objstore/providers/memory" + "github.com/grafana/pyroscope/pkg/test/mocks/mockdlq" + "github.com/oklog/ulid" + "github.com/prometheus/prometheus/util/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "sync" + "testing" + "time" +) + +func TestRecoverTick(t *testing.T) { + metas := []*metastorev1.BlockMeta{ + { + Id: ulid.MustNew(3, rand.Reader).String(), + Shard: 2, + }, + { + Id: ulid.MustNew(1, rand.Reader).String(), + Shard: 1, + }, + { + Id: ulid.MustNew(2, rand.Reader).String(), + Shard: 2, + }, + } + actual := []*metastorev1.BlockMeta{} + + srv := mockdlq.NewMockLocalServer(t) + srv.On("AddRecoveredBlock", mock.Anything, mock.Anything). + Times(3). + Run(func(args mock.Arguments) { + meta := args.Get(1).(*metastorev1.AddBlockRequest).Block + actual = append(actual, meta) + }). + Return(&metastorev1.AddBlockResponse{}, nil) + + bucket := memory.NewInMemBucket() + for _, meta := range metas { + addMeta(bucket, meta) + } + + r := NewRecovery(RecoveryConfig{}, testutil.NewLogger(t), srv, bucket) + r.recoverTick(context.Background()) + + expected := []*metastorev1.BlockMeta{ + metas[1], + metas[2], + metas[0], + } + + require.Equal(t, len(actual), len(expected)) + for i := range actual { + require.Equal(t, actual[i].Id, expected[i].Id) + require.Equal(t, actual[i].Shard, expected[i].Shard) + } +} + +func TestNotRaftLeader(t *testing.T) { + metas := []*metastorev1.BlockMeta{ + { + Id: ulid.MustNew(3, rand.Reader).String(), + Shard: 2, + }, + } + + srv := mockdlq.NewMockLocalServer(t) + s, _ := status.New(codes.Unavailable, "mock metastore error").WithDetails(&typesv1.RaftDetails{Leader: string("239")}) + srv.On("AddRecoveredBlock", mock.Anything, mock.Anything). + Once(). + Return(nil, s.Err()) + + bucket := memory.NewInMemBucket() + for _, meta := range metas { + addMeta(bucket, meta) + } + + r := NewRecovery(RecoveryConfig{}, testutil.NewLogger(t), srv, bucket) + r.recoverTick(context.Background()) + + assert.Equal(t, 1, len(bucket.Objects())) +} + +func TestStartStop(t *testing.T) { + metas := []*metastorev1.BlockMeta{ + { + Id: ulid.MustNew(3, rand.Reader).String(), + Shard: 2, + }, + { + Id: ulid.MustNew(1, rand.Reader).String(), + Shard: 1, + }, + { + Id: ulid.MustNew(2, rand.Reader).String(), + Shard: 2, + }, + } + m := new(sync.Mutex) + actual := []*metastorev1.BlockMeta{} + + srv := mockdlq.NewMockLocalServer(t) + srv.On("AddRecoveredBlock", mock.Anything, mock.Anything). + Times(3). + Run(func(args mock.Arguments) { + meta := args.Get(1).(*metastorev1.AddBlockRequest).Block + m.Lock() + actual = append(actual, meta) + m.Unlock() + }). + Return(&metastorev1.AddBlockResponse{}, nil) + + bucket := memory.NewInMemBucket() + for _, meta := range metas { + addMeta(bucket, meta) + } + + r := NewRecovery(RecoveryConfig{ + Period: time.Millisecond * 10, + }, testutil.NewLogger(t), srv, bucket) + r.Start() + defer r.Stop() + + require.Eventually(t, func() bool { + m.Lock() + defer m.Unlock() + return len(actual) == 3 + }, time.Second, time.Millisecond*100) + + expected := []*metastorev1.BlockMeta{ + metas[1], + metas[2], + metas[0], + } + + require.Equal(t, len(actual), len(expected)) + for i := range actual { + require.Equal(t, actual[i].Id, expected[i].Id) + require.Equal(t, actual[i].Shard, expected[i].Shard) + } +} + +func addMeta(bucket *memory.InMemBucket, meta *metastorev1.BlockMeta) { + data, _ := meta.MarshalVT() + bucket.Set(segmentstorage.PathForDLQ(meta), data) +} diff --git a/pkg/experiment/metastore/metastore.go b/pkg/experiment/metastore/metastore.go index fa13c095b3..9a339e36f6 100644 --- a/pkg/experiment/metastore/metastore.go +++ b/pkg/experiment/metastore/metastore.go @@ -4,6 +4,9 @@ import ( "context" "flag" "fmt" + "github.com/grafana/pyroscope/pkg/experiment/metastore/dlq" + "github.com/thanos-io/objstore" + "net" "os" "path/filepath" @@ -42,11 +45,13 @@ const ( ) type Config struct { - Address string `yaml:"address"` - GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures the gRPC client used to communicate with the metastore."` - DataDir string `yaml:"data_dir"` - Raft RaftConfig `yaml:"raft"` - Compaction CompactionConfig `yaml:"compaction_config"` + Address string `yaml:"address"` + GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures the gRPC client used to communicate with the metastore."` + DataDir string `yaml:"data_dir"` + Raft RaftConfig `yaml:"raft"` + Compaction CompactionConfig `yaml:"compaction_config"` + MinReadyDuration time.Duration `yaml:"min_ready_duration" category:"advanced"` + DLQRecoveryPeriod time.Duration `yaml:"dlq_recovery_period" category:"advanced"` } type RaftConfig struct { @@ -67,6 +72,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.Address, prefix+"address", "localhost:9095", "") cfg.GRPCClientConfig.RegisterFlagsWithPrefix(prefix+"grpc-client-config", f) f.StringVar(&cfg.DataDir, prefix+"data-dir", "./data-metastore/data", "") + f.DurationVar(&cfg.MinReadyDuration, prefix+"min-ready-duration", 15*time.Second, "Minimum duration to wait after the internal readiness checks have passed but before succeeding the readiness endpoint. This is used to slowdown deployment controllers (eg. Kubernetes) after an instance is ready and before they proceed with a rolling update, to give the rest of the cluster instances enough time to receive some (DNS?) updates.") + f.DurationVar(&cfg.DLQRecoveryPeriod, prefix+"dlq-recovery-period", 15*time.Second, "Period for DLQ recovery loop.") cfg.Raft.RegisterFlagsWithPrefix(prefix+"raft.", f) cfg.Compaction.RegisterFlagsWithPrefix(prefix+"compaction.", f) } @@ -118,7 +125,7 @@ type Metastore struct { snapshots *raft.FileSnapshotStore transport *raft.NetworkTransport raft *raft.Raft - leaderhealth *raftleader.HealthObserver //todo remove + leaderhealth *raftleader.LeaderObserver logStore raft.LogStore stableStore raft.StableStore @@ -133,11 +140,12 @@ type Metastore struct { readySince time.Time dnsProvider *dns.Provider + dlq *dlq.Recovery } type Limits interface{} -func New(config Config, limits Limits, logger log.Logger, reg prometheus.Registerer, client *metastoreclient.Client) (*Metastore, error) { +func New(config Config, limits Limits, logger log.Logger, reg prometheus.Registerer, client *metastoreclient.Client, bucket objstore.Bucket) (*Metastore, error) { metrics := newMetastoreMetrics(reg) m := &Metastore{ config: config, @@ -151,6 +159,9 @@ func New(config Config, limits Limits, logger log.Logger, reg prometheus.Registe } m.leaderhealth = raftleader.NewRaftLeaderHealthObserver(logger, raftleader.NewMetrics(reg)) m.state = newMetastoreState(logger, m.db, m.reg, &config.Compaction) + m.dlq = dlq.NewRecovery(dlq.RecoveryConfig{ + Period: config.DLQRecoveryPeriod, + }, logger, m, bucket) m.service = services.NewBasicService(m.starting, m.running, m.stopping) return m, nil } @@ -158,6 +169,7 @@ func New(config Config, limits Limits, logger log.Logger, reg prometheus.Registe func (m *Metastore) Service() services.Service { return m.service } func (m *Metastore) Shutdown() error { + m.dlq.Stop() m.shutdownRaft() m.db.shutdown() return nil @@ -170,6 +182,7 @@ func (m *Metastore) starting(context.Context) error { if err := m.initRaft(); err != nil { return fmt.Errorf("failed to initialize raft: %w", err) } + m.dlq.Start() m.wg.Add(1) go m.cleanupLoop() return nil @@ -233,7 +246,13 @@ func (m *Metastore) initRaft() (err error) { _ = level.Info(m.logger).Log("msg", "restoring existing state, not bootstraping") } - m.leaderhealth.Register(m.raft, metastoreRaftLeaderHealthServiceName) + m.leaderhealth.Register(m.raft, func(st raft.RaftState) { + if st == raft.Leader { + m.dlq.Start() + } else { + m.dlq.Stop() + } + }) return nil } @@ -287,7 +306,7 @@ func (m *Metastore) shutdownRaft() { _ = level.Error(m.logger).Log("msg", "failed to transfer leadership", "err", err) } } - m.leaderhealth.Deregister(m.raft, metastoreRaftLeaderHealthServiceName) + m.leaderhealth.Deregister(m.raft) if err := m.raft.Shutdown().Error(); err != nil { _ = level.Error(m.logger).Log("msg", "failed to shutdown raft", "err", err) } diff --git a/pkg/experiment/metastore/metastore_bootstrap.go b/pkg/experiment/metastore/metastore_bootstrap.go index 3d8cba9793..ab0df6779b 100644 --- a/pkg/experiment/metastore/metastore_bootstrap.go +++ b/pkg/experiment/metastore/metastore_bootstrap.go @@ -5,7 +5,7 @@ import ( "errors" "fmt" "github.com/grafana/dskit/backoff" - "net" + "github.com/grafana/pyroscope/pkg/experiment/metastore/discovery" "slices" "strings" "time" @@ -61,7 +61,7 @@ func (m *Metastore) bootstrapPeers() ([]raft.Server, error) { if strings.Contains(peer, "+") { resolve = append(resolve, peer) } else { - peers = append(peers, parsePeer(peer)) + peers = append(peers, discovery.ParsePeer(peer)) } } if len(resolve) > 0 { @@ -104,33 +104,6 @@ func (m *Metastore) bootstrapPeers() ([]raft.Server, error) { return peers, nil } -func parsePeer(raw string) raft.Server { - // The string may be "{addr}" or "{addr}/{node_id}". - parts := strings.SplitN(raw, "/", 2) - var addr string - var node string - if len(parts) == 2 { - addr = parts[0] - node = parts[1] - } else { - addr = raw - } - host, _, err := net.SplitHostPort(addr) - if err != nil { - // No port specified. - host = addr - } - if node == "" { - // No node_id specified. - node = host - } - return raft.Server{ - Suffrage: raft.Voter, - ID: raft.ServerID(node), - Address: raft.ServerAddress(addr), - } -} - func (m *Metastore) bootstrapPeersWithRetries() (peers []raft.Server, err error) { attempt := func() bool { peers, err = m.bootstrapPeers() diff --git a/pkg/experiment/metastore/metastore_fsm.go b/pkg/experiment/metastore/metastore_fsm.go index a4833a3f9c..81d04e9219 100644 --- a/pkg/experiment/metastore/metastore_fsm.go +++ b/pkg/experiment/metastore/metastore_fsm.go @@ -17,6 +17,7 @@ import ( compactorv1 "github.com/grafana/pyroscope/api/gen/proto/go/compactor/v1" metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" + "github.com/grafana/pyroscope/pkg/experiment/metastore/raftleader" "github.com/grafana/pyroscope/pkg/experiment/metastore/raftlogpb" "github.com/grafana/pyroscope/pkg/util" ) @@ -249,8 +250,5 @@ func wrapRetryableErrorWithRaftDetails(err error, raft *raft.Raft) error { } func shouldRetryCommand(err error) bool { - return errors.Is(err, raft.ErrLeadershipLost) || - errors.Is(err, raft.ErrNotLeader) || - errors.Is(err, raft.ErrLeadershipTransferInProgress) || - errors.Is(err, raft.ErrRaftShutdown) + return raftleader.IsRaftLeadershipError(err) } diff --git a/pkg/experiment/metastore/metastore_metrics.go b/pkg/experiment/metastore/metastore_metrics.go index c0ebbbbb68..0825978d9f 100644 --- a/pkg/experiment/metastore/metastore_metrics.go +++ b/pkg/experiment/metastore/metastore_metrics.go @@ -13,6 +13,7 @@ type metastoreMetrics struct { fsmRestoreSnapshotDuration prometheus.Histogram fsmApplyCommandHandlerDuration prometheus.Histogram raftAddBlockDuration prometheus.Histogram + raftAddRecoveredBlockDuration prometheus.Histogram } func newMetastoreMetrics(reg prometheus.Registerer) *metastoreMetrics { @@ -44,6 +45,11 @@ func newMetastoreMetrics(reg prometheus.Registerer) *metastoreMetrics { Name: "metastore_raft_add_block_duration_seconds", Buckets: dataTimingBuckets, }), + raftAddRecoveredBlockDuration: prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: "pyroscope", + Name: "metastore_raft_add_recovered_block_duration_seconds", + Buckets: dataTimingBuckets, + }), } if reg != nil { util.RegisterOrGet(reg, m.boltDBPersistSnapshotDuration) @@ -51,6 +57,7 @@ func newMetastoreMetrics(reg prometheus.Registerer) *metastoreMetrics { util.RegisterOrGet(reg, m.fsmRestoreSnapshotDuration) util.RegisterOrGet(reg, m.fsmApplyCommandHandlerDuration) util.RegisterOrGet(reg, m.raftAddBlockDuration) + util.RegisterOrGet(reg, m.raftAddRecoveredBlockDuration) } return m } diff --git a/pkg/experiment/metastore/metastore_readindex.go b/pkg/experiment/metastore/metastore_readindex.go index d031eadf56..424951f19b 100644 --- a/pkg/experiment/metastore/metastore_readindex.go +++ b/pkg/experiment/metastore/metastore_readindex.go @@ -37,7 +37,7 @@ func (m *Metastore) ReadIndex(ctx context.Context, req *metastorev1.ReadIndexReq raftLogger().Log("msg", "verify_leader") if err := m.raft.VerifyLeader().Error(); err != nil { - return new(metastorev1.ReadIndexResponse), err + return nil, wrapRetryableErrorWithRaftDetails(err, m.raft) } tcheck := time.NewTicker(tcheckFreq) @@ -110,7 +110,7 @@ func (m *Metastore) CheckReady(ctx context.Context) (err error) { if m.readySince.IsZero() { m.readySince = time.Now() } - minReadyTime := 30 * time.Second + minReadyTime := m.config.MinReadyDuration if time.Since(m.readySince) < minReadyTime { err := fmt.Errorf("waiting for %v after being ready", minReadyTime) raftLogger().Log(status, notReady, "err", err) diff --git a/pkg/experiment/metastore/metastore_state_add_block.go b/pkg/experiment/metastore/metastore_state_add_block.go index 275c8fbd6a..13ae375047 100644 --- a/pkg/experiment/metastore/metastore_state_add_block.go +++ b/pkg/experiment/metastore/metastore_state_add_block.go @@ -2,6 +2,7 @@ package metastore import ( "context" + "github.com/go-kit/log" "time" "github.com/go-kit/log/level" @@ -13,21 +14,29 @@ import ( ) func (m *Metastore) AddBlock(_ context.Context, req *metastorev1.AddBlockRequest) (*metastorev1.AddBlockResponse, error) { - _ = level.Info(m.logger).Log( - "msg", "adding block", - "block_id", req.Block.Id, - "shard", req.Block.Shard, - "raft_commit_index", m.raft.CommitIndex(), - "raft_last_index", m.raft.LastIndex(), - "raft_applied_index", m.raft.AppliedIndex()) + l := log.With(m.logger, "shard", req.Block.Shard, "block_id", req.Block.Id, "ts", req.Block.MinTime) + _ = level.Info(l).Log("msg", "adding block") t1 := time.Now() defer func() { m.metrics.raftAddBlockDuration.Observe(time.Since(t1).Seconds()) - level.Debug(m.logger).Log("msg", "add block duration", "block_id", req.Block.Id, "shard", req.Block.Shard, "duration", time.Since(t1)) }() _, resp, err := applyCommand[*metastorev1.AddBlockRequest, *metastorev1.AddBlockResponse](m.raft, req, m.config.Raft.ApplyTimeout) if err != nil { - _ = level.Error(m.logger).Log("msg", "failed to apply add block", "block_id", req.Block.Id, "shard", req.Block.Shard, "err", err) + _ = level.Error(l).Log("msg", "failed to apply add block", "err", err) + } + return resp, err +} + +func (m *Metastore) AddRecoveredBlock(_ context.Context, req *metastorev1.AddBlockRequest) (*metastorev1.AddBlockResponse, error) { + l := log.With(m.logger, "shard", req.Block.Shard, "block_id", req.Block.Id, "ts", req.Block.MinTime) + _ = level.Info(l).Log("msg", "adding recovered block") + t1 := time.Now() + defer func() { + m.metrics.raftAddRecoveredBlockDuration.Observe(time.Since(t1).Seconds()) + }() + _, resp, err := applyCommand[*metastorev1.AddBlockRequest, *metastorev1.AddBlockResponse](m.raft, req, m.config.Raft.ApplyTimeout) + if err != nil { + _ = level.Error(l).Log("msg", "failed to apply add recovered block", "err", err) } return resp, err } diff --git a/pkg/experiment/metastore/raftleader/err.go b/pkg/experiment/metastore/raftleader/err.go new file mode 100644 index 0000000000..85c935e797 --- /dev/null +++ b/pkg/experiment/metastore/raftleader/err.go @@ -0,0 +1,14 @@ +package raftleader + +import ( + "errors" + + "github.com/hashicorp/raft" +) + +func IsRaftLeadershipError(err error) bool { + return errors.Is(err, raft.ErrLeadershipLost) || + errors.Is(err, raft.ErrNotLeader) || + errors.Is(err, raft.ErrLeadershipTransferInProgress) || + errors.Is(err, raft.ErrRaftShutdown) +} diff --git a/pkg/experiment/metastore/raftleader/raftleader.go b/pkg/experiment/metastore/raftleader/raftleader.go index a440d9ae87..f593364e09 100644 --- a/pkg/experiment/metastore/raftleader/raftleader.go +++ b/pkg/experiment/metastore/raftleader/raftleader.go @@ -10,10 +10,10 @@ import ( "github.com/hashicorp/raft" ) -type HealthObserver struct { +type LeaderObserver struct { logger log.Logger mu sync.Mutex - registered map[serviceKey]*raftService + registered *raftService metrics *Metrics } type Metrics struct { @@ -33,29 +33,28 @@ func NewMetrics(reg prometheus.Registerer) *Metrics { return m } -func NewRaftLeaderHealthObserver(logger log.Logger, m *Metrics) *HealthObserver { - return &HealthObserver{ +func NewRaftLeaderHealthObserver(logger log.Logger, m *Metrics) *LeaderObserver { + return &LeaderObserver{ logger: logger, metrics: m, - registered: make(map[serviceKey]*raftService), + registered: nil, } } -func (hs *HealthObserver) Register(r *raft.Raft, service string) { +func (hs *LeaderObserver) Register(r *raft.Raft, cb func(st raft.RaftState)) { hs.mu.Lock() defer hs.mu.Unlock() - k := serviceKey{raft: r, service: service} - if _, ok := hs.registered[k]; ok { + if hs.registered != nil { return } svc := &raftService{ - hs: hs, - logger: log.With(hs.logger, "service", service), - service: service, - raft: r, - c: make(chan raft.Observation, 1), - stop: make(chan struct{}), - done: make(chan struct{}), + hs: hs, + logger: hs.logger, + raft: r, + cb: cb, + c: make(chan raft.Observation, 1), + stop: make(chan struct{}), + done: make(chan struct{}), } _ = level.Debug(svc.logger).Log("msg", "registering health check") svc.updateStatus() @@ -65,14 +64,14 @@ func (hs *HealthObserver) Register(r *raft.Raft, service string) { return ok }) r.RegisterObserver(svc.observer) - hs.registered[k] = svc + hs.registered = svc } -func (hs *HealthObserver) Deregister(r *raft.Raft, service string) { +func (hs *LeaderObserver) Deregister(r *raft.Raft) { hs.mu.Lock() - k := serviceKey{raft: r, service: service} - svc, ok := hs.registered[k] - delete(hs.registered, k) + svc := hs.registered + ok := svc != nil + hs.registered = nil hs.mu.Unlock() if ok { close(svc.stop) @@ -80,20 +79,15 @@ func (hs *HealthObserver) Deregister(r *raft.Raft, service string) { } } -type serviceKey struct { - raft *raft.Raft - service string -} - type raftService struct { - hs *HealthObserver + hs *LeaderObserver logger log.Logger - service string raft *raft.Raft observer *raft.Observer c chan raft.Observation stop chan struct{} done chan struct{} + cb func(st raft.RaftState) } func (svc *raftService) run() { @@ -114,6 +108,7 @@ func (svc *raftService) run() { func (svc *raftService) updateStatus() { state := svc.raft.State() + svc.cb(state) svc.hs.metrics.status.Set(float64(state)) _ = level.Info(svc.logger).Log("msg", "updated raft state", "state", state) } diff --git a/pkg/experiment/metastore/test/create.go b/pkg/experiment/metastore/test/create.go new file mode 100644 index 0000000000..5d56341251 --- /dev/null +++ b/pkg/experiment/metastore/test/create.go @@ -0,0 +1,165 @@ +package test + +import ( + "context" + "fmt" + "github.com/go-kit/log" + "github.com/grafana/dskit/services" + compactorv1 "github.com/grafana/pyroscope/api/gen/proto/go/compactor/v1" + metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" + "github.com/grafana/pyroscope/pkg/experiment/metastore" + metastoreclient "github.com/grafana/pyroscope/pkg/experiment/metastore/client" + "github.com/grafana/pyroscope/pkg/experiment/metastore/discovery" + "github.com/grafana/pyroscope/pkg/test" + "github.com/grafana/pyroscope/pkg/test/mocks/mockdiscovery" + "github.com/hashicorp/raft" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" + "google.golang.org/grpc" + "net" + "testing" + "time" +) + +func NewMetastoreSet(t *testing.T, cfg *metastore.Config, n int, bucket objstore.Bucket) MetastoreSet { + l := test.NewTestingLogger(t) + + ports, err := test.GetFreePorts(2 * n) + addresses := make([]string, 2*n) + for i, port := range ports { + addresses[i] = fmt.Sprintf("localhost:%d", port) + } + grpcAddresses := addresses[:n] + raftAddresses := addresses[n:] + raftIds := make([]string, n) + for i := 0; i < n; i++ { + raftIds[i] = fmt.Sprintf("node-%d", i) + } + bootstrapPeers := make([]string, n) + configs := make([]metastore.Config, n) + servers := make([]discovery.Server, n) + + for i := 0; i < n; i++ { + bootstrapPeers[i] = fmt.Sprintf("%s/%s", raftAddresses[i], raftIds[i]) + + icfg := *cfg + icfg.MinReadyDuration = 0 + icfg.Address = grpcAddresses[i] + icfg.DataDir = t.TempDir() + icfg.Raft.ServerID = raftIds[i] + icfg.Raft.Dir = t.TempDir() + icfg.Raft.AdvertiseAddress = raftAddresses[i] + icfg.Raft.BindAddress = raftAddresses[i] + icfg.Raft.BootstrapPeers = bootstrapPeers + icfg.Raft.BootstrapExpectPeers = n + srv := discovery.Server{ + Raft: raft.Server{ + ID: raft.ServerID(raftIds[i]), + Address: raft.ServerAddress(raftAddresses[i]), + }, + ResolvedAddress: addresses[i], + } + servers[i] = srv + configs[i] = icfg + } + require.NoError(t, err) + + d := MockStaticDiscovery(t, servers) + cl := metastoreclient.New(l, cfg.GRPCClientConfig, d) + err = cl.Service().StartAsync(context.Background()) + require.NoError(t, err) + + l.Log("grpcAddresses", fmt.Sprintf("%+v", grpcAddresses), "raftAddresses", fmt.Sprintf("%+v", raftAddresses)) + res := MetastoreSet{ + t: t, + } + for i := 0; i < n; i++ { + options, err := cfg.GRPCClientConfig.DialOption(nil, nil) + require.NoError(t, err) + cc, err := grpc.Dial(grpcAddresses[i], options...) + require.NoError(t, err) + il := log.With(l, "idx", bootstrapPeers[i]) + server := grpc.NewServer() + m, err := metastore.New(configs[i], nil, il, prometheus.NewRegistry(), cl, bucket) + require.NoError(t, err) + metastorev1.RegisterMetastoreServiceServer(server, m) + compactorv1.RegisterCompactionPlannerServer(server, m) + lis, err := net.Listen("tcp", grpcAddresses[i]) + assert.NoError(t, err) + go func() { + err := server.Serve(lis) + assert.NoError(t, err) + }() + res.Instances = append(res.Instances, MetastoreInstance{ + Metastore: m, + Connection: cc, + MetastoreInstanceClient: metastorev1.NewMetastoreServiceClient(cc), + CompactorInstanceClient: compactorv1.NewCompactionPlannerClient(cc), + Server: server, + }) + err = m.Service().StartAsync(context.Background()) + il.Log("msg", "service started") + require.NoError(t, err) + } + + require.Eventually(t, func() bool { + for i := 0; i < n; i++ { + if res.Instances[i].Metastore.Service().State() != services.Running { + return false + } + if res.Instances[i].Metastore.CheckReady(context.Background()) != nil { + return false + } + } + return true + }, 10*time.Second, 100*time.Millisecond) + + res.Client = cl + + return res +} + +func MockStaticDiscovery(t *testing.T, servers []discovery.Server) *mockdiscovery.MockDiscovery { + d := mockdiscovery.NewMockDiscovery(t) + d.On("Subscribe", mock.Anything).Run(func(args mock.Arguments) { + upd := args.Get(0).(discovery.Updates) + upd.Servers(servers) + }) + d.On("ServerError", mock.Anything).Return() + d.On("Close").Return(nil) + return d +} + +type MetastoreInstance struct { + Metastore *metastore.Metastore + Server *grpc.Server + Connection *grpc.ClientConn + MetastoreInstanceClient metastorev1.MetastoreServiceClient + CompactorInstanceClient compactorv1.CompactionPlannerClient +} + +func (i *MetastoreInstance) client() metastorev1.MetastoreServiceClient { + return i.MetastoreInstanceClient +} + +type MetastoreSet struct { + t *testing.T + Instances []MetastoreInstance + Client *metastoreclient.Client +} + +func (m *MetastoreSet) Close() { + for _, i := range m.Instances { + i.Metastore.Service().StopAsync() + err := i.Metastore.Service().AwaitTerminated(context.Background()) + require.NoError(m.t, err) + i.Connection.Close() + i.Server.Stop() + } + m.Client.Service().StopAsync() + err := m.Client.Service().AwaitTerminated(context.Background()) + require.NoError(m.t, err) +} diff --git a/pkg/experiment/metastore/test/metastore_leader_details_test.go b/pkg/experiment/metastore/test/metastore_leader_details_test.go new file mode 100644 index 0000000000..2bfb44432c --- /dev/null +++ b/pkg/experiment/metastore/test/metastore_leader_details_test.go @@ -0,0 +1,77 @@ +package test + +import ( + "context" + "crypto/rand" + "github.com/grafana/dskit/flagext" + compactorv1 "github.com/grafana/pyroscope/api/gen/proto/go/compactor/v1" + metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" + typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" + "github.com/grafana/pyroscope/pkg/experiment/metastore" + "github.com/grafana/pyroscope/pkg/objstore/providers/memory" + "github.com/oklog/ulid" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "testing" +) + +func TestRaftDetailsAddBlock(t *testing.T) { + cfg := new(metastore.Config) + flagext.DefaultValues(cfg) + + ms := NewMetastoreSet(t, cfg, 3, memory.NewInMemBucket()) + defer ms.Close() + + errors := 0 + m := &metastorev1.BlockMeta{ + Id: ulid.MustNew(1, rand.Reader).String(), + } + for _, it := range ms.Instances { + _, err := it.MetastoreInstanceClient.AddBlock(context.Background(), &metastorev1.AddBlockRequest{ + Block: m, + }) + if err != nil { + requireRaftDetails(t, err) + errors++ + } + } + require.Equal(t, 2, errors) +} + +func TestRaftDetailsPullCompaction(t *testing.T) { + cfg := new(metastore.Config) + flagext.DefaultValues(cfg) + + ms := NewMetastoreSet(t, cfg, 3, memory.NewInMemBucket()) + defer ms.Close() + + errors := 0 + for _, it := range ms.Instances { + _, err := it.CompactorInstanceClient.PollCompactionJobs(context.Background(), &compactorv1.PollCompactionJobsRequest{}) + if err != nil { + requireRaftDetails(t, err) + errors++ + } + } + require.Equal(t, 2, errors) +} + +func requireRaftDetails(t *testing.T, err error) { + t.Log("error", err) + s, ok := status.FromError(err) + detailsLeader := "" + if ok && s.Code() == codes.Unavailable { + ds := s.Details() + if len(ds) > 0 { + for _, d := range ds { + if rd, ok := d.(*typesv1.RaftDetails); ok { + detailsLeader = rd.Leader + break + } + } + } + } + t.Log("leader is", detailsLeader) + require.NotEmpty(t, detailsLeader) +} diff --git a/pkg/phlare/modules_experimental.go b/pkg/phlare/modules_experimental.go index 6480a302d3..a8b971df6c 100644 --- a/pkg/phlare/modules_experimental.go +++ b/pkg/phlare/modules_experimental.go @@ -1,7 +1,7 @@ package phlare import ( - "strings" + "fmt" "github.com/go-kit/log" "github.com/grafana/dskit/ring" @@ -9,14 +9,12 @@ import ( "github.com/prometheus/client_golang/prometheus" "google.golang.org/grpc/health/grpc_health_v1" - "github.com/grafana/pyroscope/pkg/experiment/metastore/discovery" - kuberesolver2 "github.com/grafana/pyroscope/pkg/experiment/metastore/discovery/kuberesolver" - compactionworker "github.com/grafana/pyroscope/pkg/experiment/compactor" segmentwriter "github.com/grafana/pyroscope/pkg/experiment/ingester" segmentwriterclient "github.com/grafana/pyroscope/pkg/experiment/ingester/client" "github.com/grafana/pyroscope/pkg/experiment/metastore" metastoreclient "github.com/grafana/pyroscope/pkg/experiment/metastore/client" + "github.com/grafana/pyroscope/pkg/experiment/metastore/discovery" querybackend "github.com/grafana/pyroscope/pkg/experiment/query_backend" querybackendclient "github.com/grafana/pyroscope/pkg/experiment/query_backend/client" "github.com/grafana/pyroscope/pkg/util/health" @@ -107,6 +105,7 @@ func (f *Phlare) initMetastore() (services.Service, error) { logger, f.reg, f.metastoreClient, + f.storageBucket, ) if err != nil { return nil, err @@ -121,17 +120,9 @@ func (f *Phlare) initMetastoreClient() (services.Service, error) { return nil, err } - kubeClient, err := kuberesolver2.NewInClusterK8sClient() + disc, err := discovery.NewDiscovery(f.logger, f.Cfg.Metastore.Address) if err != nil { - return nil, err - } - address := f.Cfg.Metastore.Address - if strings.HasPrefix(address, "dns:///_grpc._tcp.") { - address = strings.Replace(address, "dns:///_grpc._tcp.", "kubernetes:///", 1) // todo support dns discovery - } - disc, err := discovery.NewKubeResolverDiscovery(f.logger, address, kubeClient) - if err != nil { - return nil, err + return nil, fmt.Errorf("failed to create discovery: %w %s", err, f.Cfg.Metastore.Address) } f.metastoreClient = metastoreclient.New( diff --git a/pkg/phlare/phlare.go b/pkg/phlare/phlare.go index af470d77b9..386a47eec5 100644 --- a/pkg/phlare/phlare.go +++ b/pkg/phlare/phlare.go @@ -404,7 +404,7 @@ func (f *Phlare) setupModuleManager() error { if f.Cfg.v2Experiment { experimentalModules := map[string][]string{ SegmentWriter: {Overrides, API, MemberlistKV, Storage, UsageReport, MetastoreClient}, - Metastore: {Overrides, API, MetastoreClient}, + Metastore: {Overrides, API, MetastoreClient, Storage}, CompactionWorker: {Overrides, API, Storage, Overrides, MetastoreClient}, QueryBackend: {Overrides, API, Storage, Overrides, QueryBackendClient}, SegmentWriterRing: {Overrides, API, MemberlistKV}, diff --git a/pkg/phlaredb/block/metadata.go b/pkg/phlaredb/block/metadata.go index 0c2cc0c854..84e986b237 100644 --- a/pkg/phlaredb/block/metadata.go +++ b/pkg/phlaredb/block/metadata.go @@ -392,10 +392,10 @@ func ReadMetaFromDir(dir string) (*Meta, error) { return Read(f) } -func exhaustCloseWithErrCapture(err *error, r io.ReadCloser, format string) { +func exhaustCloseWithErrCapture(err *error, r io.ReadCloser, msg string) { _, copyErr := io.Copy(io.Discard, r) - runutil.CloseWithErrCapture(err, r, format) + runutil.CloseWithErrCapture(err, r, "%s", msg) // Prepend the io.Copy error. merr := multierror.MultiError{} diff --git a/pkg/scheduler/queue/queue_test.go b/pkg/scheduler/queue/queue_test.go index 1653d51a41..b7a6f3840f 100644 --- a/pkg/scheduler/queue/queue_test.go +++ b/pkg/scheduler/queue/queue_test.go @@ -294,7 +294,7 @@ func assertChanReceived(t *testing.T, c chan struct{}, timeout time.Duration, ms select { case <-c: case <-time.After(timeout): - t.Fatalf(msg) + t.Fatalf("%s", msg) } } @@ -303,7 +303,7 @@ func assertChanNotReceived(t *testing.T, c chan struct{}, wait time.Duration, ms select { case <-c: - t.Fatalf(msg) + t.Fatal(msg) case <-time.After(wait): // OK! } diff --git a/pkg/test/integration/helper.go b/pkg/test/integration/helper.go index dd95e619af..7771f936e5 100644 --- a/pkg/test/integration/helper.go +++ b/pkg/test/integration/helper.go @@ -10,7 +10,6 @@ import ( "io" "math/rand" "mime/multipart" - "net" "net/http" "os" "strings" @@ -18,6 +17,8 @@ import ( "testing" "time" + "github.com/grafana/pyroscope/pkg/test" + "connectrpc.com/connect" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" @@ -37,23 +38,6 @@ import ( "github.com/grafana/pyroscope/pkg/util/connectgrpc" ) -// getFreePorts returns a number of free local port for the tests to listen on. Note this will make sure the returned ports do not overlap, by stopping to listen once all ports are allocated -func getFreePorts(len int) (ports []int, err error) { - ports = make([]int, len) - for i := 0; i < len; i++ { - var a *net.TCPAddr - if a, err = net.ResolveTCPAddr("tcp", "127.0.0.1:0"); err == nil { - var l *net.TCPListener - if l, err = net.ListenTCP("tcp", a); err != nil { - return nil, err - } - defer l.Close() - ports[i] = l.Addr().(*net.TCPAddr).Port - } - } - return ports, nil -} - type PyroscopeTest struct { config phlare.Config it *phlare.Phlare @@ -69,7 +53,7 @@ const storeInMemory = "inmemory" func (p *PyroscopeTest) Start(t *testing.T) { - ports, err := getFreePorts(2) + ports, err := test.GetFreePorts(2) require.NoError(t, err) p.httpPort = ports[0] p.memberlistPort = ports[1] diff --git a/pkg/test/logger.go b/pkg/test/logger.go index 8c0831e8f9..74b2de723c 100644 --- a/pkg/test/logger.go +++ b/pkg/test/logger.go @@ -3,6 +3,7 @@ package test import ( + "bytes" "testing" "github.com/go-kit/log" @@ -19,6 +20,14 @@ func NewTestingLogger(t testing.TB) log.Logger { } func (l *testingLogger) Log(keyvals ...interface{}) error { - l.t.Log(keyvals...) + l.t.Helper() + buf := bytes.NewBuffer(nil) + lf := log.NewLogfmtLogger(buf) + lf.Log(keyvals...) + bs := buf.Bytes() + if len(bs) > 0 && bs[len(bs)-1] == '\n' { + bs = bs[:len(bs)-1] + } + l.t.Log(string(bs)) return nil } diff --git a/pkg/test/mocks/mockdlq/mock_local_server.go b/pkg/test/mocks/mockdlq/mock_local_server.go new file mode 100644 index 0000000000..b58e6fd6e2 --- /dev/null +++ b/pkg/test/mocks/mockdlq/mock_local_server.go @@ -0,0 +1,97 @@ +// Code generated by mockery. DO NOT EDIT. + +package mockdlq + +import ( + context "context" + + metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" + + mock "github.com/stretchr/testify/mock" +) + +// MockLocalServer is an autogenerated mock type for the LocalServer type +type MockLocalServer struct { + mock.Mock +} + +type MockLocalServer_Expecter struct { + mock *mock.Mock +} + +func (_m *MockLocalServer) EXPECT() *MockLocalServer_Expecter { + return &MockLocalServer_Expecter{mock: &_m.Mock} +} + +// AddRecoveredBlock provides a mock function with given fields: _a0, _a1 +func (_m *MockLocalServer) AddRecoveredBlock(_a0 context.Context, _a1 *metastorev1.AddBlockRequest) (*metastorev1.AddBlockResponse, error) { + ret := _m.Called(_a0, _a1) + + if len(ret) == 0 { + panic("no return value specified for AddRecoveredBlock") + } + + var r0 *metastorev1.AddBlockResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *metastorev1.AddBlockRequest) (*metastorev1.AddBlockResponse, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *metastorev1.AddBlockRequest) *metastorev1.AddBlockResponse); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*metastorev1.AddBlockResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *metastorev1.AddBlockRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockLocalServer_AddRecoveredBlock_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AddRecoveredBlock' +type MockLocalServer_AddRecoveredBlock_Call struct { + *mock.Call +} + +// AddRecoveredBlock is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *metastorev1.AddBlockRequest +func (_e *MockLocalServer_Expecter) AddRecoveredBlock(_a0 interface{}, _a1 interface{}) *MockLocalServer_AddRecoveredBlock_Call { + return &MockLocalServer_AddRecoveredBlock_Call{Call: _e.mock.On("AddRecoveredBlock", _a0, _a1)} +} + +func (_c *MockLocalServer_AddRecoveredBlock_Call) Run(run func(_a0 context.Context, _a1 *metastorev1.AddBlockRequest)) *MockLocalServer_AddRecoveredBlock_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*metastorev1.AddBlockRequest)) + }) + return _c +} + +func (_c *MockLocalServer_AddRecoveredBlock_Call) Return(_a0 *metastorev1.AddBlockResponse, _a1 error) *MockLocalServer_AddRecoveredBlock_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockLocalServer_AddRecoveredBlock_Call) RunAndReturn(run func(context.Context, *metastorev1.AddBlockRequest) (*metastorev1.AddBlockResponse, error)) *MockLocalServer_AddRecoveredBlock_Call { + _c.Call.Return(run) + return _c +} + +// NewMockLocalServer creates a new instance of MockLocalServer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockLocalServer(t interface { + mock.TestingT + Cleanup(func()) +}) *MockLocalServer { + mock := &MockLocalServer{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/test/ports.go b/pkg/test/ports.go new file mode 100644 index 0000000000..b02bdd85f0 --- /dev/null +++ b/pkg/test/ports.go @@ -0,0 +1,20 @@ +package test + +import "net" + +// GetFreePorts returns a number of free local port for the tests to listen on. Note this will make sure the returned ports do not overlap, by stopping to listen once all ports are allocated +func GetFreePorts(len int) (ports []int, err error) { + ports = make([]int, len) + for i := 0; i < len; i++ { + var a *net.TCPAddr + if a, err = net.ResolveTCPAddr("tcp", "127.0.0.1:0"); err == nil { + var l *net.TCPListener + if l, err = net.ListenTCP("tcp", a); err != nil { + return nil, err + } + defer l.Close() + ports[i] = l.Addr().(*net.TCPAddr).Port + } + } + return ports, nil +} diff --git a/pkg/util/http/error_test.go b/pkg/util/http/error_test.go index 0f42bee6bb..a1fc580c7a 100644 --- a/pkg/util/http/error_test.go +++ b/pkg/util/http/error_test.go @@ -33,7 +33,7 @@ func Test_writeError(t *testing.T) { {"deadline", context.DeadlineExceeded, `{"code":"deadline_exceeded","message":"Request timed out, decrease the duration of the request or add more label matchers (prefer exact match over regex match) to reduce the amount of data processed."}`, http.StatusGatewayTimeout}, {"rpc deadline", status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()).Err(), `{"code":"deadline_exceeded","message":"Request timed out, decrease the duration of the request or add more label matchers (prefer exact match over regex match) to reduce the amount of data processed."}`, http.StatusGatewayTimeout}, // {"mixed context, rpc deadline and another", multierror.MultiError{errors.New("standard error"), context.DeadlineExceeded, status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()).Err()}, "3 errors: standard error; context deadline exceeded; rpc error: code = DeadlineExceeded desc = context deadline exceeded", http.StatusInternalServerError}, - {"httpgrpc", httpgrpc.Errorf(http.StatusBadRequest, errors.New("foo").Error()), `{"code":"invalid_argument","message":"foo"}`, http.StatusBadRequest}, + {"httpgrpc", httpgrpc.Errorf(http.StatusBadRequest, "foo"), `{"code":"invalid_argument","message":"foo"}`, http.StatusBadRequest}, {"internal", errors.New("foo"), `{"code":"unknown","message":"foo"}`, http.StatusInternalServerError}, {"connect", connect.NewError(connect.CodeInvalidArgument, errors.New("foo")), `{"code":"invalid_argument","message":"foo"}`, http.StatusBadRequest}, {"connect wrapped", fmt.Errorf("foo %w", connect.NewError(connect.CodeInvalidArgument, errors.New("foo"))), `{"code":"invalid_argument","message":"foo"}`, http.StatusBadRequest}, diff --git a/tools/update_examples.go b/tools/update_examples.go index 2e7eb3a916..9f46a75e97 100644 --- a/tools/update_examples.go +++ b/tools/update_examples.go @@ -296,7 +296,7 @@ func getTags(repo string) []Tag { for { var pageTags []Tag url := "https://api.github.com/repos/" + repo + "/tags?page=" + strconv.Itoa(page) + "&per_page=100" - log.Printf("GET " + url) + log.Printf("GET %s", url) req, err := http.NewRequest("GET", url, nil) req.Header.Set("Authorization", "Bearer "+strings.TrimSpace(ghToken)) requireNoError(err, "new request")