Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: incorrect side inputs watch logic #1164

Merged
merged 4 commits into from
Oct 5, 2023
Merged

fix: incorrect side inputs watch logic #1164

merged 4 commits into from
Oct 5, 2023

Conversation

whynowy
Copy link
Member

@whynowy whynowy commented Oct 5, 2023

Fixes: #1160

@whynowy whynowy requested a review from vigith as a code owner October 5, 2023 04:32
@@ -132,9 +132,6 @@ func startSideInputInitializer(ctx context.Context, store kvs.KVStorer, mountPat
// gotAllSideInputVals checks if values for all side-inputs
// have been received from the KV bucket
func gotAllSideInputVals(sideInputs []string, m map[string][]byte) bool {
if len(sideInputs) != len(m) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

len(m) is total number of side inputs defined in the pipeline, len(sideInputs) is the number of side inputs current vertex is watching.

.
Signed-off-by: Derek Wang <[email protected]>
.
Signed-off-by: Derek Wang <[email protected]>
@@ -110,9 +110,9 @@ func startSideInputInitializer(ctx context.Context, store kvs.KVStorer, mountPat
m[value.Key()] = value.Value()
// Wait for the data to be ready in the side input store, and then copy it to the disk
if gotAllSideInputVals(sideInputs, m) {
for sideInput := range m {
for _, sideInput := range sideInputs {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should only populate the side inputs that current vertex is watching.

.
Signed-off-by: Derek Wang <[email protected]>
return
case value := <-watchCh:
if value == nil {
log.Warnw("nil value received from Side Input watcher")
continue
}
if !sharedutil.StringSliceContains(sideInputs, value.Key()) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignore the ones that current vertex is not watching.

@whynowy whynowy changed the title fix: incorrect side inputs initialization ready check logic fix: incorrect side inputs watch logic Oct 5, 2023
@vigith vigith requested review from jy4096 and kohlisid October 5, 2023 13:35
@vigith vigith merged commit d4b5f1b into numaproj:main Oct 5, 2023
17 checks passed
@whynowy whynowy deleted the sifix branch October 5, 2023 22:06
whynowy added a commit that referenced this pull request Oct 13, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Sink Vertex with SideInput always pending
4 participants