forked from eclipse-paho/paho.mqtt.golang
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add back-off controller for sleep time of reconnection when connectio…
…n lost is detected immediately after connecting. eclipse-paho#589 Signed-off-by: Daichi Tomaru <[email protected]>
- Loading branch information
Showing
3 changed files
with
141 additions
and
8 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
/* | ||
* Copyright (c) 2021 IBM Corp and others. | ||
* | ||
* All rights reserved. This program and the accompanying materials | ||
* are made available under the terms of the Eclipse Public License v2.0 | ||
* and Eclipse Distribution License v1.0 which accompany this distribution. | ||
* | ||
* The Eclipse Public License is available at | ||
* https://www.eclipse.org/legal/epl-2.0/ | ||
* and the Eclipse Distribution License is available at | ||
* http://www.eclipse.org/org/documents/edl-v10.php. | ||
* | ||
* Contributors: | ||
* Matt Brittan | ||
* Daichi Tomaru | ||
*/ | ||
|
||
package mqtt | ||
|
||
import ( | ||
"sync" | ||
"time" | ||
) | ||
|
||
// Controller for sleep period with backoff when some reconnection attempting or connection lost occure. | ||
// It has statuses for each situation caused retry. | ||
type backoffController struct { | ||
sync.RWMutex | ||
statusMap map[string]*backoffStatus | ||
} | ||
|
||
type backoffStatus struct { | ||
lastSleepPeriod time.Duration | ||
lastErrorTime time.Time | ||
} | ||
|
||
func newBackoffController() *backoffController { | ||
return &backoffController{ | ||
statusMap: map[string]*backoffStatus{}, | ||
} | ||
} | ||
|
||
// Calculate next sleep period from initial and max one and elapsed time since last sleeping. | ||
// Returned values are next sleep period and whether the error situation is continual. | ||
// If connection errors continuouslly occurs, its sleep period is exponentially increased. | ||
// Also if there is a lot of time between last and this error, sleep period is initialized. | ||
func (b *backoffController) getBackoffSleepTime( | ||
initSleepPeriod time.Duration, maxSleepPeriod time.Duration, situation string, processTime time.Duration, | ||
) (time.Duration, bool) { | ||
b.Lock() | ||
defer b.Unlock() | ||
|
||
status, exist := b.statusMap[situation] | ||
if !exist { | ||
b.statusMap[situation] = &backoffStatus{initSleepPeriod, time.Now()} | ||
return initSleepPeriod, false | ||
} | ||
|
||
oldTime := status.lastErrorTime | ||
status.lastErrorTime = time.Now() | ||
|
||
// When there is a lot of time between last and this error, sleep period is initialized. | ||
if status.lastErrorTime.Sub(oldTime) > (processTime * 2 + status.lastSleepPeriod) { | ||
status.lastSleepPeriod = initSleepPeriod | ||
return initSleepPeriod, false | ||
} | ||
|
||
if nextSleepPeriod := status.lastSleepPeriod * 2; nextSleepPeriod <= maxSleepPeriod { | ||
status.lastSleepPeriod = nextSleepPeriod | ||
} else { | ||
status.lastSleepPeriod = maxSleepPeriod | ||
} | ||
|
||
return status.lastSleepPeriod, true | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
/* | ||
* Copyright (c) 2021 IBM Corp and others. | ||
* | ||
* All rights reserved. This program and the accompanying materials | ||
* are made available under the terms of the Eclipse Public License v2.0 | ||
* and Eclipse Distribution License v1.0 which accompany this distribution. | ||
* | ||
* The Eclipse Public License is available at | ||
* https://www.eclipse.org/legal/epl-2.0/ | ||
* and the Eclipse Distribution License is available at | ||
* http://www.eclipse.org/org/documents/edl-v10.php. | ||
* | ||
* Contributors: | ||
* Matt Brittan | ||
* Daichi Tomaru | ||
*/ | ||
|
||
package mqtt | ||
|
||
import ( | ||
"testing" | ||
"time" | ||
) | ||
|
||
func TestGetBackoffSleepTime(t *testing.T) { | ||
// Test for adding new situation | ||
controller := newBackoffController() | ||
if s, c := controller.getBackoffSleepTime(1 * time.Second, 5 * time.Second, "not-exist", 1 * time.Second); !((s == 1 * time.Second) && !c) { | ||
t.Errorf("When new situation is added, period should be initSleepPeriod and naturally it shouldn't be continual error. s:%d c%t", s, c) | ||
} | ||
|
||
// Test for the continual error in the same situation and suppression of sleep period by maxSleepPeriod | ||
controller.getBackoffSleepTime(10 * time.Second, 30 * time.Second, "multi", 1 * time.Second) | ||
if s, c := controller.getBackoffSleepTime(10 * time.Second, 30 * time.Second, "multi", 1 * time.Second); !((s == 20 * time.Second) && c) { | ||
t.Errorf("When same situation is called again, period should be increased and it should be regarded as a continual error. s:%d c%t", s, c) | ||
} | ||
if s, c := controller.getBackoffSleepTime(10 * time.Second, 30 * time.Second, "multi", 1 * time.Second); !((s == 30 * time.Second) && c) { | ||
t.Errorf("A same situation is called three times. 10 * 2 * 2 = 40 but maxSleepPeriod is 30. So the next period should be 30. s:%d c%t", s, c) | ||
} | ||
|
||
// Test for initialization by elapsed time. | ||
controller.getBackoffSleepTime(1 * time.Second, 128 * time.Second, "elapsed", 1 * time.Second) | ||
controller.getBackoffSleepTime(1 * time.Second, 128 * time.Second, "elapsed", 1 * time.Second) | ||
time.Sleep((1 * 2 + 1 * 2 + 1) * time.Second) | ||
if s, c := controller.getBackoffSleepTime(1 * time.Second, 128 * time.Second, "elapsed", 1 * time.Second); !((s == 1 * time.Second) && !c) { | ||
t.Errorf("Initialization should be triggered by elapsed time. s:%d c%t", s, c) | ||
} | ||
|
||
// Test when initial and max period is same. | ||
controller.getBackoffSleepTime(1 * time.Second, 1 * time.Second, "same", 1 * time.Second) | ||
if s, c := controller.getBackoffSleepTime(1 * time.Second, 1 * time.Second, "same", 1 * time.Second); !((s == 1 * time.Second) && c) { | ||
t.Errorf("Sleep time should be always 1. s:%d c%t", s, c) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters