From f3a948acb564799b80cb1d827f6f75f9a63e1e91 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 3 Jan 2024 16:38:02 +0800 Subject: [PATCH 1/5] resource_mananger: deep clone resource group (#7623) (#7624) close tikv/pd#7206 resource_mananger: deep clone resource group Signed-off-by: ti-chi-bot Signed-off-by: nolouch Co-authored-by: ShuNing Co-authored-by: nolouch --- pkg/mcs/resource_manager/server/manager.go | 4 +-- .../resource_manager/server/resource_group.go | 34 ++++++++++++------- .../server/resource_group_test.go | 2 +- .../resource_manager/server/token_bukets.go | 26 ++++++++++++-- 4 files changed, 48 insertions(+), 18 deletions(-) diff --git a/pkg/mcs/resource_manager/server/manager.go b/pkg/mcs/resource_manager/server/manager.go index d50b9c32b8b..215db2e676c 100644 --- a/pkg/mcs/resource_manager/server/manager.go +++ b/pkg/mcs/resource_manager/server/manager.go @@ -278,7 +278,7 @@ func (m *Manager) GetResourceGroup(name string) *ResourceGroup { m.RLock() defer m.RUnlock() if group, ok := m.groups[name]; ok { - return group.Copy() + return group.Clone() } return nil } @@ -298,7 +298,7 @@ func (m *Manager) GetResourceGroupList() []*ResourceGroup { m.RLock() res := make([]*ResourceGroup, 0, len(m.groups)) for _, group := range m.groups { - res = append(res, group.Copy()) + res = append(res, group.Clone()) } m.RUnlock() sort.Slice(res, func(i, j int) bool { diff --git a/pkg/mcs/resource_manager/server/resource_group.go b/pkg/mcs/resource_manager/server/resource_group.go index 9fbc4a09123..439838cb57e 100644 --- a/pkg/mcs/resource_manager/server/resource_group.go +++ b/pkg/mcs/resource_manager/server/resource_group.go @@ -42,6 +42,20 @@ type RequestUnitSettings struct { RU *GroupTokenBucket `json:"r_u,omitempty"` } +// Clone returns a deep copy of the RequestUnitSettings. +func (rus *RequestUnitSettings) Clone() *RequestUnitSettings { + if rus == nil { + return nil + } + var ru *GroupTokenBucket + if rus.RU != nil { + ru = rus.RU.Clone() + } + return &RequestUnitSettings{ + RU: ru, + } +} + // NewRequestUnitSettings creates a new RequestUnitSettings with the given token bucket. func NewRequestUnitSettings(tokenBucket *rmpb.TokenBucket) *RequestUnitSettings { return &RequestUnitSettings{ @@ -58,21 +72,17 @@ func (rg *ResourceGroup) String() string { return string(res) } -// Copy copies the resource group. -func (rg *ResourceGroup) Copy() *ResourceGroup { - // TODO: use a better way to copy +// Clone copies the resource group. +func (rg *ResourceGroup) Clone() *ResourceGroup { rg.RLock() defer rg.RUnlock() - res, err := json.Marshal(rg) - if err != nil { - panic(err) - } - var newRG ResourceGroup - err = json.Unmarshal(res, &newRG) - if err != nil { - panic(err) + newRG := &ResourceGroup{ + Name: rg.Name, + Mode: rg.Mode, + Priority: rg.Priority, + RUSettings: rg.RUSettings.Clone(), } - return &newRG + return newRG } func (rg *ResourceGroup) getRUToken() float64 { diff --git a/pkg/mcs/resource_manager/server/resource_group_test.go b/pkg/mcs/resource_manager/server/resource_group_test.go index c0be8851b97..26bf48bff30 100644 --- a/pkg/mcs/resource_manager/server/resource_group_test.go +++ b/pkg/mcs/resource_manager/server/resource_group_test.go @@ -29,7 +29,7 @@ func TestPatchResourceGroup(t *testing.T) { re.NoError(err) err = rg.PatchSettings(patch) re.NoError(err) - res, err := json.Marshal(rg.Copy()) + res, err := json.Marshal(rg.Clone()) re.NoError(err) re.Equal(ca.expectJSONString, string(res)) } diff --git a/pkg/mcs/resource_manager/server/token_bukets.go b/pkg/mcs/resource_manager/server/token_bukets.go index 47608203847..88e1be02942 100644 --- a/pkg/mcs/resource_manager/server/token_bukets.go +++ b/pkg/mcs/resource_manager/server/token_bukets.go @@ -49,6 +49,22 @@ type GroupTokenBucket struct { GroupTokenBucketState `json:"state,omitempty"` } +// Clone returns the deep copy of GroupTokenBucket +func (gtb *GroupTokenBucket) Clone() *GroupTokenBucket { + if gtb == nil { + return nil + } + var settings *rmpb.TokenLimitSettings + if gtb.Settings != nil { + settings = proto.Clone(gtb.Settings).(*rmpb.TokenLimitSettings) + } + stateClone := *gtb.GroupTokenBucketState.Clone() + return &GroupTokenBucket{ + Settings: settings, + GroupTokenBucketState: stateClone, + } +} + func (gtb *GroupTokenBucket) setState(state *GroupTokenBucketState) { gtb.Tokens = state.Tokens gtb.LastUpdate = state.LastUpdate @@ -85,10 +101,14 @@ type GroupTokenBucketState struct { // Clone returns the copy of GroupTokenBucketState func (gts *GroupTokenBucketState) Clone() *GroupTokenBucketState { - tokenSlots := make(map[uint64]*TokenSlot) - for id, tokens := range gts.tokenSlots { - tokenSlots[id] = tokens + var tokenSlots map[uint64]*TokenSlot + if gts.tokenSlots != nil { + tokenSlots = make(map[uint64]*TokenSlot) + for id, tokens := range gts.tokenSlots { + tokenSlots[id] = tokens + } } + var lastUpdate *time.Time if gts.LastUpdate != nil { newLastUpdate := *gts.LastUpdate From 79aac73c27467e2ee1694a0eefe4b354f6bd17c9 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 4 Jan 2024 18:32:03 +0800 Subject: [PATCH 2/5] resource_group: don't accumulate tokens when burstlimit less than 0 (#7626) (#7657) ref tikv/pd#7206 Signed-off-by: Cabinfever_B Co-authored-by: Cabinfever_B --- .../server/token_buckets_test.go | 31 ++++++++++++++++++- .../resource_manager/server/token_bukets.go | 22 +++++++------ 2 files changed, 42 insertions(+), 11 deletions(-) diff --git a/pkg/mcs/resource_manager/server/token_buckets_test.go b/pkg/mcs/resource_manager/server/token_buckets_test.go index a7d3b9e3bad..1e14d9eca6a 100644 --- a/pkg/mcs/resource_manager/server/token_buckets_test.go +++ b/pkg/mcs/resource_manager/server/token_buckets_test.go @@ -48,11 +48,40 @@ func TestGroupTokenBucketUpdateAndPatch(t *testing.T) { }, } tb.patch(tbSetting) - + time.Sleep(10 * time.Millisecond) time2 := time.Now() tb.request(time2, 0, 0, clientUniqueID) re.LessOrEqual(math.Abs(100000-tb.Tokens), time2.Sub(time1).Seconds()*float64(tbSetting.Settings.FillRate)+1e7) re.Equal(tbSetting.Settings.FillRate, tb.Settings.FillRate) + + tbSetting = &rmpb.TokenBucket{ + Tokens: 0, + Settings: &rmpb.TokenLimitSettings{ + FillRate: 2000, + BurstLimit: -1, + }, + } + tb = NewGroupTokenBucket(tbSetting) + tb.request(time2, 0, 0, clientUniqueID) + re.LessOrEqual(math.Abs(tbSetting.Tokens), 1e-7) + time3 := time.Now() + tb.request(time3, 0, 0, clientUniqueID) + re.LessOrEqual(math.Abs(tbSetting.Tokens), 1e-7) + + tbSetting = &rmpb.TokenBucket{ + Tokens: 200000, + Settings: &rmpb.TokenLimitSettings{ + FillRate: 2000, + BurstLimit: -1, + }, + } + tb = NewGroupTokenBucket(tbSetting) + tb.request(time3, 0, 0, clientUniqueID) + re.LessOrEqual(math.Abs(tbSetting.Tokens-200000), 1e-7) + time.Sleep(10 * time.Millisecond) + time4 := time.Now() + tb.request(time4, 0, 0, clientUniqueID) + re.LessOrEqual(math.Abs(tbSetting.Tokens-200000), 1e-7) } func TestGroupTokenBucketRequest(t *testing.T) { diff --git a/pkg/mcs/resource_manager/server/token_bukets.go b/pkg/mcs/resource_manager/server/token_bukets.go index 88e1be02942..1de00350bca 100644 --- a/pkg/mcs/resource_manager/server/token_bukets.go +++ b/pkg/mcs/resource_manager/server/token_bukets.go @@ -292,7 +292,7 @@ func (gtb *GroupTokenBucket) init(now time.Time, clientID uint64) { if gtb.Settings.FillRate == 0 { gtb.Settings.FillRate = defaultRefillRate } - if gtb.Tokens < defaultInitialTokens { + if gtb.Tokens < defaultInitialTokens && gtb.Settings.BurstLimit > 0 { gtb.Tokens = defaultInitialTokens } // init slot @@ -311,21 +311,23 @@ func (gtb *GroupTokenBucket) updateTokens(now time.Time, burstLimit int64, clien var elapseTokens float64 if !gtb.Initialized { gtb.init(now, clientUniqueID) - } else if delta := now.Sub(*gtb.LastUpdate); delta > 0 { - elapseTokens = float64(gtb.Settings.GetFillRate())*delta.Seconds() + gtb.lastBurstTokens - gtb.lastBurstTokens = 0 - gtb.Tokens += elapseTokens - gtb.LastUpdate = &now + } else if burst := float64(burstLimit); burst > 0 { + if delta := now.Sub(*gtb.LastUpdate); delta > 0 { + elapseTokens = float64(gtb.Settings.GetFillRate())*delta.Seconds() + gtb.lastBurstTokens + gtb.lastBurstTokens = 0 + gtb.Tokens += elapseTokens + } + if gtb.Tokens > burst { + elapseTokens -= gtb.Tokens - burst + gtb.Tokens = burst + } } + gtb.LastUpdate = &now // Reloan when setting changed if gtb.settingChanged && gtb.Tokens <= 0 { elapseTokens = 0 gtb.resetLoan() } - if burst := float64(burstLimit); burst > 0 && gtb.Tokens > burst { - elapseTokens -= gtb.Tokens - burst - gtb.Tokens = burst - } // Balance each slots. gtb.balanceSlotTokens(clientUniqueID, gtb.Settings, consumptionToken, elapseTokens) } From 6ffd9ca74c3dea5a66714c8ff74e247649a43386 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 10 Jan 2024 14:52:54 +0800 Subject: [PATCH 3/5] memory: support cgroup with systemd (#7627) (#7665) close tikv/pd#7628 Signed-off-by: bufferflies <1045931706@qq.com> Co-authored-by: bufferflies <1045931706@qq.com> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- cmd/pd-server/main.go | 3 ++- pkg/memory/meminfo.go | 38 +++++++++++++++++++++++++++++++++++++- 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/cmd/pd-server/main.go b/cmd/pd-server/main.go index 7f42c5b4adc..ab3cfdbb383 100644 --- a/cmd/pd-server/main.go +++ b/cmd/pd-server/main.go @@ -28,6 +28,7 @@ import ( "github.com/tikv/pd/pkg/errs" resource_manager "github.com/tikv/pd/pkg/mcs/resource_manager/server" tso "github.com/tikv/pd/pkg/mcs/tso/server" + "github.com/tikv/pd/pkg/memory" "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/pkg/swaggerserver" "github.com/tikv/pd/pkg/utils/configutil" @@ -185,7 +186,7 @@ func start(cmd *cobra.Command, args []string, services ...string) { } // Flushing any buffered log entries defer log.Sync() - + memory.InitMemoryHook() if len(services) != 0 { versioninfo.Log(server.APIServiceMode) } else { diff --git a/pkg/memory/meminfo.go b/pkg/memory/meminfo.go index 0981ddacdfb..cacd12b8909 100644 --- a/pkg/memory/meminfo.go +++ b/pkg/memory/meminfo.go @@ -52,9 +52,13 @@ func MemTotalNormal() (uint64, error) { if time.Since(t) < 60*time.Second { return total, nil } + return totalMem() +} + +func totalMem() (uint64, error) { v, err := mem.VirtualMemory() if err != nil { - return v.Total, err + return 0, err } memLimit.set(v.Total, time.Now()) return v.Total, nil @@ -182,6 +186,38 @@ func init() { mustNil(err) } +// InitMemoryHook initializes the memory hook. +// It is to solve the problem that tidb cannot read cgroup in the systemd. +// so if we are not in the container, we compare the cgroup memory limit and the physical memory, +// the cgroup memory limit is smaller, we use the cgroup memory hook. +// ref https://github.com/pingcap/tidb/pull/48096/ +func InitMemoryHook() { + if cgroup.InContainer() { + log.Info("use cgroup memory hook because pd is in the container") + return + } + cgroupValue, err := cgroup.GetMemoryLimit() + if err != nil { + return + } + physicalValue, err := totalMem() + if err != nil { + return + } + if physicalValue > cgroupValue && cgroupValue != 0 { + MemTotal = MemTotalCGroup + MemUsed = MemUsedCGroup + sysutil.RegisterGetMemoryCapacity(MemTotalCGroup) + log.Info("use cgroup memory hook", zap.Int64("cgroupMemorySize", int64(cgroupValue)), zap.Int64("physicalMemorySize", int64(physicalValue))) + } else { + log.Info("use physical memory hook", zap.Int64("cgroupMemorySize", int64(cgroupValue)), zap.Int64("physicalMemorySize", int64(physicalValue))) + } + _, err = MemTotal() + mustNil(err) + _, err = MemUsed() + mustNil(err) +} + // InstanceMemUsed returns the memory usage of this process func InstanceMemUsed() (uint64, error) { used, t := serverMemUsage.get() From 2a2b949819e4449495951d4f61906cf1e262cfab Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 16 Jan 2024 15:49:46 +0800 Subject: [PATCH 4/5] scheduler: add aduit log for scheduler config API and add resp msg for evict-leader (#7674) (#7687) close tikv/pd#7672 Signed-off-by: ti-chi-bot Signed-off-by: Cabinfever_B Co-authored-by: Yongbo Jiang Co-authored-by: Cabinfever_B Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/schedule/schedulers/evict_leader.go | 2 +- pkg/schedule/schedulers/grant_leader.go | 2 +- server/api/router.go | 9 +++++---- server/api/scheduler.go | 2 +- server/api/server_test.go | 5 ++++- 5 files changed, 12 insertions(+), 8 deletions(-) diff --git a/pkg/schedule/schedulers/evict_leader.go b/pkg/schedule/schedulers/evict_leader.go index e6c493640f2..e4160725130 100644 --- a/pkg/schedule/schedulers/evict_leader.go +++ b/pkg/schedule/schedulers/evict_leader.go @@ -372,7 +372,7 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } - handler.rd.JSON(w, http.StatusOK, nil) + handler.rd.JSON(w, http.StatusOK, "The scheduler has been applied to the store.") } func (handler *evictLeaderHandler) ListConfig(w http.ResponseWriter, r *http.Request) { diff --git a/pkg/schedule/schedulers/grant_leader.go b/pkg/schedule/schedulers/grant_leader.go index ede85904c07..28bb59ceeaa 100644 --- a/pkg/schedule/schedulers/grant_leader.go +++ b/pkg/schedule/schedulers/grant_leader.go @@ -274,7 +274,7 @@ func (handler *grantLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } - handler.rd.JSON(w, http.StatusOK, nil) + handler.rd.JSON(w, http.StatusOK, "The scheduler has been applied to the store.") } func (handler *grantLeaderHandler) ListConfig(w http.ResponseWriter, r *http.Request) { diff --git a/server/api/router.go b/server/api/router.go index 9c587f01264..2e5cd6371f5 100644 --- a/server/api/router.go +++ b/server/api/router.go @@ -85,10 +85,10 @@ func getFunctionName(f interface{}) string { // @BasePath /pd/api/v1 func createRouter(prefix string, svr *server.Server) *mux.Router { serviceMiddle := newServiceMiddlewareBuilder(svr) - registerPrefix := func(router *mux.Router, prefixPath string, + registerPrefix := func(router *mux.Router, prefixPath, name string, handleFunc func(http.ResponseWriter, *http.Request), opts ...createRouteOption) { routeCreateFunc(router.PathPrefix(prefixPath), serviceMiddle.createHandler(handleFunc), - getFunctionName(handleFunc), opts...) + name, opts...) } registerFunc := func(router *mux.Router, path string, handleFunc func(http.ResponseWriter, *http.Request), opts ...createRouteOption) { @@ -148,7 +148,8 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { registerFunc(clusterRouter, "/schedulers/diagnostic/{name}", diagnosticHandler.GetDiagnosticResult, setMethods(http.MethodGet), setAuditBackend(prometheus)) schedulerConfigHandler := newSchedulerConfigHandler(svr, rd) - registerPrefix(apiRouter, "/scheduler-config", schedulerConfigHandler.GetSchedulerConfig, setAuditBackend(prometheus)) + registerPrefix(apiRouter, "/scheduler-config", "HandleSchedulerConfig", schedulerConfigHandler.HandleSchedulerConfig, setMethods(http.MethodPost, http.MethodDelete, http.MethodPut, http.MethodPatch), setAuditBackend(localLog, prometheus)) + registerPrefix(apiRouter, "/scheduler-config", "GetSchedulerConfig", schedulerConfigHandler.HandleSchedulerConfig, setMethods(http.MethodGet), setAuditBackend(prometheus)) clusterHandler := newClusterHandler(svr, rd) registerFunc(apiRouter, "/cluster", clusterHandler.GetCluster, setMethods(http.MethodGet), setAuditBackend(prometheus)) @@ -365,7 +366,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { // API to set or unset failpoints failpoint.Inject("enableFailpointAPI", func() { // this function will be named to "func2". It may be used in test - registerPrefix(apiRouter, "/fail", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + registerPrefix(apiRouter, "/fail", "FailPoint", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // The HTTP handler of failpoint requires the full path to be the failpoint path. r.URL.Path = strings.TrimPrefix(r.URL.Path, prefix+apiPrefix+"/fail") new(failpoint.HttpHandler).ServeHTTP(w, r) diff --git a/server/api/scheduler.go b/server/api/scheduler.go index cf1c82c658b..789d5791507 100644 --- a/server/api/scheduler.go +++ b/server/api/scheduler.go @@ -372,7 +372,7 @@ func newSchedulerConfigHandler(svr *server.Server, rd *render.Render) *scheduler } } -func (h *schedulerConfigHandler) GetSchedulerConfig(w http.ResponseWriter, r *http.Request) { +func (h *schedulerConfigHandler) HandleSchedulerConfig(w http.ResponseWriter, r *http.Request) { handler := h.svr.GetHandler() sh, err := handler.GetSchedulerConfigHandler() if err == nil && sh != nil { diff --git a/server/api/server_test.go b/server/api/server_test.go index 2e89ad797c3..a9e505c3ba3 100644 --- a/server/api/server_test.go +++ b/server/api/server_test.go @@ -173,7 +173,10 @@ func (suite *serviceTestSuite) TestServiceLabels() { accessPaths = suite.svr.GetServiceLabels("GetSchedulerConfig") suite.Len(accessPaths, 1) suite.Equal("/pd/api/v1/scheduler-config", accessPaths[0].Path) - suite.Equal("", accessPaths[0].Method) + suite.Equal("GET", accessPaths[0].Method) + accessPaths = suite.svr.GetServiceLabels("HandleSchedulerConfig") + suite.Len(accessPaths, 4) + suite.Equal("/pd/api/v1/scheduler-config", accessPaths[0].Path) accessPaths = suite.svr.GetServiceLabels("ResignLeader") suite.Len(accessPaths, 1) From e00f31efc1412b0f3ed6e10860f4b51bb0c937a7 Mon Sep 17 00:00:00 2001 From: disksing Date: Sun, 18 Feb 2024 17:34:55 +0800 Subject: [PATCH 5/5] replication: cherry-pick improvements for dr-autosync (#7789) close tikv/pd#7221 improve state replication and failover process for dr-autosync Signed-off-by: disksing Signed-off-by: ti-chi-bot Co-authored-by: Ti Chi Robot Co-authored-by: disksing --- server/replication/replication_mode.go | 261 +++++++++-------- server/replication/replication_mode_test.go | 306 ++++++++++++++++---- 2 files changed, 402 insertions(+), 165 deletions(-) diff --git a/server/replication/replication_mode.go b/server/replication/replication_mode.go index f1e8b5a9c8a..87205a0c149 100644 --- a/server/replication/replication_mode.go +++ b/server/replication/replication_mode.go @@ -30,7 +30,7 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/schedule" - "github.com/tikv/pd/pkg/slice" + "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/syncutil" @@ -63,7 +63,7 @@ type FileReplicater interface { // DrStatusFile is the file name that stores the dr status. const DrStatusFile = "DR_STATE" -const persistFileTimeout = time.Second * 10 +const persistFileTimeout = time.Second * 3 // ModeManager is used to control how raft logs are synchronized between // different tikv nodes. @@ -71,11 +71,11 @@ type ModeManager struct { initTime time.Time syncutil.RWMutex - config config.ReplicationModeConfig - storage endpoint.ReplicationStatusStorage - cluster schedule.Cluster - fileReplicater FileReplicater - replicatedMembers []uint64 + config config.ReplicationModeConfig + storage endpoint.ReplicationStatusStorage + cluster schedule.Cluster + fileReplicater FileReplicater + replicateState sync.Map drAutoSync drAutoSyncStatus // intermediate states of the recovery process @@ -241,7 +241,6 @@ func (m *ModeManager) drSwitchToAsyncWait(availableStores []uint64) error { return err } dr := drAutoSyncStatus{State: drStateAsyncWait, StateID: id, AvailableStores: availableStores} - m.drPersistStatusWithLock(dr) if err := m.storage.SaveReplicationStatus(modeDRAutoSync, dr); err != nil { log.Warn("failed to switch to async state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err)) return err @@ -264,7 +263,6 @@ func (m *ModeManager) drSwitchToAsyncWithLock(availableStores []uint64) error { return err } dr := drAutoSyncStatus{State: drStateAsync, StateID: id, AvailableStores: availableStores} - m.drPersistStatusWithLock(dr) if err := m.storage.SaveReplicationStatus(modeDRAutoSync, dr); err != nil { log.Warn("failed to switch to async state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err)) return err @@ -288,7 +286,6 @@ func (m *ModeManager) drSwitchToSyncRecoverWithLock() error { } now := time.Now() dr := drAutoSyncStatus{State: drStateSyncRecover, StateID: id, RecoverStartTime: &now} - m.drPersistStatusWithLock(dr) if err = m.storage.SaveReplicationStatus(modeDRAutoSync, dr); err != nil { log.Warn("failed to switch to sync_recover state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err)) return err @@ -308,7 +305,6 @@ func (m *ModeManager) drSwitchToSync() error { return err } dr := drAutoSyncStatus{State: drStateSync, StateID: id} - m.drPersistStatusWithLock(dr) if err := m.storage.SaveReplicationStatus(modeDRAutoSync, dr); err != nil { log.Warn("failed to switch to sync state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err)) return err @@ -318,50 +314,6 @@ func (m *ModeManager) drSwitchToSync() error { return nil } -func (m *ModeManager) drPersistStatusWithLock(status drAutoSyncStatus) { - ctx, cancel := context.WithTimeout(context.Background(), persistFileTimeout) - defer cancel() - - members, err := m.fileReplicater.GetMembers() - if err != nil { - log.Warn("failed to get members", zap.String("replicate-mode", modeDRAutoSync)) - return - } - - data, _ := json.Marshal(status) - - m.replicatedMembers = m.replicatedMembers[:0] - for _, member := range members { - if err := m.fileReplicater.ReplicateFileToMember(ctx, member, DrStatusFile, data); err != nil { - log.Warn("failed to switch state", zap.String("replicate-mode", modeDRAutoSync), zap.String("new-state", status.State), errs.ZapError(err)) - // Throw away the error to make it possible to switch to async when - // primary and dr DC are disconnected. This will result in the - // inability to accurately determine whether data is fully - // synchronized when using dr DC to disaster recovery. - // Since the member will not be in `replicatedMembers` list, PD will - // try to replicate state file later. - } else { - m.replicatedMembers = append(m.replicatedMembers, member.GetMemberId()) - } - } -} - -func (m *ModeManager) drCheckNeedPersistStatus(members []*pdpb.Member) bool { - m.RLock() - defer m.RUnlock() - return slice.AnyOf(members, func(i int) bool { // if there is any member in the new list - return slice.NoneOf(m.replicatedMembers, func(j int) bool { // not replicated - return m.replicatedMembers[j] == members[i].GetMemberId() - }) - }) -} - -func (m *ModeManager) drPersistStatus() { - m.Lock() - defer m.Unlock() - m.drPersistStatusWithLock(drAutoSyncStatus{State: m.drAutoSync.State, StateID: m.drAutoSync.StateID}) -} - func (m *ModeManager) drGetState() string { m.RLock() defer m.RUnlock() @@ -369,8 +321,9 @@ func (m *ModeManager) drGetState() string { } const ( - idleTimeout = time.Minute - tickInterval = 500 * time.Millisecond + idleTimeout = time.Minute + tickInterval = 500 * time.Millisecond + replicateStateInterval = time.Second * 5 ) // Run starts the background job. @@ -381,47 +334,103 @@ func (m *ModeManager) Run(ctx context.Context) { case <-ctx.Done(): return } - for { - select { - case <-time.After(tickInterval): - case <-ctx.Done(): - return + + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + for { + select { + case <-time.After(tickInterval): + case <-ctx.Done(): + return + } + m.tickUpdateState() + } + }() + + go func() { + defer wg.Done() + for { + select { + case <-time.After(replicateStateInterval): + case <-ctx.Done(): + return + } + m.tickReplicateStatus() } - m.tickDR() + }() + + wg.Wait() +} + +func minimalUpVoters(rule *placement.Rule, upStores, downStores []*core.StoreInfo) int { + if rule.Role == placement.Learner { + return 0 + } + var up, down int + for _, s := range upStores { + if placement.MatchLabelConstraints(s, rule.LabelConstraints) { + up++ + } + } + for _, s := range downStores { + if placement.MatchLabelConstraints(s, rule.LabelConstraints) { + down++ + } + } + minimalUp := rule.Count - down + if minimalUp < 0 { + minimalUp = 0 + } + if minimalUp > up { + minimalUp = up } + return minimalUp } -func (m *ModeManager) tickDR() { +func (m *ModeManager) tickUpdateState() { if m.getModeName() != modeDRAutoSync { return } drTickCounter.Inc() - totalPrimaryPeers, totalDrPeers := m.config.DRAutoSync.PrimaryReplicas, m.config.DRAutoSync.DRReplicas - stores := m.checkStoreStatus() + stores, storeIDs := m.checkStoreStatus() - // canSync is true when every region has at least 1 replica in each DC. - canSync := len(stores[primaryDown]) < totalPrimaryPeers && len(stores[drDown]) < totalDrPeers && - len(stores[primaryUp]) > 0 && len(stores[drUp]) > 0 + var primaryHasVoter, drHasVoter bool + var totalVoter, totalUpVoter int + for _, r := range m.cluster.GetRuleManager().GetAllRules() { + if len(r.StartKey) > 0 || len(r.EndKey) > 0 { + // All rules should be global rules. If not, skip it. + continue + } + if r.Role != placement.Learner { + totalVoter += r.Count + } + minimalUpPrimary := minimalUpVoters(r, stores[primaryUp], stores[primaryDown]) + minimalUpDr := minimalUpVoters(r, stores[drUp], stores[drDown]) + primaryHasVoter = primaryHasVoter || minimalUpPrimary > 0 + drHasVoter = drHasVoter || minimalUpDr > 0 + upVoters := minimalUpPrimary + minimalUpDr + if upVoters > r.Count { + upVoters = r.Count + } + totalUpVoter += upVoters + } + // canSync is true when every region has at least 1 voter replica in each DC. // hasMajority is true when every region has majority peer online. - var upPeers int - if len(stores[primaryDown]) < totalPrimaryPeers { - upPeers += totalPrimaryPeers - len(stores[primaryDown]) - } - if len(stores[drDown]) < totalDrPeers { - upPeers += totalDrPeers - len(stores[drDown]) - } - hasMajority := upPeers*2 > totalPrimaryPeers+totalDrPeers + canSync := primaryHasVoter && drHasVoter + hasMajority := totalUpVoter*2 > totalVoter log.Debug("replication store status", - zap.Uint64s("up-primary", stores[primaryUp]), - zap.Uint64s("up-dr", stores[drUp]), - zap.Uint64s("down-primary", stores[primaryDown]), - zap.Uint64s("down-dr", stores[drDown]), + zap.Uint64s("up-primary", storeIDs[primaryUp]), + zap.Uint64s("up-dr", storeIDs[drUp]), + zap.Uint64s("down-primary", storeIDs[primaryDown]), + zap.Uint64s("down-dr", storeIDs[drDown]), zap.Bool("can-sync", canSync), - zap.Int("up-peers", upPeers), zap.Bool("has-majority", hasMajority), ) @@ -447,31 +456,31 @@ func (m *ModeManager) tickDR() { case drStateSync: // If hasMajority is false, the cluster is always unavailable. Switch to async won't help. if !canSync && hasMajority { - m.drSwitchToAsyncWait(stores[primaryUp]) + m.drSwitchToAsyncWait(storeIDs[primaryUp]) } case drStateAsyncWait: if canSync { m.drSwitchToSync() break } - if oldAvailableStores := m.drGetAvailableStores(); !reflect.DeepEqual(oldAvailableStores, stores[primaryUp]) { - m.drSwitchToAsyncWait(stores[primaryUp]) + if oldAvailableStores := m.drGetAvailableStores(); !reflect.DeepEqual(oldAvailableStores, storeIDs[primaryUp]) { + m.drSwitchToAsyncWait(storeIDs[primaryUp]) break } - if m.drCheckStoreStateUpdated(stores[primaryUp]) { - m.drSwitchToAsync(stores[primaryUp]) + if m.drCheckStoreStateUpdated(storeIDs[primaryUp]) { + m.drSwitchToAsync(storeIDs[primaryUp]) } case drStateAsync: if canSync { m.drSwitchToSyncRecover() break } - if !reflect.DeepEqual(m.drGetAvailableStores(), stores[primaryUp]) && m.drCheckStoreStateUpdated(stores[primaryUp]) { - m.drSwitchToAsync(stores[primaryUp]) + if !reflect.DeepEqual(m.drGetAvailableStores(), storeIDs[primaryUp]) && m.drCheckStoreStateUpdated(storeIDs[primaryUp]) { + m.drSwitchToAsync(storeIDs[primaryUp]) } case drStateSyncRecover: if !canSync && hasMajority { - m.drSwitchToAsync(stores[primaryUp]) + m.drSwitchToAsync(storeIDs[primaryUp]) } else { m.updateProgress() progress := m.estimateProgress() @@ -484,8 +493,42 @@ func (m *ModeManager) tickDR() { } } } +} + +func (m *ModeManager) tickReplicateStatus() { + if m.getModeName() != modeDRAutoSync { + return + } - m.checkReplicateFile() + m.RLock() + state := drAutoSyncStatus{ + State: m.drAutoSync.State, + StateID: m.drAutoSync.StateID, + AvailableStores: m.drAutoSync.AvailableStores, + RecoverStartTime: m.drAutoSync.RecoverStartTime, + } + m.RUnlock() + + data, _ := json.Marshal(state) + + members, err := m.fileReplicater.GetMembers() + if err != nil { + log.Warn("failed to get members", zap.String("replicate-mode", modeDRAutoSync)) + return + } + for _, member := range members { + stateID, ok := m.replicateState.Load(member.GetMemberId()) + if !ok || stateID.(uint64) != state.StateID { + ctx, cancel := context.WithTimeout(context.Background(), persistFileTimeout) + err := m.fileReplicater.ReplicateFileToMember(ctx, member, DrStatusFile, data) + if err != nil { + log.Warn("failed to switch state", zap.String("replicate-mode", modeDRAutoSync), zap.String("new-state", state.State), errs.ZapError(err)) + } else { + m.replicateState.Store(member.GetMemberId(), state.StateID) + } + cancel() + } + } } const ( @@ -496,39 +539,40 @@ const ( storeStatusTypeCount ) -func (m *ModeManager) checkStoreStatus() [][]uint64 { +func (m *ModeManager) checkStoreStatus() ([][]*core.StoreInfo, [][]uint64) { m.RLock() defer m.RUnlock() - stores := make([][]uint64, storeStatusTypeCount) + stores, storeIDs := make([][]*core.StoreInfo, storeStatusTypeCount), make([][]uint64, storeStatusTypeCount) for _, s := range m.cluster.GetStores() { if s.IsRemoved() { continue } - // learner peers do not participate in major commit or vote, so it should not count in primary/dr as a normal store. - if s.GetRegionCount() == s.GetLearnerCount() { - continue - } down := s.DownTime() >= m.config.DRAutoSync.WaitStoreTimeout.Duration labelValue := s.GetLabelValue(m.config.DRAutoSync.LabelKey) if labelValue == m.config.DRAutoSync.Primary { if down { - stores[primaryDown] = append(stores[primaryDown], s.GetID()) + stores[primaryDown] = append(stores[primaryDown], s) + storeIDs[primaryDown] = append(storeIDs[primaryDown], s.GetID()) } else { - stores[primaryUp] = append(stores[primaryUp], s.GetID()) + stores[primaryUp] = append(stores[primaryUp], s) + storeIDs[primaryUp] = append(storeIDs[primaryUp], s.GetID()) } } if labelValue == m.config.DRAutoSync.DR { if down { - stores[drDown] = append(stores[drDown], s.GetID()) + stores[drDown] = append(stores[drDown], s) + storeIDs[drDown] = append(storeIDs[drDown], s.GetID()) } else { - stores[drUp] = append(stores[drUp], s.GetID()) + stores[drUp] = append(stores[drUp], s) + storeIDs[drUp] = append(storeIDs[drUp], s.GetID()) } } } for i := range stores { - sort.Slice(stores[i], func(a, b int) bool { return stores[i][a] < stores[i][b] }) + sort.Slice(stores[i], func(a, b int) bool { return stores[i][a].GetID() < stores[i][b].GetID() }) + sort.Slice(storeIDs[i], func(a, b int) bool { return storeIDs[i][a] < storeIDs[i][b] }) } - return stores + return stores, storeIDs } // UpdateStoreDRStatus saves the dr-autosync status of a store. @@ -557,17 +601,6 @@ func (m *ModeManager) drCheckStoreStateUpdated(stores []uint64) bool { return true } -func (m *ModeManager) checkReplicateFile() { - members, err := m.fileReplicater.GetMembers() - if err != nil { - log.Warn("failed to get members", zap.String("replicate-mode", modeDRAutoSync)) - return - } - if m.drCheckNeedPersistStatus(members) { - m.drPersistStatus() - } -} - var ( regionScanBatchSize = 1024 regionMinSampleSize = 512 diff --git a/server/replication/replication_mode_test.go b/server/replication/replication_mode_test.go index 09456893eb0..e01fb7a0b9a 100644 --- a/server/replication/replication_mode_test.go +++ b/server/replication/replication_mode_test.go @@ -27,6 +27,7 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/mock/mockcluster" "github.com/tikv/pd/pkg/mock/mockconfig" + "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/utils/typeutil" "github.com/tikv/pd/server/config" @@ -167,14 +168,17 @@ func TestStateSwitch(t *testing.T) { LabelKey: "zone", Primary: "zone1", DR: "zone2", - PrimaryReplicas: 4, - DRReplicas: 2, WaitStoreTimeout: typeutil.Duration{Duration: time.Minute}, }} cluster := mockcluster.NewCluster(ctx, mockconfig.NewTestOptions()) replicator := newMockReplicator([]uint64{1}) rep, err := NewReplicationModeManager(conf, store, cluster, replicator) re.NoError(err) + cluster.GetRuleManager().SetAllGroupBundles( + genPlacementRuleConfig([]ruleConfig{ + {key: "zone", value: "zone1", role: placement.Voter, count: 4}, + {key: "zone", value: "zone2", role: placement.Voter, count: 2}, + }), true) cluster.AddLabelsStore(1, 1, map[string]string{"zone": "zone1"}) cluster.AddLabelsStore(2, 1, map[string]string{"zone": "zone1"}) @@ -185,6 +189,7 @@ func TestStateSwitch(t *testing.T) { re.Equal(drStateSync, rep.drGetState()) stateID := rep.drAutoSync.StateID re.NotEqual(uint64(0), stateID) + rep.tickReplicateStatus() re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[1]) assertStateIDUpdate := func() { re.NotEqual(stateID, rep.drAutoSync.StateID) @@ -198,9 +203,10 @@ func TestStateSwitch(t *testing.T) { } // only one zone, sync -> async_wait -> async - rep.tickDR() + rep.tickUpdateState() re.Equal(drStateAsyncWait, rep.drGetState()) assertStateIDUpdate() + rep.tickReplicateStatus() re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1]) re.False(rep.GetReplicationStatus().GetDrAutoSync().GetPauseRegionSplit()) @@ -209,112 +215,119 @@ func TestStateSwitch(t *testing.T) { re.True(rep.GetReplicationStatus().GetDrAutoSync().GetPauseRegionSplit()) syncStoreStatus(1, 2, 3, 4) - rep.tickDR() + rep.tickUpdateState() assertStateIDUpdate() + rep.tickReplicateStatus() re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1]) // add new store in dr zone. cluster.AddLabelsStore(5, 1, map[string]string{"zone": "zone2"}) cluster.AddLabersStoreWithLearnerCount(6, 1, 1, map[string]string{"zone": "zone2"}) // async -> sync - rep.tickDR() + rep.tickUpdateState() re.Equal(drStateSyncRecover, rep.drGetState()) rep.drSwitchToSync() re.Equal(drStateSync, rep.drGetState()) assertStateIDUpdate() // sync -> async_wait - rep.tickDR() + rep.tickUpdateState() re.Equal(drStateSync, rep.drGetState()) setStoreState(cluster, "down", "up", "up", "up", "up", "up") - rep.tickDR() + rep.tickUpdateState() re.Equal(drStateSync, rep.drGetState()) setStoreState(cluster, "down", "down", "up", "up", "up", "up") setStoreState(cluster, "down", "down", "down", "up", "up", "up") - rep.tickDR() + rep.tickUpdateState() re.Equal(drStateSync, rep.drGetState()) // cannot guarantee majority, keep sync. setStoreState(cluster, "up", "up", "up", "up", "up", "down") - rep.tickDR() + rep.tickUpdateState() re.Equal(drStateSync, rep.drGetState()) - // once the voter node down, even learner node up, swith to async state. - setStoreState(cluster, "up", "up", "up", "up", "down", "up") - rep.tickDR() + // once zone2 down, swith to async state. + setStoreState(cluster, "up", "up", "up", "up", "down", "down") + rep.tickUpdateState() re.Equal(drStateAsyncWait, rep.drGetState()) rep.drSwitchToSync() replicator.errors[2] = errors.New("fail to replicate") - rep.tickDR() + rep.tickUpdateState() re.Equal(drStateAsyncWait, rep.drGetState()) assertStateIDUpdate() delete(replicator.errors, 1) // async_wait -> sync setStoreState(cluster, "up", "up", "up", "up", "up", "up") - rep.tickDR() + rep.tickUpdateState() re.Equal(drStateSync, rep.drGetState()) re.False(rep.GetReplicationStatus().GetDrAutoSync().GetPauseRegionSplit()) // async_wait -> async_wait - setStoreState(cluster, "up", "up", "up", "up", "down", "up") - rep.tickDR() + setStoreState(cluster, "up", "up", "up", "up", "down", "down") + rep.tickUpdateState() re.Equal(drStateAsyncWait, rep.drGetState()) assertStateIDUpdate() + rep.tickReplicateStatus() re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1]) - setStoreState(cluster, "down", "up", "up", "up", "down", "up") - rep.tickDR() + setStoreState(cluster, "down", "up", "up", "up", "down", "down") + rep.tickUpdateState() assertStateIDUpdate() + rep.tickReplicateStatus() re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[2,3,4]}`, stateID), replicator.lastData[1]) - setStoreState(cluster, "up", "down", "up", "up", "down", "up") - rep.tickDR() + setStoreState(cluster, "up", "down", "up", "up", "down", "down") + rep.tickUpdateState() assertStateIDUpdate() + rep.tickReplicateStatus() re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,3,4]}`, stateID), replicator.lastData[1]) // async_wait -> async - rep.tickDR() + rep.tickUpdateState() re.Equal(drStateAsyncWait, rep.drGetState()) syncStoreStatus(1, 3) - rep.tickDR() + rep.tickUpdateState() re.Equal(drStateAsyncWait, rep.drGetState()) syncStoreStatus(4) - rep.tickDR() + rep.tickUpdateState() assertStateIDUpdate() + rep.tickReplicateStatus() re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,3,4]}`, stateID), replicator.lastData[1]) // async -> async - setStoreState(cluster, "up", "up", "up", "up", "down", "up") - rep.tickDR() + setStoreState(cluster, "up", "up", "up", "up", "down", "down") + rep.tickUpdateState() // store 2 won't be available before it syncs status. + rep.tickReplicateStatus() re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,3,4]}`, stateID), replicator.lastData[1]) syncStoreStatus(1, 2, 3, 4) - rep.tickDR() + rep.tickUpdateState() assertStateIDUpdate() + rep.tickReplicateStatus() re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1]) // async -> sync_recover setStoreState(cluster, "up", "up", "up", "up", "up", "up") - rep.tickDR() + rep.tickUpdateState() re.Equal(drStateSyncRecover, rep.drGetState()) assertStateIDUpdate() rep.drSwitchToAsync([]uint64{1, 2, 3, 4, 5}) setStoreState(cluster, "down", "up", "up", "up", "up", "up") - rep.tickDR() + rep.tickUpdateState() re.Equal(drStateSyncRecover, rep.drGetState()) assertStateIDUpdate() // sync_recover -> async - rep.tickDR() + rep.tickUpdateState() re.Equal(drStateSyncRecover, rep.drGetState()) - setStoreState(cluster, "up", "up", "up", "up", "down", "up") - rep.tickDR() + setStoreState(cluster, "up", "up", "up", "up", "down", "down") + rep.tickUpdateState() re.Equal(drStateAsync, rep.drGetState()) assertStateIDUpdate() // lost majority, does not switch to async. rep.drSwitchToSyncRecover() assertStateIDUpdate() - setStoreState(cluster, "down", "down", "up", "up", "down", "up") - rep.tickDR() + setStoreState(cluster, "down", "down", "up", "up", "down", "down") + rep.tickUpdateState() re.Equal(drStateSyncRecover, rep.drGetState()) // sync_recover -> sync @@ -328,7 +341,7 @@ func TestStateSwitch(t *testing.T) { State: pb.RegionReplicationState_SIMPLE_MAJORITY, })) cluster.PutRegion(region) - rep.tickDR() + rep.tickUpdateState() re.Equal(drStateSyncRecover, rep.drGetState()) region = region.Clone(core.SetReplicationStatus(&pb.RegionReplicationStatus{ @@ -336,14 +349,14 @@ func TestStateSwitch(t *testing.T) { StateId: rep.drAutoSync.StateID - 1, // mismatch state id })) cluster.PutRegion(region) - rep.tickDR() + rep.tickUpdateState() re.Equal(drStateSyncRecover, rep.drGetState()) region = region.Clone(core.SetReplicationStatus(&pb.RegionReplicationStatus{ State: pb.RegionReplicationState_INTEGRITY_OVER_LABEL, StateId: rep.drAutoSync.StateID, })) cluster.PutRegion(region) - rep.tickDR() + rep.tickUpdateState() re.Equal(drStateSync, rep.drGetState()) assertStateIDUpdate() } @@ -357,37 +370,44 @@ func TestReplicateState(t *testing.T) { LabelKey: "zone", Primary: "zone1", DR: "zone2", - PrimaryReplicas: 2, - DRReplicas: 1, WaitStoreTimeout: typeutil.Duration{Duration: time.Minute}, }} cluster := mockcluster.NewCluster(ctx, mockconfig.NewTestOptions()) + cluster.GetRuleManager().SetAllGroupBundles( + genPlacementRuleConfig([]ruleConfig{ + {key: "zone", value: "zone1", role: placement.Voter, count: 2}, + {key: "zone", value: "zone2", role: placement.Voter, count: 1}, + }), true) replicator := newMockReplicator([]uint64{1}) rep, err := NewReplicationModeManager(conf, store, cluster, replicator) re.NoError(err) + cluster.AddLabelsStore(1, 1, map[string]string{"zone": "zone1"}) + cluster.AddLabelsStore(2, 1, map[string]string{"zone": "zone1"}) stateID := rep.drAutoSync.StateID // replicate after initialized + rep.tickReplicateStatus() re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[1]) // repliate state to new member replicator.memberIDs = append(replicator.memberIDs, 2, 3) - rep.checkReplicateFile() + rep.tickReplicateStatus() re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[2]) re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[3]) // inject error replicator.errors[2] = errors.New("failed to persist") - rep.tickDR() // switch async_wait since there is only one zone + rep.tickUpdateState() // switch async_wait since there is only one zone newStateID := rep.drAutoSync.StateID - re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d}`, newStateID), replicator.lastData[1]) + rep.tickReplicateStatus() + re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2]}`, newStateID), replicator.lastData[1]) re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[2]) - re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d}`, newStateID), replicator.lastData[3]) + re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2]}`, newStateID), replicator.lastData[3]) // clear error, replicate to node 2 next time delete(replicator.errors, 2) - rep.checkReplicateFile() - re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d}`, newStateID), replicator.lastData[2]) + rep.tickReplicateStatus() + re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2]}`, newStateID), replicator.lastData[2]) } func TestAsynctimeout(t *testing.T) { @@ -399,11 +419,14 @@ func TestAsynctimeout(t *testing.T) { LabelKey: "zone", Primary: "zone1", DR: "zone2", - PrimaryReplicas: 2, - DRReplicas: 1, WaitStoreTimeout: typeutil.Duration{Duration: time.Minute}, }} cluster := mockcluster.NewCluster(ctx, mockconfig.NewTestOptions()) + cluster.GetRuleManager().SetAllGroupBundles( + genPlacementRuleConfig([]ruleConfig{ + {key: "zone", value: "zone1", role: placement.Voter, count: 2}, + {key: "zone", value: "zone2", role: placement.Voter, count: 1}, + }), true) var replicator mockFileReplicator rep, err := NewReplicationModeManager(conf, store, cluster, &replicator) re.NoError(err) @@ -413,7 +436,7 @@ func TestAsynctimeout(t *testing.T) { cluster.AddLabelsStore(3, 1, map[string]string{"zone": "zone2"}) setStoreState(cluster, "up", "up", "down") - rep.tickDR() + rep.tickUpdateState() re.Equal(drStateAsyncWait, rep.drGetState()) } @@ -442,11 +465,14 @@ func TestRecoverProgress(t *testing.T) { LabelKey: "zone", Primary: "zone1", DR: "zone2", - PrimaryReplicas: 2, - DRReplicas: 1, WaitStoreTimeout: typeutil.Duration{Duration: time.Minute}, }} cluster := mockcluster.NewCluster(ctx, mockconfig.NewTestOptions()) + cluster.GetRuleManager().SetAllGroupBundles( + genPlacementRuleConfig([]ruleConfig{ + {key: "zone", value: "zone1", role: placement.Voter, count: 2}, + {key: "zone", value: "zone2", role: placement.Voter, count: 1}, + }), true) cluster.AddLabelsStore(1, 1, map[string]string{}) rep, err := NewReplicationModeManager(conf, store, cluster, newMockReplicator([]uint64{1})) re.NoError(err) @@ -504,11 +530,14 @@ func TestRecoverProgressWithSplitAndMerge(t *testing.T) { LabelKey: "zone", Primary: "zone1", DR: "zone2", - PrimaryReplicas: 2, - DRReplicas: 1, WaitStoreTimeout: typeutil.Duration{Duration: time.Minute}, }} cluster := mockcluster.NewCluster(ctx, mockconfig.NewTestOptions()) + cluster.GetRuleManager().SetAllGroupBundles( + genPlacementRuleConfig([]ruleConfig{ + {key: "zone", value: "zone1", role: placement.Voter, count: 2}, + {key: "zone", value: "zone2", role: placement.Voter, count: 1}, + }), true) cluster.AddLabelsStore(1, 1, map[string]string{}) rep, err := NewReplicationModeManager(conf, store, cluster, newMockReplicator([]uint64{1})) re.NoError(err) @@ -560,6 +589,157 @@ func TestRecoverProgressWithSplitAndMerge(t *testing.T) { re.Equal(float32(1.0), rep.estimateProgress()) } +func TestComplexPlacementRules(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + store := storage.NewStorageWithMemoryBackend() + conf := config.ReplicationModeConfig{ReplicationMode: modeDRAutoSync, DRAutoSync: config.DRAutoSyncReplicationConfig{ + LabelKey: "zone", + Primary: "zone1", + DR: "zone2", + WaitStoreTimeout: typeutil.Duration{Duration: time.Minute}, + }} + cluster := mockcluster.NewCluster(ctx, mockconfig.NewTestOptions()) + replicator := newMockReplicator([]uint64{1}) + rep, err := NewReplicationModeManager(conf, store, cluster, replicator) + re.NoError(err) + cluster.GetRuleManager().SetAllGroupBundles( + genPlacementRuleConfig([]ruleConfig{ + {key: "logic", value: "logic1", role: placement.Voter, count: 1}, + {key: "logic", value: "logic2", role: placement.Voter, count: 1}, + {key: "logic", value: "logic3", role: placement.Voter, count: 1}, + {key: "logic", value: "logic4", role: placement.Voter, count: 1}, + {key: "logic", value: "logic5", role: placement.Voter, count: 1}, + }), true) + + cluster.AddLabelsStore(1, 1, map[string]string{"zone": "zone1", "logic": "logic1"}) + cluster.AddLabelsStore(2, 1, map[string]string{"zone": "zone1", "logic": "logic1"}) + cluster.AddLabelsStore(3, 1, map[string]string{"zone": "zone1", "logic": "logic2"}) + cluster.AddLabelsStore(4, 1, map[string]string{"zone": "zone1", "logic": "logic2"}) + cluster.AddLabelsStore(5, 1, map[string]string{"zone": "zone1", "logic": "logic3"}) + cluster.AddLabelsStore(6, 1, map[string]string{"zone": "zone1", "logic": "logic3"}) + cluster.AddLabelsStore(7, 1, map[string]string{"zone": "zone2", "logic": "logic4"}) + cluster.AddLabelsStore(8, 1, map[string]string{"zone": "zone2", "logic": "logic4"}) + cluster.AddLabelsStore(9, 1, map[string]string{"zone": "zone2", "logic": "logic5"}) + cluster.AddLabelsStore(10, 1, map[string]string{"zone": "zone2", "logic": "logic5"}) + + // initial state is sync + re.Equal(drStateSync, rep.drGetState()) + + // down logic3 + logic5, can remain sync + setStoreState(cluster, "up", "up", "up", "up", "down", "down", "up", "up", "down", "down") + rep.tickUpdateState() + re.Equal(drStateSync, rep.drGetState()) + + // down 1 tikv from logic4 + 1 tikv from logic5, cannot sync + setStoreState(cluster, "up", "up", "up", "up", "up", "up", "up", "down", "up", "down") + rep.tickUpdateState() + re.Equal(drStateAsyncWait, rep.drGetState()) + rep.tickReplicateStatus() + re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4,5,6]}`, rep.drAutoSync.StateID), replicator.lastData[1]) + + // reset to sync + setStoreState(cluster, "up", "up", "up", "up", "up", "up", "up", "up", "up", "up") + rep.tickUpdateState() + re.Equal(drStateSync, rep.drGetState()) + + // lost majority, down 1 tikv from logic2 + 1 tikv from logic3 + 1tikv from logic5, remain sync state + setStoreState(cluster, "up", "up", "up", "down", "up", "down", "up", "up", "up", "down") + rep.tickUpdateState() + re.Equal(drStateSync, rep.drGetState()) +} + +func TestComplexPlacementRules2(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + store := storage.NewStorageWithMemoryBackend() + conf := config.ReplicationModeConfig{ReplicationMode: modeDRAutoSync, DRAutoSync: config.DRAutoSyncReplicationConfig{ + LabelKey: "zone", + Primary: "zone1", + DR: "zone2", + WaitStoreTimeout: typeutil.Duration{Duration: time.Minute}, + }} + cluster := mockcluster.NewCluster(ctx, mockconfig.NewTestOptions()) + replicator := newMockReplicator([]uint64{1}) + rep, err := NewReplicationModeManager(conf, store, cluster, replicator) + re.NoError(err) + cluster.GetRuleManager().SetAllGroupBundles( + genPlacementRuleConfig([]ruleConfig{ + {key: "logic", value: "logic1", role: placement.Voter, count: 2}, + {key: "logic", value: "logic2", role: placement.Voter, count: 1}, + {key: "logic", value: "logic3", role: placement.Voter, count: 2}, + }), true) + + cluster.AddLabelsStore(1, 1, map[string]string{"zone": "zone1", "logic": "logic1"}) + cluster.AddLabelsStore(2, 1, map[string]string{"zone": "zone1", "logic": "logic1"}) + cluster.AddLabelsStore(3, 1, map[string]string{"zone": "zone1", "logic": "logic2"}) + cluster.AddLabelsStore(4, 1, map[string]string{"zone": "zone1", "logic": "logic2"}) + cluster.AddLabelsStore(5, 1, map[string]string{"zone": "zone2", "logic": "logic3"}) + cluster.AddLabelsStore(6, 1, map[string]string{"zone": "zone2", "logic": "logic3"}) + cluster.AddLabelsStore(7, 1, map[string]string{"zone": "zone2", "logic": "logic3"}) + + // initial state is sync + re.Equal(drStateSync, rep.drGetState()) + + // down 1 from logic3, can remain sync + setStoreState(cluster, "up", "up", "up", "up", "up", "down", "up") + rep.tickUpdateState() + re.Equal(drStateSync, rep.drGetState()) + + // down 1 from logic1, 1 from logic2, can remain sync + setStoreState(cluster, "up", "down", "up", "down", "up", "up", "up") + rep.tickUpdateState() + re.Equal(drStateSync, rep.drGetState()) + + // down another from logic3, cannot sync + setStoreState(cluster, "up", "up", "up", "up", "down", "down", "up") + rep.tickUpdateState() + re.Equal(drStateAsyncWait, rep.drGetState()) + rep.tickReplicateStatus() + re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, rep.drAutoSync.StateID), replicator.lastData[1]) +} + +func TestComplexPlacementRules3(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + store := storage.NewStorageWithMemoryBackend() + conf := config.ReplicationModeConfig{ReplicationMode: modeDRAutoSync, DRAutoSync: config.DRAutoSyncReplicationConfig{ + LabelKey: "zone", + Primary: "zone1", + DR: "zone2", + WaitStoreTimeout: typeutil.Duration{Duration: time.Minute}, + }} + cluster := mockcluster.NewCluster(ctx, mockconfig.NewTestOptions()) + replicator := newMockReplicator([]uint64{1}) + rep, err := NewReplicationModeManager(conf, store, cluster, replicator) + re.NoError(err) + cluster.GetRuleManager().SetAllGroupBundles( + genPlacementRuleConfig([]ruleConfig{ + {key: "logic", value: "logic1", role: placement.Voter, count: 2}, + {key: "logic", value: "logic2", role: placement.Learner, count: 1}, + {key: "logic", value: "logic3", role: placement.Voter, count: 1}, + }), true) + + cluster.AddLabelsStore(1, 1, map[string]string{"zone": "zone1", "logic": "logic1"}) + cluster.AddLabelsStore(2, 1, map[string]string{"zone": "zone1", "logic": "logic1"}) + cluster.AddLabelsStore(3, 1, map[string]string{"zone": "zone1", "logic": "logic2"}) + cluster.AddLabelsStore(4, 1, map[string]string{"zone": "zone1", "logic": "logic2"}) + cluster.AddLabelsStore(5, 1, map[string]string{"zone": "zone2", "logic": "logic3"}) + + // initial state is sync + re.Equal(drStateSync, rep.drGetState()) + + // zone2 down, switch state, available stores should contain logic2 (learner) + setStoreState(cluster, "up", "up", "up", "up", "down") + rep.tickUpdateState() + re.Equal(drStateAsyncWait, rep.drGetState()) + rep.tickReplicateStatus() + re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, rep.drAutoSync.StateID), replicator.lastData[1]) +} + func genRegions(cluster *mockcluster.Cluster, stateID uint64, n int) []*core.RegionInfo { var regions []*core.RegionInfo for i := 1; i <= n; i++ { @@ -579,3 +759,27 @@ func genRegions(cluster *mockcluster.Cluster, stateID uint64, n int) []*core.Reg } return regions } + +type ruleConfig struct { + key string + value string + role placement.PeerRoleType + count int +} + +func genPlacementRuleConfig(rules []ruleConfig) []placement.GroupBundle { + group := placement.GroupBundle{ + ID: "group1", + } + for i, r := range rules { + group.Rules = append(group.Rules, &placement.Rule{ + ID: fmt.Sprintf("rule%d", i), + Role: r.role, + LabelConstraints: []placement.LabelConstraint{ + {Key: r.key, Op: placement.In, Values: []string{r.value}}, + }, + Count: r.count, + }) + } + return []placement.GroupBundle{group} +}