Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid unwanted watcher creation and reduce being stuck with udf is restarted #999

Merged
merged 12 commits into from
Aug 30, 2023
4 changes: 2 additions & 2 deletions pkg/shared/kvs/jetstream/kv_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,9 @@ func (jsw *jetStreamWatch) Watch(ctx context.Context) (<-chan kvs.KVEntry, <-cha
kvLastUpdatedTime := jsw.lastUpdateKVTime()

// if the last update time is zero, it means that there are no key-value pairs in the store yet or ctx was canceled both the cases we should not recreate the watcher
// if the last update time is before the previous fetch time, it means that the store is not getting any updates
// if the last update time is not after the previous fetch time, it means that the store is not getting any updates (watermark is not getting updated)
// therefore, we don't have to recreate the watcher
if kvLastUpdatedTime.IsZero() || kvLastUpdatedTime.Before(jsw.previousFetchTime) {
if kvLastUpdatedTime.IsZero() || !kvLastUpdatedTime.After(jsw.previousFetchTime) {
jsw.log.Debug("The watcher is not receiving any updates, but the store is not getting any updates either", zap.String("watcher", jsw.GetKVName()), zap.Time("lastUpdateKVTime", kvLastUpdatedTime), zap.Time("previousFetchTime", jsw.previousFetchTime))
} else {
// if the last update time is after the previous fetch time, it means that the store is getting updates but the watcher is not receiving any
Expand Down
Loading