Skip to content

Commit

Permalink
fix: use windower to fetch next window yet to be closed (#850)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
  • Loading branch information
yhl25 and vigith committed Jul 13, 2023
1 parent 609d8b3 commit 7684aad
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 18 deletions.
23 changes: 12 additions & 11 deletions pkg/reduce/data_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
metrics.LabelPartitionName: df.fromBufferPartition.GetName()}).Inc()
}

// idle watermark
if len(readMessages) == 0 {
// we use the HeadWMB as the watermark for the idle
// we get the HeadWMB for the partition from which we read the messages
Expand All @@ -204,8 +205,8 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
return
}

nextWin := df.pbqManager.NextWindowToBeClosed()
if nextWin == nil {
nextWinAsSeenByReader := df.pbqManager.NextWindowToBeMaterialized()
if nextWinAsSeenByReader == nil {
// if all the windows are closed already, and the len(readBatch) == 0
// then it means there's an idle situation
// in this case, send idle watermark to all the toBuffer partitions
Expand All @@ -220,7 +221,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
// if toBeClosed window exists, and the watermark we fetch has already passed the endTime of this window
// then it means the overall dataflow of the pipeline has already reached a later time point
// so we can close the window and process the data in this window
if watermark := time.UnixMilli(processorWMB.Watermark).Add(-1 * time.Millisecond); nextWin.EndTime().Before(watermark) {
if watermark := time.UnixMilli(processorWMB.Watermark).Add(-1 * time.Millisecond); nextWinAsSeenByReader.EndTime().Before(watermark) {
closedWindows := df.windower.RemoveWindows(watermark)
for _, win := range closedWindows {
df.ClosePartitions(win.Partitions())
Expand Down Expand Up @@ -353,9 +354,9 @@ func (df *DataForward) Process(ctx context.Context, messages []*isb.ReadMessage)
}

// solve Reduce withholding of watermark where we do not send WM until the window is closed.
if nextWin := df.pbqManager.NextWindowToBeClosed(); nextWin != nil {
if nextWinAsSeenByReader := df.pbqManager.NextWindowToBeMaterialized(); nextWinAsSeenByReader != nil {
// minus 1 ms because if it's the same as the end time the window would have already been closed
if watermark := time.Time(wm).Add(-1 * time.Millisecond); nextWin.EndTime().After(watermark) {
if watermark := time.Time(wm).Add(-1 * time.Millisecond); nextWinAsSeenByReader.EndTime().After(watermark) {
// publish idle watermark so that the next vertex doesn't need to wait for the window to close to
// start processing data whose watermark is smaller than the endTime of the toBeClosed window

Expand All @@ -381,14 +382,14 @@ messagesLoop:
// drop the late messages only if there is no window open
if message.IsLate {
// we should be able to get the late message in as long as there is an open window
nextWin := df.pbqManager.NextWindowToBeClosed()
nextWinAsSeenByWriter := df.windower.NextWindowToBeClosed()
// if there is no window open, drop the message
if nextWin == nil {
if nextWinAsSeenByWriter == nil {
df.log.Warnw("Dropping the late message", zap.Time("eventTime", message.EventTime), zap.Time("watermark", message.Watermark))
writtenMessages = append(writtenMessages, message)
continue
} else if message.EventTime.Before(nextWin.StartTime()) { // if the message doesn't fall in the next window that is about to be closed drop it.
df.log.Warnw("Dropping the late message", zap.Time("eventTime", message.EventTime), zap.Time("watermark", message.Watermark), zap.Time("nextWindowToBeClosed", nextWin.StartTime()))
} else if message.EventTime.Before(nextWinAsSeenByWriter.StartTime()) { // if the message doesn't fall in the next window that is about to be closed drop it.
df.log.Warnw("Dropping the late message", zap.Time("eventTime", message.EventTime), zap.Time("watermark", message.Watermark), zap.Time("nextWindowToBeClosed", nextWinAsSeenByWriter.StartTime()))
droppedMessagesCount.With(map[string]string{
metrics.LabelVertex: df.vertexName,
metrics.LabelPipeline: df.pipelineName,
Expand All @@ -399,7 +400,7 @@ messagesLoop:
writtenMessages = append(writtenMessages, message)
continue
} else { // if the message falls in the next window that is about to be closed, keep it
df.log.Debugw("Keeping the late message for next condition check because COB has not happened yet", zap.Int64("eventTime", message.EventTime.UnixMilli()), zap.Int64("watermark", message.Watermark.UnixMilli()), zap.Int64("nextWindowToBeClosed.startTime", nextWin.StartTime().UnixMilli()))
df.log.Debugw("Keeping the late message for next condition check because COB has not happened yet", zap.Int64("eventTime", message.EventTime.UnixMilli()), zap.Int64("watermark", message.Watermark.UnixMilli()), zap.Int64("nextWindowToBeClosed.startTime", nextWinAsSeenByWriter.StartTime().UnixMilli()))
}
}

Expand Down Expand Up @@ -429,7 +430,7 @@ messagesLoop:
windows := df.upsertWindowsAndKeys(message)

// for each window we will have a PBQ. A message could belong to multiple windows (e.g., sliding).
// We need to write the messages to these PBQs.
// We need to write the messages to these PBQs
for _, kw := range windows {

for _, partitionID := range kw.Partitions() {
Expand Down
8 changes: 6 additions & 2 deletions pkg/reduce/pbq/pbqmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,12 @@ func (m *Manager) Replay(ctx context.Context) {
m.log.Infow("Finished replaying records from store", zap.Duration("took", time.Since(tm)), zap.Any("partitions", partitionsIds))
}

// NextWindowToBeClosed returns the next keyed window that is yet to be closed
func (m *Manager) NextWindowToBeClosed() window.AlignedKeyedWindower {
// NextWindowToBeMaterialized returns the next keyed window that is yet to be materialized(GCed)
// will be used by the data forwarder to publish the idle watermark. While publishing idle watermark, we have to be
// conservative. PBQManager's view of next window to be materialized is conservative as it is on the reading side.
// We SHOULD NOT use NextWindowToBeMaterialized to write data to because it could fail (channel could have been closed but
// GC is yet to happen), this function should only be on readonly path.
func (m *Manager) NextWindowToBeMaterialized() window.AlignedKeyedWindower {
if m.yetToBeClosed.Len() == 0 {
return nil
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/reduce/pbq/pbqmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,23 +376,23 @@ func TestManager_NextWindowToBeClosed(t *testing.T) {
pq4, err = pbqManager.CreateNewPBQ(ctx, partitionFour, kwFour)
assert.NoError(t, err)

aw := pbqManager.NextWindowToBeClosed()
aw := pbqManager.NextWindowToBeMaterialized()
assert.Equal(t, aw.EndTime(), time.Unix(120, 0))

_ = pq1.GC()
aw = pbqManager.NextWindowToBeClosed()
aw = pbqManager.NextWindowToBeMaterialized()
assert.Equal(t, aw.EndTime(), time.Unix(120, 0))

_ = pq2.GC()
aw = pbqManager.NextWindowToBeClosed()
aw = pbqManager.NextWindowToBeMaterialized()
assert.Equal(t, aw.EndTime(), time.Unix(240, 0))

_ = pq3.GC()
aw = pbqManager.NextWindowToBeClosed()
aw = pbqManager.NextWindowToBeMaterialized()
assert.Equal(t, aw.EndTime(), time.Unix(240, 0))

_ = pq4.GC()
aw = pbqManager.NextWindowToBeClosed()
aw = pbqManager.NextWindowToBeMaterialized()
assert.Nil(t, aw)

}
8 changes: 8 additions & 0 deletions pkg/window/strategy/fixed/fixed.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,11 @@ func (f *Fixed) InsertIfNotPresent(kw window.AlignedKeyedWindower) (aw window.Al
func (f *Fixed) RemoveWindows(wm time.Time) []window.AlignedKeyedWindower {
return f.entries.RemoveWindows(wm)
}

// NextWindowToBeClosed returns the next window which is yet to be closed.
func (f *Fixed) NextWindowToBeClosed() window.AlignedKeyedWindower {
if f.entries.Len() == 0 {
return nil
}
return f.entries.Front()
}
8 changes: 8 additions & 0 deletions pkg/window/strategy/sliding/sliding.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,11 @@ func (s *Sliding) InsertIfNotPresent(kw window.AlignedKeyedWindower) (window.Ali
func (s *Sliding) RemoveWindows(wm time.Time) []window.AlignedKeyedWindower {
return s.entries.RemoveWindows(wm)
}

// NextWindowToBeClosed returns the next window which is yet to be closed.
func (s *Sliding) NextWindowToBeClosed() window.AlignedKeyedWindower {
if s.entries.Len() == 0 {
return nil
}
return s.entries.Front()
}
3 changes: 3 additions & 0 deletions pkg/window/windower.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,7 @@ type Windower interface {
InsertIfNotPresent(aw AlignedKeyedWindower) (AlignedKeyedWindower, bool)
// RemoveWindows returns list of window(s) that can be closed
RemoveWindows(time time.Time) []AlignedKeyedWindower
// NextWindowToBeClosed returns the next window yet to be closed.
// will be used by the data forwarder to check if the late message can be considered for processing.
NextWindowToBeClosed() AlignedKeyedWindower
}

0 comments on commit 7684aad

Please sign in to comment.