Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support explicit, immediate job execution for tests #72

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,46 @@ func main() {

See `test/main.go` for a working example.

# Testing

`faktory_worker_go` provides helpers that allow you to configure tests to execute jobs inline if you prefer. In this example, the application has defined its own wrapper function for `client.Push`.

```go
import (
faktory "github.com/contribsys/faktory/client"
worker "github.com/contribsys/faktory_worker_go"
)

func Push(mgr worker.Manager, job *faktory.Job) error {
if viper.GetBool("faktory_inline") {
return syntheticPush(mgr worker.Manager, job)
}
return realPush(job)
}

func syntheticPush(mgr worker.Manager, job *faktory.Job) error {
if mgr.IsRegistered(job.Type) {
return mgr.Dispatch(job)
}

return fmt.Errorf("inline job execution failed, unregistered job type %s", job.Type)
}

func realPush(job *faktory.Job) error {
client, err := faktory.Open()
if err != nil {
return errors.Wrap(err, "failed to open Faktory client connection")
}

err = client.Push(job)
if err != nil {
return errors.Wrap(err, "failed to enqueue Faktory job")
}

return nil
}
```

# FAQ

* How do I specify the Faktory server location?
Expand Down
14 changes: 14 additions & 0 deletions manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,20 @@ func (mgr *Manager) Register(name string, fn Perform) {
}
}

// IsRegistered checks if a given job name is registered with the manager.
func (mgr *Manager) IsRegistered(name string) bool {
_, ok := mgr.jobHandlers[name]

return ok
}

// Dispatch immediately executes a job, including all middleware on the manager.
func (mgr *Manager) Dispatch(job *faktory.Job) error {
perform := mgr.jobHandlers[job.Type]

return dispatch(mgr.middleware, jobContext(mgr.Pool, job), job, perform)
}

// Register a callback to be fired when a process lifecycle event occurs.
// These are useful for hooking into process startup or shutdown.
func (mgr *Manager) On(event lifecycleEventType, fn LifecycleEventHandler) {
Expand Down
2 changes: 1 addition & 1 deletion runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func processOne(mgr *Manager) error {
return je
}

joberr := dispatch(mgr.middleware, jobContext(mgr.Pool, job), job, perform)
joberr := mgr.Dispatch(job)
if joberr != nil {
// job errors are normal and expected, we don't return early from them
mgr.Logger.Errorf("Error running %s job %s: %v", job.Type, job.Jid, joberr)
Expand Down
Loading