Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#7806
Browse files Browse the repository at this point in the history
close tikv#7807

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
rleungx authored and ti-chi-bot committed Feb 6, 2024
1 parent 2a2b949 commit fce748e
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 3 deletions.
4 changes: 3 additions & 1 deletion pkg/mcs/meta_storage/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (s *Service) Watch(req *meta_storagepb.WatchRequest, server meta_storagepb.
if err := s.checkServing(); err != nil {
return err
}
ctx, cancel := context.WithCancel(s.ctx)
ctx, cancel := context.WithCancel(server.Context())
defer cancel()
options := []clientv3.OpOption{}
key := string(req.GetKey())
Expand All @@ -106,6 +106,8 @@ func (s *Service) Watch(req *meta_storagepb.WatchRequest, server meta_storagepb.
select {
case <-ctx.Done():
return nil
case <-s.ctx.Done():
return nil
case res := <-watchChan:
if res.Err() != nil {
var resp meta_storagepb.WatchResponse
Expand Down
15 changes: 15 additions & 0 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1917,7 +1917,20 @@ func (s *GrpcServer) WatchGlobalConfig(req *pdpb.WatchGlobalConfigRequest, serve
if s.client == nil {
return ErrEtcdNotStarted
}
<<<<<<< HEAD
ctx, cancel := context.WithCancel(s.Context())
=======
if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() {
fName := currentFunction()
limiter := s.GetGRPCRateLimiter()
if done, err := limiter.Allow(fName); err == nil {
defer done()
} else {
return err
}
}
ctx, cancel := context.WithCancel(server.Context())
>>>>>>> 37be34ef1 (*: fix context usage when watch etcd (#7806))
defer cancel()
configPath := req.GetConfigPath()
if configPath == "" {
Expand All @@ -1933,6 +1946,8 @@ func (s *GrpcServer) WatchGlobalConfig(req *pdpb.WatchGlobalConfigRequest, serve
select {
case <-ctx.Done():
return nil
case <-s.Context().Done():
return nil
case res := <-watchChan:
if res.Err() != nil {
var resp pdpb.WatchGlobalConfigResponse
Expand Down
12 changes: 10 additions & 2 deletions tests/integrations/client/global_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package client_test

import (
"context"
"path"
"strconv"
"sync"
Expand All @@ -37,7 +38,8 @@ import (
const globalConfigPath = "/global/config/"

type testReceiver struct {
re *require.Assertions
re *require.Assertions
ctx context.Context
grpc.ServerStream
}

Expand All @@ -49,6 +51,10 @@ func (s testReceiver) Send(m *pdpb.WatchGlobalConfigResponse) error {
return nil
}

func (s testReceiver) Context() context.Context {
return s.ctx
}

type globalConfigTestSuite struct {
suite.Suite
server *server.GrpcServer
Expand Down Expand Up @@ -189,7 +195,9 @@ func (suite *globalConfigTestSuite) TestWatch() {
suite.NoError(err)
}
}()
server := testReceiver{re: suite.Require()}
ctx, cancel := context.WithCancel(suite.server.Context())
defer cancel()
server := testReceiver{re: suite.Require(), ctx: ctx}
go suite.server.WatchGlobalConfig(&pdpb.WatchGlobalConfigRequest{
ConfigPath: globalConfigPath,
Revision: 0,
Expand Down

0 comments on commit fce748e

Please sign in to comment.