-
Notifications
You must be signed in to change notification settings - Fork 111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add forest validation for pipelines. Fixes #1002 #1063
Changes from 3 commits
06abfb2
eeaaa11
ec29b86
e8e99dd
816178a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -107,6 +107,10 @@ func ValidatePipeline(pl *dfv1.Pipeline) error { | |||||
return fmt.Errorf("invalid user-defined source vertex %q, only one of 'http', 'kafka', 'nats', 'redisStreams', 'generator' and 'udSource' can be specified", k) | ||||||
} | ||||||
} | ||||||
|
||||||
if findForests(s, pl) { | ||||||
return fmt.Errorf("invalid pipeline, cannot be disjointed") | ||||||
} | ||||||
} | ||||||
|
||||||
for k, t := range udTransformers { | ||||||
|
@@ -472,3 +476,37 @@ func toVerticesMappedByFrom(edges []dfv1.Edge, verticesByName map[string]*dfv1.A | |||||
} | ||||||
return mappedEdges, nil | ||||||
} | ||||||
|
||||||
func findForests(vtx dfv1.AbstractVertex, pl *dfv1.Pipeline) bool { | ||||||
|
||||||
visited := map[string]struct{}{} | ||||||
findForestHelper(vtx.Name, visited, pl) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit - naming: we have findForests and findForestHelper, do we need the extra s? Also consider naming it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a better name, will make this change. |
||||||
|
||||||
// true if forest is found | ||||||
return len(visited) != len(pl.Spec.Vertices) | ||||||
|
||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. might be useful to have a comment for this function to go over how it works |
||||||
} | ||||||
|
||||||
func findForestHelper(vtxName string, visited map[string]struct{}, pl *dfv1.Pipeline) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
NIT: how about we make the function name a good representative of what it is doing? It took me a couple of seconds to figure out that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Adding a few comments would be helpful as well, will do. |
||||||
|
||||||
visited[vtxName] = struct{}{} | ||||||
|
||||||
// construct list all to and from vertices | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "construct list of all"... |
||||||
neighbors := make(map[string]string) | ||||||
toEdges := pl.GetToEdges(vtxName) | ||||||
fromEdges := pl.GetFromEdges(vtxName) | ||||||
for _, e := range toEdges { | ||||||
neighbors[e.To] = e.To | ||||||
} | ||||||
for _, e := range fromEdges { | ||||||
neighbors[e.From] = e.From | ||||||
} | ||||||
|
||||||
// visit all to and from vertices | ||||||
for _, v := range neighbors { | ||||||
if _, alreadyVisited := visited[v]; !alreadyVisited { | ||||||
findForestHelper(v, visited, pl) | ||||||
} | ||||||
} | ||||||
|
||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -163,6 +163,57 @@ var ( | |
}, | ||
}, | ||
} | ||
|
||
testForestPipeline = &dfv1.Pipeline{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Name: "test-pl", | ||
Namespace: "test-ns", | ||
}, | ||
Spec: dfv1.PipelineSpec{ | ||
Vertices: []dfv1.AbstractVertex{ | ||
{ | ||
Name: "input", | ||
Source: &dfv1.Source{ | ||
UDTransformer: &dfv1.UDTransformer{ | ||
Builtin: &dfv1.Transformer{Name: "filter"}, | ||
}}, | ||
}, | ||
{ | ||
Name: "input-1", | ||
Source: &dfv1.Source{ | ||
UDTransformer: &dfv1.UDTransformer{ | ||
Builtin: &dfv1.Transformer{Name: "filter"}, | ||
}}, | ||
}, | ||
{ | ||
Name: "p1", | ||
UDF: &dfv1.UDF{ | ||
Builtin: &dfv1.Function{Name: "cat"}, | ||
}, | ||
}, | ||
{ | ||
Name: "p2", | ||
UDF: &dfv1.UDF{ | ||
Builtin: &dfv1.Function{Name: "cat"}, | ||
}, | ||
}, | ||
{ | ||
Name: "output", | ||
Sink: &dfv1.Sink{}, | ||
}, | ||
{ | ||
Name: "output-1", | ||
Sink: &dfv1.Sink{}, | ||
}, | ||
}, | ||
Edges: []dfv1.Edge{ | ||
{From: "input", To: "p1"}, | ||
{From: "p1", To: "output"}, | ||
{From: "input-1", To: "p2"}, | ||
{From: "p2", To: "output-1"}, | ||
}, | ||
}, | ||
} | ||
) | ||
|
||
func TestValidatePipeline(t *testing.T) { | ||
|
@@ -273,6 +324,42 @@ func TestValidatePipeline(t *testing.T) { | |
assert.Contains(t, err.Error(), "can not specify both builtin function, and a customized image") | ||
}) | ||
|
||
t.Run("forest - two pipelines with 1 source/sink", func(t *testing.T) { | ||
testObj := testForestPipeline.DeepCopy() | ||
err := ValidatePipeline(testObj) | ||
assert.Error(t, err) | ||
assert.Contains(t, err.Error(), "invalid pipeline") | ||
}) | ||
|
||
t.Run("forest - second pipeline has no sink", func(t *testing.T) { | ||
testObj := testForestPipeline.DeepCopy() | ||
testObj.Spec.Vertices[5].Sink = nil | ||
testObj.Spec.Vertices[5].UDF = &dfv1.UDF{} | ||
err := ValidatePipeline(testObj) | ||
assert.Error(t, err) | ||
assert.Contains(t, err.Error(), "invalid vertex") | ||
}) | ||
|
||
t.Run("forest - two pipelines with multiple sources/sinks", func(t *testing.T) { | ||
testObj := testForestPipeline.DeepCopy() | ||
testObj.Spec.Vertices = append(testObj.Spec.Vertices, dfv1.AbstractVertex{Name: "input-2", Source: &dfv1.Source{}}) | ||
testObj.Spec.Vertices = append(testObj.Spec.Vertices, dfv1.AbstractVertex{Name: "output-2", Sink: &dfv1.Sink{}}) | ||
testObj.Spec.Edges = append(testObj.Spec.Edges, dfv1.Edge{From: "input-2", To: "p1"}) | ||
testObj.Spec.Edges = append(testObj.Spec.Edges, dfv1.Edge{From: "p2", To: "output-2"}) | ||
err := ValidatePipeline(testObj) | ||
assert.Error(t, err) | ||
assert.Contains(t, err.Error(), "invalid pipeline") | ||
}) | ||
|
||
t.Run("forest - pipelines have cycles", func(t *testing.T) { | ||
testObj := testForestPipeline.DeepCopy() | ||
testObj.Spec.Edges = append(testObj.Spec.Edges, dfv1.Edge{From: "p1", To: "p1"}) | ||
testObj.Spec.Edges = append(testObj.Spec.Edges, dfv1.Edge{From: "p2", To: "p2"}) | ||
err := ValidatePipeline(testObj) | ||
assert.Error(t, err) | ||
assert.Contains(t, err.Error(), "invalid pipeline") | ||
}) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. looks like some good test cases for invalid pipelines. I wonder if we should add one for an "interesting" valid pipeline - something with multiple sources and multiple sinks? |
||
t.Run("edge - invalid vertex name", func(t *testing.T) { | ||
testObj := testPipeline.DeepCopy() | ||
testObj.Spec.Edges = append(testObj.Spec.Edges, dfv1.Edge{From: "a", To: "b"}) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to
findForests
for all the sources? Can we define the findForests as below?Inside the function, we randomly pick a vertex and DFS. If the pipeline is a forest, then ANY of the vertices cannot visit ALL vertices. Hence if the eventual lengths don't match, the pipeline is a forest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct. I didn't notice we were doing it for all sources. We should only do it for one vertex.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, how would we randomly pick the vertex? I'm in favor of just calling it on
pl.Spec.Vertices[0]
each time as well.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's fine