Skip to content

Commit

Permalink
operator: Merge operators canceled together (tikv#6673)
Browse files Browse the repository at this point in the history
close tikv#6607

cancel two merge operators together so that they won't occupy the merge-schedule-limit.

Signed-off-by: husharp <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and rleungx committed Dec 1, 2023
1 parent 66b496a commit 5c43f8d
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 0 deletions.
5 changes: 5 additions & 0 deletions pkg/schedule/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/json"
"fmt"
"reflect"
"strconv"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -61,6 +62,8 @@ var (
ExceedStoreLimit CancelReasonType = "exceed store limit"
// ExceedWaitLimit is the cancel reason when the operator exceeds the waiting queue limit.
ExceedWaitLimit CancelReasonType = "exceed wait limit"
// RelatedMergeRegion is the cancel reason when the operator is cancelled by related merge region.
RelatedMergeRegion CancelReasonType = "related merge region"
// Unknown is the cancel reason when the operator is cancelled by an unknown reason.
Unknown CancelReasonType = "unknown"
)
Expand Down Expand Up @@ -115,6 +118,8 @@ func NewOperator(desc, brief string, regionID uint64, regionEpoch *metapb.Region
// Sync some attribute with the given timeout.
func (o *Operator) Sync(other *Operator) {
o.timeout = other.timeout
o.AdditionalInfos[string(RelatedMergeRegion)] = strconv.FormatUint(other.RegionID(), 10)
other.AdditionalInfos[string(RelatedMergeRegion)] = strconv.FormatUint(o.RegionID(), 10)
}

func (o *Operator) String() string {
Expand Down
16 changes: 16 additions & 0 deletions pkg/schedule/operator/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,11 +569,27 @@ func (oc *Controller) removeOperatorLocked(op *Operator) bool {
oc.updateCounts(oc.operators)
operatorCounter.WithLabelValues(op.Desc(), "remove").Inc()
oc.ack(op)
if op.Kind()&OpMerge != 0 {
oc.removeRelatedMergeOperator(op)
}
return true
}
return false
}

func (oc *Controller) removeRelatedMergeOperator(op *Operator) {
relatedID, _ := strconv.ParseUint(op.AdditionalInfos[string(RelatedMergeRegion)], 10, 64)
if relatedOp := oc.operators[relatedID]; relatedOp != nil && relatedOp.Status() != CANCELED {
log.Info("operator canceled related merge region",
zap.Uint64("region-id", relatedOp.RegionID()),
zap.String("additional-info", relatedOp.GetAdditionalInfo()),
zap.Duration("takes", relatedOp.RunningTime()))
oc.removeOperatorLocked(relatedOp)
relatedOp.Cancel(RelatedMergeRegion)
oc.buryOperator(relatedOp)
}
}

func (oc *Controller) buryOperator(op *Operator) {
st := op.Status()

Expand Down
41 changes: 41 additions & 0 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2700,6 +2700,47 @@ func TestCheckerIsBusy(t *testing.T) {
checkRegionAndOperator(re, tc, co, num, 0)
}

func TestMergeRegionCancelOneOperator(t *testing.T) {
re := require.New(t)
tc, co, cleanup := prepare(nil, nil, nil, re)
defer cleanup()

source := core.NewRegionInfo(
&metapb.Region{
Id: 1,
StartKey: []byte(""),
EndKey: []byte("a"),
},
nil,
)
target := core.NewRegionInfo(
&metapb.Region{
Id: 2,
StartKey: []byte("a"),
EndKey: []byte("t"),
},
nil,
)
re.NoError(tc.putRegion(source))
re.NoError(tc.putRegion(target))

// Cancel source region.
ops, err := operator.CreateMergeRegionOperator("merge-region", tc, source, target, operator.OpMerge)
re.NoError(err)
re.Len(ops, co.GetOperatorController().AddWaitingOperator(ops...))
// Cancel source operator.
co.GetOperatorController().RemoveOperator(co.GetOperatorController().GetOperator(source.GetID()))
re.Len(co.GetOperatorController().GetOperators(), 0)

// Cancel target region.
ops, err = operator.CreateMergeRegionOperator("merge-region", tc, source, target, operator.OpMerge)
re.NoError(err)
re.Len(ops, co.GetOperatorController().AddWaitingOperator(ops...))
// Cancel target operator.
co.GetOperatorController().RemoveOperator(co.GetOperatorController().GetOperator(target.GetID()))
re.Len(co.GetOperatorController().GetOperators(), 0)
}

func TestReplica(t *testing.T) {
re := require.New(t)

Expand Down

0 comments on commit 5c43f8d

Please sign in to comment.