Skip to content

Commit

Permalink
Bump retries & fix context timeouts (#131)
Browse files Browse the repository at this point in the history
* Bump built-in retries to 20

Default is 3, with a max back-off time of 20s between attempts

https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/retries-timeouts/#standard-retryer

* NewContext should actually be new

It shouldn't be based on the previous context, and therefore inherit its deadline and just try to implement an additional one - this will never work because the old deadline will always be the initial timeout time after launch, and we aren't actually renewing a timer at all. By just creating a brand new context every time instead and throwing the old one away, we're actually getting a fresh timer.

* remove double-sleep
  • Loading branch information
holtwilkins authored Oct 25, 2021
1 parent 92f1b01 commit 8efbc96
Show file tree
Hide file tree
Showing 12 changed files with 63 additions and 35 deletions.
9 changes: 8 additions & 1 deletion aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"strconv"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/aws/retry"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/autoscaling"
at "github.com/aws/aws-sdk-go-v2/service/autoscaling/types"
Expand All @@ -44,7 +46,12 @@ func GetAWSClients(ctx context.Context) (*Clients, error) {
region = "us-east-1"
}

cfg, err := config.LoadDefaultConfig(ctx, config.WithRegion(region))
cfg, err := config.LoadDefaultConfig(ctx,
config.WithRegion(region),
config.WithRetryer(func() aws.Retryer {
return retry.AddWithMaxAttempts(retry.NewStandard(), 20)
}),
)
if err != nil {
return nil, errors.Wrap(err, "Error opening default AWS config")
}
Expand Down
29 changes: 20 additions & 9 deletions bouncer/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ const (

asgSeparator = ","
desiredCapSeparator = ":"

debugTimeFormat = "2006-01-02 15:04:05 MST"
)

// NewBaseRunner instantiates a BaseRunner
Expand Down Expand Up @@ -189,23 +191,32 @@ func (r *BaseRunner) SetDesiredCapacity(ctx context.Context, asg *ASG, desiredCa
}

// NewContext generates a context with the ItemTimeout from the parent context given
func (r *BaseRunner) NewContext(ctx context.Context) (context.Context, context.CancelFunc) {
return context.WithTimeout(ctx, r.Opts.ItemTimeout)
}
func (r *BaseRunner) NewContext() (context.Context, context.CancelFunc) {
ctx, cancel := context.WithTimeout(context.Background(), r.Opts.ItemTimeout)
dn, _ := ctx.Deadline()

// ResetAndSleep resets our context timer (because we just performed a mutation action), and then sleeps
func (r *BaseRunner) ResetAndSleep(ctx context.Context) (context.Context, context.CancelFunc) {
log.Debugf("Resetting timer")
l := log.WithFields(log.Fields{
"Context deadline": dn.Format(debugTimeFormat),
"Current time": getHumanCurrentTime(),
})

ctx, cancel := r.NewContext(ctx)
r.Sleep(ctx)
l.Debug("Generating fresh context")

return ctx, cancel
}

func getHumanCurrentTime() string {
return time.Now().Format(debugTimeFormat)
}

// Sleep makes us sleep for the constant time - call this when waiting for an AWS change
func (r *BaseRunner) Sleep(ctx context.Context) {
log.Debugf("Sleeping for %v", waitBetweenChecks)
l := log.WithFields(log.Fields{
"Sleep Duration": waitBetweenChecks,
"Current time": getHumanCurrentTime(),
})

l.Debug("Sleeping between checks")

select {
case <-time.After(waitBetweenChecks):
Expand Down
10 changes: 6 additions & 4 deletions canary/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ func (r *Runner) ValidatePrereqs(ctx context.Context) error {
}

// Run has the meat of the batch job
func (r *Runner) Run(ctx context.Context) error {
func (r *Runner) Run() error {
var newDesiredCapacity int32

ctx, cancel := r.NewContext(ctx)
ctx, cancel := r.NewContext()
defer cancel()

for {
Expand Down Expand Up @@ -161,8 +161,9 @@ func (r *Runner) Run(ctx context.Context) error {
return errors.Wrap(err, "error killing instance")
}
}
ctx, cancel = r.ResetAndSleep(ctx)
ctx, cancel = r.NewContext()
defer cancel()
r.Sleep(ctx)

continue
}
Expand Down Expand Up @@ -202,8 +203,9 @@ func (r *Runner) Run(ctx context.Context) error {
return errors.Wrap(err, "error setting desired capacity")
}

ctx, cancel = r.ResetAndSleep(ctx)
ctx, cancel = r.NewContext()
defer cancel()
r.Sleep(ctx)

continue
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/canary.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ var canaryCmd = &cobra.Command{
log.Fatal(err)
}

err = r.Run(ctx)
err = r.Run()
if err != nil {
log.Fatal(errors.Wrap(err, "error in run"))
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ var fullCmd = &cobra.Command{
log.Fatal(err)
}

err = r.Run(ctx)
err = r.Run()
if err != nil {
log.Fatal(errors.Wrap(err, "error in run"))
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/rolling.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ var rollingCmd = &cobra.Command{
log.Fatal(err)
}

err = r.Run(ctx)
err = r.Run()
if err != nil {
log.Fatal(errors.Wrap(err, "error in run"))
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/serial.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ var serialCmd = &cobra.Command{
log.Fatal(err)
}

err = r.Run(ctx)
err = r.Run()
if err != nil {
log.Fatal(errors.Wrap(err, "error in run"))
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/slow-canary.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ var slowCanaryCmd = &cobra.Command{
log.Fatal(err)
}

err = r.Run(ctx)
err = r.Run()
if err != nil {
log.Fatal(errors.Wrap(err, "error in run"))
}
Expand Down
10 changes: 6 additions & 4 deletions full/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ func asgSetWrapper(asg *bouncer.ASG) *bouncer.ASGSet {
}

// Run has the meat of the batch job
func (r *Runner) Run(ctx context.Context) error {
func (r *Runner) Run() error {
var newDesiredCapacity int32

ctx, cancel := r.NewContext(ctx)
ctx, cancel := r.NewContext()
defer cancel()

start:
Expand Down Expand Up @@ -132,8 +132,9 @@ start:
return errors.Wrap(err, "failed to kill instance")
}

ctx, cancel = r.ResetAndSleep(ctx)
ctx, cancel = r.NewContext()
defer cancel()
r.Sleep(ctx)

continue start
}
Expand All @@ -150,8 +151,9 @@ start:
return errors.Wrap(err, "error setting desired capacity")
}

ctx, cancel = r.ResetAndSleep(ctx)
ctx, cancel = r.NewContext()
defer cancel()
r.Sleep(ctx)

continue start
}
Expand Down
7 changes: 4 additions & 3 deletions rolling/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ func (r *Runner) ValidatePrereqs(ctx context.Context) error {
}

// Run has the meat of the batch job
func (r *Runner) Run(ctx context.Context) error {
ctx, cancel := r.NewContext(ctx)
func (r *Runner) Run() error {
ctx, cancel := r.NewContext()
defer cancel()

for {
Expand All @@ -104,8 +104,9 @@ func (r *Runner) Run(ctx context.Context) error {
return errors.Wrap(err, "error finding or killing best old instance")
}

ctx, cancel = r.ResetAndSleep(ctx)
ctx, cancel = r.NewContext()
defer cancel()
r.Sleep(ctx)

continue
}
Expand Down
10 changes: 6 additions & 4 deletions serial/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ func (r *Runner) ValidatePrereqs(ctx context.Context) error {
}

// Run has the meat of the batch job
func (r *Runner) Run(ctx context.Context) error {
ctx, cancel := r.NewContext(ctx)
func (r *Runner) Run() error {
ctx, cancel := r.NewContext()
defer cancel()

for {
Expand All @@ -116,8 +116,9 @@ func (r *Runner) Run(ctx context.Context) error {
}

if len(divergedASGs) != 0 {
ctx, cancel = r.ResetAndSleep(ctx)
ctx, cancel = r.NewContext()
defer cancel()
r.Sleep(ctx)

continue
}
Expand All @@ -129,8 +130,9 @@ func (r *Runner) Run(ctx context.Context) error {
return errors.Wrap(err, "error finding or killing best old instance")
}

ctx, cancel = r.ResetAndSleep(ctx)
ctx, cancel = r.NewContext()
defer cancel()
r.Sleep(ctx)

continue
}
Expand Down
13 changes: 8 additions & 5 deletions slowcanary/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ func (r *Runner) ValidatePrereqs(ctx context.Context) error {
}

// Run has the meat of the batch job
func (r *Runner) Run(ctx context.Context) error {
func (r *Runner) Run() error {
var newDesiredCapacity int32

ctx, cancel := r.NewContext(ctx)
ctx, cancel := r.NewContext()
defer cancel()

for {
Expand Down Expand Up @@ -132,8 +132,9 @@ func (r *Runner) Run(ctx context.Context) error {
return errors.Wrap(err, "error setting desired capacity")
}

ctx, cancel = r.ResetAndSleep(ctx)
ctx, cancel = r.NewContext()
defer cancel()
r.Sleep(ctx)

continue
} else if *curDesiredCapacity == *finDesiredCapacity+1 {
Expand All @@ -160,8 +161,9 @@ func (r *Runner) Run(ctx context.Context) error {
return errors.Wrap(err, "error killing instance")
}

ctx, cancel = r.ResetAndSleep(ctx)
ctx, cancel = r.NewContext()
defer cancel()
r.Sleep(ctx)

continue
}
Expand All @@ -177,8 +179,9 @@ func (r *Runner) Run(ctx context.Context) error {
return errors.Wrap(err, "error killing instance")
}

ctx, cancel = r.ResetAndSleep(ctx)
ctx, cancel = r.NewContext()
defer cancel()
r.Sleep(ctx)

continue
}
Expand Down

0 comments on commit 8efbc96

Please sign in to comment.