Skip to content

Commit

Permalink
use windower to fetch next window to be closed
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Jul 13, 2023
1 parent 609d8b3 commit 7f810df
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 11 deletions.
8 changes: 4 additions & 4 deletions pkg/reduce/data_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
return
}

nextWin := df.pbqManager.NextWindowToBeClosed()
nextWin := df.pbqManager.NextWindowToBeMaterialized()
if nextWin == nil {
// if all the windows are closed already, and the len(readBatch) == 0
// then it means there's an idle situation
Expand Down Expand Up @@ -353,7 +353,7 @@ func (df *DataForward) Process(ctx context.Context, messages []*isb.ReadMessage)
}

// solve Reduce withholding of watermark where we do not send WM until the window is closed.
if nextWin := df.pbqManager.NextWindowToBeClosed(); nextWin != nil {
if nextWin := df.pbqManager.NextWindowToBeMaterialized(); nextWin != nil {
// minus 1 ms because if it's the same as the end time the window would have already been closed
if watermark := time.Time(wm).Add(-1 * time.Millisecond); nextWin.EndTime().After(watermark) {
// publish idle watermark so that the next vertex doesn't need to wait for the window to close to
Expand Down Expand Up @@ -381,7 +381,7 @@ messagesLoop:
// drop the late messages only if there is no window open
if message.IsLate {
// we should be able to get the late message in as long as there is an open window
nextWin := df.pbqManager.NextWindowToBeClosed()
nextWin := df.windower.NextWindowToBeClosed()
// if there is no window open, drop the message
if nextWin == nil {
df.log.Warnw("Dropping the late message", zap.Time("eventTime", message.EventTime), zap.Time("watermark", message.Watermark))
Expand Down Expand Up @@ -429,7 +429,7 @@ messagesLoop:
windows := df.upsertWindowsAndKeys(message)

// for each window we will have a PBQ. A message could belong to multiple windows (e.g., sliding).
// We need to write the messages to these PBQs.
// We need to write the messages to these PBQs
for _, kw := range windows {

for _, partitionID := range kw.Partitions() {
Expand Down
5 changes: 3 additions & 2 deletions pkg/reduce/pbq/pbqmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,9 @@ func (m *Manager) Replay(ctx context.Context) {
m.log.Infow("Finished replaying records from store", zap.Duration("took", time.Since(tm)), zap.Any("partitions", partitionsIds))
}

// NextWindowToBeClosed returns the next keyed window that is yet to be closed
func (m *Manager) NextWindowToBeClosed() window.AlignedKeyedWindower {
// NextWindowToBeMaterialized returns the next keyed window that is yet to be materialized(GCed)
// will be used by the data forwarder to publish the idle watermark.
func (m *Manager) NextWindowToBeMaterialized() window.AlignedKeyedWindower {
if m.yetToBeClosed.Len() == 0 {
return nil
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/reduce/pbq/pbqmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,23 +376,23 @@ func TestManager_NextWindowToBeClosed(t *testing.T) {
pq4, err = pbqManager.CreateNewPBQ(ctx, partitionFour, kwFour)
assert.NoError(t, err)

aw := pbqManager.NextWindowToBeClosed()
aw := pbqManager.NextWindowToBeMaterialized()
assert.Equal(t, aw.EndTime(), time.Unix(120, 0))

_ = pq1.GC()
aw = pbqManager.NextWindowToBeClosed()
aw = pbqManager.NextWindowToBeMaterialized()
assert.Equal(t, aw.EndTime(), time.Unix(120, 0))

_ = pq2.GC()
aw = pbqManager.NextWindowToBeClosed()
aw = pbqManager.NextWindowToBeMaterialized()
assert.Equal(t, aw.EndTime(), time.Unix(240, 0))

_ = pq3.GC()
aw = pbqManager.NextWindowToBeClosed()
aw = pbqManager.NextWindowToBeMaterialized()
assert.Equal(t, aw.EndTime(), time.Unix(240, 0))

_ = pq4.GC()
aw = pbqManager.NextWindowToBeClosed()
aw = pbqManager.NextWindowToBeMaterialized()
assert.Nil(t, aw)

}
8 changes: 8 additions & 0 deletions pkg/window/strategy/fixed/fixed.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,11 @@ func (f *Fixed) InsertIfNotPresent(kw window.AlignedKeyedWindower) (aw window.Al
func (f *Fixed) RemoveWindows(wm time.Time) []window.AlignedKeyedWindower {
return f.entries.RemoveWindows(wm)
}

// NextWindowToBeClosed returns the next window which is yet to be closed.
func (f *Fixed) NextWindowToBeClosed() window.AlignedKeyedWindower {
if f.entries.Len() == 0 {
return nil
}
return f.entries.Front()
}
8 changes: 8 additions & 0 deletions pkg/window/strategy/sliding/sliding.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,11 @@ func (s *Sliding) InsertIfNotPresent(kw window.AlignedKeyedWindower) (window.Ali
func (s *Sliding) RemoveWindows(wm time.Time) []window.AlignedKeyedWindower {
return s.entries.RemoveWindows(wm)
}

// NextWindowToBeClosed returns the next window which is yet to be closed.
func (s *Sliding) NextWindowToBeClosed() window.AlignedKeyedWindower {
if s.entries.Len() == 0 {
return nil
}
return s.entries.Front()
}
3 changes: 3 additions & 0 deletions pkg/window/windower.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,7 @@ type Windower interface {
InsertIfNotPresent(aw AlignedKeyedWindower) (AlignedKeyedWindower, bool)
// RemoveWindows returns list of window(s) that can be closed
RemoveWindows(time time.Time) []AlignedKeyedWindower
// NextWindowToBeClosed returns the next window yet to be closed.
// will be used by the data forwarder to check if the late message can be considered for processing.
NextWindowToBeClosed() AlignedKeyedWindower
}

0 comments on commit 7f810df

Please sign in to comment.