Skip to content

Commit

Permalink
graph: don't try to process more root nodes if context is done (#114)
Browse files Browse the repository at this point in the history
* don't start root nodes if context is done

* add test for change to prevent root nodes being invoked when context is already cancelled

* parallelize test amongst other tests

* Update GHA, update Go version, go mod tidy
  • Loading branch information
Peter Wilson authored Aug 12, 2024
1 parent 0b0d547 commit befca3d
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 0 deletions.
9 changes: 9 additions & 0 deletions graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ func (g *graph) process(ctx context.Context, e *Event) (Status, error) {
var wg sync.WaitGroup
go func() {
g.roots.Range(func(_ PipelineID, pipeline *registeredPipeline) bool {
select {
// Don't continue to start root nodes if our context is already done.
// We would just process the node and then drop the status, and no
// other linked nodes would be processed.
case <-ctx.Done():
return false
default:
}

wg.Add(1)
g.doProcess(ctx, pipeline.rootNode, e, statusChan, &wg)
return true
Expand Down
117 changes: 117 additions & 0 deletions graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@ import (
"context"
"io/ioutil"
"os"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"
)

type reopenNode struct {
Expand All @@ -17,6 +21,9 @@ type reopenNode struct {

var _ Node = &reopenNode{}

// Ensure that testActionNode implements the interface for a Node.
var _ Node = (*testActionNode)(nil)

func (r *reopenNode) Process(ctx context.Context, e *Event) (*Event, error) {
return e, nil
}
Expand Down Expand Up @@ -389,3 +396,113 @@ func TestSendBlocking(t *testing.T) {
// satisfy the go leak detector in TestMain.
time.Sleep(700 * time.Millisecond)
}

func TestGraph_process_StopRootRangeOnContextCancelled(t *testing.T) {
t.Parallel()

seen := atomic.Bool{}
ctx, cancel := context.WithCancel(context.Background())
l := &sync.RWMutex{}

// We will configure a formatter node in each pipeline to run this func, the
// first one to run it will cancel the context which is shared across all the
// nodes. If the func is invoked again, the test will fail.
action := func(ctx context.Context, e *Event) (*Event, error) {
l.Lock()
defer l.Unlock()

if !seen.Load() {
seen.Store(true)
cancel()
} else {
t.Fatal("root node invoked with cancelled context")
}

return e, nil
}

broker, err := NewBroker()
require.NoError(t, err)
require.NotNil(t, broker)

// Register nodes and pipeline for pipeline1.
// The formatter node performs the func above, the sink node does nothing.
formatterID1 := NodeID("formatter1")
formatterNode1 := &testActionNode{action: action, nodeType: NodeTypeFormatter}
err = broker.RegisterNode(formatterID1, formatterNode1)
require.NoError(t, err)
sinkID1 := NodeID("sink1")
sinkNode1 := &testActionNode{nodeType: NodeTypeSink}
err = broker.RegisterNode(sinkID1, sinkNode1)
require.NoError(t, err)
err = broker.RegisterPipeline(Pipeline{
PipelineID: "pipeline1",
EventType: "foo",
NodeIDs: []NodeID{formatterID1, sinkID1},
})
require.NoError(t, err)

// Register nodes and pipeline for pipeline2.
// The formatter node performs the func above, the sink node does nothing.
// (We don't expect these nodes or pipeline to ever be invoked).
formatterID2 := NodeID("formatter2")
formatterNode2 := &testActionNode{action: action, nodeType: NodeTypeFormatter}
err = broker.RegisterNode(formatterID2, formatterNode2)
require.NoError(t, err)
sinkID2 := NodeID("sink2")
sinkNode2 := &testActionNode{nodeType: NodeTypeSink}
err = broker.RegisterNode(sinkID2, sinkNode2)
require.NoError(t, err)
err = broker.RegisterPipeline(Pipeline{
PipelineID: "pipeline2",
EventType: "foo",
NodeIDs: []NodeID{formatterID2, sinkID2},
})
require.NoError(t, err)

// Send an event via the broker to trigger processing of nodes.
e := &Event{
Type: "foo",
CreatedAt: time.Now(),
Formatted: make(map[string][]byte),
Payload: nil,
}

status, err := broker.Send(ctx, "foo", e)
require.NoError(t, err)
require.Len(t, status.Warnings, 0, "nodes reported errors")
}

// testActionNode is a Node which can be configured to perform the given action
// when Process is invoked on the node. It is flexible and allows the NodeType to
// be reported however the creator likes.
type testActionNode struct {
// action to perform when Process is called on the Node.
// NOTE: if no action is specified the node will behave like a successful
// sink node and return nil, nil.
action func(ctx context.Context, e *Event) (*Event, error)

// NodeType for this node.
// NOTE: this should be configured otherwise NodeTypeSink will be returned.
nodeType NodeType
}

func (s *testActionNode) Type() NodeType {
if s.nodeType > 0 {
return s.nodeType
}

return NodeTypeSink
}

func (s *testActionNode) Reopen() error {
return nil
}

func (s *testActionNode) Process(ctx context.Context, e *Event) (*Event, error) {
if s.action != nil {
return s.action(ctx, e)
}

return nil, nil
}

0 comments on commit befca3d

Please sign in to comment.