Skip to content

Commit

Permalink
Merge pull request #22 from pfnet-research/exit-on-empty-wait-duration
Browse files Browse the repository at this point in the history
support grace period for exit on empty
  • Loading branch information
everpeace authored Aug 14, 2020
2 parents 6d048aa + 73d5b35 commit bfb8681
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 12 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,8 @@ worker:
exitOnSuspend: true
# If true, worker exits when the queue was empty
exitOnEmpty: false
# If exitOnEmpty is true, worker waits for exit in the grace period
exitOnEmptyGracePeriod: 10s
# If the value was positive, worker will exit
# after processing the number of tasks
numTasks: 1000
Expand Down
3 changes: 3 additions & 0 deletions cmd/start_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ func init() {
flag.Bool("exit-on-empty", cmdOpts.Worker.ExitOnEmpty, "if set, worker exits when queue is empty")
viperBindPFlag("Worker.ExitOnEmpty", strconv.FormatBool(cmdOpts.Worker.ExitOnEmpty), flag.Lookup("exit-on-empty"))

flag.Duration("exit-on-empty-grace-period", cmdOpts.Worker.ExitOnEmptyGracePeriod, "if exit-on-empty is true, worker waits for exit in the grace period")
viperBindPFlag("Worker.ExitOnEmptyGracePeriod", cmdOpts.Worker.ExitOnEmptyGracePeriod.String(), flag.Lookup("exit-on-empty-grace-period"))

flag.Int("num-tasks", cmdOpts.Worker.NumTasks, "if set positive value, worker exits after processing the number of tasks")
viperBindPFlag("Worker.NumTasks", strconv.Itoa(cmdOpts.Worker.NumTasks), flag.Lookup("num-tasks"))

Expand Down
17 changes: 9 additions & 8 deletions pkg/apis/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,15 @@ type Worker struct {
}

type WorkerSpec struct {
Name string `json:"name" yaml:"name" default:"-" validate:"required,min=1,max=256"`
Concurrency int `json:"concurrency" yaml:"concurrency" default:"1" validate:"required,min=1"`
TaskHandler TaskHandlerSpec `json:"taskHandler" yaml:"taskHandler" validate:"required"`
HeartBeat HeartBeatSpec `json:"heartBeat" yaml:"heartBeat" validate:"required"`
ExitOnSuspend bool `json:"exitOnSuspend" yaml:"exitOnSuspend" default:"true"`
ExitOnEmpty bool `json:"exitOnEmpty" yaml:"exitOnEmpty" default:"false"`
NumTasks int `json:"numTasks" yaml:"numTasks" default:"100"`
WorkDir string `json:"workDir" yaml:"workDir" default:"/tmp" validate:"isWorkDirValid,required"`
Name string `json:"name" yaml:"name" default:"-" validate:"required,min=1,max=256"`
Concurrency int `json:"concurrency" yaml:"concurrency" default:"1" validate:"required,min=1"`
TaskHandler TaskHandlerSpec `json:"taskHandler" yaml:"taskHandler" validate:"required"`
HeartBeat HeartBeatSpec `json:"heartBeat" yaml:"heartBeat" validate:"required"`
ExitOnSuspend bool `json:"exitOnSuspend" yaml:"exitOnSuspend" default:"true"`
ExitOnEmpty bool `json:"exitOnEmpty" yaml:"exitOnEmpty" default:"false"`
ExitOnEmptyGracePeriod time.Duration `json:"exitOnEmptyGracePeriod" yaml:"exitOnEmptyGracePeriod" default:"10s"`
NumTasks int `json:"numTasks" yaml:"numTasks" default:"100"`
WorkDir string `json:"workDir" yaml:"workDir" default:"/tmp" validate:"isWorkDirValid,required"`
}

type TaskHandlerSpec struct {
Expand Down
18 changes: 14 additions & 4 deletions pkg/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func (w *Worker) Start() error {
}

func (w *Worker) startProcessTasks() error {
var queueEmptyDetectedAt *time.Time
L:
for i := w.config.NumTasks; i != 0; i-- {
select {
Expand Down Expand Up @@ -159,12 +160,21 @@ L:
w.smphr.Release(1)
continue
case backend.TaskQueueEmptyError:
if w.config.ExitOnEmpty {
w.logger.Info().Bool("exitOnEmpty", w.config.ExitOnEmpty).Msg("Queue is empty. Stopping worker")
if queueEmptyDetectedAt == nil {
now := time.Now()
queueEmptyDetectedAt = &now
}
logger := w.logger.With().
Bool("exitOnEmpty", w.config.ExitOnEmpty).
Dur("exitOnEmptyGracePeriod", w.config.ExitOnEmptyGracePeriod).
Time("detectedQueueEmptyAt", *queueEmptyDetectedAt).Logger()
shouldExitNow := !time.Now().Before(queueEmptyDetectedAt.Add(w.config.ExitOnEmptyGracePeriod))
if w.config.ExitOnEmpty && shouldExitNow {
logger.Info().Msg("Queue is empty. Stopping worker")
w.smphr.Release(1)
return nil
}
w.logger.Debug().Bool("exitOnEmpty", w.config.ExitOnEmpty).Msg("Queue is empty. retrying in 5 seconds.")
logger.Info().Msg("Queue is empty. retrying in 5 seconds.")
util.SleepContext(w.ctx, 5*time.Second)
w.smphr.Release(1)
continue
Expand All @@ -176,7 +186,7 @@ L:
}
}
w.logger.Debug().Interface("task", taskFetched).Msg("Task fetched")

queueEmptyDetectedAt = nil
w.wg.Add(1)
go func() {
defer w.smphr.Release(1)
Expand Down

0 comments on commit bfb8681

Please sign in to comment.