Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support inline job execution for testing #74

Merged
merged 1 commit into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 44 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,14 @@ func main() {
//mgr.ProcessWeightedPriorityQueues(map[string]int{"critical":3, "default":2, "bulk":1})

go func(){
// Start processing jobs in background routine, this method does not return
// Start processing jobs in background routine, this method does not return
// unless an error is returned or cancel() is called
mgr.RunWithContext(ctx)
}()

go func() {
stopSignals := []os.Signal{
syscall.SIGTERM,
syscall.SIGTERM,
syscall.SIGINT,
}
stop := make(chan os.Signal, len(stopSignals))
Expand All @@ -131,13 +131,53 @@ func main() {
}
}
}()

<-ctx.Done()
}
```

See `test/main.go` for a working example.

# Testing

`faktory_worker_go` provides helpers that allow you to configure tests to execute jobs inline if you prefer. In this example, the application has defined its own wrapper function for `client.Push`.

```go
import (
faktory "github.com/contribsys/faktory/client"
worker "github.com/contribsys/faktory_worker_go"
)

func Push(mgr worker.Manager, job *faktory.Job) error {
if viper.GetBool("faktory_inline") {
return syntheticPush(mgr worker.Manager, job)
}
return realPush(job)
}

func syntheticPush(mgr worker.Manager, job *faktory.Job) error {
if mgr.IsRegistered(job.Type) {
return mgr.Dispatch(job)
}

return fmt.Errorf("inline job execution failed, unregistered job type %s", job.Type)
}

func realPush(job *faktory.Job) error {
client, err := faktory.Open()
if err != nil {
return errors.Wrap(err, "failed to open Faktory client connection")
}

err = client.Push(job)
if err != nil {
return errors.Wrap(err, "failed to enqueue Faktory job")
}

return nil
}
```

# FAQ

* How do I specify the Faktory server location?
Expand Down
18 changes: 17 additions & 1 deletion manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,29 @@ type Manager struct {
// Register a handler for the given jobtype. It is expected that all jobtypes
// are registered upon process startup.
//
// mgr.Register("ImportantJob", ImportantFunc)
// mgr.Register("ImportantJob", ImportantFunc)
func (mgr *Manager) Register(name string, fn Perform) {
mgr.jobHandlers[name] = func(ctx context.Context, job *faktory.Job) error {
return fn(ctx, job.Args...)
}
}

// IsRegistered checks if a given job name is registered with the manager.
//
// mgr.IsRegistered("SomeJob")
func (mgr *Manager) IsRegistered(name string) bool {
_, ok := mgr.jobHandlers[name]

return ok
}

// Dispatch immediately executes a job, including all middleware.
func (mgr *Manager) Dispatch(job *faktory.Job) error {
perform := mgr.jobHandlers[job.Type]

return dispatch(mgr.middleware, jobContext(mgr.Pool, job), job, perform)
}

// Register a callback to be fired when a process lifecycle event occurs.
// These are useful for hooking into process startup or shutdown.
func (mgr *Manager) On(event lifecycleEventType, fn LifecycleEventHandler) {
Expand Down
6 changes: 2 additions & 4 deletions runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,7 @@ func processOne(mgr *Manager) error {
}
}

perform := mgr.jobHandlers[job.Type]

if perform == nil {
if !mgr.IsRegistered(job.Type) {
je := &NoHandlerError{JobType: job.Type}
err := mgr.with(func(c *faktory.Client) error {
return c.Fail(job.Jid, je, nil)
Expand All @@ -157,7 +155,7 @@ func processOne(mgr *Manager) error {
return je
}

joberr := dispatch(mgr.middleware, jobContext(mgr.Pool, job), job, perform)
joberr := mgr.Dispatch(job)
if joberr != nil {
// job errors are normal and expected, we don't return early from them
mgr.Logger.Errorf("Error running %s job %s: %v", job.Type, job.Jid, joberr)
Expand Down
Loading