Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid unwanted watcher creation and reduce being stuck with udf is restarted #999

Merged
merged 12 commits into from
Aug 30, 2023

Conversation

yhl25
Copy link
Contributor

@yhl25 yhl25 commented Aug 29, 2023

fixes #1001

yhl25 and others added 2 commits August 29, 2023 09:56
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
@yhl25 yhl25 requested a review from vigith August 29, 2023 09:22
@vigith vigith changed the title fix: avoid watcher creation when lastUpdateTime is equal to previousFetchTime fix: avoid unwanted watcher creation and reduce being stuck with udf is restarted Aug 29, 2023
pkg/udf/rpc/grpc_reduce.go Outdated Show resolved Hide resolved

// invoke the reduceFn method with datumCh channel
wg.Add(1)
go func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need of wait group?


// pass key and window information inside the context
mdMap := map[string]string{
shared.WinStartTime: strconv.FormatInt(partitionID.Start.UnixMilli(), 10),
shared.WinEndTime: strconv.FormatInt(partitionID.End.UnixMilli(), 10),
}

ctx = metadata.NewOutgoingContext(ctx, metadata.New(mdMap))
cctx, cancel := context.WithCancel(ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need a new context?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to exit from readloop go routine when we return from ApplyReduce.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be okay to just do ctx, cancel := context.WithCancel(ctx), do not need to give a new cctx.

This is a common patten to make sure everyone in this function watches this ctx will properly exit if the function returns, no matter if it's expectedly or unexpectedly.

case msg, ok := <-messageStream:
if msg != nil {
// create datum from isbMessage and send it to datumCh channel for reduceFn
go func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

go func() {
  defer close(datumCh)
  for {
    select {
      if xxx {
        return
      }
      xxxx
    }
  }
}()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

yhl25 and others added 3 commits August 30, 2023 09:08
Signed-off-by: Yashash H L <[email protected]>

Co-authored-by: Vigith Maurice <[email protected]>
Signed-off-by: Yashash H L <[email protected]>
@yhl25 yhl25 requested a review from whynowy August 30, 2023 03:41
Comment on lines 118 to 125
// if the messageStream is closed, break the loop
if !ok {
return
}
// if the message is nil, break the loop
if msg == nil {
return
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverted this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


// pass key and window information inside the context
mdMap := map[string]string{
shared.WinStartTime: strconv.FormatInt(partitionID.Start.UnixMilli(), 10),
shared.WinEndTime: strconv.FormatInt(partitionID.End.UnixMilli(), 10),
}

ctx = metadata.NewOutgoingContext(ctx, metadata.New(mdMap))
cctx, cancel := context.WithCancel(ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be okay to just do ctx, cancel := context.WithCancel(ctx), do not need to give a new cctx.

This is a common patten to make sure everyone in this function watches this ctx will properly exit if the function returns, no matter if it's expectedly or unexpectedly.

Signed-off-by: Yashash H L <[email protected]>
@yhl25 yhl25 enabled auto-merge (squash) August 30, 2023 06:02
@yhl25 yhl25 merged commit 92fbf7f into main Aug 30, 2023
17 checks passed
@yhl25 yhl25 deleted the kvw branch August 30, 2023 06:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

reduce pod gets stuck when the udf container is restarted.
3 participants