Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add forest validation for pipelines. Fixes #1002 #1063

Merged
merged 5 commits into from
Sep 19, 2023

Conversation

dpadhiar
Copy link
Contributor

@dpadhiar dpadhiar commented Sep 18, 2023

Explain what this PR does.

Fixes #1002

Adds validation to prevent users from submitting pipeline specs that are forests.

@dpadhiar dpadhiar marked this pull request as ready for review September 18, 2023 22:29
assert.Error(t, err)
assert.Contains(t, err.Error(), "invalid pipeline")
})

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like some good test cases for invalid pipelines. I wonder if we should add one for an "interesting" valid pipeline - something with multiple sources and multiple sinks?


// true if forest is found
return len(visited) != len(pl.Spec.Vertices)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be useful to have a comment for this function to go over how it works


visited[vtxName] = struct{}{}

// construct list all to and from vertices
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"construct list of all"...

@@ -107,6 +107,10 @@ func ValidatePipeline(pl *dfv1.Pipeline) error {
return fmt.Errorf("invalid user-defined source vertex %q, only one of 'http', 'kafka', 'nats', 'redisStreams', 'generator' and 'udSource' can be specified", k)
}
}

if findForests(s, pl) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to findForests for all the sources? Can we define the findForests as below?

func findForests(pl *dfv1.Pipeline) bool

Inside the function, we randomly pick a vertex and DFS. If the pipeline is a forest, then ANY of the vertices cannot visit ALL vertices. Hence if the eventual lengths don't match, the pipeline is a forest.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. I didn't notice we were doing it for all sources. We should only do it for one vertex.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, how would we randomly pick the vertex? I'm in favor of just calling it on pl.Spec.Vertices[0] each time as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's fine

func findForests(vtx dfv1.AbstractVertex, pl *dfv1.Pipeline) bool {

visited := map[string]struct{}{}
findForestHelper(vtx.Name, visited, pl)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - naming: we have findForests and findForestHelper, do we need the extra s? Also consider naming it isAForest

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a better name, will make this change. findForests isn't as descriptive/accurate for what we're doing.


}

func findForestHelper(vtxName string, visited map[string]struct{}, pl *dfv1.Pipeline) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func findForestHelper(vtxName string, visited map[string]struct{}, pl *dfv1.Pipeline) {
func buildVisitedMap(vtxName string, visited map[string]struct{}, pl *dfv1.Pipeline) {

NIT: how about we make the function name a good representative of what it is doing? It took me a couple of seconds to figure out that visited is mutated even though it is an arg.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a few comments would be helpful as well, will do.

@dpadhiar dpadhiar merged commit b2a377c into numaproj:main Sep 19, 2023
17 checks passed
kohlisid pushed a commit to kohlisid/numaflow that referenced this pull request Sep 19, 2023
kohlisid pushed a commit to kohlisid/numaflow that referenced this pull request Sep 19, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

complete pipeline validations
4 participants