Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support unit testing Perform funcs which push jobs #52

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 69 additions & 14 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ import (
type valueKey int

const (
poolKey valueKey = 2
jobKey valueKey = 3
helperKey valueKey = 1
)

var (
Expand Down Expand Up @@ -57,6 +56,10 @@ type Helper interface {
// allows direct access to the Faktory server from the job
With(func(*faktory.Client) error) error

// Push the given job to Faktory. In test mode, this will
// save the job in an array so you can assert it.
Push(*faktory.Job) error

// Faktory Enterprise:
// this method integrates with Faktory Enterprise's Job Tracking feature.
// `reserveUntil` is optional, only needed for long jobs which have more dynamic
Expand All @@ -68,6 +71,47 @@ type Helper interface {
TrackProgress(percent int, desc string, reserveUntil *time.Time) error
}

// The test helper is a mode of operation which tries to avoid network calls.
// This allows you to unit test your job code with the Helper API without
// Faktory running.
type testHelper struct {
job *faktory.Job
pushedJobs []*faktory.Job
}

func (h *testHelper) Jid() string {
return h.job.Jid
}
func (h *testHelper) Bid() string {
if b, ok := h.job.GetCustom("_bid"); ok {
return b.(string)
}
return ""
}
func (h *testHelper) JobType() string {
return h.job.Type
}

func (h *testHelper) TrackProgress(percent int, desc string, reserveUntil *time.Time) error {
return nil
Copy link
Contributor Author

@mperham mperham Oct 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of creating a separate type which doesn't implement much of the interface, maybe use the standard jobHelper type with a swappable Push() fn?

}
func (h *testHelper) Batch(fn func(*faktory.Batch) error) error {
return nil
}
func (h *testHelper) With(fn func(*faktory.Client) error) error {
return fmt.Errorf("With() does not work in test mode, you will need to abstract out this call")
}

func (h *testHelper) Push(job *faktory.Job) error {
h.pushedJobs = append(h.pushedJobs, job)
return nil
}

func (h *testHelper) PushedJobs() []*faktory.Job {
return h.pushedJobs
}

// This "live" helper impl requires Faktory to be running.
type jobHelper struct {
job *faktory.Job
pool *faktory.Pool
Expand All @@ -90,24 +134,29 @@ func (h *jobHelper) JobType() string {
// context of an executing job. It will panic if it cannot
// create a Helper due to missing context values.
func HelperFor(ctx context.Context) Helper {
if j := ctx.Value(jobKey); j != nil {
job := j.(*faktory.Job)
if p := ctx.Value(poolKey); p != nil {
pool := p.(*faktory.Pool)
return &jobHelper{
job: job,
pool: pool,
}
}
if j := ctx.Value(helperKey); j != nil {
helper := j.(Helper)
return helper
}
log.Panic("Invalid job context, cannot create faktory_worker_go job helper")
return nil
}

func jobContext(pool *faktory.Pool, job *faktory.Job) context.Context {
func jobContext(testing bool, pool *faktory.Pool, job *faktory.Job) context.Context {
var helper Helper
if testing {
helper = &testHelper{
job: job,
}
} else {
helper = &jobHelper{
job: job,
pool: pool,
}
}

ctx := context.Background()
ctx = context.WithValue(ctx, poolKey, pool)
ctx = context.WithValue(ctx, jobKey, job)
ctx = context.WithValue(ctx, helperKey, helper)
return ctx
}

Expand Down Expand Up @@ -146,3 +195,9 @@ func (h *jobHelper) Batch(fn func(*faktory.Batch) error) error {
func (h *jobHelper) With(fn func(*faktory.Client) error) error {
return h.pool.With(fn)
}

func (h *jobHelper) Push(job *faktory.Job) error {
return h.pool.With(func(cl *faktory.Client) error {
return cl.Push(job)
})
}
4 changes: 2 additions & 2 deletions context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestSimpleContext(t *testing.T) {
//assert.NoError(t, err)
//cl.Push(job)

ctx := jobContext(pool, job)
ctx := jobContext(false, pool, job)
help := HelperFor(ctx)
assert.Equal(t, help.Jid(), job.Jid)
assert.Empty(t, help.Bid())
Expand Down Expand Up @@ -51,7 +51,7 @@ func TestBatchContext(t *testing.T) {
job.SetCustom("track", 1)
job.SetCustom("_bid", "nosuchbatch")

ctx := jobContext(pool, job)
ctx := jobContext(false, pool, job)
help := HelperFor(ctx)
assert.Equal(t, help.Jid(), job.Jid)
assert.Equal(t, "nosuchbatch", help.Bid())
Expand Down
2 changes: 1 addition & 1 deletion middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestMiddleware(t *testing.T) {
}

job := faktory.NewJob("blah", 1, 2)
ctx := jobContext(mgr.Pool, job)
ctx := jobContext(false, mgr.Pool, job)
assert.Nil(t, ctx.Value("a"))
assert.EqualValues(t, 0, counter)

Expand Down
2 changes: 1 addition & 1 deletion runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func processOne(mgr *Manager) error {
return je
}

joberr := dispatch(mgr.middleware, jobContext(mgr.Pool, job), job, perform)
joberr := dispatch(mgr.middleware, jobContext(false, mgr.Pool, job), job, perform)
if joberr != nil {
// job errors are normal and expected, we don't return early from them
mgr.Logger.Errorf("Error running %s job %s: %v", job.Type, job.Jid, joberr)
Expand Down
32 changes: 28 additions & 4 deletions testing.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
package faktory_worker

import (
"context"
"encoding/json"

faktory "github.com/contribsys/faktory/client"
)

type PerformExecutor interface {
// This executes the Perform func with full network
// access to the helper.
Execute(*faktory.Job, Perform) error

// This stubs out the helper so your Perform func can
// use it without network access.
ExecuteWithResult(*faktory.Job, Perform) (ExecuteResult, error)
}

type testExecutor struct {
Expand All @@ -19,18 +26,35 @@ func NewTestExecutor(p *faktory.Pool) PerformExecutor {
}

func (tp *testExecutor) Execute(specjob *faktory.Job, p Perform) error {
_, err := tp.exec(specjob, p, false)
return err
}

func (tp *testExecutor) ExecuteWithResult(specjob *faktory.Job, p Perform) (ExecuteResult, error) {
ctx, err := tp.exec(specjob, p, true)
if err != nil {
return nil, err
}
return HelperFor(ctx).(ExecuteResult), nil
}

type ExecuteResult interface {
PushedJobs() []*faktory.Job
}

func (tp *testExecutor) exec(specjob *faktory.Job, p Perform, local bool) (context.Context, error) {
// perform a JSON round trip to ensure Perform gets the arguments
// exactly how a round trip to Faktory would look.
data, err := json.Marshal(specjob)
if err != nil {
return err
return nil, err
}
var job faktory.Job
err = json.Unmarshal(data, &job)
if err != nil {
return err
return nil, err
}

c := jobContext(tp.Pool, &job)
return p(c, job.Args...)
c := jobContext(local, tp.Pool, &job)
return c, p(c, job.Args...)
}
32 changes: 32 additions & 0 deletions testing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,13 @@ func TestTesting(t *testing.T) {

ajob := faktory.NewJob("sometype", 12, "foobar")
err = perf.Execute(ajob, func(ctx context.Context, args ...interface{}) error {
helper := HelperFor(ctx)
assert.Equal(t, 2, len(args))
assert.EqualValues(t, 12, args[0])
assert.EqualValues(t, "foobar", args[1])
assert.EqualValues(t, ajob.Jid, helper.Jid())
assert.EqualValues(t, "", helper.Bid())
assert.EqualValues(t, ajob.Type, helper.JobType())
return nil
})
assert.NoError(t, err)
Expand All @@ -42,3 +46,31 @@ func TestTesting(t *testing.T) {
})
assert.Equal(t, "Oops", err.Error())
}

func TestRecursiveTest(t *testing.T) {
pool, err := faktory.NewPool(5)
assert.NoError(t, err)
perf := NewTestExecutor(pool)

somejob := faktory.NewJob("pushtype", 12, "foobar")
res, err := perf.ExecuteWithResult(somejob, func(ctx context.Context, args ...interface{}) error {
helper := HelperFor(ctx)
helper.TrackProgress(10, "Starting", nil)

assert.NotEmpty(t, helper.Jid())
assert.Empty(t, helper.Bid())
assert.Equal(t, "pushtype", helper.JobType())

ajob := faktory.NewJob("sometype", 12, "foobar")
ajob.Queue = "whale"
helper.Push(ajob)

helper.TrackProgress(100, "Finished", nil)
return nil
})

assert.NoError(t, err)
assert.NotNil(t, res)
assert.EqualValues(t, 1, len(res.PushedJobs()))
assert.EqualValues(t, "whale", res.PushedJobs()[0].Queue)
}