diff --git a/broker.go b/broker.go index 0a7adcc..01457dd 100644 --- a/broker.go +++ b/broker.go @@ -175,15 +175,18 @@ func (s Status) CompleteSinks() []NodeID { return s.completeSinks } -func (s Status) getError(threshold, thresholdSinks int) error { +func (s Status) getError(ctxErr error, threshold, thresholdSinks int) error { + var err error switch { case len(s.complete) < threshold: - return fmt.Errorf("event not processed by enough 'filter' and 'sink' nodes") + err = fmt.Errorf("event not processed by enough 'filter' and 'sink' nodes") case len(s.completeSinks) < thresholdSinks: - return fmt.Errorf("event not processed by enough 'sink' nodes") + err = fmt.Errorf("event not processed by enough 'sink' nodes") default: return nil } + + return errors.Join(err, ctxErr) } // Send writes an event of type t to all registered pipelines concurrently and diff --git a/broker_test.go b/broker_test.go index 38a6ab6..ee8334e 100644 --- a/broker_test.go +++ b/broker_test.go @@ -1187,3 +1187,90 @@ func TestBroker_Status_CompleteSinks(t *testing.T) { }) } } + +func TestStatus_getError(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + complete []NodeID + completeSinks []NodeID + isErrorExpected bool + expectedErrorMessage string + shouldCancelContext bool + }{ + "no-problems": { + complete: []NodeID{"1", "2"}, + completeSinks: []NodeID{"2"}, + isErrorExpected: false, + }, + "no-problems-context-cancelled": { + complete: []NodeID{"1", "2"}, + completeSinks: []NodeID{"2"}, + shouldCancelContext: true, + isErrorExpected: false, + }, + "not-enough-nodes": { + complete: []NodeID{}, + completeSinks: []NodeID{}, + isErrorExpected: true, + expectedErrorMessage: "event not processed by enough 'filter' and 'sink' nodes", + }, + "not-enough-nodes-context-cancelled": { + complete: []NodeID{}, + completeSinks: []NodeID{}, + shouldCancelContext: true, + isErrorExpected: true, + expectedErrorMessage: "event not processed by enough 'filter' and 'sink' nodes\ncontext canceled", + }, + "not-enough-sink-nodes": { + complete: []NodeID{"1"}, + completeSinks: []NodeID{}, + isErrorExpected: true, + expectedErrorMessage: "event not processed by enough 'sink' nodes", + }, + "not-enough-sink-nodes-context-cancelled": { + complete: []NodeID{"1"}, + completeSinks: []NodeID{}, + shouldCancelContext: true, + isErrorExpected: true, + expectedErrorMessage: "event not processed by enough 'sink' nodes\ncontext canceled", + }, + } + + for name, tc := range tests { + name := name + tc := tc + t.Run(name, func(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(func() { cancel() }) + + s := Status{ + complete: tc.complete, + completeSinks: tc.completeSinks, + } + + if tc.shouldCancelContext { + // Cancel the context NOW if we want to test the output from getError. + cancel() + } + + // NOTE: threshold and thresholdSinks can be added to tests struct + // if needed for additional tests. + err := s.getError(ctx.Err(), 1, 1) + + switch { + case tc.isErrorExpected: + require.Error(t, err) + require.EqualError(t, err, tc.expectedErrorMessage) + if tc.shouldCancelContext { + errors.Is(err, context.Canceled) + } + default: + require.NoError(t, err) + } + }) + } + +} diff --git a/graph.go b/graph.go index a396781..68f56cd 100644 --- a/graph.go +++ b/graph.go @@ -59,7 +59,7 @@ func (g *graph) process(ctx context.Context, e *Event) (Status, error) { } } } - return status, status.getError(g.successThreshold, g.successThresholdSinks) + return status, status.getError(ctx.Err(), g.successThreshold, g.successThresholdSinks) } // Recursively process every node in the graph.