Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add forest validation for pipelines. Fixes #1002 #1063

Merged
merged 5 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions pkg/reconciler/pipeline/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ func ValidatePipeline(pl *dfv1.Pipeline) error {
return fmt.Errorf("invalid user-defined source vertex %q, only one of 'http', 'kafka', 'nats', 'redisStreams', 'generator' and 'udSource' can be specified", k)
}
}

if findForests(s, pl) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to findForests for all the sources? Can we define the findForests as below?

func findForests(pl *dfv1.Pipeline) bool

Inside the function, we randomly pick a vertex and DFS. If the pipeline is a forest, then ANY of the vertices cannot visit ALL vertices. Hence if the eventual lengths don't match, the pipeline is a forest.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. I didn't notice we were doing it for all sources. We should only do it for one vertex.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, how would we randomly pick the vertex? I'm in favor of just calling it on pl.Spec.Vertices[0] each time as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's fine

return fmt.Errorf("invalid pipeline, cannot be disjointed")
}
}

for k, t := range udTransformers {
Expand Down Expand Up @@ -472,3 +476,37 @@ func toVerticesMappedByFrom(edges []dfv1.Edge, verticesByName map[string]*dfv1.A
}
return mappedEdges, nil
}

func findForests(vtx dfv1.AbstractVertex, pl *dfv1.Pipeline) bool {

visited := map[string]struct{}{}
findForestHelper(vtx.Name, visited, pl)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - naming: we have findForests and findForestHelper, do we need the extra s? Also consider naming it isAForest

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a better name, will make this change. findForests isn't as descriptive/accurate for what we're doing.


// true if forest is found
return len(visited) != len(pl.Spec.Vertices)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be useful to have a comment for this function to go over how it works

}

func findForestHelper(vtxName string, visited map[string]struct{}, pl *dfv1.Pipeline) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func findForestHelper(vtxName string, visited map[string]struct{}, pl *dfv1.Pipeline) {
func buildVisitedMap(vtxName string, visited map[string]struct{}, pl *dfv1.Pipeline) {

NIT: how about we make the function name a good representative of what it is doing? It took me a couple of seconds to figure out that visited is mutated even though it is an arg.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a few comments would be helpful as well, will do.


visited[vtxName] = struct{}{}

// construct list all to and from vertices
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"construct list of all"...

neighbors := make(map[string]string)
toEdges := pl.GetToEdges(vtxName)
fromEdges := pl.GetFromEdges(vtxName)
for _, e := range toEdges {
neighbors[e.To] = e.To
}
for _, e := range fromEdges {
neighbors[e.From] = e.From
}

// visit all to and from vertices
for _, v := range neighbors {
if _, alreadyVisited := visited[v]; !alreadyVisited {
findForestHelper(v, visited, pl)
}
}

}
87 changes: 87 additions & 0 deletions pkg/reconciler/pipeline/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,57 @@ var (
},
},
}

testForestPipeline = &dfv1.Pipeline{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pl",
Namespace: "test-ns",
},
Spec: dfv1.PipelineSpec{
Vertices: []dfv1.AbstractVertex{
{
Name: "input",
Source: &dfv1.Source{
UDTransformer: &dfv1.UDTransformer{
Builtin: &dfv1.Transformer{Name: "filter"},
}},
},
{
Name: "input-1",
Source: &dfv1.Source{
UDTransformer: &dfv1.UDTransformer{
Builtin: &dfv1.Transformer{Name: "filter"},
}},
},
{
Name: "p1",
UDF: &dfv1.UDF{
Builtin: &dfv1.Function{Name: "cat"},
},
},
{
Name: "p2",
UDF: &dfv1.UDF{
Builtin: &dfv1.Function{Name: "cat"},
},
},
{
Name: "output",
Sink: &dfv1.Sink{},
},
{
Name: "output-1",
Sink: &dfv1.Sink{},
},
},
Edges: []dfv1.Edge{
{From: "input", To: "p1"},
{From: "p1", To: "output"},
{From: "input-1", To: "p2"},
{From: "p2", To: "output-1"},
},
},
}
)

func TestValidatePipeline(t *testing.T) {
Expand Down Expand Up @@ -273,6 +324,42 @@ func TestValidatePipeline(t *testing.T) {
assert.Contains(t, err.Error(), "can not specify both builtin function, and a customized image")
})

t.Run("forest - two pipelines with 1 source/sink", func(t *testing.T) {
testObj := testForestPipeline.DeepCopy()
err := ValidatePipeline(testObj)
assert.Error(t, err)
assert.Contains(t, err.Error(), "invalid pipeline")
})

t.Run("forest - second pipeline has no sink", func(t *testing.T) {
testObj := testForestPipeline.DeepCopy()
testObj.Spec.Vertices[5].Sink = nil
testObj.Spec.Vertices[5].UDF = &dfv1.UDF{}
err := ValidatePipeline(testObj)
assert.Error(t, err)
assert.Contains(t, err.Error(), "invalid vertex")
})

t.Run("forest - two pipelines with multiple sources/sinks", func(t *testing.T) {
testObj := testForestPipeline.DeepCopy()
testObj.Spec.Vertices = append(testObj.Spec.Vertices, dfv1.AbstractVertex{Name: "input-2", Source: &dfv1.Source{}})
testObj.Spec.Vertices = append(testObj.Spec.Vertices, dfv1.AbstractVertex{Name: "output-2", Sink: &dfv1.Sink{}})
testObj.Spec.Edges = append(testObj.Spec.Edges, dfv1.Edge{From: "input-2", To: "p1"})
testObj.Spec.Edges = append(testObj.Spec.Edges, dfv1.Edge{From: "p2", To: "output-2"})
err := ValidatePipeline(testObj)
assert.Error(t, err)
assert.Contains(t, err.Error(), "invalid pipeline")
})

t.Run("forest - pipelines have cycles", func(t *testing.T) {
testObj := testForestPipeline.DeepCopy()
testObj.Spec.Edges = append(testObj.Spec.Edges, dfv1.Edge{From: "p1", To: "p1"})
testObj.Spec.Edges = append(testObj.Spec.Edges, dfv1.Edge{From: "p2", To: "p2"})
err := ValidatePipeline(testObj)
assert.Error(t, err)
assert.Contains(t, err.Error(), "invalid pipeline")
})

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like some good test cases for invalid pipelines. I wonder if we should add one for an "interesting" valid pipeline - something with multiple sources and multiple sinks?

t.Run("edge - invalid vertex name", func(t *testing.T) {
testObj := testPipeline.DeepCopy()
testObj.Spec.Edges = append(testObj.Spec.Edges, dfv1.Edge{From: "a", To: "b"})
Expand Down
Loading