Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#6874
Browse files Browse the repository at this point in the history
ref tikv#4399

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
disksing authored and ti-chi-bot committed Feb 18, 2024
1 parent 2a2b949 commit 5cc7fae
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 51 deletions.
106 changes: 85 additions & 21 deletions server/replication/replication_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
<<<<<<< HEAD:server/replication/replication_mode.go

Check failure on line 32 in server/replication/replication_mode.go

View workflow job for this annotation

GitHub Actions / statics

missing import path
"github.com/tikv/pd/pkg/schedule"
"github.com/tikv/pd/pkg/slice"
=======
sche "github.com/tikv/pd/pkg/schedule/core"
>>>>>>> d65d309b1 (dr-autosync: move state replicate to different goroutine (#6874)):pkg/replication/replication_mode.go
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
Expand Down Expand Up @@ -71,11 +75,19 @@ type ModeManager struct {
initTime time.Time

syncutil.RWMutex
<<<<<<< HEAD:server/replication/replication_mode.go
config config.ReplicationModeConfig
storage endpoint.ReplicationStatusStorage
cluster schedule.Cluster
fileReplicater FileReplicater
replicatedMembers []uint64
=======
config config.ReplicationModeConfig
storage endpoint.ReplicationStatusStorage
cluster sche.ClusterInformer
fileReplicater FileReplicater
replicateState sync.Map
>>>>>>> d65d309b1 (dr-autosync: move state replicate to different goroutine (#6874)):pkg/replication/replication_mode.go

drAutoSync drAutoSyncStatus
// intermediate states of the recovery process
Expand Down Expand Up @@ -241,7 +253,6 @@ func (m *ModeManager) drSwitchToAsyncWait(availableStores []uint64) error {
return err
}
dr := drAutoSyncStatus{State: drStateAsyncWait, StateID: id, AvailableStores: availableStores}
m.drPersistStatusWithLock(dr)
if err := m.storage.SaveReplicationStatus(modeDRAutoSync, dr); err != nil {
log.Warn("failed to switch to async state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err))
return err
Expand All @@ -264,7 +275,6 @@ func (m *ModeManager) drSwitchToAsyncWithLock(availableStores []uint64) error {
return err
}
dr := drAutoSyncStatus{State: drStateAsync, StateID: id, AvailableStores: availableStores}
m.drPersistStatusWithLock(dr)
if err := m.storage.SaveReplicationStatus(modeDRAutoSync, dr); err != nil {
log.Warn("failed to switch to async state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err))
return err
Expand All @@ -288,7 +298,6 @@ func (m *ModeManager) drSwitchToSyncRecoverWithLock() error {
}
now := time.Now()
dr := drAutoSyncStatus{State: drStateSyncRecover, StateID: id, RecoverStartTime: &now}
m.drPersistStatusWithLock(dr)
if err = m.storage.SaveReplicationStatus(modeDRAutoSync, dr); err != nil {
log.Warn("failed to switch to sync_recover state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err))
return err
Expand All @@ -308,7 +317,6 @@ func (m *ModeManager) drSwitchToSync() error {
return err
}
dr := drAutoSyncStatus{State: drStateSync, StateID: id}
m.drPersistStatusWithLock(dr)
if err := m.storage.SaveReplicationStatus(modeDRAutoSync, dr); err != nil {
log.Warn("failed to switch to sync state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err))
return err
Expand All @@ -318,6 +326,7 @@ func (m *ModeManager) drSwitchToSync() error {
return nil
}

<<<<<<< HEAD:server/replication/replication_mode.go
func (m *ModeManager) drPersistStatusWithLock(status drAutoSyncStatus) {
ctx, cancel := context.WithTimeout(context.Background(), persistFileTimeout)
defer cancel()
Expand Down Expand Up @@ -362,15 +371,18 @@ func (m *ModeManager) drPersistStatus() {
m.drPersistStatusWithLock(drAutoSyncStatus{State: m.drAutoSync.State, StateID: m.drAutoSync.StateID})
}

=======
>>>>>>> d65d309b1 (dr-autosync: move state replicate to different goroutine (#6874)):pkg/replication/replication_mode.go
func (m *ModeManager) drGetState() string {
m.RLock()
defer m.RUnlock()
return m.drAutoSync.State
}

const (
idleTimeout = time.Minute
tickInterval = 500 * time.Millisecond
idleTimeout = time.Minute
tickInterval = 500 * time.Millisecond
replicateStateInterval = time.Second * 5
)

// Run starts the background job.
Expand All @@ -381,17 +393,46 @@ func (m *ModeManager) Run(ctx context.Context) {
case <-ctx.Done():
return
}
<<<<<<< HEAD:server/replication/replication_mode.go
for {
select {
case <-time.After(tickInterval):
case <-ctx.Done():
return
=======

var wg sync.WaitGroup
wg.Add(2)

go func() {
defer wg.Done()
for {
select {
case <-time.After(tickInterval):
case <-ctx.Done():
return
}
m.tickUpdateState()
>>>>>>> d65d309b1 (dr-autosync: move state replicate to different goroutine (#6874)):pkg/replication/replication_mode.go
}
m.tickDR()
}
}()

go func() {
defer wg.Done()
for {
select {
case <-time.After(replicateStateInterval):
case <-ctx.Done():
return
}
m.tickReplicateStatus()
}
}()

wg.Wait()
}

func (m *ModeManager) tickDR() {
func (m *ModeManager) tickUpdateState() {
if m.getModeName() != modeDRAutoSync {
return
}
Expand Down Expand Up @@ -484,8 +525,42 @@ func (m *ModeManager) tickDR() {
}
}
}
}

func (m *ModeManager) tickReplicateStatus() {
if m.getModeName() != modeDRAutoSync {
return
}

m.RLock()
state := drAutoSyncStatus{
State: m.drAutoSync.State,
StateID: m.drAutoSync.StateID,
AvailableStores: m.drAutoSync.AvailableStores,
RecoverStartTime: m.drAutoSync.RecoverStartTime,
}
m.RUnlock()

data, _ := json.Marshal(state)

m.checkReplicateFile()
members, err := m.fileReplicater.GetMembers()
if err != nil {
log.Warn("failed to get members", zap.String("replicate-mode", modeDRAutoSync))
return
}
for _, member := range members {
stateID, ok := m.replicateState.Load(member.GetMemberId())
if !ok || stateID.(uint64) != state.StateID {
ctx, cancel := context.WithTimeout(context.Background(), persistFileTimeout)
err := m.fileReplicater.ReplicateFileToMember(ctx, member, drStatusFile, data)
if err != nil {
log.Warn("failed to switch state", zap.String("replicate-mode", modeDRAutoSync), zap.String("new-state", state.State), errs.ZapError(err))
} else {
m.replicateState.Store(member.GetMemberId(), state.StateID)
}
cancel()
}
}
}

const (
Expand Down Expand Up @@ -557,17 +632,6 @@ func (m *ModeManager) drCheckStoreStateUpdated(stores []uint64) bool {
return true
}

func (m *ModeManager) checkReplicateFile() {
members, err := m.fileReplicater.GetMembers()
if err != nil {
log.Warn("failed to get members", zap.String("replicate-mode", modeDRAutoSync))
return
}
if m.drCheckNeedPersistStatus(members) {
m.drPersistStatus()
}
}

var (
regionScanBatchSize = 1024
regionMinSampleSize = 512
Expand Down
Loading

0 comments on commit 5cc7fae

Please sign in to comment.