diff --git a/integration_tests/pd_api_test.go b/integration_tests/pd_api_test.go index 349cb3847..cc5679059 100644 --- a/integration_tests/pd_api_test.go +++ b/integration_tests/pd_api_test.go @@ -212,7 +212,7 @@ func (s *apiTestSuite) TestInitClusterMinResolvedTSZero() { // Try to get the minimum resolved timestamp of the cluster from PD. require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`)) retryCount = 0 - for s.store.GetMinSafeTS(oracle.GlobalTxnScope) == math.MaxUint64 { + for s.store.GetMinSafeTS(oracle.GlobalTxnScope) == 0 { time.Sleep(100 * time.Millisecond) if retryCount > 5 { break diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index ac2608c90..bea8bec11 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -2692,7 +2692,10 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) { if s.addr != addr || !s.IsSameLabels(store.GetLabels()) { newStore := &Store{storeID: s.storeID, addr: addr, peerAddr: store.GetPeerAddress(), saddr: store.GetStatusAddress(), storeType: storeType, labels: store.GetLabels(), state: uint64(resolved)} newStore.livenessState = atomic.LoadUint32(&s.livenessState) - newStore.unreachableSince = s.unreachableSince + if newStore.getLivenessState() != reachable { + newStore.unreachableSince = s.unreachableSince + go newStore.checkUntilHealth(c, newStore.getLivenessState(), storeReResolveInterval) + } c.storeMu.Lock() if s.addr == addr { newStore.slowScore = s.slowScore @@ -2700,6 +2703,14 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) { c.storeMu.stores[newStore.storeID] = newStore c.storeMu.Unlock() s.setResolveState(deleted) + logutil.BgLogger().Info("store address or labels changed, add new store and mark old store deleted", + zap.Uint64("store", s.storeID), + zap.String("old-addr", s.addr), + zap.Any("old-labels", s.labels), + zap.String("old-liveness", s.getLivenessState().String()), + zap.String("new-addr", newStore.addr), + zap.Any("new-labels", newStore.labels), + zap.String("new-liveness", newStore.getLivenessState().String())) return false, nil } s.changeResolveStateTo(needCheck, resolved) @@ -2851,6 +2862,8 @@ func (s livenessState) String() string { } } +var storeReResolveInterval = 30 * time.Second + func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache, liveness livenessState) { // This mechanism doesn't support non-TiKV stores currently. if s.storeType != tikvrpc.TiKV { @@ -2862,7 +2875,7 @@ func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache, liveness livenessSt // It may be already started by another thread. if atomic.CompareAndSwapUint32(&s.livenessState, uint32(reachable), uint32(liveness)) { s.unreachableSince = time.Now() - reResolveInterval := 30 * time.Second + reResolveInterval := storeReResolveInterval if val, err := util.EvalFailpoint("injectReResolveInterval"); err == nil { if dur, err := time.ParseDuration(val.(string)); err == nil { reResolveInterval = dur @@ -2890,6 +2903,10 @@ func (s *Store) checkUntilHealth(c *RegionCache, liveness livenessState, reResol case <-c.ctx.Done(): return case <-ticker.C: + if s.getResolveState() == deleted { + logutil.BgLogger().Info("[health check] store meta deleted, stop checking", zap.Uint64("storeID", s.storeID), zap.String("addr", s.addr), zap.String("state", s.getResolveState().String())) + return + } if time.Since(lastCheckPDTime) > reResolveInterval { lastCheckPDTime = time.Now() @@ -2897,19 +2914,7 @@ func (s *Store) checkUntilHealth(c *RegionCache, liveness livenessState, reResol if err != nil { logutil.BgLogger().Warn("[health check] failed to re-resolve unhealthy store", zap.Error(err)) } else if !valid { - if s.getResolveState() == deleted { - // if the store is deleted, a new store with same id must be inserted (guaranteed by reResolve). - c.storeMu.RLock() - newStore := c.storeMu.stores[s.storeID] - c.storeMu.RUnlock() - logutil.BgLogger().Info("[health check] store meta changed", - zap.Uint64("storeID", s.storeID), - zap.String("oldAddr", s.addr), - zap.String("oldLabels", fmt.Sprintf("%v", s.labels)), - zap.String("newAddr", newStore.addr), - zap.String("newLabels", fmt.Sprintf("%v", newStore.labels))) - go newStore.checkUntilHealth(c, liveness, reResolveInterval) - } + logutil.BgLogger().Info("[health check] store meta deleted, stop checking", zap.Uint64("storeID", s.storeID), zap.String("addr", s.addr), zap.String("state", s.getResolveState().String())) return } } diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index 4334ccfb3..15a1865fe 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -1982,3 +1982,45 @@ func (s *testRegionCacheSuite) TestScanRegionsWithGaps() { }) s.Equal(scanRegionRes, regions) } + +func (s *testRegionCacheSuite) TestIssue1401() { + // init region cache + s.cache.LocateKey(s.bo, []byte("a")) + + store1 := s.cache.getStoreByStoreID(s.store1) + s.Require().NotNil(store1) + s.Require().Equal(resolved, store1.getResolveState()) + // change store1 label. + labels := store1.labels + labels = append(labels, &metapb.StoreLabel{Key: "host", Value: "0.0.0.0:20161"}) + s.cluster.UpdateStoreAddr(store1.storeID, store1.addr, labels...) + + // mark the store is unreachable and need check. + atomic.StoreUint32(&store1.livenessState, uint32(unreachable)) + store1.setResolveState(needCheck) + + // setup mock liveness func + tf := func(s *Store, bo *retry.Backoffer) livenessState { + return reachable + } + s.cache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf)) + + // start health check loop + go store1.checkUntilHealth(s.cache, unreachable, time.Second*30) + + // mock asyncCheckAndResolveLoop worker to check and resolve store. + s.cache.checkAndResolve(nil, func(s *Store) bool { + return s.getResolveState() == needCheck + }) + + // assert that the old store should be deleted. + s.Eventually(func() bool { + return store1.getResolveState() == deleted + }, 3*time.Second, time.Second) + // assert the new store should be added and it should be resolved and reachable. + newStore1 := s.cache.getStoreByStoreID(s.store1) + s.Eventually(func() bool { + return newStore1.getResolveState() == resolved && newStore1.getLivenessState() == reachable + }, 3*time.Second, time.Second) + s.Require().True(isStoreContainLabel(newStore1.labels, "host", "0.0.0.0:20161")) +}