Skip to content

Commit

Permalink
dynamolock/v2: introduce individual timeouts to automatic heartbeats
Browse files Browse the repository at this point in the history
In the process of investigating #446, the issue reporters uncovered
that heartbeats were using a context.Background(). When the public
interface got refactored to use contexts (with the upgrade to AWS
Go SDK v2), this particular part of the code remained untouched by
me.

In the same issue, the reporter claims that SessionMonitors are
flaky. I could not reproduce the problem in my own environment,
by as a policy I assumed the report is accurate.

In a world in which this problem does happen, one of the sources
of the misbehavior would be the fact that Heartbeats would take
too long to execute, holding the lock longer than it should, and
effectively derailing the test inside SessionMonitor code. In this
situation, when it finally came around, the lock would be already
lost and the session monitor would've never been called.

This commit attempts to address this issue by introducing individual
timeouts to each SendHeartbeat call. As this change was being
iterated, I could see that the locks heartbeat loop could take very
long to run. In the past it was possible to get away with it because
all locks would get a chance, with this change though, it runs on a
time budget and they have to run as quickly as possible within
c.heartbeatPeriod.

This commit also adds specific tests cases that attempt to replicate
the network connectivity issue declared in #446 (refer to
TestSessionMonitorMissedCall).
  • Loading branch information
ucirello committed Mar 30, 2024
1 parent bc78634 commit 277066f
Show file tree
Hide file tree
Showing 8 changed files with 275 additions and 40 deletions.
9 changes: 7 additions & 2 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
uses: actions/checkout@v4

- name: Setup Go
uses: actions/setup-go@v4
uses: actions/setup-go@v5
with:
go-version: "stable"

Expand All @@ -30,3 +30,8 @@ jobs:
working-directory: v2/
run: |
make test
- name: Linters
working-directory: v2/
run: |
make linters
4 changes: 2 additions & 2 deletions v2/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ local-dynamodb:
wget -O local-dynamodb/latest.zip https://s3.us-west-2.amazonaws.com/dynamodb-local/dynamodb_local_latest.zip
(cd local-dynamodb; unzip latest.zip)

test: linters local-dynamodb
test: local-dynamodb
GOEXPERIMENT=loopvar go test -v

test-race:
GOEXPERIMENT=loopvar go test -race -count=1000
GOEXPERIMENT=loopvar go test -race -count=1000
68 changes: 52 additions & 16 deletions v2/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"io"
"log"
"runtime"
"sync"
"time"

Expand Down Expand Up @@ -276,6 +277,12 @@ func WithAdditionalAttributes(attr map[string]types.AttributeValue) AcquireLockO
// Consider an example which uses this mechanism for leader election. One
// way to make use of this SessionMonitor is to register a callback that
// kills the instance in case the leader's lock enters the danger zone.
//
// The SessionMonitor will not trigger if by the time of its evaluation, the
// lock is already expired. Therefore, you have to tune the lease, the
// heartbeat, and the safe time to reduce the likelihood that the lock will be
// lost at the same time in which the session monitor would be evaluated. A good
// rule of thumb is to have safeTime to be leaseDuration-(3*heartbeatPeriod).
func WithSessionMonitor(safeTime time.Duration, callback func()) AcquireLockOption {
return func(opt *acquireLockOptions) {
opt.sessionMonitor = &sessionMonitor{
Expand Down Expand Up @@ -673,22 +680,50 @@ func randString() string {
}
return base32Encoder.EncodeToString(randomBytes)
}

func (c *Client) heartbeat(ctx context.Context) {
c.logger.Println(ctx, "starting heartbeats")
func (c *Client) heartbeat(rootCtx context.Context) {
c.logger.Println(rootCtx, "heartbeats starting")
defer c.logger.Println(rootCtx, "heartbeats done")
tick := time.NewTicker(c.heartbeatPeriod)
defer tick.Stop()
for range tick.C {
for {
select {
case <-rootCtx.Done():
c.logger.Println(rootCtx, "client closed, stopping heartbeat")
return
case t := <-tick.C:
c.logger.Println(rootCtx, "heartbeat at:", t)
}
var (
wg sync.WaitGroup
maxProcs = runtime.GOMAXPROCS(0)
lockItems = make(chan *Lock, maxProcs)
)
c.logger.Println(rootCtx, "heartbeat concurrency level:", maxProcs)
for i := 0; i < maxProcs; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for lockItem := range lockItems {
c.heartbeatLock(rootCtx, lockItem)
}
}()
}
c.locks.Range(func(_ string, lockItem *Lock) bool {
if err := c.SendHeartbeat(lockItem); err != nil {
c.logger.Println(ctx, "error sending heartbeat to", lockItem.partitionKey, ":", err)
}
lockItems <- lockItem
return true
})
if ctx.Err() != nil {
c.logger.Println(ctx, "client closed, stopping heartbeat")
return
}
close(lockItems)
c.logger.Println(rootCtx, "all heartbeats are dispatched")
wg.Wait()
c.logger.Println(rootCtx, "all heartbeats are processed")
}
}

func (c *Client) heartbeatLock(rootCtx context.Context, lockItem *Lock) {
ctx, cancel := context.WithTimeout(rootCtx, c.heartbeatPeriod)
defer cancel()
if err := c.SendHeartbeatWithContext(ctx, lockItem); err != nil {
c.logger.Println(ctx, "error sending heartbeat to", lockItem.partitionKey, ":", err)
}
}

Expand Down Expand Up @@ -1030,18 +1065,19 @@ func (c *Client) removeKillSessionMonitor(monitorName string) {
cancel()
}

func (c *Client) lockSessionMonitorChecker(ctx context.Context,
monitorName string, lock *Lock) {
func (c *Client) lockSessionMonitorChecker(ctx context.Context, monitorName string, lock *Lock) {
go func() {
defer c.sessionMonitorCancellations.Delete(monitorName)
for {
lock.semaphore.Lock()
timeUntilDangerZone, err := lock.timeUntilDangerZoneEntered()
isExpired := lock.isExpired()
timeUntilDangerZone := lock.timeUntilDangerZoneEntered()
lock.semaphore.Unlock()
if err != nil {
c.logger.Println(ctx, "cannot run session monitor because", err)
if isExpired {
c.logger.Println(ctx, "lock expired", timeUntilDangerZone)
return
}
c.logger.Println(ctx, "lockSessionMonitorChecker", "monitorName:", monitorName, "timeUntilDangerZone:", timeUntilDangerZone, time.Now().Add(timeUntilDangerZone))
if timeUntilDangerZone <= 0 {
go lock.sessionMonitor.callback()
return
Expand Down
4 changes: 3 additions & 1 deletion v2/client_heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ func (c *Client) SendHeartbeatWithContext(ctx context.Context, lockItem *Lock, o
}
targetRVN := c.generateRecordVersionNumber()
err := c.sendHeartbeat(ctx, sho, currentRVN, targetRVN)
if err != nil {
if errors.Is(err, ctx.Err()) {
return ctx.Err()
} else if err != nil {
err = c.retryHeartbeat(ctx, err, sho, currentRVN, targetRVN)
err = parseDynamoDBError(err, "already acquired lock, stopping heartbeats")
if errors.As(err, new(*LockNotGrantedError)) {
Expand Down
120 changes: 120 additions & 0 deletions v2/client_session_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ limitations under the License.
package dynamolock_test

import (
"bytes"
"context"
"errors"
"fmt"
"log"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -190,3 +194,119 @@ func TestSessionMonitorFullCycle(t *testing.T) {
t.Error("lockedItem should be already expired:", ok, err)
}
}

func TestSessionMonitorMissedCall(t *testing.T) {
t.Parallel()

cases := []struct {
leaseDuration time.Duration
heartbeatPeriod time.Duration
}{
{6 * time.Second, 1 * time.Second},
{15 * time.Second, 1 * time.Second},
{15 * time.Second, 3 * time.Second},
{20 * time.Second, 5 * time.Second},
}
for _, tt := range cases {
tt := tt
safeZone := tt.leaseDuration - (3 * tt.heartbeatPeriod)
t.Run(fmt.Sprintf("%s/%s/%s", tt.leaseDuration, tt.heartbeatPeriod, safeZone), func(t *testing.T) {
t.Parallel()
lockName := randStr()
t.Log("lockName:", lockName)
cfg, proxyCloser := proxyConfig(t)
svc := dynamodb.NewFromConfig(cfg)
logger := &bufferedLogger{}
c, err := dynamolock.New(svc,
"locks",
dynamolock.WithLeaseDuration(tt.leaseDuration),
dynamolock.WithOwnerName("TestSessionMonitorMissedCall#1"),
dynamolock.WithHeartbeatPeriod(tt.heartbeatPeriod),
dynamolock.WithPartitionKeyName("key"),
dynamolock.WithLogger(logger),
)
if err != nil {
t.Fatal(err)
}

t.Log("ensuring table exists")
_, _ = c.CreateTable("locks",
dynamolock.WithProvisionedThroughput(&types.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(5),
WriteCapacityUnits: aws.Int64(5),
}),
dynamolock.WithCustomPartitionKeyName("key"),
)

sessionMonitorWasTriggered := make(chan struct{})

data := []byte("some content a")
lockedItem, err := c.AcquireLock(lockName,
dynamolock.WithData(data),
dynamolock.ReplaceData(),
dynamolock.WithSessionMonitor(safeZone, func() {
close(sessionMonitorWasTriggered)
}),
)
if err != nil {
t.Fatal(err)
}
t.Log("lock acquired, closing proxy")
proxyCloser()
t.Log("proxy closed")

t.Log("waiting", tt.leaseDuration)
select {
case <-time.After(tt.leaseDuration):
t.Error("session monitor was not triggered")
case <-sessionMonitorWasTriggered:
t.Log("session monitor was triggered")
}
t.Log("isExpired", lockedItem.IsExpired())

t.Log(logger.String())
})
}
}

type bufferedLogger struct {
mu sync.Mutex
buf bytes.Buffer
logger *log.Logger
}

func (bl *bufferedLogger) String() string {
bl.mu.Lock()
defer bl.mu.Unlock()
return bl.buf.String()
}

func (bl *bufferedLogger) Println(a ...any) {
bl.mu.Lock()
defer bl.mu.Unlock()
if bl.logger == nil {
bl.logger = log.New(&bl.buf, "", 0)
}
bl.logger.Println(a...)
}

type bufferedContextLogger struct {
mu sync.Mutex
buf bytes.Buffer
logger *log.Logger
}

func (bl *bufferedContextLogger) String() string {
bl.mu.Lock()
defer bl.mu.Unlock()
return bl.buf.String()
}

func (bl *bufferedContextLogger) Println(_ context.Context, a ...any) {
bl.mu.Lock()
defer bl.mu.Unlock()
if bl.logger == nil {
bl.logger = log.New(&bl.buf, "", 0)
}
bl.logger.Println(a...)
}
4 changes: 2 additions & 2 deletions v2/client_sort_key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func TestSortKeyReadLockContentAfterDeleteOnRelease(t *testing.T) {
sortKeyTable,
dynamolock.WithLeaseDuration(3*time.Second),
dynamolock.WithHeartbeatPeriod(1*time.Second),
dynamolock.WithOwnerName("TestReadLockContentAfterDeleteOnRelease#1"),
dynamolock.WithOwnerName("TestSortKeyReadLockContentAfterDeleteOnRelease#1"),
dynamolock.WithPartitionKeyName("key"),
dynamolock.WithSortKey("sortkey", "sortvalue"),
)
Expand Down Expand Up @@ -328,7 +328,7 @@ func TestSortKeyReadLockContentAfterDeleteOnRelease(t *testing.T) {
sortKeyTable,
dynamolock.WithLeaseDuration(3*time.Second),
dynamolock.WithHeartbeatPeriod(1*time.Second),
dynamolock.WithOwnerName("TestReadLockContentAfterDeleteOnRelease#2"),
dynamolock.WithOwnerName("TestSortKeyReadLockContentAfterDeleteOnRelease#2"),
dynamolock.WithSortKey("sortkey", "sortvalue"),
)
if err != nil {
Expand Down
Loading

0 comments on commit 277066f

Please sign in to comment.