Skip to content

Commit

Permalink
Merge branch 'main' into update-sdk
Browse files Browse the repository at this point in the history
  • Loading branch information
yhl25 committed Sep 7, 2023
2 parents 088168e + 8b41a39 commit b26cbc7
Show file tree
Hide file tree
Showing 39 changed files with 1,551 additions and 338 deletions.
61 changes: 61 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,66 @@
# Changelog

## v0.10.0 (2023-09-05)

* [10d1cfde](https://github.com/numaproj/numaflow/commit/10d1cfde26f168e19dabc797485fc766d117f086) Update manifests to v0.10.0
* [6f7c1f4a](https://github.com/numaproj/numaflow/commit/6f7c1f4a950a808a8e3f575a3223b1a0d5d44aaa) fix: seg fault inside controller (#1016)
* [c2fdef16](https://github.com/numaproj/numaflow/commit/c2fdef16338e99b6cc26778705a7789937d7b49b) fix: reconcile headless services before pods (#1014)
* [7d8b9087](https://github.com/numaproj/numaflow/commit/7d8b90874b37fcf91d4bf04432ece2816a04d513) fix: print version info when starting (#1013)
* [247b89ed](https://github.com/numaproj/numaflow/commit/247b89ed9aa5e17142b982f955c01bfe2a6650ed) feat: join vertex UI support (#1010)
* [aabb8af0](https://github.com/numaproj/numaflow/commit/aabb8af0a9dee4338c2ece6c35eaee4f4a169a20) feat: scaleUpCooldownSeconds and scaleDownCooldownSeconds to replace cooldownSeconds (#1008)
* [ad647ab7](https://github.com/numaproj/numaflow/commit/ad647ab7549efac87160ca0a0f69a34153aaa887) chore(deps): bump @adobe/css-tools from 4.2.0 to 4.3.1 in /ui (#1005)
* [92fbf7f1](https://github.com/numaproj/numaflow/commit/92fbf7f15dfd296dcbc20214753fb43550a9d1fd) fix: avoid unwanted watcher creation and reduce being stuck with udf is restarted (#999)
* [bac06df0](https://github.com/numaproj/numaflow/commit/bac06df00748f2f1b84be210e2f79c6f280ebb9b) fix: missing edges on UI (#998)
* [f90d4fe7](https://github.com/numaproj/numaflow/commit/f90d4fe7bff838cfed3001920965f33c57105f3d) feat: Add side input sdkclient and grpc (#953)
* [d99480a8](https://github.com/numaproj/numaflow/commit/d99480a890d00be99c4d2fc33f2856eae38db4d0) feat: implement user-defined source (#980)
* [70685902](https://github.com/numaproj/numaflow/commit/706859025e2baaa1c77b77d9b83ec6bcf32129d0) fix: send keys for udsink (#979)
* [8f32b9a3](https://github.com/numaproj/numaflow/commit/8f32b9a3e4ac4a55fb275f0407dc67f1f3523b4a) fix bulleted list (#977)
* [1f33bf8b](https://github.com/numaproj/numaflow/commit/1f33bf8b45039ce235b930047ab3b77e0f1d8635) refactor: build wmstore and wmstorewatcher directly, and remove some unnecessary fields (#970)
* [4cea3444](https://github.com/numaproj/numaflow/commit/4cea3444ae1a375d2550ccd7b66e0541fece169c) feat: add vertex template to pipeline spec (#947)
* [4a4ed927](https://github.com/numaproj/numaflow/commit/4a4ed9275c6da00588df72434f6e082e0bb0dd99) feat: Add side-input initializer and synchronizer (#912)
* [d10f36e6](https://github.com/numaproj/numaflow/commit/d10f36e67581c76516554fd60acd400de45c2607) fix: npe when the ctx is canceled inside kv watcher (#942)
* [6b1b3337](https://github.com/numaproj/numaflow/commit/6b1b3337c76bfdbe2a53173f97ce43ee993577ad) fix: retry logic for fetching last updated kv time (#939)
* [e3da4a3e](https://github.com/numaproj/numaflow/commit/e3da4a3ef31191f61d75ebeab8ba5f76cfba0e17) fix: close the watermark fetcher and publishers after all the forwarders exit (#921)
* [2d6112bf](https://github.com/numaproj/numaflow/commit/2d6112bf0c20baace28da76e6fb0ace3c3be01b5) Pipelines with Cycles: e2e testing, and pipeline validation (#920)
* [5e0bf77e](https://github.com/numaproj/numaflow/commit/5e0bf77e6fbbb73ff267271064d7443dcdffac86) docs quick fixes (#919)
* [0f8f7a17](https://github.com/numaproj/numaflow/commit/0f8f7a17b0a1e1259e771dafed38c71db3443543) docs updates (#917)
* [b55566b8](https://github.com/numaproj/numaflow/commit/b55566b89b568b4211a467e98cb48a0c4b7ea884) feat: watermark delay in tooltip (#910)
* [667ada75](https://github.com/numaproj/numaflow/commit/667ada75146ee4594ef6603fa06fb1c93e141a89) fix: removing WIP tag (#914)
* [872aa864](https://github.com/numaproj/numaflow/commit/872aa8640c08c776b7cea9da4afa09a1a9098cc3) feat: emit k8s events for controller messages. Fixes #856 (#901)
* [0fbdb7ab](https://github.com/numaproj/numaflow/commit/0fbdb7aba3fc6e15f6b81146fdf7a6acdae08868) fix: avoid potential deadlocks when operating UniqueStringList (#905)
* [2c85ec43](https://github.com/numaproj/numaflow/commit/2c85ec439098a313f4c33829a0ef8d9db30a0ea0) refactor: avoid exposing internal data structures of pod tracker to the rater (#902)
* [7e86306b](https://github.com/numaproj/numaflow/commit/7e86306bcb1c0cc66f08172d668ad5f14c7ca503) feat: Join Vertex (#875)
* [85360f65](https://github.com/numaproj/numaflow/commit/85360f6528139721fff37048eccd0e605fc53418) fix: stabilize nats connection (#889)
* [d4f8f594](https://github.com/numaproj/numaflow/commit/d4f8f59431e1322ce6a555018efd821801e69a12) doc: Update multi partition doc (#898)
* [404672d6](https://github.com/numaproj/numaflow/commit/404672d68ed0777e94ee98ef008461bc5687d101) fix: Reduce idle WM unit test fix (#897)
* [a1bbdedf](https://github.com/numaproj/numaflow/commit/a1bbdedf44289efc46773dd1d004b586d8663037) updated default version of Redis used for e2e (#891)
* [85ee4b0d](https://github.com/numaproj/numaflow/commit/85ee4b0d8669f4b24fe202e6b9e7389e85826912) fix TestBuiltinEventTimeExtractor (#885)
* [f3e1044e](https://github.com/numaproj/numaflow/commit/f3e1044eab7e2372dcdf2779d74e4ce8cb5f7cfb) chore(deps): bump word-wrap from 1.2.3 to 1.2.4 in /ui (#881)
* [a02f29a7](https://github.com/numaproj/numaflow/commit/a02f29a710fd482b093193df11387e287d2c7c2e) fix: remove retry when the processor is not found. (#868)
* [cfdeaa8a](https://github.com/numaproj/numaflow/commit/cfdeaa8a4b50ffae8edcfd3a940ba194e354f0b6) refactor: create a new data forwarder dedicated for source (#874)
* [6d14998a](https://github.com/numaproj/numaflow/commit/6d14998a998e392590a8871da87514f5bffa6a46) feat: controller changes for Side Inputs support (#866)
* [92db62a9](https://github.com/numaproj/numaflow/commit/92db62a907cb410a74ef20e066e5ed8aea10bf78) fix: highlight edge when buffer is full (#869)
* [9c4e83c0](https://github.com/numaproj/numaflow/commit/9c4e83c0658dd319cf43eced249af965a9e8af18) fix: minor ui bugs (#861)
* [b970b4cc](https://github.com/numaproj/numaflow/commit/b970b4cc7dfe90dd01a086a333e654275d5aeb7f) fix: release script for validating webhook (#860)
* [7684aada](https://github.com/numaproj/numaflow/commit/7684aada2fb57458c572a088fbfc5c9ffc1a07e5) fix: use windower to fetch next window yet to be closed (#850)
* [609d8b3c](https://github.com/numaproj/numaflow/commit/609d8b3ce2233b18f9c6debfc4c8ec65ee067dfa) feat: implement optional validation webhook. Fixes #817. (#832)
* [3ae1cedb](https://github.com/numaproj/numaflow/commit/3ae1cedb3918d9936b0f2098b853f1d96d5e6e60) chore(deps): bump semver from 6.3.0 to 6.3.1 in /ui (#845)

### Contributors

* Derek Wang
* Dillen Padhiar
* Jason Zesheng Chen
* Juanlu Yu
* Julie Vogelman
* Keran Yang
* RohanAshar
* Sidhant Kohli
* Vedant Gupta
* Vigith Maurice
* Yashash H L
* dependabot[bot]

## v0.9.3 (2023-09-05)

* [6141719f](https://github.com/numaproj/numaflow/commit/6141719f327e8f8d5b5176c75cd41b179622de96) Update manifests to v0.9.3
Expand Down
2 changes: 2 additions & 0 deletions pkg/daemon/server/service/rater/rater.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ func (r *Rater) getPodReadCounts(vertexName, vertexType, podName string) *PodRea
readTotalMetricName = "reduce_isb_reader_read_total"
} else if vertexType == "source" {
readTotalMetricName = "source_forwarder_read_total"
} else if vertexType == "sink" {
readTotalMetricName = "sink_forwarder_read_total"
} else {
readTotalMetricName = "forwarder_read_total"
}
Expand Down
9 changes: 0 additions & 9 deletions pkg/forward/applier/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,3 @@ type ApplyMapFunc func(context.Context, *isb.ReadMessage) ([]*isb.WriteMessage,
func (f ApplyMapFunc) ApplyMap(ctx context.Context, message *isb.ReadMessage) ([]*isb.WriteMessage, error) {
return f(ctx, message)
}

var (
// Terminal Applier do not make any change to the message
Terminal = ApplyMapFunc(func(ctx context.Context, msg *isb.ReadMessage) ([]*isb.WriteMessage, error) {
return []*isb.WriteMessage{{
Message: msg.Message,
}}, nil
})
)
13 changes: 0 additions & 13 deletions pkg/forward/applier/mapstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,3 @@ type ApplyMapStreamFunc func(context.Context, *isb.ReadMessage, chan<- isb.Write
func (f ApplyMapStreamFunc) ApplyMapStream(ctx context.Context, message *isb.ReadMessage, writeMessageCh chan<- isb.WriteMessage) error {
return f(ctx, message, writeMessageCh)
}

var (
// TerminalMapStream Applier do not make any change to the message
TerminalMapStream = ApplyMapStreamFunc(func(ctx context.Context, msg *isb.ReadMessage, writeMessageCh chan<- isb.WriteMessage) error {
defer close(writeMessageCh)
writeMessage := &isb.WriteMessage{
Message: msg.Message,
}

writeMessageCh <- *writeMessage
return nil
})
)
23 changes: 6 additions & 17 deletions pkg/forward/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,6 @@ func NewInterStepDataForward(
return nil, fmt.Errorf("batch size is not 1 with map UDF streaming")
}

if isdf.opts.vertexType == dfv1.VertexTypeSource {
return nil, fmt.Errorf("source vertex is not supported by inter-step forwarder, please use source forwarder instead")
}

return &isdf, nil
}

Expand Down Expand Up @@ -228,7 +224,7 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) {
for toVertexName, toVertexBuffer := range isdf.toBuffers {
for _, partition := range toVertexBuffer {
if p, ok := isdf.wmPublishers[toVertexName]; ok {
idlehandler.PublishIdleWatermark(ctx, partition, p, isdf.idleManager, isdf.opts.logger, isdf.opts.vertexType, wmb.Watermark(time.UnixMilli(processorWMB.Watermark)))
idlehandler.PublishIdleWatermark(ctx, partition, p, isdf.idleManager, isdf.opts.logger, dfv1.VertexTypeMapUDF, wmb.Watermark(time.UnixMilli(processorWMB.Watermark)))
}
}
}
Expand Down Expand Up @@ -350,20 +346,13 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) {
activeWatermarkBuffers[toVertexName] = make([]bool, len(toVertexBufferOffsets))
if publisher, ok := isdf.wmPublishers[toVertexName]; ok {
for index, offsets := range toVertexBufferOffsets {
if isdf.opts.vertexType == dfv1.VertexTypeMapUDF || isdf.opts.vertexType == dfv1.VertexTypeReduceUDF {
if len(offsets) > 0 {
publisher.PublishWatermark(processorWM, offsets[len(offsets)-1], int32(index))
activeWatermarkBuffers[toVertexName][index] = true
// reset because the toBuffer partition is no longer idling
isdf.idleManager.Reset(isdf.toBuffers[toVertexName][index].GetName())
}
// This (len(offsets) == 0) happens at conditional forwarding, there's no data written to the buffer
} else { // For Sink vertex, and it does not care about the offset during watermark publishing
publisher.PublishWatermark(processorWM, nil, int32(index))
if len(offsets) > 0 {
publisher.PublishWatermark(processorWM, offsets[len(offsets)-1], int32(index))
activeWatermarkBuffers[toVertexName][index] = true
// reset because the toBuffer partition is no longer idling
isdf.idleManager.Reset(isdf.toBuffers[toVertexName][index].GetName())
}
// This (len(offsets) == 0) happens at conditional forwarding, there's no data written to the buffer
}
}
}
Expand All @@ -383,7 +372,7 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) {
// use the watermark of the current read batch for the idle watermark
// same as read len==0 because there's no event published to the buffer
if p, ok := isdf.wmPublishers[bufferName]; ok {
idlehandler.PublishIdleWatermark(ctx, isdf.toBuffers[bufferName][index], p, isdf.idleManager, isdf.opts.logger, isdf.opts.vertexType, processorWM)
idlehandler.PublishIdleWatermark(ctx, isdf.toBuffers[bufferName][index], p, isdf.idleManager, isdf.opts.logger, dfv1.VertexTypeMapUDF, processorWM)
}
}
}
Expand All @@ -395,7 +384,7 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) {
err = isdf.ackFromBuffer(ctx, readOffsets)
// implicit return for posterity :-)
if err != nil {
isdf.opts.logger.Errorw("failed to ack from buffer", zap.Error(err))
isdf.opts.logger.Errorw("Failed to ack from buffer", zap.Error(err))
ackMessageError.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Add(float64(len(readOffsets)))
return
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/forward/forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func TestNewInterStepDataForward(t *testing.T) {
}
}()

f, err := NewInterStepDataForward(vertex, fromStep, toSteps, &myForwardToAllTest{}, &myForwardToAllTest{}, &myForwardToAllTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(batchSize), WithVertexType(dfv1.VertexTypeMapUDF), WithUDFStreaming(tt.streamEnabled))
f, err := NewInterStepDataForward(vertex, fromStep, toSteps, &myForwardToAllTest{}, &myForwardToAllTest{}, &myForwardToAllTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(batchSize), WithUDFStreaming(tt.streamEnabled))

assert.NoError(t, err)
assert.False(t, to11.IsFull())
Expand Down Expand Up @@ -394,7 +394,7 @@ func TestNewInterStepDataForward(t *testing.T) {
}
}()

f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardDropTest{}, myForwardDropTest{}, myForwardDropTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(batchSize), WithVertexType(dfv1.VertexTypeMapUDF), WithUDFStreaming(tt.streamEnabled))
f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardDropTest{}, myForwardDropTest{}, myForwardDropTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(batchSize), WithUDFStreaming(tt.streamEnabled))

assert.NoError(t, err)
assert.False(t, to11.IsFull())
Expand Down Expand Up @@ -566,7 +566,7 @@ func TestNewInterStepDataForward(t *testing.T) {
}
}()

f, err := NewInterStepDataForward(vertex, fromStep, toSteps, &mySourceForwardTestRoundRobin{}, myForwardTest{}, myForwardTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(batchSize), WithVertexType(dfv1.VertexTypeMapUDF), WithUDFStreaming(tt.streamEnabled))
f, err := NewInterStepDataForward(vertex, fromStep, toSteps, &mySourceForwardTestRoundRobin{}, myForwardTest{}, myForwardTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(batchSize), WithUDFStreaming(tt.streamEnabled))

assert.NoError(t, err)
assert.False(t, to11.IsFull())
Expand Down Expand Up @@ -900,7 +900,7 @@ func TestNewInterStepDataForwardIdleWatermark(t *testing.T) {
}
}()

f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardTest{}, myForwardTest{}, myForwardTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(2), WithVertexType(dfv1.VertexTypeMapUDF))
f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardTest{}, myForwardTest{}, myForwardTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(2))
assert.NoError(t, err)
assert.False(t, to1.IsFull())
assert.True(t, to1.IsEmpty())
Expand Down Expand Up @@ -1073,7 +1073,7 @@ func TestNewInterStepDataForwardIdleWatermark_Reset(t *testing.T) {
}
}()

f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardTest{}, myForwardTest{}, myForwardTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(2), WithVertexType(dfv1.VertexTypeMapUDF))
f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardTest{}, myForwardTest{}, myForwardTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(2))
assert.NoError(t, err)
assert.False(t, to1.IsFull())
assert.True(t, to1.IsEmpty())
Expand Down Expand Up @@ -1295,7 +1295,7 @@ func TestInterStepDataForwardSinglePartition(t *testing.T) {
_, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps)

// create a forwarder
f, err := NewInterStepDataForward(vertex, fromStep, toSteps, mySourceForwardTest{}, mySourceForwardTest{}, mySourceForwardTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(5), WithVertexType(dfv1.VertexTypeMapUDF))
f, err := NewInterStepDataForward(vertex, fromStep, toSteps, mySourceForwardTest{}, mySourceForwardTest{}, mySourceForwardTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(5))
assert.NoError(t, err)
assert.False(t, to1.IsFull())
assert.True(t, to1.IsEmpty())
Expand Down Expand Up @@ -1341,7 +1341,7 @@ func TestInterStepDataForwardMultiplePartition(t *testing.T) {
_, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps)

// create a forwarder
f, err := NewInterStepDataForward(vertex, fromStep, toSteps, &mySourceForwardTestRoundRobin{}, mySourceForwardTest{}, mySourceForwardTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(5), WithVertexType(dfv1.VertexTypeMapUDF))
f, err := NewInterStepDataForward(vertex, fromStep, toSteps, &mySourceForwardTestRoundRobin{}, mySourceForwardTest{}, mySourceForwardTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(5))
assert.NoError(t, err)
assert.False(t, to11.IsFull())
assert.False(t, to12.IsFull())
Expand Down
10 changes: 0 additions & 10 deletions pkg/forward/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ type options struct {
udfConcurrency int
// retryInterval is the time.Duration to sleep before retrying
retryInterval time.Duration
// vertexType indicates the type of the vertex
vertexType dfv1.VertexType
// logger is used to pass the logger variable
logger *zap.SugaredLogger
// enableMapUdfStream indicates whether the message streaming is enabled or not for map UDF processing
Expand Down Expand Up @@ -85,14 +83,6 @@ func WithLogger(l *zap.SugaredLogger) Option {
}
}

// WithVertexType sets the type of the vertex
func WithVertexType(t dfv1.VertexType) Option {
return func(o *options) error {
o.vertexType = t
return nil
}
}

// WithUDFStreaming sets streaming for map UDF processing
func WithUDFStreaming(f bool) Option {
return func(o *options) error {
Expand Down
7 changes: 7 additions & 0 deletions pkg/shared/idlehandler/idlehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,25 @@ package idlehandler

import (
"context"
"reflect"

"go.uber.org/zap"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/watermark/generic"
"github.com/numaproj/numaflow/pkg/watermark/publish"
"github.com/numaproj/numaflow/pkg/watermark/wmb"
)

// PublishIdleWatermark publishes a ctrl message with isb.Kind set to WMB. We only send one ctrl message when
// we see a new WMB; later we only update the WMB without a ctrl message.
func PublishIdleWatermark(ctx context.Context, toBufferPartition isb.BufferWriter, wmPublisher publish.Publisher, idleManager *wmb.IdleManager, logger *zap.SugaredLogger, vertexType dfv1.VertexType, wm wmb.Watermark) {
if reflect.TypeOf(wmPublisher) == reflect.TypeOf(&generic.NoOpWMProgressor{}) {
// TODO(idleWatermark): simplest fix as of now, need to design a better fix
// watermark disabled, no need to proceed
return
}
var toPartitionName = toBufferPartition.GetName()
var toVertexPartition = toBufferPartition.GetPartitionIdx()

Expand Down
Loading

0 comments on commit b26cbc7

Please sign in to comment.