Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Allow invoked pipelines to complete, mitigate blocking on status channel #113

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,3 +599,22 @@ func (p Pipeline) validate() error {

return err
}

// SetRequireAllPipelinesComplete is used to set a 'per EventType' setting which
// specifies whether every pipeline invoked for this EventType should complete
// before we will allow execution to return to the caller.
// Configuring an EventType's Pipelines to all require completion means that even
// cancelling the context alone will not immediately return from a Broker.Send
// in the return to calling code.
func (b *Broker) SetRequireAllPipelinesComplete(eventType EventType, requireAll bool) {
b.lock.Lock()
defer b.lock.Unlock()

g, exists := b.graphs[eventType]
if !exists {
g = &graph{}
b.graphs[eventType] = g
}

g.requireAllPipelinesComplete = requireAll
}
20 changes: 20 additions & 0 deletions broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1187,3 +1187,23 @@ func TestBroker_Status_CompleteSinks(t *testing.T) {
})
}
}

func TestBroker_SetRequireAllPipelinesComplete(t *testing.T) {
t.Parallel()

// Create a broker
broker, err := NewBroker()
require.NoError(t, err)

// Turn 'on'
broker.SetRequireAllPipelinesComplete("foo", true)

// Get the pointer to the graph for further inspection.
g := broker.graphs["foo"]
require.NotNil(t, g)
require.True(t, g.requireAllPipelinesComplete)

// Turn 'off' and check setting.
broker.SetRequireAllPipelinesComplete("foo", false)
require.False(t, g.requireAllPipelinesComplete)
}
42 changes: 37 additions & 5 deletions graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,56 @@ type graph struct {
// successThresholdSinks specifies how many sinks must successfully process
// an event for Process to not return an error.
successThresholdSinks int

// requireAllPipelinesComplete specifies whether every pipeline triggered for
// a specific EventType should complete/return before determining Status and
// success based on thresholds.
// The default setting is false, meaning when all Pipelines complete or the
// context is 'done' then the Status and error will be returned.
// Enabling this setting allows the Status for each Pipeline to be considered
// in the return to calling code (Broker.Send).
requireAllPipelinesComplete bool
}

// Process the Event by routing it through all of the graph's nodes,
// starting with the root node.
func (g *graph) process(ctx context.Context, e *Event) (Status, error) {
statusChan := make(chan Status)
// Pre-emptively create a channel that can safely hold as many statuses as
// we have pipelines, with the expectation that we'd have a usually have 3 nodes
// per pipeline (filter, formatter, sink). We do this so that we don't end up
// blocked when trying to return the status from processing of a node, but we
// may have already decided to gather Status and error to return to the caller.
statusChan := make(chan Status, g.roots.numRoots*3)

// Initialize so that by default, we aren't required to wait for all invoked
// pipelines to complete.
allPipelinesCompleted := !g.requireAllPipelinesComplete
var done bool
var wg sync.WaitGroup

go func() {
g.roots.Range(func(_ PipelineID, pipeline *registeredPipeline) bool {
select {
// Don't continue to start root nodes if our context is already done.
case <-ctx.Done():
done = true
return false
default:
}

wg.Add(1)
g.doProcess(ctx, pipeline.rootNode, e, statusChan, &wg)
return true
})
wg.Wait()
close(statusChan)
allPipelinesCompleted = true
}()
var status Status
var done bool
for !done {

// We will continue to aggregate our Status from nodes while we are not 'done'
// or waiting for pipelines to complete (when required to do so).
for !done || !allPipelinesCompleted {
select {
case <-ctx.Done():
done = true
Expand All @@ -59,6 +90,7 @@ func (g *graph) process(ctx context.Context, e *Event) (Status, error) {
}
}
}

return status, status.getError(g.successThreshold, g.successThresholdSinks)
}

Expand All @@ -72,7 +104,7 @@ func (g *graph) process(ctx context.Context, e *Event) (Status, error) {
// filter node's ID
// - the final node in a pipeline (a sink) finishes, and Status.complete contains
// the sink node's ID
func (g *graph) doProcess(ctx context.Context, node *linkedNode, e *Event, statusChan chan Status, wg *sync.WaitGroup) {
func (g *graph) doProcess(ctx context.Context, node *linkedNode, e *Event, statusChan chan<- Status, wg *sync.WaitGroup) {
defer wg.Done()

// Process the current Node
Expand All @@ -99,7 +131,7 @@ func (g *graph) doProcess(ctx context.Context, node *linkedNode, e *Event, statu
return
}

// Process any child nodes. This is depth-first.
// Process any child nodes. This is depth-first.
if len(node.next) != 0 {
// If the new Event is nil, it has been filtered out and we are done.
if e == nil {
Expand Down
40 changes: 40 additions & 0 deletions graphmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ import (
// graphMap implements a type-safe synchronized map[PipelineID]*linkedNode
type graphMap struct {
m sync.Map

// numRoots attempts to track the number of root nodes that are registered in
// the associate map of pipelines. This can be useful for the graph to decide
// how large a channel should be for receiving Status from nodes as they process.
// Later it may require a lock/mutex in order to synchronize the Store and Delete
// operations on the map, but for now this should be accurate enough.
numRoots int
}

// registeredPipeline represents both linked nodes and the registration policy
Expand All @@ -31,11 +38,28 @@ func (g *graphMap) Range(f func(key PipelineID, value *registeredPipeline) bool)

// Store calls sync.Map.Store
func (g *graphMap) Store(id PipelineID, root *registeredPipeline) {
// Store the root node and increment how many we have (if this is a new pipeline).
// NOTE: These two actions might not be atomic, so potentially something could
// start to range over the map before we've made the change to the total number
// of roots.
if !g.Exists(id) {
g.numRoots++
}
g.m.Store(id, root)
}

// Delete calls sync.Map.Delete
func (g *graphMap) Delete(id PipelineID) {
if !g.Exists(id) {
return
}

// Delete the root node for the pipeline if it was already stored, and decrement
// how many we have.
// NOTE: These two actions might not be atomic, so potentially something could
// start to range over the map before we've made the change to the total number
// of roots.
g.numRoots--
g.m.Delete(id)
}

Expand All @@ -58,5 +82,21 @@ func (g *graphMap) Nodes(id PipelineID) ([]NodeID, error) {
result[i] = k
i++
}

return result, nil
}

// Exists determines whether a PipelineID is already stored within the graphMap.
func (g *graphMap) Exists(id PipelineID) bool {
var found bool

g.Range(func(key PipelineID, v *registeredPipeline) bool {
if key == id {
found = true
return false
}
return true
})

return found
}
88 changes: 88 additions & 0 deletions graphmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,91 @@ func TestNodes_ListNodes_RegisteredPipeline(t *testing.T) {
require.Contains(t, nodeIDs, NodeID("b"))
require.Contains(t, nodeIDs, NodeID("c"))
}

func TestGraphMap_Store(t *testing.T) {
t.Parallel()

g := &graphMap{}
findPipeline := pipelineFinder(g)

// Set up pipeline for storing.
id := PipelineID("foo")
p := &registeredPipeline{
registrationPolicy: "bar",
}

// Sanity check we have nothing to start with.
require.Equal(t, 0, g.numRoots)
v := findPipeline(id)
require.Nil(t, v)

// Store the pipeline then check we stored it and incremented the counter.
g.Store(id, p)
require.Equal(t, 1, g.numRoots)
v = findPipeline(id)
require.NotNil(t, v)
require.Equal(t, RegistrationPolicy("bar"), v.registrationPolicy)

// Store it again, and check it's still there but the counter hasn't changed
// since it's per distinct ID.
p.registrationPolicy = "baz"
g.Store(id, p)
require.Equal(t, 1, g.numRoots)
v = findPipeline(id)
require.NotNil(t, v)
require.Equal(t, RegistrationPolicy("baz"), v.registrationPolicy)
}

func TestGraphMap_Delete(t *testing.T) {
t.Parallel()

g := &graphMap{}
findPipeline := pipelineFinder(g)

// Set up pipeline for storing.
id := PipelineID("foo")
p := &registeredPipeline{
registrationPolicy: "bar",
}

// Sanity check we have nothing to start with.
require.Equal(t, 0, g.numRoots)
v := findPipeline(id)
require.Nil(t, v)

// Store the pipeline then check we stored it and incremented the counter.
g.Store(id, p)
require.Equal(t, 1, g.numRoots)
v = findPipeline(id)
require.NotNil(t, v)
require.Equal(t, RegistrationPolicy("bar"), v.registrationPolicy)

// Now delete the pipeline and check it's gone and the counter went down.
g.Delete(id)
require.Equal(t, 0, g.numRoots)
v = findPipeline(id)
require.Nil(t, v)

// Delete again and make sure nothing funky happens.
g.Delete(id)
require.Equal(t, 0, g.numRoots)
v = findPipeline(id)
require.Nil(t, v)
}

// pipelineFinder returns a func that can be used to find a pipeline in a graphMap.
func pipelineFinder(g *graphMap) func(id PipelineID) *registeredPipeline {
return func(id PipelineID) *registeredPipeline {
var res *registeredPipeline

g.Range(func(key PipelineID, rp *registeredPipeline) bool {
if key == id {
res = rp
return false
}
return true
})

return res
}
}
Loading