Skip to content

Commit

Permalink
fix issue that store's liveness may incorrectly marked as unreachable…
Browse files Browse the repository at this point in the history
… when the store restarts with label changed (#1402) (#1417)

* fix issue that store's liveness may incorrectly marked as unreachable when the store restarts with label changed  (#1407)

Signed-off-by: crazycs520 <[email protected]>

---------

Signed-off-by: crazycs520 <[email protected]>

* fix

Signed-off-by: crazycs520 <[email protected]>

* fix ci

Signed-off-by: crazycs520 <[email protected]>

---------

Signed-off-by: crazycs520 <[email protected]>
  • Loading branch information
crazycs520 authored Aug 7, 2024
1 parent 2cd3a74 commit 73815ad
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 16 deletions.
2 changes: 1 addition & 1 deletion integration_tests/pd_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (s *apiTestSuite) TestInitClusterMinResolvedTSZero() {
// Try to get the minimum resolved timestamp of the cluster from PD.
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
retryCount = 0
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) == math.MaxUint64 {
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) == 0 {
time.Sleep(100 * time.Millisecond)
if retryCount > 5 {
break
Expand Down
35 changes: 20 additions & 15 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2692,14 +2692,25 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) {
if s.addr != addr || !s.IsSameLabels(store.GetLabels()) {
newStore := &Store{storeID: s.storeID, addr: addr, peerAddr: store.GetPeerAddress(), saddr: store.GetStatusAddress(), storeType: storeType, labels: store.GetLabels(), state: uint64(resolved)}
newStore.livenessState = atomic.LoadUint32(&s.livenessState)
newStore.unreachableSince = s.unreachableSince
if newStore.getLivenessState() != reachable {
newStore.unreachableSince = s.unreachableSince
go newStore.checkUntilHealth(c, newStore.getLivenessState(), storeReResolveInterval)
}
c.storeMu.Lock()
if s.addr == addr {
newStore.slowScore = s.slowScore
}
c.storeMu.stores[newStore.storeID] = newStore
c.storeMu.Unlock()
s.setResolveState(deleted)
logutil.BgLogger().Info("store address or labels changed, add new store and mark old store deleted",
zap.Uint64("store", s.storeID),
zap.String("old-addr", s.addr),
zap.Any("old-labels", s.labels),
zap.String("old-liveness", s.getLivenessState().String()),
zap.String("new-addr", newStore.addr),
zap.Any("new-labels", newStore.labels),
zap.String("new-liveness", newStore.getLivenessState().String()))
return false, nil
}
s.changeResolveStateTo(needCheck, resolved)
Expand Down Expand Up @@ -2851,6 +2862,8 @@ func (s livenessState) String() string {
}
}

var storeReResolveInterval = 30 * time.Second

func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache, liveness livenessState) {
// This mechanism doesn't support non-TiKV stores currently.
if s.storeType != tikvrpc.TiKV {
Expand All @@ -2862,7 +2875,7 @@ func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache, liveness livenessSt
// It may be already started by another thread.
if atomic.CompareAndSwapUint32(&s.livenessState, uint32(reachable), uint32(liveness)) {
s.unreachableSince = time.Now()
reResolveInterval := 30 * time.Second
reResolveInterval := storeReResolveInterval
if val, err := util.EvalFailpoint("injectReResolveInterval"); err == nil {
if dur, err := time.ParseDuration(val.(string)); err == nil {
reResolveInterval = dur
Expand Down Expand Up @@ -2890,26 +2903,18 @@ func (s *Store) checkUntilHealth(c *RegionCache, liveness livenessState, reResol
case <-c.ctx.Done():
return
case <-ticker.C:
if s.getResolveState() == deleted {
logutil.BgLogger().Info("[health check] store meta deleted, stop checking", zap.Uint64("storeID", s.storeID), zap.String("addr", s.addr), zap.String("state", s.getResolveState().String()))
return
}
if time.Since(lastCheckPDTime) > reResolveInterval {
lastCheckPDTime = time.Now()

valid, err := s.reResolve(c)
if err != nil {
logutil.BgLogger().Warn("[health check] failed to re-resolve unhealthy store", zap.Error(err))
} else if !valid {
if s.getResolveState() == deleted {
// if the store is deleted, a new store with same id must be inserted (guaranteed by reResolve).
c.storeMu.RLock()
newStore := c.storeMu.stores[s.storeID]
c.storeMu.RUnlock()
logutil.BgLogger().Info("[health check] store meta changed",
zap.Uint64("storeID", s.storeID),
zap.String("oldAddr", s.addr),
zap.String("oldLabels", fmt.Sprintf("%v", s.labels)),
zap.String("newAddr", newStore.addr),
zap.String("newLabels", fmt.Sprintf("%v", newStore.labels)))
go newStore.checkUntilHealth(c, liveness, reResolveInterval)
}
logutil.BgLogger().Info("[health check] store meta deleted, stop checking", zap.Uint64("storeID", s.storeID), zap.String("addr", s.addr), zap.String("state", s.getResolveState().String()))
return
}
}
Expand Down
42 changes: 42 additions & 0 deletions internal/locate/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1982,3 +1982,45 @@ func (s *testRegionCacheSuite) TestScanRegionsWithGaps() {
})
s.Equal(scanRegionRes, regions)
}

func (s *testRegionCacheSuite) TestIssue1401() {
// init region cache
s.cache.LocateKey(s.bo, []byte("a"))

store1 := s.cache.getStoreByStoreID(s.store1)
s.Require().NotNil(store1)
s.Require().Equal(resolved, store1.getResolveState())
// change store1 label.
labels := store1.labels
labels = append(labels, &metapb.StoreLabel{Key: "host", Value: "0.0.0.0:20161"})
s.cluster.UpdateStoreAddr(store1.storeID, store1.addr, labels...)

// mark the store is unreachable and need check.
atomic.StoreUint32(&store1.livenessState, uint32(unreachable))
store1.setResolveState(needCheck)

// setup mock liveness func
tf := func(s *Store, bo *retry.Backoffer) livenessState {
return reachable
}
s.cache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf))

// start health check loop
go store1.checkUntilHealth(s.cache, unreachable, time.Second*30)

// mock asyncCheckAndResolveLoop worker to check and resolve store.
s.cache.checkAndResolve(nil, func(s *Store) bool {
return s.getResolveState() == needCheck
})

// assert that the old store should be deleted.
s.Eventually(func() bool {
return store1.getResolveState() == deleted
}, 3*time.Second, time.Second)
// assert the new store should be added and it should be resolved and reachable.
newStore1 := s.cache.getStoreByStoreID(s.store1)
s.Eventually(func() bool {
return newStore1.getResolveState() == resolved && newStore1.getLivenessState() == reachable
}, 3*time.Second, time.Second)
s.Require().True(isStoreContainLabel(newStore1.labels, "host", "0.0.0.0:20161"))
}

0 comments on commit 73815ad

Please sign in to comment.