From fce748efc35bde311c2f3ebeb8632359dcfbb3e3 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 5 Feb 2024 17:37:13 +0800 Subject: [PATCH] This is an automated cherry-pick of #7806 close tikv/pd#7807 Signed-off-by: ti-chi-bot --- pkg/mcs/meta_storage/server/grpc_service.go | 4 +++- server/grpc_service.go | 15 +++++++++++++++ tests/integrations/client/global_config_test.go | 12 ++++++++++-- 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/pkg/mcs/meta_storage/server/grpc_service.go b/pkg/mcs/meta_storage/server/grpc_service.go index e9d35fbf14b..3da079e6109 100644 --- a/pkg/mcs/meta_storage/server/grpc_service.go +++ b/pkg/mcs/meta_storage/server/grpc_service.go @@ -86,7 +86,7 @@ func (s *Service) Watch(req *meta_storagepb.WatchRequest, server meta_storagepb. if err := s.checkServing(); err != nil { return err } - ctx, cancel := context.WithCancel(s.ctx) + ctx, cancel := context.WithCancel(server.Context()) defer cancel() options := []clientv3.OpOption{} key := string(req.GetKey()) @@ -106,6 +106,8 @@ func (s *Service) Watch(req *meta_storagepb.WatchRequest, server meta_storagepb. select { case <-ctx.Done(): return nil + case <-s.ctx.Done(): + return nil case res := <-watchChan: if res.Err() != nil { var resp meta_storagepb.WatchResponse diff --git a/server/grpc_service.go b/server/grpc_service.go index 1b47cd5ed30..bd78a6845f0 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1917,7 +1917,20 @@ func (s *GrpcServer) WatchGlobalConfig(req *pdpb.WatchGlobalConfigRequest, serve if s.client == nil { return ErrEtcdNotStarted } +<<<<<<< HEAD ctx, cancel := context.WithCancel(s.Context()) +======= + if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { + fName := currentFunction() + limiter := s.GetGRPCRateLimiter() + if done, err := limiter.Allow(fName); err == nil { + defer done() + } else { + return err + } + } + ctx, cancel := context.WithCancel(server.Context()) +>>>>>>> 37be34ef1 (*: fix context usage when watch etcd (#7806)) defer cancel() configPath := req.GetConfigPath() if configPath == "" { @@ -1933,6 +1946,8 @@ func (s *GrpcServer) WatchGlobalConfig(req *pdpb.WatchGlobalConfigRequest, serve select { case <-ctx.Done(): return nil + case <-s.Context().Done(): + return nil case res := <-watchChan: if res.Err() != nil { var resp pdpb.WatchGlobalConfigResponse diff --git a/tests/integrations/client/global_config_test.go b/tests/integrations/client/global_config_test.go index 15034d035a6..a2ee1468146 100644 --- a/tests/integrations/client/global_config_test.go +++ b/tests/integrations/client/global_config_test.go @@ -15,6 +15,7 @@ package client_test import ( + "context" "path" "strconv" "sync" @@ -37,7 +38,8 @@ import ( const globalConfigPath = "/global/config/" type testReceiver struct { - re *require.Assertions + re *require.Assertions + ctx context.Context grpc.ServerStream } @@ -49,6 +51,10 @@ func (s testReceiver) Send(m *pdpb.WatchGlobalConfigResponse) error { return nil } +func (s testReceiver) Context() context.Context { + return s.ctx +} + type globalConfigTestSuite struct { suite.Suite server *server.GrpcServer @@ -189,7 +195,9 @@ func (suite *globalConfigTestSuite) TestWatch() { suite.NoError(err) } }() - server := testReceiver{re: suite.Require()} + ctx, cancel := context.WithCancel(suite.server.Context()) + defer cancel() + server := testReceiver{re: suite.Require(), ctx: ctx} go suite.server.WatchGlobalConfig(&pdpb.WatchGlobalConfigRequest{ ConfigPath: globalConfigPath, Revision: 0,