Skip to content

Commit

Permalink
operator: make additional information thread safe (#8104) (#8111)
Browse files Browse the repository at this point in the history
close #8079

Signed-off-by: husharp <[email protected]>

Co-authored-by: husharp <[email protected]>
  • Loading branch information
ti-chi-bot and HuSharp authored Apr 22, 2024
1 parent 3ec92bd commit 4e13bfc
Show file tree
Hide file tree
Showing 12 changed files with 98 additions and 48 deletions.
4 changes: 2 additions & 2 deletions pkg/schedule/operator/create_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ func CreateSplitRegionOperator(desc string, region *core.RegionInfo, kind OpKind
brief += fmt.Sprintf(" and keys %v", hexKeys)
}
op := NewOperator(desc, brief, region.GetID(), region.GetRegionEpoch(), kind|OpSplit, region.GetApproximateSize(), step)
op.AdditionalInfos["region-start-key"] = core.HexRegionKeyStr(logutil.RedactBytes(region.GetStartKey()))
op.AdditionalInfos["region-end-key"] = core.HexRegionKeyStr(logutil.RedactBytes(region.GetEndKey()))
op.SetAdditionalInfo("region-start-key", core.HexRegionKeyStr(logutil.RedactBytes(region.GetStartKey())))
op.SetAdditionalInfo("region-end-key", core.HexRegionKeyStr(logutil.RedactBytes(region.GetEndKey())))
return op, nil
}

Expand Down
46 changes: 19 additions & 27 deletions pkg/schedule/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package operator

import (
"encoding/json"
"fmt"
"reflect"
"strconv"
Expand Down Expand Up @@ -83,7 +82,7 @@ type Operator struct {
level constant.PriorityLevel
Counters []prometheus.Counter
FinishedCounters []prometheus.Counter
AdditionalInfos map[string]string
additionalInfos opAdditionalInfo
ApproximateSize int64
timeout time.Duration
influence *OpInfluence
Expand All @@ -100,16 +99,18 @@ func NewOperator(desc, brief string, regionID uint64, regionEpoch *metapb.Region
maxDuration += v.Timeout(approximateSize).Seconds()
}
return &Operator{
desc: desc,
brief: brief,
regionID: regionID,
regionEpoch: regionEpoch,
kind: kind,
steps: steps,
stepsTime: make([]int64, len(steps)),
status: NewOpStatusTracker(),
level: level,
AdditionalInfos: make(map[string]string),
desc: desc,
brief: brief,
regionID: regionID,
regionEpoch: regionEpoch,
kind: kind,
steps: steps,
stepsTime: make([]int64, len(steps)),
status: NewOpStatusTracker(),
level: level,
additionalInfos: opAdditionalInfo{
value: make(map[string]string),
},
ApproximateSize: approximateSize,
timeout: time.Duration(maxDuration) * time.Second,
}
Expand All @@ -118,8 +119,8 @@ func NewOperator(desc, brief string, regionID uint64, regionEpoch *metapb.Region
// Sync some attribute with the given timeout.
func (o *Operator) Sync(other *Operator) {
o.timeout = other.timeout
o.AdditionalInfos[string(RelatedMergeRegion)] = strconv.FormatUint(other.RegionID(), 10)
other.AdditionalInfos[string(RelatedMergeRegion)] = strconv.FormatUint(o.RegionID(), 10)
o.SetAdditionalInfo(string(RelatedMergeRegion), strconv.FormatUint(other.RegionID(), 10))
other.SetAdditionalInfo(string(RelatedMergeRegion), strconv.FormatUint(o.RegionID(), 10))
}

func (o *Operator) String() string {
Expand Down Expand Up @@ -297,8 +298,10 @@ func (o *Operator) CheckSuccess() bool {

// Cancel marks the operator canceled.
func (o *Operator) Cancel(reason ...CancelReasonType) bool {
if _, ok := o.AdditionalInfos[cancelReason]; !ok && len(reason) != 0 {
o.AdditionalInfos[cancelReason] = string(reason[0])
o.additionalInfos.Lock()
defer o.additionalInfos.Unlock()
if _, ok := o.additionalInfos.value[cancelReason]; !ok && len(reason) != 0 {
o.additionalInfos.value[cancelReason] = string(reason[0])
}
return o.status.To(CANCELED)
}
Expand Down Expand Up @@ -507,17 +510,6 @@ func (o *Operator) Record(finishTime time.Time) *OpRecord {
return record
}

// GetAdditionalInfo returns additional info with string
func (o *Operator) GetAdditionalInfo() string {
if len(o.AdditionalInfos) != 0 {
additionalInfo, err := json.Marshal(o.AdditionalInfos)
if err == nil {
return string(additionalInfo)
}
}
return ""
}

// IsLeaveJointStateOperator returns true if the desc is OpDescLeaveJointState.
func (o *Operator) IsLeaveJointStateOperator() bool {
return strings.EqualFold(o.desc, OpDescLeaveJointState)
Expand Down
14 changes: 7 additions & 7 deletions pkg/schedule/operator/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ func (oc *Controller) addOperatorInner(op *Operator) bool {
log.Info("add operator",
zap.Uint64("region-id", regionID),
zap.Reflect("operator", op),
zap.String("additional-info", op.GetAdditionalInfo()))
zap.String("additional-info", op.LogAdditionalInfo()))

// If there is an old operator, replace it. The priority should be checked
// already.
Expand Down Expand Up @@ -657,7 +657,7 @@ func (oc *Controller) removeOperatorInner(op *Operator) bool {
}

func (oc *Controller) removeRelatedMergeOperator(op *Operator) {
relatedID, _ := strconv.ParseUint(op.AdditionalInfos[string(RelatedMergeRegion)], 10, 64)
relatedID, _ := strconv.ParseUint(op.GetAdditionalInfo(string(RelatedMergeRegion)), 10, 64)
relatedOpi, ok := oc.operators.Load(relatedID)
if !ok {
return
Expand All @@ -666,7 +666,7 @@ func (oc *Controller) removeRelatedMergeOperator(op *Operator) {
if relatedOp != nil && relatedOp.Status() != CANCELED {
log.Info("operator canceled related merge region",
zap.Uint64("region-id", relatedOp.RegionID()),
zap.String("additional-info", relatedOp.GetAdditionalInfo()),
zap.String("additional-info", relatedOp.LogAdditionalInfo()),
zap.Duration("takes", relatedOp.RunningTime()))
oc.removeOperatorInner(relatedOp)
relatedOp.Cancel(RelatedMergeRegion)
Expand Down Expand Up @@ -695,7 +695,7 @@ func (oc *Controller) buryOperator(op *Operator) {
zap.Uint64("region-id", op.RegionID()),
zap.Duration("takes", op.RunningTime()),
zap.Reflect("operator", op),
zap.String("additional-info", op.GetAdditionalInfo()))
zap.String("additional-info", op.LogAdditionalInfo()))
operatorCounter.WithLabelValues(op.Desc(), "finish").Inc()
operatorDuration.WithLabelValues(op.Desc()).Observe(op.RunningTime().Seconds())
for _, counter := range op.FinishedCounters {
Expand All @@ -706,7 +706,7 @@ func (oc *Controller) buryOperator(op *Operator) {
zap.Uint64("region-id", op.RegionID()),
zap.Duration("takes", op.RunningTime()),
zap.Reflect("operator", op),
zap.String("additional-info", op.GetAdditionalInfo()))
zap.String("additional-info", op.LogAdditionalInfo()))
operatorCounter.WithLabelValues(op.Desc(), "replace").Inc()
case EXPIRED:
log.Info("operator expired",
Expand All @@ -719,14 +719,14 @@ func (oc *Controller) buryOperator(op *Operator) {
zap.Uint64("region-id", op.RegionID()),
zap.Duration("takes", op.RunningTime()),
zap.Reflect("operator", op),
zap.String("additional-info", op.GetAdditionalInfo()))
zap.String("additional-info", op.LogAdditionalInfo()))
operatorCounter.WithLabelValues(op.Desc(), "timeout").Inc()
case CANCELED:
log.Info("operator canceled",
zap.Uint64("region-id", op.RegionID()),
zap.Duration("takes", op.RunningTime()),
zap.Reflect("operator", op),
zap.String("additional-info", op.GetAdditionalInfo()),
zap.String("additional-info", op.LogAdditionalInfo()),
)
operatorCounter.WithLabelValues(op.Desc(), "cancel").Inc()
}
Expand Down
33 changes: 33 additions & 0 deletions pkg/schedule/operator/status_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package operator

import (
"encoding/json"
"time"

"github.com/tikv/pd/pkg/utils/syncutil"
Expand Down Expand Up @@ -136,3 +137,35 @@ func (trk *OpStatusTracker) String() string {
defer trk.rw.RUnlock()
return OpStatusToString(trk.current)
}

type opAdditionalInfo struct {
syncutil.RWMutex
value map[string]string
}

// SetAdditionalInfo sets additional info with key and value.
func (o *Operator) SetAdditionalInfo(key string, value string) {
o.additionalInfos.Lock()
defer o.additionalInfos.Unlock()
o.additionalInfos.value[key] = value
}

// GetAdditionalInfo returns additional info with key.
func (o *Operator) GetAdditionalInfo(key string) string {
o.additionalInfos.RLock()
defer o.additionalInfos.RUnlock()
return o.additionalInfos.value[key]
}

// LogAdditionalInfo returns additional info with string
func (o *Operator) LogAdditionalInfo() string {
o.additionalInfos.RLock()
defer o.additionalInfos.RUnlock()
if len(o.additionalInfos.value) != 0 {
additionalInfo, err := json.Marshal(o.additionalInfos.value)
if err == nil {
return string(additionalInfo)
}
}
return ""
}
25 changes: 25 additions & 0 deletions pkg/schedule/operator/status_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package operator

import (
"fmt"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -178,3 +180,26 @@ func checkReachTime(re *require.Assertions, trk *OpStatusTracker, reached ...OpS
re.True(trk.ReachTimeOf(st).IsZero())
}
}

func TestAdditionalInfoConcurrent(t *testing.T) {
op := NewOperator("test", "test", 0, nil, OpAdmin, 0)

var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
key := fmt.Sprintf("key%d", i)
value := fmt.Sprintf("value%d", i)
op.SetAdditionalInfo(key, value)
if op.GetAdditionalInfo(key) != value {
t.Errorf("unexpected value for key %s", key)
}
}(i)
}
wg.Wait()

if logInfo := op.LogAdditionalInfo(); logInfo == "" {
t.Error("LogAdditionalInfo returned an empty string")
}
}
4 changes: 2 additions & 2 deletions pkg/schedule/scatter/region_scatterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,8 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string, s
if op != nil {
scatterSuccessCounter.Inc()
r.Put(targetPeers, targetLeader, group)
op.AdditionalInfos["group"] = group
op.AdditionalInfos["leader-picked-count"] = strconv.FormatUint(leaderStorePickedCount, 10)
op.SetAdditionalInfo("group", group)
op.SetAdditionalInfo("leader-picked-count", strconv.FormatUint(leaderStorePickedCount, 10))
op.SetPriorityLevel(constant.High)
}
return op, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/scatter/region_scatterer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ func TestSelectedStoresTooFewPeers(t *testing.T) {
re.NoError(err)
re.False(isPeerCountChanged(op))
if op != nil {
re.Equal(group, op.AdditionalInfos["group"])
re.Equal(group, op.GetAdditionalInfo("group"))
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ func (l *balanceLeaderScheduler) createOperator(solver *solver, collector *plan.
op.FinishedCounters = append(op.FinishedCounters,
balanceDirectionCounter.WithLabelValues(l.GetName(), solver.SourceMetricLabel(), solver.TargetMetricLabel()),
)
op.AdditionalInfos["sourceScore"] = strconv.FormatFloat(solver.sourceScore, 'f', 2, 64)
op.AdditionalInfos["targetScore"] = strconv.FormatFloat(solver.targetScore, 'f', 2, 64)
op.SetAdditionalInfo("sourceScore", strconv.FormatFloat(solver.sourceScore, 'f', 2, 64))
op.SetAdditionalInfo("targetScore", strconv.FormatFloat(solver.targetScore, 'f', 2, 64))
return op
}
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,8 @@ func (s *balanceRegionScheduler) transferPeer(solver *solver, collector *plan.Co
op.FinishedCounters = append(op.FinishedCounters,
balanceDirectionCounter.WithLabelValues(s.GetName(), sourceLabel, targetLabel),
)
op.AdditionalInfos["sourceScore"] = strconv.FormatFloat(solver.sourceScore, 'f', 2, 64)
op.AdditionalInfos["targetScore"] = strconv.FormatFloat(solver.targetScore, 'f', 2, 64)
op.SetAdditionalInfo("sourceScore", strconv.FormatFloat(solver.sourceScore, 'f', 2, 64))
op.SetAdditionalInfo("targetScore", strconv.FormatFloat(solver.targetScore, 'f', 2, 64))
return op
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/balance_witness.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ func (b *balanceWitnessScheduler) createOperator(solver *solver, collector *plan
b.counter.WithLabelValues("move-witness", solver.SourceMetricLabel()+"-out"),
b.counter.WithLabelValues("move-witness", solver.TargetMetricLabel()+"-in"),
)
op.AdditionalInfos["sourceScore"] = strconv.FormatFloat(solver.sourceScore, 'f', 2, 64)
op.AdditionalInfos["targetScore"] = strconv.FormatFloat(solver.targetScore, 'f', 2, 64)
op.SetAdditionalInfo("sourceScore", strconv.FormatFloat(solver.sourceScore, 'f', 2, 64))
op.SetAdditionalInfo("targetScore", strconv.FormatFloat(solver.targetScore, 'f', 2, 64))
return op
}
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1661,8 +1661,8 @@ func (bs *balanceSolver) splitBucketsByLoad(region *core.RegionInfo, bucketStats
}
op := bs.splitBucketsOperator(region, [][]byte{splitKey})
if op != nil {
op.AdditionalInfos["accLoads"] = strconv.FormatUint(acc-stats[splitIdx-1].Loads[dim], 10)
op.AdditionalInfos["totalLoads"] = strconv.FormatUint(totalLoads, 10)
op.SetAdditionalInfo("accLoads", strconv.FormatUint(acc-stats[splitIdx-1].Loads[dim], 10))
op.SetAdditionalInfo("totalLoads", strconv.FormatUint(totalLoads, 10))
}
return op
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/split_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func (s *splitBucketScheduler) splitBucket(plan *splitBucketPlan) []*operator.Op
return nil
}
splitBucketNewOperatorCounter.Inc()
op.AdditionalInfos["hot-degree"] = strconv.FormatInt(int64(splitBucket.HotDegree), 10)
op.SetAdditionalInfo("hot-degree", strconv.FormatInt(int64(splitBucket.HotDegree), 10))
return []*operator.Operator{op}
}
return nil
Expand Down

0 comments on commit 4e13bfc

Please sign in to comment.