Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add maintenance lock to sequencers #1709

Merged
merged 10 commits into from
Jun 21, 2023
115 changes: 69 additions & 46 deletions arbnode/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,19 @@ import (
type MaintenanceRunner struct {
stopwaiter.StopWaiter

config MaintenanceConfigFetcher
seqCoordinator *SeqCoordinator
dbs []ethdb.Database
lastCheck time.Time
config MaintenanceConfigFetcher
seqCoordinator *SeqCoordinator
dbs []ethdb.Database
lastMaintenance time.Time

// lock is used to ensures that at any given time, only single node is on
// maintenance mode.
lock *SimpleRedisLock
}

type MaintenanceConfig struct {
TimeOfDay string `koanf:"time-of-day" reload:"hot"`
TimeOfDay string `koanf:"time-of-day" reload:"hot"`
Lock SimpleRedisLockConfig `koanf:"lock" reload:"hot"`

// Generated: the minutes since start of UTC day to compact at
minutesAfterMidnight int
Expand Down Expand Up @@ -65,6 +70,7 @@ func (c *MaintenanceConfig) Validate() error {

func MaintenanceConfigAddOptions(prefix string, f *flag.FlagSet) {
f.String(prefix+".time-of-day", DefaultMaintenanceConfig.TimeOfDay, "UTC 24-hour time of day to run maintenance (currently only db compaction) at (e.g. 15:00)")
RedisLockConfigAddOptions(prefix+".lock", f)
}

var DefaultMaintenanceConfig = MaintenanceConfig{
Expand All @@ -76,21 +82,32 @@ var DefaultMaintenanceConfig = MaintenanceConfig{
type MaintenanceConfigFetcher func() *MaintenanceConfig

func NewMaintenanceRunner(config MaintenanceConfigFetcher, seqCoordinator *SeqCoordinator, dbs []ethdb.Database) (*MaintenanceRunner, error) {
err := config().Validate()
if err != nil {
return nil, err
}
return &MaintenanceRunner{
config: config,
seqCoordinator: seqCoordinator,
dbs: dbs,
lastCheck: time.Now().UTC(),
}, nil
cfg := config()
if err := cfg.Validate(); err != nil {
return nil, fmt.Errorf("validating config: %w", err)
}
res := &MaintenanceRunner{
config: config,
seqCoordinator: seqCoordinator,
dbs: dbs,
lastMaintenance: time.Now().UTC(),
}

if seqCoordinator != nil {
c := func() *SimpleRedisLockConfig { return &cfg.Lock }
r := func() bool { return true } // always ready to lock
rl, err := NewSimpleRedisLock(seqCoordinator.Client, c, r)
if err != nil {
return nil, fmt.Errorf("creating new simple redis lock: %w", err)
}
res.lock = rl
}
return res, nil
}

func (c *MaintenanceRunner) Start(ctxIn context.Context) {
c.StopWaiter.Start(ctxIn, c)
c.CallIteratively(c.maybeRunMaintenance)
func (mr *MaintenanceRunner) Start(ctxIn context.Context) {
mr.StopWaiter.Start(ctxIn, mr)
mr.CallIteratively(mr.maybeRunMaintenance)
}

func wentPastTimeOfDay(before time.Time, after time.Time, timeOfDay int) bool {
Expand All @@ -112,47 +129,53 @@ func wentPastTimeOfDay(before time.Time, after time.Time, timeOfDay int) bool {
return prevMinutes < dbCompactionMinutes && newMinutes >= dbCompactionMinutes
}

func (c *MaintenanceRunner) maybeRunMaintenance(ctx context.Context) time.Duration {
config := c.config()
func (mr *MaintenanceRunner) maybeRunMaintenance(ctx context.Context) time.Duration {
config := mr.config()
if !config.enabled {
return time.Minute
}

now := time.Now().UTC()
if wentPastTimeOfDay(c.lastCheck, now, config.minutesAfterMidnight) {
log.Info("attempting to release sequencer lockout to run database compaction", "targetTime", config.TimeOfDay)
if c.seqCoordinator == nil {
c.runMaintenance()
} else {
// We want to switch sequencers before running maintenance
success := c.seqCoordinator.AvoidLockout(ctx)
defer c.seqCoordinator.SeekLockout(ctx) // needs called even if c.Zombify returns false
if success {
// We've unset the wants lockout key, now wait for the handoff
success = c.seqCoordinator.TryToHandoffChosenOne(ctx)
if success {
c.runMaintenance()
}
}
}

if !wentPastTimeOfDay(mr.lastMaintenance, now, config.minutesAfterMidnight) {
return time.Minute
}
c.lastCheck = now

if mr.seqCoordinator == nil {
mr.lastMaintenance = now
mr.runMaintenance()
return time.Minute
}

if !mr.lock.AttemptLock(ctx) {
return time.Minute
anodar marked this conversation as resolved.
Show resolved Hide resolved
}
defer mr.lock.Release(ctx)

log.Info("Attempting avoiding lockout and handing off", "targetTime", config.TimeOfDay)
// Avoid lockout for the sequencer and try to handoff.
if mr.seqCoordinator.AvoidLockout(ctx) && mr.seqCoordinator.TryToHandoffChosenOne(ctx) {
mr.lastMaintenance = now
mr.runMaintenance()
}
defer mr.seqCoordinator.SeekLockout(ctx) // needs called even if c.Zombify returns false

return time.Minute
}

func (c *MaintenanceRunner) runMaintenance() {
log.Info("compacting databases (this may take a while...)")
results := make(chan error, len(c.dbs))
for _, db := range c.dbs {
func (mr *MaintenanceRunner) runMaintenance() {
log.Info("Compacting databases (this may take a while...)")
results := make(chan error, len(mr.dbs))
for _, db := range mr.dbs {
db := db
go func() {
results <- db.Compact(nil, nil)
}()
}
for range c.dbs {
err := <-results
if err != nil {
log.Warn("failed to compact database", "err", err)
for range mr.dbs {
if err := <-results; err != nil {
log.Warn("Failed to compact database", "err", err)
}
}
log.Info("done compacting databases")
log.Info("Done compacting databases")
}
49 changes: 24 additions & 25 deletions arbnode/maintenance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,38 @@
package arbnode

import (
"fmt"
"testing"
"time"
)

func TestWentPastTimeOfDay(t *testing.T) {
checkWentPastTimeOfDay := func(before time.Time, after time.Time, timeOfDay string, expected bool) {
config := MaintenanceConfig{
TimeOfDay: timeOfDay,
}
Require(t, config.Validate(), "Failed to validate sample config")
have := wentPastTimeOfDay(before, after, config.minutesAfterMidnight)
if have != expected {
Fail(t, fmt.Sprintf("Expected wentPastTimeOfDay(%v, %v, \"%v\") to return %v but it returned %v", before, after, timeOfDay, expected, have))
}
}

eleven_pm := time.Date(2000, 1, 1, 23, 0, 0, 0, time.UTC)
midnight := time.Date(2000, 1, 2, 0, 0, 0, 0, time.UTC)
one_am := time.Date(2000, 1, 2, 1, 0, 0, 0, time.UTC)

checkWentPastTimeOfDay(eleven_pm, eleven_pm, "23:00", false)
checkWentPastTimeOfDay(midnight, midnight, "00:00", false)
checkWentPastTimeOfDay(one_am, one_am, "1:00", false)

checkWentPastTimeOfDay(eleven_pm, midnight, "23:30", true)
checkWentPastTimeOfDay(eleven_pm, midnight, "00:00", true)
checkWentPastTimeOfDay(eleven_pm, one_am, "00:00", true)
checkWentPastTimeOfDay(eleven_pm, one_am, "01:00", true)
checkWentPastTimeOfDay(eleven_pm, one_am, "02:00", false)
checkWentPastTimeOfDay(eleven_pm, one_am, "12:00", false)
for _, tc := range []struct {
before, after time.Time
timeOfDay string
want bool
}{
{before: eleven_pm, after: eleven_pm, timeOfDay: "23:00"},
{before: midnight, after: midnight, timeOfDay: "00:00"},
{before: one_am, after: one_am, timeOfDay: "1:00"},
{before: eleven_pm, after: midnight, timeOfDay: "23:30", want: true},
{before: eleven_pm, after: midnight, timeOfDay: "00:00", want: true},
{before: eleven_pm, after: one_am, timeOfDay: "00:00", want: true},
{before: eleven_pm, after: one_am, timeOfDay: "01:00", want: true},
{before: eleven_pm, after: one_am, timeOfDay: "02:00"},
{before: eleven_pm, after: one_am, timeOfDay: "12:00"},
{before: midnight, after: one_am, timeOfDay: "00:00"},
{before: midnight, after: one_am, timeOfDay: "00:30", want: true},
{before: midnight, after: one_am, timeOfDay: "01:00", want: true},
} {
config := MaintenanceConfig{TimeOfDay: tc.timeOfDay}
Require(t, config.Validate(), "Failed to validate sample config")

checkWentPastTimeOfDay(midnight, one_am, "00:00", false)
checkWentPastTimeOfDay(midnight, one_am, "00:30", true)
checkWentPastTimeOfDay(midnight, one_am, "01:00", true)
if got := wentPastTimeOfDay(tc.before, tc.after, config.minutesAfterMidnight); got != tc.want {
t.Errorf("wentPastTimeOfDay(%v, %v, %q) = %T want %T", tc.before, tc.after, tc.timeOfDay, got, tc.want)
}
}
}
Loading