Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#7884
Browse files Browse the repository at this point in the history
ref tikv#5520

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
JmPotato authored and ti-chi-bot committed Mar 7, 2024
1 parent d71a1a3 commit 420c54b
Show file tree
Hide file tree
Showing 25 changed files with 713 additions and 127 deletions.
3 changes: 2 additions & 1 deletion client/grpcutil/grpcutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
)

Expand All @@ -52,7 +53,7 @@ const (
// ctx will be noop. Users should call ClientConn.Close to terminate all the
// pending operations after this function returns.
func GetClientConn(ctx context.Context, addr string, tlsCfg *tls.Config, do ...grpc.DialOption) (*grpc.ClientConn, error) {
opt := grpc.WithInsecure() //nolint
opt := grpc.WithTransportCredentials(insecure.NewCredentials())
if tlsCfg != nil {
creds := credentials.NewTLS(tlsCfg)
opt = grpc.WithTransportCredentials(creds)
Expand Down
318 changes: 318 additions & 0 deletions client/pd_service_discovery_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,318 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pd

import (
"context"
"crypto/tls"
"errors"
"log"
"net"
"net/url"
"sync/atomic"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/tikv/pd/client/grpcutil"
"github.com/tikv/pd/client/testutil"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "google.golang.org/grpc/examples/helloworld/helloworld"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/metadata"
)

type testGRPCServer struct {
pb.UnimplementedGreeterServer
isLeader bool
leaderAddr string
leaderConn *grpc.ClientConn
handleCount atomic.Int32
forwardCount atomic.Int32
}

// SayHello implements helloworld.GreeterServer
func (s *testGRPCServer) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
if !s.isLeader {
if !grpcutil.IsFollowerHandleEnabled(ctx, metadata.FromIncomingContext) {
if addr := grpcutil.GetForwardedHost(ctx, metadata.FromIncomingContext); addr == s.leaderAddr {
s.forwardCount.Add(1)
return pb.NewGreeterClient(s.leaderConn).SayHello(ctx, in)
}
return nil, errors.New("not leader")
}
}
s.handleCount.Add(1)
return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}

func (s *testGRPCServer) resetCount() {
s.handleCount.Store(0)
s.forwardCount.Store(0)
}

func (s *testGRPCServer) getHandleCount() int32 {
return s.handleCount.Load()
}

func (s *testGRPCServer) getForwardCount() int32 {
return s.forwardCount.Load()
}

type testServer struct {
server *testGRPCServer
grpcServer *grpc.Server
addr string
}

func newTestServer(isLeader bool) *testServer {
addr := testutil.Alloc()
u, err := url.Parse(addr)
if err != nil {
return nil
}
grpcServer := grpc.NewServer()
server := &testGRPCServer{
isLeader: isLeader,
}
pb.RegisterGreeterServer(grpcServer, server)
hsrv := health.NewServer()
hsrv.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
healthpb.RegisterHealthServer(grpcServer, hsrv)
return &testServer{
server: server,
grpcServer: grpcServer,
addr: u.Host,
}
}

func (s *testServer) run() {
lis, err := net.Listen("tcp", s.addr)
if err != nil {
log.Fatalf("failed to serve: %v", err)
}
if err := s.grpcServer.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}

type serviceClientTestSuite struct {
suite.Suite
ctx context.Context
clean context.CancelFunc

leaderServer *testServer
followerServer *testServer

leaderClient ServiceClient
followerClient ServiceClient
}

func TestServiceClientClientTestSuite(t *testing.T) {
suite.Run(t, new(serviceClientTestSuite))
}

func (suite *serviceClientTestSuite) SetupSuite() {
suite.ctx, suite.clean = context.WithCancel(context.Background())

suite.leaderServer = newTestServer(true)
suite.followerServer = newTestServer(false)
go suite.leaderServer.run()
go suite.followerServer.run()
for i := 0; i < 10; i++ {
leaderConn, err1 := grpc.Dial(suite.leaderServer.addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
followerConn, err2 := grpc.Dial(suite.followerServer.addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err1 == nil && err2 == nil {
suite.followerClient = newPDServiceClient(suite.followerServer.addr, suite.leaderServer.addr, nil, followerConn, false)
suite.leaderClient = newPDServiceClient(suite.leaderServer.addr, suite.leaderServer.addr, nil, leaderConn, true)
suite.followerServer.server.leaderConn = suite.leaderClient.GetClientConn()
suite.followerServer.server.leaderAddr = suite.leaderClient.GetAddress()
return
}
time.Sleep(50 * time.Millisecond)
}
suite.NotNil(suite.leaderClient)
}

func (suite *serviceClientTestSuite) TearDownTest() {
suite.leaderServer.server.resetCount()
suite.followerServer.server.resetCount()
}

func (suite *serviceClientTestSuite) TearDownSuite() {
suite.leaderServer.grpcServer.GracefulStop()
suite.followerServer.grpcServer.GracefulStop()
suite.leaderClient.GetClientConn().Close()
suite.followerClient.GetClientConn().Close()
suite.clean()
}

func (suite *serviceClientTestSuite) TestServiceClient() {
re := suite.Require()
leaderAddress := suite.leaderServer.addr
followerAddress := suite.followerServer.addr

follower := suite.followerClient
leader := suite.leaderClient

re.Equal(follower.GetAddress(), followerAddress)
re.Equal(leader.GetAddress(), leaderAddress)
re.Equal(follower.GetHTTPAddress(), "http://"+followerAddress)
re.Equal(leader.GetHTTPAddress(), "http://"+leaderAddress)

re.True(follower.Available())
re.True(leader.Available())

re.False(follower.IsConnectedToLeader())
re.True(leader.IsConnectedToLeader())

re.NoError(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork1", "return(true)"))
follower.(*pdServiceClient).checkNetworkAvailable(suite.ctx)
leader.(*pdServiceClient).checkNetworkAvailable(suite.ctx)
re.False(follower.Available())
re.False(leader.Available())
re.NoError(failpoint.Disable("github.com/tikv/pd/client/unreachableNetwork1"))

follower.(*pdServiceClient).checkNetworkAvailable(suite.ctx)
leader.(*pdServiceClient).checkNetworkAvailable(suite.ctx)
re.True(follower.Available())
re.True(leader.Available())

followerConn := follower.GetClientConn()
leaderConn := leader.GetClientConn()
re.NotNil(followerConn)
re.NotNil(leaderConn)

_, err := pb.NewGreeterClient(followerConn).SayHello(suite.ctx, &pb.HelloRequest{Name: "pd"})
re.ErrorContains(err, "not leader")
resp, err := pb.NewGreeterClient(leaderConn).SayHello(suite.ctx, &pb.HelloRequest{Name: "pd"})
re.NoError(err)
re.Equal("Hello pd", resp.GetMessage())

re.False(follower.NeedRetry(nil, nil))
re.False(leader.NeedRetry(nil, nil))

ctx1 := context.WithoutCancel(suite.ctx)
ctx1 = follower.BuildGRPCTargetContext(ctx1, false)
re.True(grpcutil.IsFollowerHandleEnabled(ctx1, metadata.FromOutgoingContext))
re.Empty(grpcutil.GetForwardedHost(ctx1, metadata.FromOutgoingContext))
ctx2 := context.WithoutCancel(suite.ctx)
ctx2 = follower.BuildGRPCTargetContext(ctx2, true)
re.False(grpcutil.IsFollowerHandleEnabled(ctx2, metadata.FromOutgoingContext))
re.Equal(grpcutil.GetForwardedHost(ctx2, metadata.FromOutgoingContext), leaderAddress)

ctx3 := context.WithoutCancel(suite.ctx)
ctx3 = leader.BuildGRPCTargetContext(ctx3, false)
re.False(grpcutil.IsFollowerHandleEnabled(ctx3, metadata.FromOutgoingContext))
re.Empty(grpcutil.GetForwardedHost(ctx3, metadata.FromOutgoingContext))
ctx4 := context.WithoutCancel(suite.ctx)
ctx4 = leader.BuildGRPCTargetContext(ctx4, true)
re.False(grpcutil.IsFollowerHandleEnabled(ctx4, metadata.FromOutgoingContext))
re.Empty(grpcutil.GetForwardedHost(ctx4, metadata.FromOutgoingContext))

followerAPIClient := newPDServiceAPIClient(follower, regionAPIErrorFn)
leaderAPIClient := newPDServiceAPIClient(leader, regionAPIErrorFn)

re.NoError(failpoint.Enable("github.com/tikv/pd/client/fastCheckAvailable", "return(true)"))

re.True(followerAPIClient.Available())
re.True(leaderAPIClient.Available())
pdErr1 := &pdpb.Error{
Type: pdpb.ErrorType_UNKNOWN,
}
pdErr2 := &pdpb.Error{
Type: pdpb.ErrorType_REGION_NOT_FOUND,
}
err = errors.New("error")
re.True(followerAPIClient.NeedRetry(pdErr1, nil))
re.False(leaderAPIClient.NeedRetry(pdErr1, nil))
re.True(followerAPIClient.Available())
re.True(leaderAPIClient.Available())

re.True(followerAPIClient.NeedRetry(pdErr2, nil))
re.False(leaderAPIClient.NeedRetry(pdErr2, nil))
re.False(followerAPIClient.Available())
re.True(leaderAPIClient.Available())
followerAPIClient.(*pdServiceAPIClient).markAsAvailable()
leaderAPIClient.(*pdServiceAPIClient).markAsAvailable()
re.False(followerAPIClient.Available())
time.Sleep(time.Millisecond * 100)
followerAPIClient.(*pdServiceAPIClient).markAsAvailable()
re.True(followerAPIClient.Available())

re.True(followerAPIClient.NeedRetry(nil, err))
re.False(leaderAPIClient.NeedRetry(nil, err))
re.True(followerAPIClient.Available())
re.True(leaderAPIClient.Available())

re.NoError(failpoint.Disable("github.com/tikv/pd/client/fastCheckAvailable"))
}

func (suite *serviceClientTestSuite) TestServiceClientBalancer() {
re := suite.Require()
follower := suite.followerClient
leader := suite.leaderClient
b := &pdServiceBalancer{}
b.set([]ServiceClient{leader, follower})
re.Equal(2, b.totalNode)

for i := 0; i < 10; i++ {
client := b.get()
ctx := client.BuildGRPCTargetContext(suite.ctx, false)
conn := client.GetClientConn()
re.NotNil(conn)
resp, err := pb.NewGreeterClient(conn).SayHello(ctx, &pb.HelloRequest{Name: "pd"})
re.NoError(err)
re.Equal("Hello pd", resp.GetMessage())
}
re.Equal(int32(5), suite.leaderServer.server.getHandleCount())
re.Equal(int32(5), suite.followerServer.server.getHandleCount())
suite.followerServer.server.resetCount()
suite.leaderServer.server.resetCount()

for i := 0; i < 10; i++ {
client := b.get()
ctx := client.BuildGRPCTargetContext(suite.ctx, true)
conn := client.GetClientConn()
re.NotNil(conn)
resp, err := pb.NewGreeterClient(conn).SayHello(ctx, &pb.HelloRequest{Name: "pd"})
re.NoError(err)
re.Equal("Hello pd", resp.GetMessage())
}
re.Equal(int32(10), suite.leaderServer.server.getHandleCount())
re.Equal(int32(0), suite.followerServer.server.getHandleCount())
re.Equal(int32(5), suite.followerServer.server.getForwardCount())
}

func TestHTTPScheme(t *testing.T) {
re := require.New(t)
cli := newPDServiceClient("127.0.0.1:2379", "127.0.0.1:2379", nil, nil, false)
re.Equal("http://127.0.0.1:2379", cli.GetHTTPAddress())
cli = newPDServiceClient("https://127.0.0.1:2379", "127.0.0.1:2379", nil, nil, false)
re.Equal("http://127.0.0.1:2379", cli.GetHTTPAddress())
cli = newPDServiceClient("http://127.0.0.1:2379", "127.0.0.1:2379", nil, nil, false)
re.Equal("http://127.0.0.1:2379", cli.GetHTTPAddress())
cli = newPDServiceClient("127.0.0.1:2379", "127.0.0.1:2379", &tls.Config{}, nil, false)
re.Equal("https://127.0.0.1:2379", cli.GetHTTPAddress())
cli = newPDServiceClient("https://127.0.0.1:2379", "127.0.0.1:2379", &tls.Config{}, nil, false)
re.Equal("https://127.0.0.1:2379", cli.GetHTTPAddress())
cli = newPDServiceClient("http://127.0.0.1:2379", "127.0.0.1:2379", &tls.Config{}, nil, false)
re.Equal("https://127.0.0.1:2379", cli.GetHTTPAddress())
}
18 changes: 15 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ module github.com/tikv/pd

go 1.21

// When you modify PD cooperatively with kvproto, this will be useful to submit the PR to PD and the PR to
// kvproto at the same time. You can run `go mod tidy` to make it replaced with go-mod style specification.
// After the PR to kvproto is merged, remember to comment this out and run `go mod tidy`.
// replace github.com/pingcap/kvproto => github.com/$YourPrivateRepo $YourPrivateBranch

require (
github.com/AlekSi/gocov-xml v1.0.0
github.com/BurntSushi/toml v0.3.1
Expand Down Expand Up @@ -43,7 +48,7 @@ require (
github.com/sasha-s/go-deadlock v0.2.0
github.com/shirou/gopsutil/v3 v3.23.3
github.com/smallnest/chanx v0.0.0-20221229104322-eb4c998d2072
github.com/soheilhy/cmux v0.1.4
github.com/soheilhy/cmux v0.1.5
github.com/spf13/cobra v1.0.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.3
Expand All @@ -52,7 +57,7 @@ require (
github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965
github.com/unrolled/render v1.0.1
github.com/urfave/negroni v0.3.0
go.etcd.io/etcd v0.5.0-alpha.5.0.20220915004622-85b640cee793
go.etcd.io/etcd v0.5.0-alpha.5.0.20240131130919-ff2304879e1b
go.uber.org/atomic v1.10.0
go.uber.org/goleak v1.1.12
go.uber.org/zap v1.24.0
Expand Down Expand Up @@ -120,6 +125,10 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/google/pprof v0.0.0-20211122183932-1daafda22083 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
<<<<<<< HEAD
=======
github.com/google/uuid v1.3.1 // indirect
>>>>>>> 57cd60348 (*: upgrade etcd to v3.4.30 (#7884))
github.com/gorilla/websocket v1.4.2 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
Expand Down Expand Up @@ -179,7 +188,7 @@ require (
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
// Fix panic in unit test with go >= 1.14, ref: etcd-io/bbolt#201 https://github.com/etcd-io/bbolt/pull/201
go.etcd.io/bbolt v1.3.6 // indirect
go.etcd.io/bbolt v1.3.8 // indirect
go.uber.org/dig v1.9.0 // indirect
go.uber.org/fx v1.12.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand All @@ -205,10 +214,13 @@ require (
moul.io/zapgorm2 v1.1.0 // indirect
sigs.k8s.io/yaml v1.2.0 // indirect
)
<<<<<<< HEAD

replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0

// When you modify PD cooperatively with kvproto, this will be useful to submit the PR to PD and the PR to
// kvproto at the same time. You can run `go mod tidy` to make it replaced with go-mod style specification.
// After the PR to kvproto is merged, remember to comment this out and run `go mod tidy`.
// replace github.com/pingcap/kvproto => github.com/$YourPrivateRepo $YourPrivateBranch
=======
>>>>>>> 57cd60348 (*: upgrade etcd to v3.4.30 (#7884))
Loading

0 comments on commit 420c54b

Please sign in to comment.