Skip to content

Commit

Permalink
resource_manager: only init the default resource group once (tikv#6788)…
Browse files Browse the repository at this point in the history
… (tikv#6792)

close tikv#6787

Signed-off-by: glorv <[email protected]>

Co-authored-by: glorv <[email protected]>
  • Loading branch information
ti-chi-bot and glorv authored Jul 13, 2023
1 parent 61a7f07 commit f8bf1d7
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 15 deletions.
4 changes: 3 additions & 1 deletion client/resource_manager_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ const (
groupSettingsPathPrefix = "resource_group/settings"
// errNotPrimary is returned when the requested server is not primary.
errNotPrimary = "not primary"
// errNotLeader is returned when the requested server is not pd leader.
errNotLeader = "not leader"
)

// ResourceManagerClient manages resource group info and token request.
Expand All @@ -60,7 +62,7 @@ func (c *client) resourceManagerClient() (rmpb.ResourceManagerClient, error) {

// gRPCErrorHandler is used to handle the gRPC error returned by the resource manager service.
func (c *client) gRPCErrorHandler(err error) {
if strings.Contains(err.Error(), errNotPrimary) {
if strings.Contains(err.Error(), errNotPrimary) || strings.Contains(err.Error(), errNotLeader) {
c.pdSvcDiscovery.ScheduleCheckMemberChanged()
}
}
Expand Down
31 changes: 17 additions & 14 deletions pkg/mcs/resource_manager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,23 +126,26 @@ func (m *Manager) Init(ctx context.Context) {
}
m.storage.LoadResourceGroupStates(tokenHandler)

// Add default group
defaultGroup := &ResourceGroup{
Name: reservedDefaultGroupName,
Mode: rmpb.GroupMode_RUMode,
RUSettings: &RequestUnitSettings{
RU: &GroupTokenBucket{
Settings: &rmpb.TokenLimitSettings{
FillRate: math.MaxInt32,
BurstLimit: -1,
// Add default group if it's not inited.
if _, ok := m.groups[reservedDefaultGroupName]; !ok {
defaultGroup := &ResourceGroup{
Name: reservedDefaultGroupName,
Mode: rmpb.GroupMode_RUMode,
RUSettings: &RequestUnitSettings{
RU: &GroupTokenBucket{
Settings: &rmpb.TokenLimitSettings{
FillRate: math.MaxInt32,
BurstLimit: -1,
},
},
},
},
Priority: middlePriority,
}
if err := m.AddResourceGroup(defaultGroup.IntoProtoResourceGroup()); err != nil {
log.Warn("init default group failed", zap.Error(err))
Priority: middlePriority,
}
if err := m.AddResourceGroup(defaultGroup.IntoProtoResourceGroup()); err != nil {
log.Warn("init default group failed", zap.Error(err))
}
}

// Start the background metrics flusher.
go m.backgroundMetricsFlush(ctx)
go func() {
Expand Down
32 changes: 32 additions & 0 deletions tests/integrations/mcs/resource_manager/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,19 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() {
}
},
},
{"default", rmpb.GroupMode_RUMode, false, true,
`{"name":"default","mode":1,"r_u_settings":{"r_u":{"settings":{"fill_rate":10000,"burst_limit":-1},"state":{"initialized":false}}},"priority":0}`,
func(gs *rmpb.ResourceGroup) {
gs.RUSettings = &rmpb.GroupRequestUnitSettings{
RU: &rmpb.TokenBucket{
Settings: &rmpb.TokenLimitSettings{
FillRate: 10000,
BurstLimit: -1,
},
},
}
},
},
}

checkErr := func(err error, success bool) {
Expand Down Expand Up @@ -865,6 +878,25 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() {
re.Equal(1, len(groups1))
}
}

// test restart cluster
groups, err := cli.ListResourceGroups(suite.ctx)
re.NoError(err)
servers := suite.cluster.GetServers()
re.NoError(suite.cluster.StopAll())
serverList := make([]*tests.TestServer, 0, len(servers))
for _, s := range servers {
serverList = append(serverList, s)
}
re.NoError(suite.cluster.RunServers(serverList))
suite.cluster.WaitLeader()
var newGroups []*rmpb.ResourceGroup
testutil.Eventually(suite.Require(), func() bool {
var err error
newGroups, err = cli.ListResourceGroups(suite.ctx)
return err == nil
}, testutil.WithWaitFor(time.Second))
re.Equal(groups, newGroups)
}

func (suite *resourceManagerClientTestSuite) TestResourceManagerClientFailover() {
Expand Down

0 comments on commit f8bf1d7

Please sign in to comment.