-
Notifications
You must be signed in to change notification settings - Fork 111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: controller changes for Side Inputs support #866
Conversation
Signed-off-by: Derek Wang <[email protected]>
Signed-off-by: Derek Wang <[email protected]>
Signed-off-by: Derek Wang <[email protected]>
Signed-off-by: Derek Wang <[email protected]>
Signed-off-by: Derek Wang <[email protected]>
Signed-off-by: Derek Wang <[email protected]>
Signed-off-by: Derek Wang <[email protected]>
Signed-off-by: Derek Wang <[email protected]>
Signed-off-by: Derek Wang <[email protected]>
Signed-off-by: Derek Wang <[email protected]>
Signed-off-by: Derek Wang <[email protected]>
Signed-off-by: Derek Wang <[email protected]>
Signed-off-by: Derek Wang <[email protected]>
Signed-off-by: Derek Wang <[email protected]>
Signed-off-by: Derek Wang <[email protected]>
cmd/commands/root.go
Outdated
@@ -46,4 +46,7 @@ func init() { | |||
rootCmd.AddCommand(NewServerCommand()) | |||
rootCmd.AddCommand(NewServerInitCommand()) | |||
rootCmd.AddCommand(NewWebhookCommand()) | |||
rootCmd.AddCommand(NewSideInputsInitCommand()) | |||
rootCmd.AddCommand(NewSideInputManagerCommand()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we keep either use SideInput
or SideInputs
, but not both. WDYT?
Signed-off-by: Derek Wang <[email protected]>
} | ||
|
||
func (p Pipeline) GetSideInputsStoreName() string { | ||
return p.Name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we setting SideInputs store name the same as pipeline name? Maybe it worth documenting the rationale behind.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's just a unique name in a namespace.
@@ -393,6 +402,81 @@ func (r *pipelineReconciler) findExistingVertices(ctx context.Context, pl *dfv1. | |||
return result, nil | |||
} | |||
|
|||
// Create or update Side Input Mapager deployments | |||
func (r *pipelineReconciler) createOrUpdateSIMDeployments(ctx context.Context, pl *dfv1.Pipeline, isbSvcConfig dfv1.BufferServiceConfig) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we unit test it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
Signed-off-by: Derek Wang <[email protected]>
Signed-off-by: Derek Wang <[email protected]>
Signed-off-by: Derek Wang <[email protected]>
Signed-off-by: Derek Wang <[email protected]>
|
Signed-off-by: Derek Wang <[email protected]>
Signed-off-by: Derek Wang <[email protected]>
done |
Be able to orchestrate a pipeline spec like following:
For a pipeline like above, following new resources will be created:
s1
ands2
, each of them contains 1 init container to check if the store is ready, and 2 containers.my-udf
vertex pod, which is supposed to load the data from data store.