Skip to content

Commit

Permalink
refactor ignore lock impl
Browse files Browse the repository at this point in the history
  • Loading branch information
CMGS committed Mar 20, 2024
1 parent 195b249 commit 2705d84
Show file tree
Hide file tree
Showing 13 changed files with 33 additions and 41 deletions.
2 changes: 1 addition & 1 deletion cluster/calcium/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (c *Calcium) ControlWorkload(ctx context.Context, IDs []string, typ string,
_ = c.pool.Invoke(func() {
defer wg.Done()
var message []*bytes.Buffer
err := c.withWorkloadLocked(ctx, ID, func(ctx context.Context, workload *types.Workload) error {
err := c.withWorkloadLocked(ctx, ID, false, func(ctx context.Context, workload *types.Workload) error {
var err error
switch typ {
case cluster.WorkloadStop:
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ func TestCreateWorkloadIngorePullTxn(t *testing.T) {
engine.AssertExpectations(t)
}

func newCreateWorkloadCluster(t *testing.T) (*Calcium, []*types.Node) {
func newCreateWorkloadCluster(_ *testing.T) (*Calcium, []*types.Node) {
c := NewTestCluster()

engine := &enginemocks.API{}
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/dissociate.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (c *Calcium) DissociateWorkload(ctx context.Context, IDs []string) (chan *t
if err := c.withNodePodLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
for _, workloadID := range workloadIDs { //nolint:scopelint
msg := &types.DissociateWorkloadMessage{WorkloadID: workloadID} //nolint:scopelint
if err := c.withWorkloadLocked(ctx, workloadID, func(ctx context.Context, workload *types.Workload) error {
if err := c.withWorkloadLocked(ctx, workloadID, false, func(ctx context.Context, workload *types.Workload) error {
return utils.Txn(
ctx,
// if
Expand Down
18 changes: 10 additions & 8 deletions cluster/calcium/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,16 @@ func (c *Calcium) doUnlockAll(ctx context.Context, locks map[string]lock.Distrib
}
}

func (c *Calcium) withWorkloadLocked(ctx context.Context, ID string, f func(context.Context, *types.Workload) error) error {
return c.withWorkloadsLocked(ctx, []string{ID}, func(ctx context.Context, workloads map[string]*types.Workload) error {
func (c *Calcium) withWorkloadLocked(ctx context.Context, ID string, ignoreLock bool, f func(context.Context, *types.Workload) error) error {
return c.withWorkloadsLocked(ctx, ignoreLock, []string{ID}, func(ctx context.Context, workloads map[string]*types.Workload) error {
if c, ok := workloads[ID]; ok {
return f(ctx, c)
}
return types.ErrWorkloadNotExists
})
}

func (c *Calcium) withWorkloadsLocked(ctx context.Context, IDs []string, f func(context.Context, map[string]*types.Workload) error) error {
func (c *Calcium) withWorkloadsLocked(ctx context.Context, ignoreLock bool, IDs []string, f func(context.Context, map[string]*types.Workload) error) error {
workloads := map[string]*types.Workload{}
locks := map[string]lock.DistributedLock{}
logger := log.WithFunc("calcium.withWorkloadsLocked")
Expand All @@ -83,12 +83,14 @@ func (c *Calcium) withWorkloadsLocked(ctx context.Context, IDs []string, f func(
}
var lock lock.DistributedLock
for _, workload := range cs {
lock, ctx, err = c.doLock(ctx, fmt.Sprintf(cluster.WorkloadLock, workload.ID), c.config.LockTimeout)
if err != nil {
return err
if !ignoreLock {
lock, ctx, err = c.doLock(ctx, fmt.Sprintf(cluster.WorkloadLock, workload.ID), c.config.LockTimeout)
if err != nil {
return err
}
logger.Debugf(ctx, "Workload %s locked", workload.ID)
locks[workload.ID] = lock
}
logger.Debugf(ctx, "Workload %s locked", workload.ID)
locks[workload.ID] = lock
workloads[workload.ID] = workload
}
return f(ctx, workloads)
Expand Down
12 changes: 6 additions & 6 deletions cluster/calcium/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ func TestWithWorkloadsLocked(t *testing.T) {
// failed to get lock
lock.On("Lock", mock.Anything).Return(context.Background(), types.ErrMockError).Once()
store.On("GetWorkloads", mock.Anything, mock.Anything).Return([]*types.Workload{{}}, nil).Once()
err := c.withWorkloadsLocked(ctx, []string{"c1", "c2"}, func(ctx context.Context, workloads map[string]*types.Workload) error { return nil })
err := c.withWorkloadsLocked(ctx, false, []string{"c1", "c2"}, func(ctx context.Context, workloads map[string]*types.Workload) error { return nil })
assert.Error(t, err)
// success
lock.On("Lock", mock.Anything).Return(ctx, nil)
// failed by getworkload
store.On("GetWorkloads", mock.Anything, mock.Anything).Return(nil, types.ErrMockError).Once()
err = c.withWorkloadsLocked(ctx, []string{"c1", "c2"}, func(ctx context.Context, workloads map[string]*types.Workload) error { return nil })
err = c.withWorkloadsLocked(ctx, false, []string{"c1", "c2"}, func(ctx context.Context, workloads map[string]*types.Workload) error { return nil })
assert.Error(t, err)
engine := &enginemocks.API{}
workload := &types.Workload{
Expand All @@ -74,7 +74,7 @@ func TestWithWorkloadsLocked(t *testing.T) {
}
store.On("GetWorkloads", mock.Anything, mock.Anything).Return([]*types.Workload{workload}, nil)
// success
err = c.withWorkloadsLocked(ctx, []string{"c1", "c1"}, func(ctx context.Context, workloads map[string]*types.Workload) error {
err = c.withWorkloadsLocked(ctx, false, []string{"c1", "c1"}, func(ctx context.Context, workloads map[string]*types.Workload) error {
assert.Len(t, workloads, 1)
return nil
})
Expand All @@ -92,13 +92,13 @@ func TestWithWorkloadLocked(t *testing.T) {
// failed to get lock
lock.On("Lock", mock.Anything).Return(context.Background(), types.ErrMockError).Once()
store.On("GetWorkloads", mock.Anything, mock.Anything).Return([]*types.Workload{{}}, nil).Once()
err := c.withWorkloadLocked(ctx, "c1", func(ctx context.Context, workload *types.Workload) error { return nil })
err := c.withWorkloadLocked(ctx, "c1", false, func(ctx context.Context, workload *types.Workload) error { return nil })
assert.Error(t, err)
// success
lock.On("Lock", mock.Anything).Return(ctx, nil)
// failed by getworkload
store.On("GetWorkloads", mock.Anything, mock.Anything).Return(nil, types.ErrMockError).Once()
err = c.withWorkloadLocked(ctx, "c1", func(ctx context.Context, workload *types.Workload) error { return nil })
err = c.withWorkloadLocked(ctx, "c1", false, func(ctx context.Context, workload *types.Workload) error { return nil })
assert.Error(t, err)
engine := &enginemocks.API{}
workload := &types.Workload{
Expand All @@ -107,7 +107,7 @@ func TestWithWorkloadLocked(t *testing.T) {
}
store.On("GetWorkloads", mock.Anything, mock.Anything).Return([]*types.Workload{workload}, nil)
// success
err = c.withWorkloadLocked(ctx, "c1", func(ctx context.Context, workload *types.Workload) error {
err = c.withWorkloadLocked(ctx, "c1", false, func(ctx context.Context, workload *types.Workload) error {
assert.Equal(t, workload.ID, "c1")
return nil
})
Expand Down
16 changes: 3 additions & 13 deletions cluster/calcium/raw_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,10 @@ func (c *Calcium) RawEngine(ctx context.Context, opts *types.RawEngineOptions) (
wg.Add(1)
_ = c.pool.Invoke(func() {
defer wg.Done()
if opts.IgnoreLock {
var workload *types.Workload
if workload, err = c.store.GetWorkload(ctx, ID); err != nil {
return
}
if err = c.withWorkloadLocked(ctx, ID, opts.IgnoreLock, func(ctx context.Context, workload *types.Workload) error {
msg, err = workload.RawEngine(ctx, opts)
} else {
err = c.withWorkloadLocked(ctx, ID, func(ctx context.Context, workload *types.Workload) error {
msg, err = workload.RawEngine(ctx, opts)
return err
})
}

if err == nil {
return err
}); err == nil {
logger.Infof(ctx, "Workload %s", ID)
logger.Infof(ctx, "%+v", msg)
}
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/realloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (c *Calcium) ReallocResource(ctx context.Context, opts *types.ReallocOption
// copy origin workload
originWorkload := *workload
return c.withNodePodLocked(ctx, workload.Nodename, func(ctx context.Context, node *types.Node) error {
return c.withWorkloadLocked(ctx, opts.ID, func(ctx context.Context, workload *types.Workload) error {
return c.withWorkloadLocked(ctx, opts.ID, false, func(ctx context.Context, workload *types.Workload) error {
err := c.doReallocOnNode(ctx, node, workload, originWorkload, opts)
logger.Error(ctx, err)
return err
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, IDs []string, force bool)
if err := c.withNodePodLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
for _, workloadID := range workloadIDs {
ret := &types.RemoveWorkloadMessage{WorkloadID: workloadID, Success: true, Hook: []*bytes.Buffer{}}
if err := c.withWorkloadLocked(ctx, workloadID, func(ctx context.Context, workload *types.Workload) error {
if err := c.withWorkloadLocked(ctx, workloadID, false, func(ctx context.Context, workload *types.Workload) error {
return utils.Txn(
ctx,
// if
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (c *Calcium) ReplaceWorkload(ctx context.Context, opts *types.ReplaceOption
var createMessage *types.CreateWorkloadMessage
removeMessage := &types.RemoveWorkloadMessage{WorkloadID: ID}
var err error
if err = c.withWorkloadLocked(ctx, ID, func(ctx context.Context, workload *types.Workload) error {
if err = c.withWorkloadLocked(ctx, ID, false, func(ctx context.Context, workload *types.Workload) error {
if opts.Podname != "" && workload.Podname != opts.Podname {
logger.Warnf(ctx, "Skip not in pod workload %s", workload.ID)
return errors.Wrapf(types.ErrWorkloadIgnored, "workload %s not in pod %s", workload.ID, opts.Podname)
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (c *Calcium) Send(ctx context.Context, opts *types.SendOptions) (chan *type
_ = c.pool.Invoke(func(ID string) func() {
return func() {
defer wg.Done()
if err := c.withWorkloadLocked(ctx, ID, func(ctx context.Context, workload *types.Workload) error {
if err := c.withWorkloadLocked(ctx, ID, false, func(ctx context.Context, workload *types.Workload) error {
for _, file := range opts.Files {
err := c.doSendFileToWorkload(ctx, workload.Engine, workload.ID, file)
logger.Error(ctx, err)
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/sendlarge.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (c *Calcium) newWorkloadSender(ctx context.Context, ID string, resp chan *t
utils.SentryGo(func(ID, name string, size int64, content io.Reader, uid, gid int, mode int64) func() {
return func() {
defer wg.Done()
if err := sender.calcium.withWorkloadLocked(ctx, ID, func(ctx context.Context, workload *types.Workload) error {
if err := sender.calcium.withWorkloadLocked(ctx, ID, false, func(ctx context.Context, workload *types.Workload) error {
err := errors.WithStack(workload.Engine.VirtualizationCopyChunkTo(ctx, ID, name, size, content, uid, gid, mode))
resp <- &types.SendMessage{ID: ID, Path: name, Error: err}
return nil
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ require (
golang.org/x/net v0.19.0
golang.org/x/sync v0.4.0
google.golang.org/grpc v1.60.1
google.golang.org/protobuf v1.31.0
google.golang.org/protobuf v1.33.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
)

Expand Down Expand Up @@ -79,7 +79,7 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/btree v1.1.2 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QD
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/gomodule/redigo v1.7.1-0.20190724094224-574c33c3df38/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4=
github.com/google/btree v1.1.2 h1:xf4v41cLI2Z6FxbKm+8Bu+m8ifhj15JuZ9sa0jZCMUU=
github.com/google/btree v1.1.2/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4=
Expand Down Expand Up @@ -744,8 +744,8 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down

0 comments on commit 2705d84

Please sign in to comment.