From a1dfffe935e8b3b37d6d13941cee1ef4bee31c1c Mon Sep 17 00:00:00 2001 From: Adam Steel Date: Fri, 19 Jan 2024 11:02:44 -0700 Subject: [PATCH] Support inline job execution for testing --- README.md | 48 ++++++++++++++++++++++++++++++++++++++++++++---- manager.go | 18 +++++++++++++++++- runner.go | 6 ++---- 3 files changed, 63 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 524d83f..aa3109a 100644 --- a/README.md +++ b/README.md @@ -107,14 +107,14 @@ func main() { //mgr.ProcessWeightedPriorityQueues(map[string]int{"critical":3, "default":2, "bulk":1}) go func(){ - // Start processing jobs in background routine, this method does not return + // Start processing jobs in background routine, this method does not return // unless an error is returned or cancel() is called mgr.RunWithContext(ctx) }() - + go func() { stopSignals := []os.Signal{ - syscall.SIGTERM, + syscall.SIGTERM, syscall.SIGINT, } stop := make(chan os.Signal, len(stopSignals)) @@ -131,13 +131,53 @@ func main() { } } }() - + <-ctx.Done() } ``` See `test/main.go` for a working example. +# Testing + +`faktory_worker_go` provides helpers that allow you to configure tests to execute jobs inline if you prefer. In this example, the application has defined its own wrapper function for `client.Push`. + +```go +import ( + faktory "github.com/contribsys/faktory/client" + worker "github.com/contribsys/faktory_worker_go" +) + +func Push(mgr worker.Manager, job *faktory.Job) error { + if viper.GetBool("faktory_inline") { + return syntheticPush(mgr worker.Manager, job) + } + return realPush(job) +} + +func syntheticPush(mgr worker.Manager, job *faktory.Job) error { + if mgr.IsRegistered(job.Type) { + return mgr.Dispatch(job) + } + + return fmt.Errorf("inline job execution failed, unregistered job type %s", job.Type) +} + +func realPush(job *faktory.Job) error { + client, err := faktory.Open() + if err != nil { + return errors.Wrap(err, "failed to open Faktory client connection") + } + + err = client.Push(job) + if err != nil { + return errors.Wrap(err, "failed to enqueue Faktory job") + } + + return nil +} +``` + # FAQ * How do I specify the Faktory server location? diff --git a/manager.go b/manager.go index 7ba1a20..6337b3f 100644 --- a/manager.go +++ b/manager.go @@ -41,13 +41,29 @@ type Manager struct { // Register a handler for the given jobtype. It is expected that all jobtypes // are registered upon process startup. // -// mgr.Register("ImportantJob", ImportantFunc) +// mgr.Register("ImportantJob", ImportantFunc) func (mgr *Manager) Register(name string, fn Perform) { mgr.jobHandlers[name] = func(ctx context.Context, job *faktory.Job) error { return fn(ctx, job.Args...) } } +// IsRegistered checks if a given job name is registered with the manager. +// +// mgr.IsRegistered("SomeJob") +func (mgr *Manager) IsRegistered(name string) bool { + _, ok := mgr.jobHandlers[name] + + return ok +} + +// Dispatch immediately executes a job, including all middleware. +func (mgr *Manager) Dispatch(job *faktory.Job) error { + perform := mgr.jobHandlers[job.Type] + + return dispatch(mgr.middleware, jobContext(mgr.Pool, job), job, perform) +} + // Register a callback to be fired when a process lifecycle event occurs. // These are useful for hooking into process startup or shutdown. func (mgr *Manager) On(event lifecycleEventType, fn LifecycleEventHandler) { diff --git a/runner.go b/runner.go index 458c9de..f552324 100644 --- a/runner.go +++ b/runner.go @@ -144,9 +144,7 @@ func processOne(mgr *Manager) error { } } - perform := mgr.jobHandlers[job.Type] - - if perform == nil { + if !mgr.IsRegistered(job.Type) { je := &NoHandlerError{JobType: job.Type} err := mgr.with(func(c *faktory.Client) error { return c.Fail(job.Jid, je, nil) @@ -157,7 +155,7 @@ func processOne(mgr *Manager) error { return je } - joberr := dispatch(mgr.middleware, jobContext(mgr.Pool, job), job, perform) + joberr := mgr.Dispatch(job) if joberr != nil { // job errors are normal and expected, we don't return early from them mgr.Logger.Errorf("Error running %s job %s: %v", job.Type, job.Jid, joberr)