Skip to content

Commit

Permalink
Merge pull request #625 from tomatod/connect_retry_backoff - Prevent …
Browse files Browse the repository at this point in the history
…reconnect loops

Add back-off controller for sleep time of reconnection  when connection lost is detected immediately after connecting. #589
This issue could be caused by an invalid publish request (which leads to the broker dropping the connection immediately).
  • Loading branch information
MattBrittan authored Jan 8, 2023
2 parents 4b066a0 + d174b9a commit e3fa503
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 10 deletions.
104 changes: 104 additions & 0 deletions backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright (c) 2021 IBM Corp and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0/
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Matt Brittan
* Daichi Tomaru
*/

package mqtt

import (
"sync"
"time"
)

// Controller for sleep with backoff when the client attempts reconnection
// It has statuses for each situations cause reconnection.
type backoffController struct {
sync.RWMutex
statusMap map[string]*backoffStatus
}

type backoffStatus struct {
lastSleepPeriod time.Duration
lastErrorTime time.Time
}

func newBackoffController() *backoffController {
return &backoffController{
statusMap: map[string]*backoffStatus{},
}
}

// Calculate next sleep period from the specified parameters.
// Returned values are next sleep period and whether the error situation is continual.
// If connection errors continuouslly occurs, its sleep period is exponentially increased.
// Also if there is a lot of time between last and this error, sleep period is initialized.
func (b *backoffController) getBackoffSleepTime(
situation string, initSleepPeriod time.Duration, maxSleepPeriod time.Duration, processTime time.Duration, skipFirst bool,
) (time.Duration, bool) {
// Decide first sleep time if the situation is not continual.
var firstProcess = func(status *backoffStatus, init time.Duration, skip bool) (time.Duration, bool) {
if skip {
status.lastSleepPeriod = 0
return 0, false
}
status.lastSleepPeriod = init
return init, false
}

// Prioritize maxSleep.
if initSleepPeriod > maxSleepPeriod {
initSleepPeriod = maxSleepPeriod
}
b.Lock()
defer b.Unlock()

status, exist := b.statusMap[situation]
if !exist {
b.statusMap[situation] = &backoffStatus{initSleepPeriod, time.Now()}
return firstProcess(b.statusMap[situation], initSleepPeriod, skipFirst)
}

oldTime := status.lastErrorTime
status.lastErrorTime = time.Now()

// When there is a lot of time between last and this error, sleep period is initialized.
if status.lastErrorTime.Sub(oldTime) > (processTime * 2 + status.lastSleepPeriod) {
return firstProcess(status, initSleepPeriod, skipFirst)
}

if status.lastSleepPeriod == 0 {
status.lastSleepPeriod = initSleepPeriod
return initSleepPeriod, true
}

if nextSleepPeriod := status.lastSleepPeriod * 2; nextSleepPeriod <= maxSleepPeriod {
status.lastSleepPeriod = nextSleepPeriod
} else {
status.lastSleepPeriod = maxSleepPeriod
}

return status.lastSleepPeriod, true
}

// Execute sleep the time returned from getBackoffSleepTime.
func (b *backoffController) sleepWithBackoff(
situation string, initSleepPeriod time.Duration, maxSleepPeriod time.Duration, processTime time.Duration, skipFirst bool,
) (time.Duration, bool) {
sleep, isFirst := b.getBackoffSleepTime(situation, initSleepPeriod, maxSleepPeriod, processTime, skipFirst)
if sleep != 0 {
time.Sleep(sleep)
}
return sleep, isFirst
}
68 changes: 68 additions & 0 deletions backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (c) 2021 IBM Corp and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0/
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Matt Brittan
* Daichi Tomaru
*/

package mqtt

import (
"testing"
"time"
)

func TestGetBackoffSleepTime(t *testing.T) {
// Test for adding new situation
controller := newBackoffController()
if s, c := controller.getBackoffSleepTime("not-exist", 1 * time.Second, 5 * time.Second, 1 * time.Second, false); !((s == 1 * time.Second) && !c) {
t.Errorf("When new situation is added, period should be initSleepPeriod and naturally it shouldn't be continual error. s:%d c%t", s, c)
}

// Test for the continual error in the same situation and suppression of sleep period by maxSleepPeriod
controller.getBackoffSleepTime("multi", 10 * time.Second, 30 * time.Second, 1 * time.Second, false)
if s, c := controller.getBackoffSleepTime("multi", 10 * time.Second, 30 * time.Second, 1 * time.Second, false); !((s == 20 * time.Second) && c) {
t.Errorf("When same situation is called again, period should be increased and it should be regarded as a continual error. s:%d c%t", s, c)
}
if s, c := controller.getBackoffSleepTime("multi", 10 * time.Second, 30 * time.Second, 1 * time.Second, false); !((s == 30 * time.Second) && c) {
t.Errorf("A same situation is called three times. 10 * 2 * 2 = 40 but maxSleepPeriod is 30. So the next period should be 30. s:%d c%t", s, c)
}

// Test for initialization by elapsed time.
controller.getBackoffSleepTime("elapsed", 1 * time.Second, 128 * time.Second, 1 * time.Second, false)
controller.getBackoffSleepTime("elapsed", 1 * time.Second, 128 * time.Second, 1 * time.Second, false)
time.Sleep((1 * 2 + 1 * 2 + 1) * time.Second)
if s, c := controller.getBackoffSleepTime("elapsed", 1 * time.Second, 128 * time.Second, 1 * time.Second, false); !((s == 1 * time.Second) && !c) {
t.Errorf("Initialization should be triggered by elapsed time. s:%d c%t", s, c)
}

// Test when initial and max period is same.
controller.getBackoffSleepTime("same", 2 * time.Second, 2 * time.Second, 1 * time.Second, false)
if s, c := controller.getBackoffSleepTime("same", 2 * time.Second, 2 * time.Second, 1 * time.Second, false); !((s == 2 * time.Second) && c) {
t.Errorf("Sleep time should be always 2. s:%d c%t", s, c)
}

// Test when initial period > max period.
controller.getBackoffSleepTime("bigger", 5 * time.Second, 2 * time.Second, 1 * time.Second, false)
if s, c := controller.getBackoffSleepTime("bigger", 5 * time.Second, 2 * time.Second, 1 * time.Second, false); !((s == 2 * time.Second) && c) {
t.Errorf("Sleep time should be 2. s:%d c%t", s, c)
}

// Test when first sleep is skipped.
if s, c := controller.getBackoffSleepTime("skip", 3 * time.Second, 12 * time.Second, 1 * time.Second, true); !((s == 0) && !c) {
t.Errorf("Sleep time should be 0 because of skip. s:%d c%t", s, c)
}
if s, c := controller.getBackoffSleepTime("skip", 3 * time.Second, 12 * time.Second, 1 * time.Second, true); !((s == 3 * time.Second) && c) {
t.Errorf("Sleep time should be 3. s:%d c%t", s, c)
}
}
22 changes: 12 additions & 10 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ type client struct {
stop chan struct{} // Closed to request that workers stop
workers sync.WaitGroup // used to wait for workers to complete (ping, keepalive, errwatch, resume)
commsStopped chan struct{} // closed when the comms routines have stopped (kept running until after workers have closed to avoid deadlocks)

backoff *backoffController
}

// NewClient will create an MQTT v3.1.1 client with all of the options specified
Expand Down Expand Up @@ -169,6 +171,7 @@ func NewClient(o *ClientOptions) Client {
c.msgRouter.setDefaultHandler(c.options.DefaultPublishHandler)
c.obound = make(chan *PacketAndToken)
c.oboundP = make(chan *PacketAndToken)
c.backoff = newBackoffController()
return c
}

Expand Down Expand Up @@ -302,10 +305,16 @@ func (c *client) Connect() Token {
func (c *client) reconnect(connectionUp connCompletedFn) {
DEBUG.Println(CLI, "enter reconnect")
var (
sleep = 1 * time.Second
initSleep = 1 * time.Second
conn net.Conn
)

// If the reason of connection lost is same as the before one, sleep timer is set before attempting connection is started.
// Sleep time is exponentially increased as the same situation continues
if slp, isContinual := c.backoff.sleepWithBackoff("connectionLost", initSleep, c.options.MaxReconnectInterval, 3 * time.Second, true); isContinual {
DEBUG.Println(CLI, "Detect continual connection lost after reconnect, slept for", int(slp.Seconds()), "seconds")
}

for {
if nil != c.options.OnReconnecting {
c.options.OnReconnecting(c, &c.options)
Expand All @@ -315,15 +324,8 @@ func (c *client) reconnect(connectionUp connCompletedFn) {
if err == nil {
break
}
DEBUG.Println(CLI, "Reconnect failed, sleeping for", int(sleep.Seconds()), "seconds:", err)
time.Sleep(sleep)
if sleep < c.options.MaxReconnectInterval {
sleep *= 2
}

if sleep > c.options.MaxReconnectInterval {
sleep = c.options.MaxReconnectInterval
}
sleep, _ := c.backoff.sleepWithBackoff("attemptReconnection", initSleep, c.options.MaxReconnectInterval, c.options.ConnectTimeout, false)
DEBUG.Println(CLI, "Reconnect failed, slept for", int(sleep.Seconds()), "seconds:", err)

if c.status.ConnectionStatus() != reconnecting { // Disconnect may have been called
if err := connectionUp(false); err != nil { // Should always return an error
Expand Down

0 comments on commit e3fa503

Please sign in to comment.