diff --git a/pkg/schedule/operator/operator.go b/pkg/schedule/operator/operator.go index ab956d79c9e..9054983128b 100644 --- a/pkg/schedule/operator/operator.go +++ b/pkg/schedule/operator/operator.go @@ -18,6 +18,7 @@ import ( "encoding/json" "fmt" "reflect" + "strconv" "strings" "sync/atomic" "time" @@ -61,6 +62,8 @@ var ( ExceedStoreLimit CancelReasonType = "exceed store limit" // ExceedWaitLimit is the cancel reason when the operator exceeds the waiting queue limit. ExceedWaitLimit CancelReasonType = "exceed wait limit" + // RelatedMergeRegion is the cancel reason when the operator is cancelled by related merge region. + RelatedMergeRegion CancelReasonType = "related merge region" // Unknown is the cancel reason when the operator is cancelled by an unknown reason. Unknown CancelReasonType = "unknown" ) @@ -115,6 +118,8 @@ func NewOperator(desc, brief string, regionID uint64, regionEpoch *metapb.Region // Sync some attribute with the given timeout. func (o *Operator) Sync(other *Operator) { o.timeout = other.timeout + o.AdditionalInfos[string(RelatedMergeRegion)] = strconv.FormatUint(other.RegionID(), 10) + other.AdditionalInfos[string(RelatedMergeRegion)] = strconv.FormatUint(o.RegionID(), 10) } func (o *Operator) String() string { diff --git a/pkg/schedule/operator/operator_controller.go b/pkg/schedule/operator/operator_controller.go index d6b94d2996f..5581ecdd2d4 100644 --- a/pkg/schedule/operator/operator_controller.go +++ b/pkg/schedule/operator/operator_controller.go @@ -569,11 +569,27 @@ func (oc *Controller) removeOperatorLocked(op *Operator) bool { oc.updateCounts(oc.operators) operatorCounter.WithLabelValues(op.Desc(), "remove").Inc() oc.ack(op) + if op.Kind()&OpMerge != 0 { + oc.removeRelatedMergeOperator(op) + } return true } return false } +func (oc *Controller) removeRelatedMergeOperator(op *Operator) { + relatedID, _ := strconv.ParseUint(op.AdditionalInfos[string(RelatedMergeRegion)], 10, 64) + if relatedOp := oc.operators[relatedID]; relatedOp != nil && relatedOp.Status() != CANCELED { + log.Info("operator canceled related merge region", + zap.Uint64("region-id", relatedOp.RegionID()), + zap.String("additional-info", relatedOp.GetAdditionalInfo()), + zap.Duration("takes", relatedOp.RunningTime())) + oc.removeOperatorLocked(relatedOp) + relatedOp.Cancel(RelatedMergeRegion) + oc.buryOperator(relatedOp) + } +} + func (oc *Controller) buryOperator(op *Operator) { st := op.Status() diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 1a5b64906bf..6271b17d462 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -2700,6 +2700,47 @@ func TestCheckerIsBusy(t *testing.T) { checkRegionAndOperator(re, tc, co, num, 0) } +func TestMergeRegionCancelOneOperator(t *testing.T) { + re := require.New(t) + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + + source := core.NewRegionInfo( + &metapb.Region{ + Id: 1, + StartKey: []byte(""), + EndKey: []byte("a"), + }, + nil, + ) + target := core.NewRegionInfo( + &metapb.Region{ + Id: 2, + StartKey: []byte("a"), + EndKey: []byte("t"), + }, + nil, + ) + re.NoError(tc.putRegion(source)) + re.NoError(tc.putRegion(target)) + + // Cancel source region. + ops, err := operator.CreateMergeRegionOperator("merge-region", tc, source, target, operator.OpMerge) + re.NoError(err) + re.Len(ops, co.GetOperatorController().AddWaitingOperator(ops...)) + // Cancel source operator. + co.GetOperatorController().RemoveOperator(co.GetOperatorController().GetOperator(source.GetID())) + re.Len(co.GetOperatorController().GetOperators(), 0) + + // Cancel target region. + ops, err = operator.CreateMergeRegionOperator("merge-region", tc, source, target, operator.OpMerge) + re.NoError(err) + re.Len(ops, co.GetOperatorController().AddWaitingOperator(ops...)) + // Cancel target operator. + co.GetOperatorController().RemoveOperator(co.GetOperatorController().GetOperator(target.GetID())) + re.Len(co.GetOperatorController().GetOperators(), 0) +} + func TestReplica(t *testing.T) { re := require.New(t)