From b7396e6b3f692e7f80ab4a0a55cf6cf4949d020e Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Sat, 14 Sep 2024 12:38:30 +0800 Subject: [PATCH] scheduler: fix scheduler save config (#7108) (#7165) close tikv/pd#6897 Signed-off-by: husharp Co-authored-by: husharp Co-authored-by: Hu# --- server/cluster/coordinator.go | 4 + server/cluster/coordinator_test.go | 3 +- server/schedule/scheduler.go | 13 ++- server/schedulers/evict_leader.go | 15 +++ tests/server/api/testutil.go | 59 ++++++++++++ tests/server/cluster/cluster_test.go | 139 +++++++++++++++++++++++++++ 6 files changed, 225 insertions(+), 8 deletions(-) create mode 100644 tests/server/api/testutil.go diff --git a/server/cluster/coordinator.go b/server/cluster/coordinator.go index 4f4f991324a..db2d4e50589 100644 --- a/server/cluster/coordinator.go +++ b/server/cluster/coordinator.go @@ -639,6 +639,10 @@ func (c *coordinator) addScheduler(scheduler schedule.Scheduler, args ...string) c.wg.Add(1) go c.runScheduler(s) c.schedulers[s.GetName()] = s + if err := schedule.SaveSchedulerConfig(c.cluster.storage, scheduler); err != nil { + log.Error("can not save scheduler config", zap.String("scheduler-name", scheduler.GetName()), errs.ZapError(err)) + return err + } c.cluster.opt.AddSchedulerCfg(s.GetType(), args) return nil } diff --git a/server/cluster/coordinator_test.go b/server/cluster/coordinator_test.go index b77543201f1..3a4bd47a742 100644 --- a/server/cluster/coordinator_test.go +++ b/server/cluster/coordinator_test.go @@ -703,8 +703,9 @@ func (s *testCoordinatorSuite) TestPersistScheduler(c *C) { // whether the schedulers added or removed in dynamic way are recorded in opt _, newOpt, err := newTestScheduleConfig() c.Assert(err, IsNil) - _, err = schedule.CreateScheduler(schedulers.ShuffleRegionType, oc, storage, schedule.ConfigJSONDecoder([]byte("null"))) + shuffle, err := schedule.CreateScheduler(schedulers.ShuffleRegionType, oc, storage, schedule.ConfigJSONDecoder([]byte("null"))) c.Assert(err, IsNil) + c.Assert(co.addScheduler(shuffle), IsNil) // suppose we add a new default enable scheduler config.DefaultSchedulers = append(config.DefaultSchedulers, config.SchedulerConfig{Type: "shuffle-region"}) defer func() { diff --git a/server/schedule/scheduler.go b/server/schedule/scheduler.go index d5dbede2ffc..901d6b3fc26 100644 --- a/server/schedule/scheduler.go +++ b/server/schedule/scheduler.go @@ -118,17 +118,16 @@ func CreateScheduler(typ string, opController *OperatorController, storage *core if !ok { return nil, errs.ErrSchedulerCreateFuncNotRegistered.FastGenByArgs(typ) } + return fn(opController, storage, dec) +} - s, err := fn(opController, storage, dec) - if err != nil { - return nil, err - } +// SaveSchedulerConfig saves the config of the specified scheduler. +func SaveSchedulerConfig(storage *core.Storage, s Scheduler) error { data, err := s.EncodeConfig() if err != nil { - return nil, err + return err } - err = storage.SaveScheduleConfig(s.GetName(), data) - return s, err + return storage.SaveScheduleConfig(s.GetName(), data) } // FindSchedulerTypeByName finds the type of the specified name. diff --git a/server/schedulers/evict_leader.go b/server/schedulers/evict_leader.go index 1862c337de8..d1f6d392b49 100644 --- a/server/schedulers/evict_leader.go +++ b/server/schedulers/evict_leader.go @@ -85,6 +85,16 @@ type evictLeaderSchedulerConfig struct { cluster opt.Cluster } +func (conf *evictLeaderSchedulerConfig) getStores() []uint64 { + conf.mu.RLock() + defer conf.mu.RUnlock() + stores := make([]uint64, 0, len(conf.StoreIDWithRanges)) + for storeID := range conf.StoreIDWithRanges { + stores = append(stores, storeID) + } + return stores +} + func (conf *evictLeaderSchedulerConfig) BuildWithArgs(args []string) error { if len(args) != 1 { return errs.ErrSchedulerConfig.FastGenByArgs("id") @@ -189,6 +199,11 @@ func newEvictLeaderScheduler(opController *schedule.OperatorController, conf *ev } } +// EvictStoreIDs returns the IDs of the evict-stores. +func (s *evictLeaderScheduler) EvictStoreIDs() []uint64 { + return s.conf.getStores() +} + func (s *evictLeaderScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.handler.ServeHTTP(w, r) } diff --git a/tests/server/api/testutil.go b/tests/server/api/testutil.go new file mode 100644 index 00000000000..a6d5322a44f --- /dev/null +++ b/tests/server/api/testutil.go @@ -0,0 +1,59 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + + "github.com/pingcap/check" +) + +const schedulersPrefix = "/pd/api/v1/schedulers" + +// dialClient used to dial http request. +var dialClient = &http.Client{ + Transport: &http.Transport{ + DisableKeepAlives: true, + }, +} + +// MustAddScheduler adds a scheduler with HTTP API. +func MustAddScheduler( + c *check.C, serverAddr string, + schedulerName string, args map[string]interface{}, +) { + request := map[string]interface{}{ + "name": schedulerName, + } + for arg, val := range args { + request[arg] = val + } + data, err := json.Marshal(request) + c.Assert(err, check.IsNil) + + httpReq, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s%s", serverAddr, schedulersPrefix), bytes.NewBuffer(data)) + c.Assert(err, check.IsNil) + // Send request. + resp, err := dialClient.Do(httpReq) + c.Assert(err, check.IsNil) + defer resp.Body.Close() + _, err = io.ReadAll(resp.Body) + c.Assert(err, check.IsNil) + c.Assert(resp.StatusCode, check.Equals, http.StatusOK) +} diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 1cd55beae4c..2ad1afc4ec0 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -17,6 +17,10 @@ package cluster_test import ( "context" "fmt" + "github.com/tikv/pd/server/id" + "github.com/tikv/pd/server/schedulers" + "github.com/tikv/pd/tests/server/api" + "strconv" "sync" "testing" "time" @@ -1182,3 +1186,138 @@ func (s *clusterTestSuite) TestTransferLeaderBack(c *C) { c.Assert(rc.GetMetaCluster(), DeepEquals, meta) c.Assert(rc.GetStoreCount(), Equals, 3) } + +func (s *clusterTestSuite) TestTransferLeaderForScheduler(c *C) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + c.Assert(failpoint.Enable("github.com/tikv/pd/server/cluster/changeCoordinatorTicker", `return(true)`), IsNil) + tc, err := tests.NewTestCluster(ctx, 2) + defer tc.Destroy() + c.Assert(err, IsNil) + err = tc.RunInitialServers() + c.Assert(err, IsNil) + tc.WaitLeader() + // start + leaderServer := tc.GetServer(tc.GetLeader()) + c.Assert(leaderServer.BootstrapCluster(), IsNil) + rc := leaderServer.GetServer().GetRaftCluster() + c.Assert(rc, NotNil) + + storesNum := 2 + grpcPDClient := testutil.MustNewGrpcClient(c, leaderServer.GetAddr()) + for i := 1; i <= storesNum; i++ { + store := &metapb.Store{ + Id: uint64(i), + Address: "127.0.0.1:" + strconv.Itoa(i), + } + resp, err := putStore(grpcPDClient, leaderServer.GetClusterID(), store) + c.Assert(err, IsNil) + c.Assert(resp.GetHeader().GetError().GetType(), Equals, pdpb.ErrorType_OK) + } + // region heartbeat + id := leaderServer.GetAllocator() + putRegionWithLeader(c, rc, id, 1) + + time.Sleep(time.Second) + c.Assert(leaderServer.GetRaftCluster().IsPrepared(), IsTrue) + + // Add evict leader scheduler + api.MustAddScheduler(c, leaderServer.GetAddr(), schedulers.EvictLeaderName, map[string]interface{}{ + "store_id": 1, + }) + api.MustAddScheduler(c, leaderServer.GetAddr(), schedulers.EvictLeaderName, map[string]interface{}{ + "store_id": 2, + }) + // Check scheduler updated. + c.Assert(len(rc.GetSchedulers()), Equals, 4) + checkEvictLeaderSchedulerExist(c, rc, true) + checkEvictLeaderStoreIDs(c, rc, []uint64{1, 2}) + + // transfer PD leader to another PD + tc.ResignLeader() + rc.Stop() + tc.WaitLeader() + leaderServer = tc.GetServer(tc.GetLeader()) + rc1 := leaderServer.GetServer().GetRaftCluster() + rc1.Start(leaderServer.GetServer()) + c.Assert(err, IsNil) + c.Assert(rc1, NotNil) + // region heartbeat + id = leaderServer.GetAllocator() + putRegionWithLeader(c, rc1, id, 1) + time.Sleep(time.Second) + c.Assert(leaderServer.GetRaftCluster().IsPrepared(), IsTrue) + // Check scheduler updated. + c.Assert(len(rc.GetSchedulers()), Equals, 4) + checkEvictLeaderSchedulerExist(c, rc, true) + checkEvictLeaderStoreIDs(c, rc, []uint64{1, 2}) + + // transfer PD leader back to the previous PD + tc.ResignLeader() + rc1.Stop() + tc.WaitLeader() + leaderServer = tc.GetServer(tc.GetLeader()) + rc = leaderServer.GetServer().GetRaftCluster() + rc.Start(leaderServer.GetServer()) + c.Assert(rc, NotNil) + // region heartbeat + id = leaderServer.GetAllocator() + putRegionWithLeader(c, rc, id, 1) + time.Sleep(time.Second) + c.Assert(leaderServer.GetRaftCluster().IsPrepared(), IsTrue) + // Check scheduler updated + c.Assert(len(rc.GetSchedulers()), Equals, 4) + checkEvictLeaderSchedulerExist(c, rc, true) + checkEvictLeaderStoreIDs(c, rc, []uint64{1, 2}) + + c.Assert(failpoint.Disable("github.com/tikv/pd/server/cluster/changeCoordinatorTicker"), IsNil) +} + +func checkEvictLeaderSchedulerExist(c *C, rc *cluster.RaftCluster, exist bool) { + isExistScheduler := func(rc *cluster.RaftCluster, name string) bool { + s := rc.GetSchedulers() + for _, scheduler := range s { + if scheduler == name { + return true + } + } + return false + } + + testutil.WaitUntil(c, func(c *C) bool { + return isExistScheduler(rc, schedulers.EvictLeaderName) == exist + }) +} + +func checkEvictLeaderStoreIDs(c *C, rc *cluster.RaftCluster, expected []uint64) { + handler, ok := rc.GetSchedulerHandlers()[schedulers.EvictLeaderName] + c.Assert(ok, IsTrue) + h, ok := handler.(interface { + EvictStoreIDs() []uint64 + }) + c.Assert(ok, IsTrue) + var evictStoreIDs []uint64 + testutil.WaitUntil(c, func(c *C) bool { + evictStoreIDs = h.EvictStoreIDs() + return len(evictStoreIDs) == len(expected) + }) +} + +func putRegionWithLeader(c *C, rc *cluster.RaftCluster, id id.Allocator, storeID uint64) { + for i := 0; i < 3; i++ { + regionID, err := id.Alloc() + c.Assert(err, IsNil) + peerID, err := id.Alloc() + c.Assert(err, IsNil) + region := &metapb.Region{ + Id: regionID, + Peers: []*metapb.Peer{{Id: peerID, StoreId: storeID}}, + StartKey: []byte{byte(i)}, + EndKey: []byte{byte(i + 1)}, + } + rc.HandleRegionHeartbeat(core.NewRegionInfo(region, region.Peers[0])) + } + + time.Sleep(50 * time.Millisecond) + c.Assert(rc.GetStore(storeID).GetLeaderCount(), Equals, 3) +}