Skip to content

Commit

Permalink
allow getError to include information about the state of the context …
Browse files Browse the repository at this point in the history
…in the error message
  • Loading branch information
peteski22 committed Aug 11, 2024
1 parent 7a72cf9 commit 341edad
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 4 deletions.
9 changes: 6 additions & 3 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,18 @@ func (s Status) CompleteSinks() []NodeID {
return s.completeSinks
}

func (s Status) getError(threshold, thresholdSinks int) error {
func (s Status) getError(ctxErr error, threshold, thresholdSinks int) error {
var err error
switch {
case len(s.complete) < threshold:
return fmt.Errorf("event not processed by enough 'filter' and 'sink' nodes")
err = fmt.Errorf("event not processed by enough 'filter' and 'sink' nodes")
case len(s.completeSinks) < thresholdSinks:
return fmt.Errorf("event not processed by enough 'sink' nodes")
err = fmt.Errorf("event not processed by enough 'sink' nodes")
default:
return nil
}

return errors.Join(err, ctxErr)

Check failure on line 189 in broker.go

View workflow job for this annotation

GitHub Actions / build

undefined: errors.Join
}

// Send writes an event of type t to all registered pipelines concurrently and
Expand Down
87 changes: 87 additions & 0 deletions broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1187,3 +1187,90 @@ func TestBroker_Status_CompleteSinks(t *testing.T) {
})
}
}

func TestStatus_getError(t *testing.T) {
t.Parallel()

tests := map[string]struct {
complete []NodeID
completeSinks []NodeID
isErrorExpected bool
expectedErrorMessage string
shouldCancelContext bool
}{
"no-problems": {
complete: []NodeID{"1", "2"},
completeSinks: []NodeID{"2"},
isErrorExpected: false,
},
"no-problems-context-cancelled": {
complete: []NodeID{"1", "2"},
completeSinks: []NodeID{"2"},
shouldCancelContext: true,
isErrorExpected: false,
},
"not-enough-nodes": {
complete: []NodeID{},
completeSinks: []NodeID{},
isErrorExpected: true,
expectedErrorMessage: "event not processed by enough 'filter' and 'sink' nodes",
},
"not-enough-nodes-context-cancelled": {
complete: []NodeID{},
completeSinks: []NodeID{},
shouldCancelContext: true,
isErrorExpected: true,
expectedErrorMessage: "event not processed by enough 'filter' and 'sink' nodes\ncontext canceled",
},
"not-enough-sink-nodes": {
complete: []NodeID{"1"},
completeSinks: []NodeID{},
isErrorExpected: true,
expectedErrorMessage: "event not processed by enough 'sink' nodes",
},
"not-enough-sink-nodes-context-cancelled": {
complete: []NodeID{"1"},
completeSinks: []NodeID{},
shouldCancelContext: true,
isErrorExpected: true,
expectedErrorMessage: "event not processed by enough 'sink' nodes\ncontext canceled",
},
}

for name, tc := range tests {
name := name
tc := tc
t.Run(name, func(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(func() { cancel() })

s := Status{
complete: tc.complete,
completeSinks: tc.completeSinks,
}

if tc.shouldCancelContext {
// Cancel the context NOW if we want to test the output from getError.
cancel()
}

// NOTE: threshold and thresholdSinks can be added to tests struct
// if needed for additional tests.
err := s.getError(ctx.Err(), 1, 1)

switch {
case tc.isErrorExpected:
require.Error(t, err)
require.EqualError(t, err, tc.expectedErrorMessage)
if tc.shouldCancelContext {
errors.Is(err, context.Canceled)
}
default:
require.NoError(t, err)
}
})
}

}
2 changes: 1 addition & 1 deletion graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (g *graph) process(ctx context.Context, e *Event) (Status, error) {
}
}
}
return status, status.getError(g.successThreshold, g.successThresholdSinks)
return status, status.getError(ctx.Err(), g.successThreshold, g.successThresholdSinks)
}

// Recursively process every node in the graph.
Expand Down

0 comments on commit 341edad

Please sign in to comment.