Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc_service: add ManuallyChangePeer and ManuallyChangeMultiPeers. #8266

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/eraftpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -133,6 +134,11 @@ type RPCClient interface {
// SetExternalTimestamp sets external timestamp
SetExternalTimestamp(ctx context.Context, timestamp uint64) error

// ManuallyChangePeer manually changes the peer of a region.
ManuallyChangePeer(ctx context.Context, regionID uint64, changeType eraftpb.ConfChangeType, peer *metapb.Peer) error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary? Why not let TiKV handle it? If we are going to use RPC to solve the snapshot problem, we still can not ensure that the operator is executed successfully.

Copy link
Contributor Author

@LykxSassinator LykxSassinator Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used to make the operation on add peer / remove peer managed by PD uniformly.

If let TiKV handle it directly, the scheduling on add peer or remove peer will not be perceived by PD, and the check of the scheduler cannot be validated by the store limit and other checking mechanisms.

As for

ensure that the operator is executed successfully.

TiKV will retry the remove peer RPC if the operator does not make senses until this peer is cleared.

// ManuallyChangeMultiPeers manually changes multi peers of a region.
ManuallyChangeMultiPeers(ctx context.Context, regionID uint64, changeTypes map[uint64]eraftpb.ConfChangeType, peers []*metapb.Peer) error

// TSOClient is the TSO client.
TSOClient
// MetaStorageClient is the meta storage client.
Expand Down Expand Up @@ -1600,3 +1606,60 @@ func (c *client) GetTSOAllocators() *sync.Map {
}
return tsoClient.GetTSOAllocators()
}

func (c *client) ManuallyChangePeer(ctx context.Context, regionID uint64, changeType eraftpb.ConfChangeType, peer *metapb.Peer) error {
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
return errs.ErrClientGetProtoClient
}
resp, err := protoClient.ManuallyChangePeer(ctx, &pdpb.ChangePeerRequest{
Header: c.requestHeader(),
RegionId: regionID,
ChangePeer: &pdpb.ChangePeer{
ChangeType: changeType,
Peer: peer,
},
})
if err != nil {
return err
}
resErr := resp.GetHeader().GetError()
if resErr != nil {
return errors.Errorf("[pd]" + resErr.Message)
}
return nil
}

func (c *client) ManuallyChangeMultiPeers(ctx context.Context, regionID uint64, changeTypes map[uint64]eraftpb.ConfChangeType, peers []*metapb.Peer) error {
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
return errs.ErrClientGetProtoClient
}

changes := make([]*pdpb.ChangePeer, 0, len(peers))
for _, peer := range peers {
changes = append(changes, &pdpb.ChangePeer{
ChangeType: changeTypes[peer.Id],
Peer: peer,
})
}
resp, err := protoClient.ManuallyChangePeer(ctx, &pdpb.ChangePeerRequest{
Header: c.requestHeader(),
RegionId: regionID,
ChangePeerV2: &pdpb.ChangePeerV2{
Changes: changes,
},
})
if err != nil {
return err
}
resErr := resp.GetHeader().GetError()
if resErr != nil {
return errors.Errorf("[pd]" + resErr.Message)
}
return nil
}
2 changes: 2 additions & 0 deletions client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module github.com/tikv/pd/client

go 1.21

replace github.com/pingcap/kvproto => github.com/LykxSassinator/kvproto v0.0.0-20240605160713-4dfe4594d6ca
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will be cleared after pingcap/kvproto#1250 is merged.


require (
github.com/BurntSushi/toml v0.3.1
github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/LykxSassinator/kvproto v0.0.0-20240605160713-4dfe4594d6ca h1:1IHVZpI2fgoykVX3dl+QzQ8qv05/miYEZNyePumUYOg=
github.com/LykxSassinator/kvproto v0.0.0-20240605160713-4dfe4594d6ca/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
Expand Down Expand Up @@ -46,8 +48,6 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20231222062942-c0c73f41d0b2 h1:364A6VCS+l0oHBKZKotX9LzmfEtIO/NTccTIQcPp3Ug=
github.com/pingcap/kvproto v0.0.0-20231222062942-c0c73f41d0b2/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go 1.21
// kvproto at the same time. You can run `go mod tidy` to make it replaced with go-mod style specification.
// After the PR to kvproto is merged, remember to comment this out and run `go mod tidy`.
// replace github.com/pingcap/kvproto => github.com/$YourPrivateRepo $YourPrivateBranch
replace github.com/pingcap/kvproto => github.com/LykxSassinator/kvproto v0.0.0-20240605160713-4dfe4594d6ca
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.


require (
github.com/AlekSi/gocov-xml v1.0.0
Expand Down Expand Up @@ -113,7 +114,6 @@ require (
github.com/goccy/go-json v0.10.2 // indirect
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/pprof v0.0.0-20211122183932-1daafda22083 // indirect
Expand Down
Loading
Loading