Skip to content

Commit

Permalink
test via InlineDispatch
Browse files Browse the repository at this point in the history
  • Loading branch information
Adam Steel authored and mperham committed Jan 23, 2024
1 parent 3b31160 commit 80ee3f2
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 24 deletions.
16 changes: 3 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,22 +156,12 @@ func Push(mgr worker.Manager, job *faktory.Job) error {
}

func syntheticPush(mgr worker.Manager, job *faktory.Job) error {
if !mgr.IsRegistered(job.Type) {
return fmt.Errorf("failed to execute job type %s inline, job not registered", job.Type)
}

// Manually setting the worker defaults is a threadsafe alternative to mgr.Run/mgr.RunWithContext, which can trigger data races
err := mgr.SetUpWorkerProcess()
err := mgr.InlineDispatch(job)
if err != nil {
return fmt.Errorf("couldn't set up Faktory worker process in synthetic push: %w", err)
return errors.Wrap(err, "syntheticPush failed")
}

err := mgr.Dispatch(job)
if err != nil {
return errors.Wrap(err, fmt.Sprintf("job was immediately executed but failed. Job type %s, with args %+v", job.Type, job.Args))
}

return nil
return nil
}

func realPush(job *faktory.Job) error {
Expand Down
34 changes: 27 additions & 7 deletions manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,42 @@ func (mgr *Manager) Register(name string, fn Perform) {
}
}

// IsRegistered checks if a given job name is registered with the manager.
// isRegistered checks if a given job name is registered with the manager.
//
// mgr.IsRegistered("SomeJob")
func (mgr *Manager) IsRegistered(name string) bool {
// mgr.isRegistered("SomeJob")
func (mgr *Manager) isRegistered(name string) bool {
_, ok := mgr.jobHandlers[name]

return ok
}

// Dispatch immediately executes a job, including all middleware.
func (mgr *Manager) Dispatch(job *faktory.Job) error {
// dispatch immediately executes a job, including all middleware.
func (mgr *Manager) dispatch(job *faktory.Job) error {
perform := mgr.jobHandlers[job.Type]

return dispatch(mgr.middleware, jobContext(mgr.Pool, job), job, perform)
}

// InlineDispatch is designed for testing. It immediate executes a job, including all middleware,
// as well as performs manager setup if needed.
func (mgr *Manager) InlineDispatch(job *faktory.Job) error {
if !mgr.isRegistered(job.Type) {
return fmt.Errorf("failed to dispatch inline for job type %s; job not registered", job.Type)
}

err := mgr.setUpWorkerProcess()
if err != nil {
return fmt.Errorf("couldn't set up worker process for inline dispatch - %w", err)
}

err = mgr.dispatch(job)
if err != nil {
return fmt.Errorf("job was dispatched inline but failed. Job type %s, with args %+v - %w", job.Type, job.Args, err)
}

return nil
}

// Register a callback to be fired when a process lifecycle event occurs.
// These are useful for hooking into process startup or shutdown.
func (mgr *Manager) On(event lifecycleEventType, fn LifecycleEventHandler) {
Expand Down Expand Up @@ -130,7 +150,7 @@ func NewManager() *Manager {
}
}

func (mgr *Manager) SetUpWorkerProcess() error {
func (mgr *Manager) setUpWorkerProcess() error {
mgr.mut.Lock()
defer mgr.mut.Unlock()

Expand Down Expand Up @@ -174,7 +194,7 @@ func (mgr *Manager) RunWithContext(ctx context.Context) error {
}

func (mgr *Manager) boot() error {
err := mgr.SetUpWorkerProcess()
err := mgr.setUpWorkerProcess()
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestManagerSetup(t *testing.T) {
}

mgr := NewManager()
err = mgr.SetUpWorkerProcess()
err = mgr.setUpWorkerProcess()
assert.NoError(t, err)

startupCalled := false
Expand Down
4 changes: 2 additions & 2 deletions runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func processOne(mgr *Manager) error {
}
}

if !mgr.IsRegistered(job.Type) {
if !mgr.isRegistered(job.Type) {
je := &NoHandlerError{JobType: job.Type}
err := mgr.with(func(c *faktory.Client) error {
return c.Fail(job.Jid, je, nil)
Expand All @@ -155,7 +155,7 @@ func processOne(mgr *Manager) error {
return je
}

joberr := mgr.Dispatch(job)
joberr := mgr.dispatch(job)
if joberr != nil {
// job errors are normal and expected, we don't return early from them
mgr.Logger.Errorf("Error running %s job %s: %v", job.Type, job.Jid, joberr)
Expand Down
2 changes: 1 addition & 1 deletion runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestLiveServer(t *testing.T) {
mgr := NewManager()
mgr.ProcessStrictPriorityQueues("fwgtest")
mgr.Concurrency = 1
err := mgr.SetUpWorkerProcess()
err := mgr.setUpWorkerProcess()
assert.NoError(t, err)

mgr.Register("aworker", func(ctx context.Context, args ...interface{}) error {
Expand Down

0 comments on commit 80ee3f2

Please sign in to comment.