Skip to content

Commit

Permalink
Pipelines with Cycles: e2e testing, and pipeline validation (#920)
Browse files Browse the repository at this point in the history
Signed-off-by: Julie Vogelman <[email protected]>
  • Loading branch information
juliev0 committed Aug 8, 2023
1 parent 17c171f commit 2d6112b
Show file tree
Hide file tree
Showing 8 changed files with 618 additions and 14 deletions.
45 changes: 45 additions & 0 deletions examples/10-cycle-to-prev.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: cycle-to-prev
spec:
vertices:
- name: in
source:
http: {}
- name: cat
udf:
builtin:
name: cat
- name: retry
scale:
min: 1
disabled: true # don't scale this beyond one Pod since it doesn't make sense for this particular container, which uses in-memory storage
udf:
container:
# This will try each message up to 3 times before continuing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/function/examples/retry
# (a more realistic example might retry only on failure)
image: quay.io/numaio/numaflow-go/map-retry:latest
- name: out
scale:
min: 1
sink:
log: {}
edges:
- from: in
to: cat
- from: cat
to: retry
- from: retry
to: cat
conditions:
tags:
values:
- retry
- from: retry
to: out
conditions:
tags:
operator: not
values:
- retry
39 changes: 39 additions & 0 deletions examples/10-cycle-to-self.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: cycle-to-self
spec:
vertices:
- name: in
source:
http: {}
- name: retry
scale:
min: 1
disabled: true # don't scale this beyond one Pod since it doesn't make sense for this particular container, which uses in-memory storage
udf:
container:
# This will try each message up to 3 times before continuing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/function/examples/retry
# (a more realistic example might retry only on failure)
image: quay.io/numaio/numaflow-go/map-retry:latest
- name: out
scale:
min: 1
sink:
log: {}
edges:
- from: in
to: retry
- from: retry
to: retry
conditions:
tags:
values:
- retry
- from: retry
to: out
conditions:
tags:
operator: not
values:
- retry
29 changes: 29 additions & 0 deletions pkg/apis/numaflow/v1alpha1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,35 @@ type PipelineSpec struct {
SideInputs []SideInput `json:"sideInputs,omitempty" protobuf:"bytes,8,rep,name=sideInputs"`
}

func (pipeline PipelineSpec) GetMatchingVertices(f func(AbstractVertex) bool) map[string]*AbstractVertex {
mappedVertices := make(map[string]*AbstractVertex)
for i := range pipeline.Vertices {
v := &pipeline.Vertices[i]
if f(*v) {
mappedVertices[v.Name] = v
}
}
return mappedVertices
}

func (pipeline PipelineSpec) GetVerticesByName() map[string]*AbstractVertex {
return pipeline.GetMatchingVertices(func(v AbstractVertex) bool {
return true
})
}

func (pipeline PipelineSpec) GetSourcesByName() map[string]*AbstractVertex {
return pipeline.GetMatchingVertices(func(v AbstractVertex) bool {
return v.IsASource()
})
}

func (pipeline PipelineSpec) GetSinksByName() map[string]*AbstractVertex {
return pipeline.GetMatchingVertices(func(v AbstractVertex) bool {
return v.IsASink()
})
}

type Watermark struct {
// Disabled toggles the watermark propagation, defaults to false.
// +kubebuilder:default=false
Expand Down
152 changes: 148 additions & 4 deletions pkg/reconciler/pipeline/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,6 @@ func ValidatePipeline(pl *dfv1.Pipeline) error {
if e.From == "" || e.To == "" {
return fmt.Errorf("invalid edge: both from and to need to be specified")
}
if e.From == e.To {
return fmt.Errorf("invalid edge: same from and to")
}
if !names[e.From] {
return fmt.Errorf("invalid edge: no vertex named %q", e.From)
}
Expand All @@ -155,7 +152,12 @@ func ValidatePipeline(pl *dfv1.Pipeline) error {
return fmt.Errorf("not all the vertex names are defined in edges")
}

// TODO(Join): prevent pipelines with Cycles in the case that there is a Reduce Vertex at the point of the cycle or to the right of it
// Prevent pipelines with Cycles in the case that there is a Reduce Vertex at the point of the cycle or to the right of it.
// Whenever there's a cycle, there will inherently be "late data", and we don't want late data for a Reduce Vertex, which may
// have already "closed the book" on the data's time window.
if err := validateCycles(&pl.Spec); err != nil {
return err
}

for _, v := range pl.Spec.Vertices {
if err := validateVertex(v); err != nil {
Expand Down Expand Up @@ -308,3 +310,145 @@ func isReservedContainerName(name string) bool {
name == dfv1.CtrInitSideInputs ||
name == dfv1.CtrSideInputsWatcher
}

// validateCycles verifies that there are no invalid cycles in the pipeline.
// An invalid cycle has a Reduce Vertex at or to the right of the cycle. Whenever there's a cycle,
// there will inherently be "late data", and we don't want late data for a Reduce Vertex, which may
// have already "closed the book" on the data's time window.
func validateCycles(pipelineSpec *dfv1.PipelineSpec) error {
verticesByName := pipelineSpec.GetVerticesByName()
edges, err := toVerticesMappedByFrom(pipelineSpec.Edges, verticesByName)
if err != nil {
return err
}

// first find the cycles, if any
cycles, err := getCycles(pipelineSpec)
if err != nil {
return err
}
// need to make sure none of the cycles have a Reduce Vertex at or to the right of the cycle
for cycleVertexName := range cycles {
cycleVertex, found := verticesByName[cycleVertexName]
if !found {
return fmt.Errorf("something went wrong: no Vertex found with name %q", cycleVertexName)
}
invalidReduce := edges.findVertex(cycleVertex, map[string]struct{}{}, func(v *dfv1.AbstractVertex) bool {
return v.IsReduceUDF()
})
if invalidReduce {
return fmt.Errorf("there's a Reduce Vertex at or to the right of a Cycle occurring at Vertex %q", cycleVertexName)
}
}

return nil
}

// getCycles locates the vertices where there's a Cycle, if any
// eg. if A->B->A, then return A
// Since there are multiple Sources, and since each Source produces a Tree, then we can return multiple Cycles
func getCycles(pipelineSpec *dfv1.PipelineSpec) (map[string]struct{}, error) {
edges, err := toVerticesMappedByFrom(pipelineSpec.Edges, pipelineSpec.GetVerticesByName())
if err != nil {
return nil, err
}

sources := pipelineSpec.GetSourcesByName()
cycles := map[string]struct{}{} // essentially a Set of cycle Vertex names

// consolidate the Cycles from all Sources
for _, sourceVertex := range sources {
cyclesFromSource := edges.getCyclesFromVertex(sourceVertex, map[string]struct{}{})
for cycleVertex := range cyclesFromSource {
cycles[cycleVertex] = struct{}{}
}
}

return cycles, nil
}

// getCyclesFromVertex returns the cycles detected if any, starting from startVertex
// This is a recursive function. Each iteration we keep track of the visited Vertices in order to detect a cycle.
func (edges verticesByFrom) getCyclesFromVertex(startVertex *dfv1.AbstractVertex, visited map[string]struct{}) map[string]struct{} {

toVertices, found := edges[startVertex.Name]
// base case: no Edges stem from this Vertex
if !found {
return map[string]struct{}{}
}

// check for cycle
_, alreadyVisited := visited[startVertex.Name]
if alreadyVisited {
return map[string]struct{}{startVertex.Name: {}}
}
// add this Vertex to our Set
visited[startVertex.Name] = struct{}{}

// recurse the Edges of this Vertex, looking for cycles
cyclesFound := make(map[string]struct{})
for _, toVertex := range toVertices {
newCycles := edges.getCyclesFromVertex(toVertex, visited)
for cycleVertex := range newCycles {
cyclesFound[cycleVertex] = struct{}{}
}
}

delete(visited, startVertex.Name) // pop

return cyclesFound
}

// findVertex determines if any Vertex starting from this one meets some condition
// This is a recursive function. Each iteration we keep track of the visited Vertices in order not to get in an infinite loop
func (edges verticesByFrom) findVertex(startVertex *dfv1.AbstractVertex, visited map[string]struct{}, f func(*dfv1.AbstractVertex) bool) bool {

// first try the condition on this vertex
if f(startVertex) {
return true
}

toVertices, found := edges[startVertex.Name]
// base case: no Edges stem from this Vertex
if !found {
return false
}

// if we've arrived at a cycle, then stop
_, alreadyVisited := visited[startVertex.Name]
if alreadyVisited {
return false
}
// keep track of visited vertices so we don't get into an infinite loop
visited[startVertex.Name] = struct{}{}

// recurse
for _, toVertex := range toVertices {
if edges.findVertex(toVertex, visited, f) {
return true
}
}

delete(visited, startVertex.Name) // pop

return false
}

type verticesByFrom map[string][]*dfv1.AbstractVertex

// toVerticesMappedByFrom is a helper function to create a map of "To Vertices" from their "From Vertex"
func toVerticesMappedByFrom(edges []dfv1.Edge, verticesByName map[string]*dfv1.AbstractVertex) (verticesByFrom, error) {
mappedEdges := make(verticesByFrom)
for _, edge := range edges {
_, found := mappedEdges[edge.From]
if !found {
mappedEdges[edge.From] = make([]*dfv1.AbstractVertex, 0)
}
toVertex, found := verticesByName[edge.To]
if !found {
return nil, fmt.Errorf("no vertex found of name %q", edge.To)
}
mappedEdges[edge.From] = append(mappedEdges[edge.From], toVertex)
}
return mappedEdges, nil
}
Loading

0 comments on commit 2d6112b

Please sign in to comment.