Skip to content

Commit

Permalink
feat: support cancellable sleep
Browse files Browse the repository at this point in the history
  • Loading branch information
WqyJh committed Dec 28, 2023
1 parent 2655cd6 commit c3ad74e
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 5 deletions.
33 changes: 31 additions & 2 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,25 @@ func (c *Client) weakFetchBatch(ctx context.Context, keys []string, expire time.
go func(i int) {
defer wg.Done()
r, err := c.luaGet(ctx, keys[i], owner)
ticker := time.NewTimer(c.Options.LockSleep)
defer ticker.Stop()
for err == nil && r[0] == nil && r[1].(string) != locked {
debugf("batch weak: empty result for %s locked by other, so sleep %s", keys[i], c.Options.LockSleep.String())
time.Sleep(c.Options.LockSleep)
select {
case <-ctx.Done():
ch <- pair{idx: i, err: ctx.Err()}
return

Check warning on line 202 in batch.go

View check run for this annotation

Codecov / codecov/patch

batch.go#L200-L202

Added lines #L200 - L202 were not covered by tests
case <-ticker.C:
// equal to time.Sleep(c.Options.LockSleep) but can be canceled
}
r, err = c.luaGet(ctx, keys[i], owner)
// Reset ticker after luaGet
// If we reset ticker before luaGet, since luaGet takes a period of time,
// the actual sleep time will be shorter than expected
if !ticker.Stop() && len(ticker.C) > 0 {
<-ticker.C

Check warning on line 211 in batch.go

View check run for this annotation

Codecov / codecov/patch

batch.go#L211

Added line #L211 was not covered by tests
}
ticker.Reset(c.Options.LockSleep)
}
if err != nil {
ch <- pair{idx: i, data: "", err: err}
Expand Down Expand Up @@ -302,10 +317,24 @@ func (c *Client) strongFetchBatch(ctx context.Context, keys []string, expire tim
go func(i int) {
defer wg.Done()
r, err := c.luaGet(ctx, keys[i], owner)
ticker := time.NewTimer(c.Options.LockSleep)
defer ticker.Stop()
for err == nil && r[1] != nil && r[1] != locked { // locked by other
debugf("batch: locked by other, so sleep %s", c.Options.LockSleep)
time.Sleep(c.Options.LockSleep)
select {
case <-ctx.Done():
ch <- pair{idx: i, err: ctx.Err()}

Check warning on line 326 in batch.go

View check run for this annotation

Codecov / codecov/patch

batch.go#L325-L326

Added lines #L325 - L326 were not covered by tests
case <-ticker.C:
// equal to time.Sleep(c.Options.LockSleep) but can be canceled
}
r, err = c.luaGet(ctx, keys[i], owner)
// Reset ticker after luaGet
// If we reset ticker before luaGet, since luaGet takes a period of time,
// the actual sleep time will be shorter than expected
if !ticker.Stop() && len(ticker.C) > 0 {
<-ticker.C

Check warning on line 335 in batch.go

View check run for this annotation

Codecov / codecov/patch

batch.go#L335

Added line #L335 was not covered by tests
}
ticker.Reset(c.Options.LockSleep)
}
if err != nil {
ch <- pair{idx: i, data: "", err: err}
Expand Down
35 changes: 33 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rockscache
import (
"context"
"fmt"
"log"
"math"
"math/rand"
"time"
Expand Down Expand Up @@ -193,10 +194,25 @@ func (c *Client) weakFetch(ctx context.Context, key string, expire time.Duration
debugf("weakFetch: key=%s", key)
owner := shortuuid.New()
r, err := c.luaGet(ctx, key, owner)
ticker := time.NewTimer(c.Options.LockSleep)
defer ticker.Stop()
for err == nil && r[0] == nil && r[1].(string) != locked {
debugf("empty result for %s locked by other, so sleep %s", key, c.Options.LockSleep.String())
time.Sleep(c.Options.LockSleep)
select {
case <-ctx.Done():
return "", ctx.Err()

Check warning on line 203 in client.go

View check run for this annotation

Codecov / codecov/patch

client.go#L202-L203

Added lines #L202 - L203 were not covered by tests
case <-ticker.C:
log.Printf("ticker")
// equal to time.Sleep(c.Options.LockSleep) but can be canceled
}
r, err = c.luaGet(ctx, key, owner)
// Reset ticker after luaGet
// If we reset ticker before luaGet, since luaGet takes a period of time,
// the actual sleep time will be shorter than expected
if !ticker.Stop() && len(ticker.C) > 0 {
<-ticker.C

Check warning on line 213 in client.go

View check run for this annotation

Codecov / codecov/patch

client.go#L213

Added line #L213 was not covered by tests
}
ticker.Reset(c.Options.LockSleep)
}
if err != nil {
return "", err
Expand All @@ -217,10 +233,25 @@ func (c *Client) strongFetch(ctx context.Context, key string, expire time.Durati
debugf("strongFetch: key=%s", key)
owner := shortuuid.New()
r, err := c.luaGet(ctx, key, owner)
ticker := time.NewTimer(c.Options.LockSleep)
defer ticker.Stop()
for err == nil && r[1] != nil && r[1] != locked { // locked by other
debugf("locked by other, so sleep %s", c.Options.LockSleep)
time.Sleep(c.Options.LockSleep)
select {
case <-ctx.Done():
return "", ctx.Err()

Check warning on line 242 in client.go

View check run for this annotation

Codecov / codecov/patch

client.go#L241-L242

Added lines #L241 - L242 were not covered by tests
case <-ticker.C:
log.Printf("ticker")
// equal to time.Sleep(c.Options.LockSleep) but can be canceled
}
r, err = c.luaGet(ctx, key, owner)
// Reset ticker after luaGet
// If we reset ticker before luaGet, since luaGet takes a period of time,
// the actual sleep time will be shorter than expected
if !ticker.Stop() && len(ticker.C) > 0 {
<-ticker.C

Check warning on line 252 in client.go

View check run for this annotation

Codecov / codecov/patch

client.go#L252

Added line #L252 was not covered by tests
}
ticker.Reset(c.Options.LockSleep)
}
if err != nil {
return "", err
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.18
require (
github.com/lithammer/shortuuid v3.0.0+incompatible
github.com/redis/go-redis/v9 v9.0.3
github.com/stretchr/testify v1.8.1
github.com/stretchr/testify v1.8.4
golang.org/x/sync v0.1.0
)

Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
Expand Down

0 comments on commit c3ad74e

Please sign in to comment.