Skip to content

Commit

Permalink
Implement hard shutdown, fixes #76
Browse files Browse the repository at this point in the history
  • Loading branch information
mperham committed Feb 12, 2024
1 parent 409602f commit f728a46
Show file tree
Hide file tree
Showing 14 changed files with 121 additions and 59 deletions.
11 changes: 11 additions & 0 deletions Changes.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# faktory\_worker\_go

## 1.7.0

- Implement hard shutdown timeout of 25 seconds. [#76]
Your job funcs should implement `context` package semantics.
If you use `Manager.Run()`, FWG will now gracefully shutdown.
After a default delay of 25 seconds, FWG will cancel the root Context which should quickly cancel any lingering jobs running under that Manager.
If your jobs run long and do not respond to context cancellation, you risk orphaning any jobs in-progress.
They will linger on the Busy tab until the job's `reserve_for` timeout.

Please also note that `RunWithContext` added in `1.6.0` does not implement the shutdown delay but the README example contains the code to implement it.

## 1.6.0

- Upgrade to Go 1.17 and Faktory 1.6.0.
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
test:
go test -v ./...
go test ./...

work:
go run test/main.go
Expand Down
23 changes: 14 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@

![travis](https://travis-ci.org/contribsys/faktory_worker_go.svg?branch=master)

This repository provides a Faktory worker process for Go apps. This
worker process fetches background jobs from the Faktory server and processes them.
This repository provides a Faktory worker process for Go apps.
This worker process fetches background jobs from the Faktory server and processes them.

How is this different from all the other Go background worker libraries?
They all use Redis or another "dumb" datastore. This library is far
simpler because the Faktory server implements most of the data storage, retry logic,
Web UI, etc.
They all use Redis or another "dumb" datastore.
This library is far simpler because the Faktory server implements most of the data storage, retry logic, Web UI, etc.

# Installation

Expand Down Expand Up @@ -55,6 +54,8 @@ func main() {

// use up to N goroutines to execute jobs
mgr.Concurrency = 20
// wait up to 25 seconds to let jobs in progress finish
mgr.ShutdownTimeout = 25 * time.Second

// pull jobs from these queues, in this order of precedence
mgr.ProcessStrictPriorityQueues("critical", "default", "bulk")
Expand Down Expand Up @@ -99,6 +100,8 @@ func main() {

// use up to N goroutines to execute jobs
mgr.Concurrency = 20
// wait up to 25 seconds to let jobs in progress finish
mgr.ShutdownTimeout = 25 * time.Second

// pull jobs from these queues, in this order of precedence
mgr.ProcessStrictPriorityQueues("critical", "default", "bulk")
Expand All @@ -116,6 +119,7 @@ func main() {
stopSignals := []os.Signal{
syscall.SIGTERM,
syscall.SIGINT,
// TODO Implement the TSTP signal to call mgr.Quiet()
}
stop := make(chan os.Signal, len(stopSignals))
for _, s := range stopSignals {
Expand All @@ -127,17 +131,18 @@ func main() {
case <-ctx.Done():
return
case <-stop:
cancel()
break
}
}

_ = time.AfterFunc(mgr.ShutdownTimeout, cancel)
_ = mgr.Terminate(true) // never returns
}()

<-ctx.Done()
}
```

See `test/main.go` for a working example.

# Testing

`faktory_worker_go` provides helpers that allow you to configure tests to execute jobs inline if you prefer. In this example, the application has defined its own wrapper function for `client.Push`.
Expand Down Expand Up @@ -225,7 +230,7 @@ See [the wiki](https://github.com/contribsys/faktory/wiki) for details.

# Author

Mike Perham, @getajobmike, @contribsys
Mike Perham, https://ruby.social/@getajobmike

# License

Expand Down
16 changes: 7 additions & 9 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,13 @@ var (
// because execution should be orthogonal to
// most of the Job payload contents.
//
// func myJob(ctx context.Context, args ...interface{}) error {
// helper := worker.HelperFor(ctx)
// jid := helper.Jid()
//
// helper.With(func(cl *faktory.Client) error {
// cl.Push("anotherJob", 4, "arg")
// })
// func myJob(ctx context.Context, args ...interface{}) error {
// helper := worker.HelperFor(ctx)
// jid := helper.Jid()
//
// helper.With(func(cl *faktory.Client) error {
// cl.Push("anotherJob", 4, "arg")
// })
type Helper interface {
Jid() string
JobType() string
Expand Down Expand Up @@ -127,8 +126,7 @@ func HelperFor(ctx context.Context) Helper {
return nil
}

func jobContext(pool *faktory.Pool, job *faktory.Job) context.Context {
ctx := context.Background()
func jobContext(ctx context.Context, pool *faktory.Pool, job *faktory.Job) context.Context {
ctx = context.WithValue(ctx, poolKey, pool)
ctx = context.WithValue(ctx, jobKey, job)
return ctx
Expand Down
13 changes: 9 additions & 4 deletions context_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package faktory_worker

import (
"context"
"errors"
"regexp"
"testing"
"time"

faktory "github.com/contribsys/faktory/client"
"github.com/stretchr/testify/assert"
Expand All @@ -21,15 +23,16 @@ func TestSimpleContext(t *testing.T) {
//cl, err := faktory.Open()
//assert.NoError(t, err)
//cl.Push(job)

ctx := jobContext(pool, job)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
ctx = jobContext(ctx, pool, job)
help := HelperFor(ctx)
assert.Equal(t, help.Jid(), job.Jid)
assert.Empty(t, help.Bid())
assert.Equal(t, "something", help.JobType())

_, ok := ctx.Deadline()
assert.False(t, ok)
assert.True(t, ok)

//assert.NoError(t, ctx.TrackProgress(45, "Working....", nil))

Expand All @@ -51,7 +54,9 @@ func TestBatchContext(t *testing.T) {
job.SetCustom("track", 1)
job.SetCustom("bid", "nosuchbatch")

ctx := jobContext(pool, job)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
ctx = jobContext(ctx, pool, job)
help := HelperFor(ctx)
assert.Equal(t, help.Jid(), job.Jid)
assert.Equal(t, "nosuchbatch", help.Bid())
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/contribsys/faktory_worker_go

go 1.20
go 1.21

toolchain go1.21.4

Expand Down
44 changes: 31 additions & 13 deletions manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"strconv"
"sync"
"time"

faktory "github.com/contribsys/faktory/client"
)
Expand All @@ -16,11 +17,12 @@ import (
type Manager struct {
mut sync.Mutex

Concurrency int
Logger Logger
ProcessWID string
Labels []string
Pool *faktory.Pool
Concurrency int
Logger Logger
ProcessWID string
Labels []string
Pool *faktory.Pool
ShutdownTimeout time.Duration

queues []string
middleware []MiddlewareFunc
Expand All @@ -31,6 +33,7 @@ type Manager struct {
shutdownWaiter *sync.WaitGroup
jobHandlers map[string]Handler
eventHandlers map[lifecycleEventType][]LifecycleEventHandler
cancelFunc context.CancelFunc

// This only needs to be computed once. Store it here to keep things fast.
weightedPriorityQueuesEnabled bool
Expand All @@ -57,10 +60,10 @@ func (mgr *Manager) isRegistered(name string) bool {
}

// dispatch immediately executes a job, including all middleware.
func (mgr *Manager) dispatch(job *faktory.Job) error {
func (mgr *Manager) dispatch(ctx context.Context, job *faktory.Job) error {
perform := mgr.jobHandlers[job.Type]

return dispatch(mgr.middleware, jobContext(mgr.Pool, job), job, perform)
return dispatch(jobContext(ctx, mgr.Pool, job), mgr.middleware, job, perform)
}

// InlineDispatch is designed for testing. It immediate executes a job, including all middleware,
Expand All @@ -75,7 +78,7 @@ func (mgr *Manager) InlineDispatch(job *faktory.Job) error {
return fmt.Errorf("couldn't set up worker process for inline dispatch - %w", err)
}

err = mgr.dispatch(job)
err = mgr.dispatch(context.Background(), job)
if err != nil {
return fmt.Errorf("job was dispatched inline but failed. Job type %s, with args %+v - %w", job.Type, job.Args, err)
}
Expand Down Expand Up @@ -117,8 +120,14 @@ func (mgr *Manager) Terminate(reallydie bool) {
mgr.Logger.Info("Shutting down...")
mgr.state = "terminate"
close(mgr.done)

if mgr.cancelFunc != nil {
// cancel any jobs which are lingering
time.AfterFunc(mgr.ShutdownTimeout, mgr.cancelFunc)
}
mgr.fireEvent(Shutdown)
mgr.shutdownWaiter.Wait()
mgr.shutdownWaiter.Wait() // can't pass this point until all jobs are done

mgr.Pool.Close()
mgr.Logger.Info("Goodbye")
if reallydie {
Expand All @@ -134,6 +143,12 @@ func NewManager() *Manager {
Labels: []string{"golang-" + Version},
Pool: nil,

// best practice is to give jobs 25 seconds to finish their work
// and then use the last 5 seconds to force any lingering jobs to
// stop by closing their Context. Many cloud services default to a
// hard 30 second timeout beforing KILLing the process.
ShutdownTimeout: 25 * time.Second,

state: "",
queues: []string{"default"},
done: make(chan interface{}),
Expand Down Expand Up @@ -182,7 +197,7 @@ func (mgr *Manager) setUpWorkerProcess() error {
// If the context is present then os signals will be ignored, the context must be canceled for the method to return
// after running.
func (mgr *Manager) RunWithContext(ctx context.Context) error {
err := mgr.boot()
err := mgr.boot(ctx)
if err != nil {
return err
}
Expand All @@ -191,7 +206,7 @@ func (mgr *Manager) RunWithContext(ctx context.Context) error {
return nil
}

func (mgr *Manager) boot() error {
func (mgr *Manager) boot(ctx context.Context) error {
err := mgr.setUpWorkerProcess()
if err != nil {
return err
Expand All @@ -203,7 +218,7 @@ func (mgr *Manager) boot() error {
mgr.Logger.Infof("faktory_worker_go %s PID %d now ready to process jobs", Version, os.Getpid())
mgr.Logger.Debugf("Using Faktory Client API %s", faktory.Version)
for i := 0; i < mgr.Concurrency; i++ {
go process(mgr, i)
go process(ctx, mgr, i)
}

return nil
Expand All @@ -212,7 +227,10 @@ func (mgr *Manager) boot() error {
// Run starts processing jobs.
// This method does not return unless an error is encountered while starting.
func (mgr *Manager) Run() error {
err := mgr.boot()
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
mgr.cancelFunc = cancel
err := mgr.boot(ctx)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ func (mgr *Manager) Use(middleware ...MiddlewareFunc) {
mgr.middleware = append(mgr.middleware, middleware...)
}

func dispatch(chain []MiddlewareFunc, ctx context.Context, job *faktory.Job, perform Handler) error {
func dispatch(ctx context.Context, chain []MiddlewareFunc, job *faktory.Job, perform Handler) error {
if len(chain) == 0 {
return perform(ctx, job)
}

link := chain[0]
rest := chain[1:]
return link(ctx, job, func(ctx context.Context) error {
return dispatch(rest, ctx, job, perform)
return dispatch(ctx, rest, job, perform)
})
}
7 changes: 5 additions & 2 deletions middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package faktory_worker
import (
"context"
"testing"
"time"

faktory "github.com/contribsys/faktory/client"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -34,12 +35,14 @@ func TestMiddleware(t *testing.T) {
return nil
}

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
job := faktory.NewJob("blah", 1, 2)
ctx := jobContext(mgr.Pool, job)
ctx = jobContext(ctx, mgr.Pool, job)
assert.Nil(t, ctx.Value(EXAMPLE))
assert.EqualValues(t, 0, counter)

err = dispatch(mgr.middleware, ctx, job, blahFunc)
err = dispatch(ctx, mgr.middleware, job, blahFunc)

assert.NoError(t, err)
assert.EqualValues(t, 1, counter)
Expand Down
9 changes: 5 additions & 4 deletions runner.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package faktory_worker

import (
"context"
"encoding/json"
"fmt"
"math"
Expand Down Expand Up @@ -77,7 +78,7 @@ func heartbeat(mgr *Manager) {
}
}

func process(mgr *Manager, idx int) {
func process(ctx context.Context, mgr *Manager, idx int) {
mgr.shutdownWaiter.Add(1)
defer mgr.shutdownWaiter.Done()

Expand All @@ -97,7 +98,7 @@ func process(mgr *Manager, idx int) {
default:
}

err := processOne(mgr)
err := processOne(ctx, mgr)
if err != nil {
mgr.Logger.Debug(err)
if _, ok := err.(*NoHandlerError); !ok {
Expand All @@ -123,7 +124,7 @@ func process(mgr *Manager, idx int) {
}
}

func processOne(mgr *Manager) error {
func processOne(ctx context.Context, mgr *Manager) error {
var job *faktory.Job

// explicit scopes to limit variable visibility
Expand Down Expand Up @@ -155,7 +156,7 @@ func processOne(mgr *Manager) error {
return je
}

joberr := mgr.dispatch(job)
joberr := mgr.dispatch(ctx, job)
if joberr != nil {
// job errors are normal and expected, we don't return early from them
mgr.Logger.Errorf("Error running %s job %s: %v", job.Type, job.Jid, joberr)
Expand Down
Loading

0 comments on commit f728a46

Please sign in to comment.