From 53d982adba47e14ead9e8046c0cbe4030f065d65 Mon Sep 17 00:00:00 2001 From: Juanlu Yu <19543684+jy4096@users.noreply.github.com> Date: Wed, 6 Sep 2023 20:43:18 -0700 Subject: [PATCH 1/2] chore: extract sink forwarder out of the pkg/forwarder (#1000) Signed-off-by: jyu6 Co-authored-by: jyu6 --- pkg/daemon/server/service/rater/rater.go | 2 + pkg/forward/applier/mapper.go | 9 - pkg/forward/applier/mapstreamer.go | 13 - pkg/forward/forward.go | 23 +- pkg/forward/forward_test.go | 14 +- pkg/forward/options.go | 10 - pkg/shared/idlehandler/idlehandler.go | 7 + pkg/sinks/blackhole/blackhole.go | 18 +- pkg/sinks/blackhole/blackhole_test.go | 104 +---- pkg/sinks/forward/forward.go | 432 ++++++++++++++++++ pkg/sinks/forward/forward_test.go | 395 ++++++++++++++++ pkg/sinks/forward/metrics.go | 109 +++++ pkg/sinks/forward/options.go | 81 ++++ pkg/sinks/forward/shutdown.go | 73 +++ pkg/sinks/forward/shutdown_test.go | 122 +++++ pkg/sinks/kafka/kafka.go | 14 +- pkg/sinks/kafka/kafka_test.go | 20 +- pkg/sinks/logger/log.go | 14 +- pkg/sinks/logger/log_test.go | 83 +--- pkg/sinks/sink.go | 28 +- pkg/sinks/udsink/sink.go | 14 +- pkg/sinks/udsink/udsink_grpc.go | 6 +- pkg/sinks/udsink/udsink_grpc_test.go | 4 +- pkg/sources/forward/data_forward_test.go | 6 +- pkg/sources/forward/shutdown_test.go | 4 +- pkg/sources/generator/tickgen_test.go | 6 +- pkg/sources/http/http_test.go | 5 +- pkg/sources/kafka/handler.go | 2 +- pkg/sources/kafka/handler_test.go | 3 +- .../transformer/grpc_transformer_test.go | 9 +- pkg/sources/udsource/grpc_udsource.go | 1 - pkg/sources/udsource/grpc_udsource_test.go | 7 +- pkg/udf/map_udf.go | 2 +- pkg/watermark/generic/noop.go | 4 +- 34 files changed, 1313 insertions(+), 331 deletions(-) create mode 100644 pkg/sinks/forward/forward.go create mode 100644 pkg/sinks/forward/forward_test.go create mode 100644 pkg/sinks/forward/metrics.go create mode 100644 pkg/sinks/forward/options.go create mode 100644 pkg/sinks/forward/shutdown.go create mode 100644 pkg/sinks/forward/shutdown_test.go diff --git a/pkg/daemon/server/service/rater/rater.go b/pkg/daemon/server/service/rater/rater.go index a33aca719e..aa773729c0 100644 --- a/pkg/daemon/server/service/rater/rater.go +++ b/pkg/daemon/server/service/rater/rater.go @@ -233,6 +233,8 @@ func (r *Rater) getPodReadCounts(vertexName, vertexType, podName string) *PodRea readTotalMetricName = "reduce_isb_reader_read_total" } else if vertexType == "source" { readTotalMetricName = "source_forwarder_read_total" + } else if vertexType == "sink" { + readTotalMetricName = "sink_forwarder_read_total" } else { readTotalMetricName = "forwarder_read_total" } diff --git a/pkg/forward/applier/mapper.go b/pkg/forward/applier/mapper.go index 8e6b624643..28c48fb54e 100644 --- a/pkg/forward/applier/mapper.go +++ b/pkg/forward/applier/mapper.go @@ -34,12 +34,3 @@ type ApplyMapFunc func(context.Context, *isb.ReadMessage) ([]*isb.WriteMessage, func (f ApplyMapFunc) ApplyMap(ctx context.Context, message *isb.ReadMessage) ([]*isb.WriteMessage, error) { return f(ctx, message) } - -var ( - // Terminal Applier do not make any change to the message - Terminal = ApplyMapFunc(func(ctx context.Context, msg *isb.ReadMessage) ([]*isb.WriteMessage, error) { - return []*isb.WriteMessage{{ - Message: msg.Message, - }}, nil - }) -) diff --git a/pkg/forward/applier/mapstreamer.go b/pkg/forward/applier/mapstreamer.go index 6313a9f7a7..19e88862c6 100644 --- a/pkg/forward/applier/mapstreamer.go +++ b/pkg/forward/applier/mapstreamer.go @@ -34,16 +34,3 @@ type ApplyMapStreamFunc func(context.Context, *isb.ReadMessage, chan<- isb.Write func (f ApplyMapStreamFunc) ApplyMapStream(ctx context.Context, message *isb.ReadMessage, writeMessageCh chan<- isb.WriteMessage) error { return f(ctx, message, writeMessageCh) } - -var ( - // TerminalMapStream Applier do not make any change to the message - TerminalMapStream = ApplyMapStreamFunc(func(ctx context.Context, msg *isb.ReadMessage, writeMessageCh chan<- isb.WriteMessage) error { - defer close(writeMessageCh) - writeMessage := &isb.WriteMessage{ - Message: msg.Message, - } - - writeMessageCh <- *writeMessage - return nil - }) -) diff --git a/pkg/forward/forward.go b/pkg/forward/forward.go index 6898cfb293..4f56ad797e 100644 --- a/pkg/forward/forward.go +++ b/pkg/forward/forward.go @@ -118,10 +118,6 @@ func NewInterStepDataForward( return nil, fmt.Errorf("batch size is not 1 with map UDF streaming") } - if isdf.opts.vertexType == dfv1.VertexTypeSource { - return nil, fmt.Errorf("source vertex is not supported by inter-step forwarder, please use source forwarder instead") - } - return &isdf, nil } @@ -228,7 +224,7 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) { for toVertexName, toVertexBuffer := range isdf.toBuffers { for _, partition := range toVertexBuffer { if p, ok := isdf.wmPublishers[toVertexName]; ok { - idlehandler.PublishIdleWatermark(ctx, partition, p, isdf.idleManager, isdf.opts.logger, isdf.opts.vertexType, wmb.Watermark(time.UnixMilli(processorWMB.Watermark))) + idlehandler.PublishIdleWatermark(ctx, partition, p, isdf.idleManager, isdf.opts.logger, dfv1.VertexTypeMapUDF, wmb.Watermark(time.UnixMilli(processorWMB.Watermark))) } } } @@ -350,20 +346,13 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) { activeWatermarkBuffers[toVertexName] = make([]bool, len(toVertexBufferOffsets)) if publisher, ok := isdf.wmPublishers[toVertexName]; ok { for index, offsets := range toVertexBufferOffsets { - if isdf.opts.vertexType == dfv1.VertexTypeMapUDF || isdf.opts.vertexType == dfv1.VertexTypeReduceUDF { - if len(offsets) > 0 { - publisher.PublishWatermark(processorWM, offsets[len(offsets)-1], int32(index)) - activeWatermarkBuffers[toVertexName][index] = true - // reset because the toBuffer partition is no longer idling - isdf.idleManager.Reset(isdf.toBuffers[toVertexName][index].GetName()) - } - // This (len(offsets) == 0) happens at conditional forwarding, there's no data written to the buffer - } else { // For Sink vertex, and it does not care about the offset during watermark publishing - publisher.PublishWatermark(processorWM, nil, int32(index)) + if len(offsets) > 0 { + publisher.PublishWatermark(processorWM, offsets[len(offsets)-1], int32(index)) activeWatermarkBuffers[toVertexName][index] = true // reset because the toBuffer partition is no longer idling isdf.idleManager.Reset(isdf.toBuffers[toVertexName][index].GetName()) } + // This (len(offsets) == 0) happens at conditional forwarding, there's no data written to the buffer } } } @@ -383,7 +372,7 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) { // use the watermark of the current read batch for the idle watermark // same as read len==0 because there's no event published to the buffer if p, ok := isdf.wmPublishers[bufferName]; ok { - idlehandler.PublishIdleWatermark(ctx, isdf.toBuffers[bufferName][index], p, isdf.idleManager, isdf.opts.logger, isdf.opts.vertexType, processorWM) + idlehandler.PublishIdleWatermark(ctx, isdf.toBuffers[bufferName][index], p, isdf.idleManager, isdf.opts.logger, dfv1.VertexTypeMapUDF, processorWM) } } } @@ -395,7 +384,7 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) { err = isdf.ackFromBuffer(ctx, readOffsets) // implicit return for posterity :-) if err != nil { - isdf.opts.logger.Errorw("failed to ack from buffer", zap.Error(err)) + isdf.opts.logger.Errorw("Failed to ack from buffer", zap.Error(err)) ackMessageError.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Add(float64(len(readOffsets))) return } diff --git a/pkg/forward/forward_test.go b/pkg/forward/forward_test.go index 4216e74ae7..f845dd57d4 100644 --- a/pkg/forward/forward_test.go +++ b/pkg/forward/forward_test.go @@ -234,7 +234,7 @@ func TestNewInterStepDataForward(t *testing.T) { } }() - f, err := NewInterStepDataForward(vertex, fromStep, toSteps, &myForwardToAllTest{}, &myForwardToAllTest{}, &myForwardToAllTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(batchSize), WithVertexType(dfv1.VertexTypeMapUDF), WithUDFStreaming(tt.streamEnabled)) + f, err := NewInterStepDataForward(vertex, fromStep, toSteps, &myForwardToAllTest{}, &myForwardToAllTest{}, &myForwardToAllTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(batchSize), WithUDFStreaming(tt.streamEnabled)) assert.NoError(t, err) assert.False(t, to11.IsFull()) @@ -394,7 +394,7 @@ func TestNewInterStepDataForward(t *testing.T) { } }() - f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardDropTest{}, myForwardDropTest{}, myForwardDropTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(batchSize), WithVertexType(dfv1.VertexTypeMapUDF), WithUDFStreaming(tt.streamEnabled)) + f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardDropTest{}, myForwardDropTest{}, myForwardDropTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(batchSize), WithUDFStreaming(tt.streamEnabled)) assert.NoError(t, err) assert.False(t, to11.IsFull()) @@ -566,7 +566,7 @@ func TestNewInterStepDataForward(t *testing.T) { } }() - f, err := NewInterStepDataForward(vertex, fromStep, toSteps, &mySourceForwardTestRoundRobin{}, myForwardTest{}, myForwardTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(batchSize), WithVertexType(dfv1.VertexTypeMapUDF), WithUDFStreaming(tt.streamEnabled)) + f, err := NewInterStepDataForward(vertex, fromStep, toSteps, &mySourceForwardTestRoundRobin{}, myForwardTest{}, myForwardTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(batchSize), WithUDFStreaming(tt.streamEnabled)) assert.NoError(t, err) assert.False(t, to11.IsFull()) @@ -900,7 +900,7 @@ func TestNewInterStepDataForwardIdleWatermark(t *testing.T) { } }() - f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardTest{}, myForwardTest{}, myForwardTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(2), WithVertexType(dfv1.VertexTypeMapUDF)) + f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardTest{}, myForwardTest{}, myForwardTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(2)) assert.NoError(t, err) assert.False(t, to1.IsFull()) assert.True(t, to1.IsEmpty()) @@ -1073,7 +1073,7 @@ func TestNewInterStepDataForwardIdleWatermark_Reset(t *testing.T) { } }() - f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardTest{}, myForwardTest{}, myForwardTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(2), WithVertexType(dfv1.VertexTypeMapUDF)) + f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardTest{}, myForwardTest{}, myForwardTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(2)) assert.NoError(t, err) assert.False(t, to1.IsFull()) assert.True(t, to1.IsEmpty()) @@ -1295,7 +1295,7 @@ func TestInterStepDataForwardSinglePartition(t *testing.T) { _, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps) // create a forwarder - f, err := NewInterStepDataForward(vertex, fromStep, toSteps, mySourceForwardTest{}, mySourceForwardTest{}, mySourceForwardTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(5), WithVertexType(dfv1.VertexTypeMapUDF)) + f, err := NewInterStepDataForward(vertex, fromStep, toSteps, mySourceForwardTest{}, mySourceForwardTest{}, mySourceForwardTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(5)) assert.NoError(t, err) assert.False(t, to1.IsFull()) assert.True(t, to1.IsEmpty()) @@ -1341,7 +1341,7 @@ func TestInterStepDataForwardMultiplePartition(t *testing.T) { _, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps) // create a forwarder - f, err := NewInterStepDataForward(vertex, fromStep, toSteps, &mySourceForwardTestRoundRobin{}, mySourceForwardTest{}, mySourceForwardTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(5), WithVertexType(dfv1.VertexTypeMapUDF)) + f, err := NewInterStepDataForward(vertex, fromStep, toSteps, &mySourceForwardTestRoundRobin{}, mySourceForwardTest{}, mySourceForwardTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(5)) assert.NoError(t, err) assert.False(t, to11.IsFull()) assert.False(t, to12.IsFull()) diff --git a/pkg/forward/options.go b/pkg/forward/options.go index f4829a2a3d..42d80848b9 100644 --- a/pkg/forward/options.go +++ b/pkg/forward/options.go @@ -33,8 +33,6 @@ type options struct { udfConcurrency int // retryInterval is the time.Duration to sleep before retrying retryInterval time.Duration - // vertexType indicates the type of the vertex - vertexType dfv1.VertexType // logger is used to pass the logger variable logger *zap.SugaredLogger // enableMapUdfStream indicates whether the message streaming is enabled or not for map UDF processing @@ -85,14 +83,6 @@ func WithLogger(l *zap.SugaredLogger) Option { } } -// WithVertexType sets the type of the vertex -func WithVertexType(t dfv1.VertexType) Option { - return func(o *options) error { - o.vertexType = t - return nil - } -} - // WithUDFStreaming sets streaming for map UDF processing func WithUDFStreaming(f bool) Option { return func(o *options) error { diff --git a/pkg/shared/idlehandler/idlehandler.go b/pkg/shared/idlehandler/idlehandler.go index e0a0c8f528..157ba36aaa 100644 --- a/pkg/shared/idlehandler/idlehandler.go +++ b/pkg/shared/idlehandler/idlehandler.go @@ -18,11 +18,13 @@ package idlehandler import ( "context" + "reflect" "go.uber.org/zap" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" "github.com/numaproj/numaflow/pkg/isb" + "github.com/numaproj/numaflow/pkg/watermark/generic" "github.com/numaproj/numaflow/pkg/watermark/publish" "github.com/numaproj/numaflow/pkg/watermark/wmb" ) @@ -30,6 +32,11 @@ import ( // PublishIdleWatermark publishes a ctrl message with isb.Kind set to WMB. We only send one ctrl message when // we see a new WMB; later we only update the WMB without a ctrl message. func PublishIdleWatermark(ctx context.Context, toBufferPartition isb.BufferWriter, wmPublisher publish.Publisher, idleManager *wmb.IdleManager, logger *zap.SugaredLogger, vertexType dfv1.VertexType, wm wmb.Watermark) { + if reflect.TypeOf(wmPublisher) == reflect.TypeOf(&generic.NoOpWMProgressor{}) { + // TODO(idleWatermark): simplest fix as of now, need to design a better fix + // watermark disabled, no need to proceed + return + } var toPartitionName = toBufferPartition.GetName() var toVertexPartition = toBufferPartition.GetPartitionIdx() diff --git a/pkg/sinks/blackhole/blackhole.go b/pkg/sinks/blackhole/blackhole.go index 36ef6a0a06..74ebe4041a 100644 --- a/pkg/sinks/blackhole/blackhole.go +++ b/pkg/sinks/blackhole/blackhole.go @@ -19,23 +19,22 @@ package blackhole import ( "context" + "go.uber.org/zap" + dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" - "github.com/numaproj/numaflow/pkg/forward" - "github.com/numaproj/numaflow/pkg/forward/applier" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/metrics" "github.com/numaproj/numaflow/pkg/shared/logging" + sinkforward "github.com/numaproj/numaflow/pkg/sinks/forward" "github.com/numaproj/numaflow/pkg/watermark/fetch" "github.com/numaproj/numaflow/pkg/watermark/publish" - - "go.uber.org/zap" ) // Blackhole is a sink to emulate /dev/null type Blackhole struct { name string pipelineName string - isdf *forward.InterStepDataForward + isdf *sinkforward.DataForward logger *zap.SugaredLogger } @@ -52,8 +51,7 @@ func WithLogger(log *zap.SugaredLogger) Option { func NewBlackhole(vertex *dfv1.Vertex, fromBuffer isb.BufferReader, fetchWatermark fetch.Fetcher, - publishWatermark map[string]publish.Publisher, - whereToDecider forward.GoWhere, + publishWatermark publish.Publisher, opts ...Option) (*Blackhole, error) { bh := new(Blackhole) @@ -70,14 +68,14 @@ func NewBlackhole(vertex *dfv1.Vertex, bh.logger = logging.NewLogger() } - forwardOpts := []forward.Option{forward.WithVertexType(dfv1.VertexTypeSink), forward.WithLogger(bh.logger)} + forwardOpts := []sinkforward.Option{sinkforward.WithLogger(bh.logger)} if x := vertex.Spec.Limits; x != nil { if x.ReadBatchSize != nil { - forwardOpts = append(forwardOpts, forward.WithReadBatchSize(int64(*x.ReadBatchSize))) + forwardOpts = append(forwardOpts, sinkforward.WithReadBatchSize(int64(*x.ReadBatchSize))) } } - isdf, err := forward.NewInterStepDataForward(vertex, fromBuffer, map[string][]isb.BufferWriter{vertex.Spec.Name: {bh}}, whereToDecider, applier.Terminal, applier.TerminalMapStream, fetchWatermark, publishWatermark, forwardOpts...) + isdf, err := sinkforward.NewDataForward(vertex, fromBuffer, bh, fetchWatermark, publishWatermark, forwardOpts...) if err != nil { return nil, err } diff --git a/pkg/sinks/blackhole/blackhole_test.go b/pkg/sinks/blackhole/blackhole_test.go index 7dd8a89dbb..4c6f76c29d 100644 --- a/pkg/sinks/blackhole/blackhole_test.go +++ b/pkg/sinks/blackhole/blackhole_test.go @@ -21,35 +21,14 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" - "github.com/numaproj/numaflow/pkg/forward" - "github.com/numaproj/numaflow/pkg/forward/applier" - "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer" "github.com/numaproj/numaflow/pkg/isb/testutils" "github.com/numaproj/numaflow/pkg/watermark/generic" - - "github.com/stretchr/testify/assert" -) - -var ( - testStartTime = time.Unix(1636470000, 0).UTC() ) -type myForwardToAllTest struct { -} - -func (f myForwardToAllTest) WhereTo(_ []string, _ []string) ([]forward.VertexBuffer, error) { - return []forward.VertexBuffer{{ - ToVertexName: "to1", - ToVertexPartitionIdx: 0, - }, - { - ToVertexName: "to2", - ToVertexPartitionIdx: 0, - }}, nil -} - func TestBlackhole_Start(t *testing.T) { fromStep := simplebuffer.NewInMemoryBuffer("from", 25, 0) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) @@ -67,7 +46,7 @@ func TestBlackhole_Start(t *testing.T) { }, }} fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferList([]string{vertex.Spec.Name}) - s, err := NewBlackhole(vertex, fromStep, fetchWatermark, publishWatermark, getSinkGoWhereDecider(vertex.Spec.Name)) + s, err := NewBlackhole(vertex, fromStep, fetchWatermark, publishWatermark[vertex.Spec.Name]) assert.NoError(t, err) stopped := s.Start() @@ -83,80 +62,3 @@ func TestBlackhole_Start(t *testing.T) { <-stopped } - -// TestBlackhole_ForwardToTwoVertex writes to 2 vertices and have a blackhole sinks attached to each vertex. -func TestBlackhole_ForwardToTwoVertex(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) - defer cancel() - - fromStep := simplebuffer.NewInMemoryBuffer("from", 25, 0) - to1 := simplebuffer.NewInMemoryBuffer("to1", 25, 0) - to2 := simplebuffer.NewInMemoryBuffer("to2", 25, 0) - - // start the last vertex first - // add 2 sinks per vertex - vertex1 := &dfv1.Vertex{Spec: dfv1.VertexSpec{ - AbstractVertex: dfv1.AbstractVertex{ - Name: "sinks.blackhole1", - Sink: &dfv1.Sink{ - Blackhole: &dfv1.Blackhole{}, - }, - }, - }} - - vertex2 := &dfv1.Vertex{Spec: dfv1.VertexSpec{ - AbstractVertex: dfv1.AbstractVertex{ - Name: "sinks.blackhole2", - Sink: &dfv1.Sink{ - Blackhole: &dfv1.Blackhole{}, - }, - }, - }} - fetchWatermark1, publishWatermark1 := generic.BuildNoOpWatermarkProgressorsFromBufferList([]string{vertex1.Spec.Name}) - bh1, _ := NewBlackhole(vertex1, to1, fetchWatermark1, publishWatermark1, getSinkGoWhereDecider(vertex1.Spec.Name)) - fetchWatermark2, publishWatermark2 := generic.BuildNoOpWatermarkProgressorsFromBufferList([]string{vertex2.Spec.Name}) - bh2, _ := NewBlackhole(vertex2, to2, fetchWatermark2, publishWatermark2, getSinkGoWhereDecider(vertex2.Spec.Name)) - bh1Stopped := bh1.Start() - bh2Stopped := bh2.Start() - - toSteps := map[string][]isb.BufferWriter{ - "to1": {to1}, - "to2": {to2}, - } - - writeMessages := testutils.BuildTestWriteMessages(int64(20), testStartTime) - vertex := &dfv1.Vertex{Spec: dfv1.VertexSpec{ - PipelineName: "testPipeline", - AbstractVertex: dfv1.AbstractVertex{ - Name: "testVertex", - }, - }} - fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps) - f, err := forward.NewInterStepDataForward(vertex, fromStep, toSteps, myForwardToAllTest{}, applier.Terminal, applier.TerminalMapStream, fetchWatermark, publishWatermark) - assert.NoError(t, err) - - stopped := f.Start() - // write some data - _, errs := fromStep.Write(ctx, writeMessages[0:5]) - assert.Equal(t, make([]error, 5), errs) - f.Stop() - <-stopped - // downstream should be stopped only after upstream is stopped - bh1.Stop() - bh2.Stop() - - <-bh1Stopped - <-bh2Stopped -} - -func getSinkGoWhereDecider(vertexName string) forward.GoWhere { - fsd := forward.GoWhere(func(keys []string, tags []string) ([]forward.VertexBuffer, error) { - var result []forward.VertexBuffer - result = append(result, forward.VertexBuffer{ - ToVertexName: vertexName, - ToVertexPartitionIdx: 0, - }) - return result, nil - }) - return fsd -} diff --git a/pkg/sinks/forward/forward.go b/pkg/sinks/forward/forward.go new file mode 100644 index 0000000000..c4ff3ab1a7 --- /dev/null +++ b/pkg/sinks/forward/forward.go @@ -0,0 +1,432 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package forward + +import ( + "context" + "errors" + "fmt" + "math" + "sync" + "time" + + "go.uber.org/zap" + "k8s.io/apimachinery/pkg/util/wait" + + dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" + "github.com/numaproj/numaflow/pkg/isb" + "github.com/numaproj/numaflow/pkg/metrics" + "github.com/numaproj/numaflow/pkg/shared/idlehandler" + "github.com/numaproj/numaflow/pkg/shared/logging" + "github.com/numaproj/numaflow/pkg/watermark/fetch" + "github.com/numaproj/numaflow/pkg/watermark/publish" + "github.com/numaproj/numaflow/pkg/watermark/wmb" +) + +// DataForward forwards the data from previous vertex to sinker. +type DataForward struct { + ctx context.Context + // cancelFn cancels our new context, our cancellation is little more complex and needs to be well orchestrated, hence + // we need something more than a cancel(). + cancelFn context.CancelFunc + fromBufferPartition isb.BufferReader + toBuffer isb.BufferWriter + wmFetcher fetch.Fetcher + wmPublisher publish.Publisher + opts options + vertexName string + pipelineName string + // idleManager manages the idle watermark status. + idleManager *wmb.IdleManager + // wmbChecker checks if the idle watermark is valid. + wmbChecker wmb.WMBChecker + Shutdown +} + +// NewDataForward creates a sink data forwarder. +func NewDataForward( + vertex *dfv1.Vertex, + fromStep isb.BufferReader, + toStep isb.BufferWriter, + fetchWatermark fetch.Fetcher, + publishWatermark publish.Publisher, + opts ...Option) (*DataForward, error) { + + options := DefaultOptions() + for _, o := range opts { + if err := o(options); err != nil { + return nil, err + } + } + // creating a context here which is managed by the forwarder's lifecycle + ctx, cancel := context.WithCancel(context.Background()) + + var df = DataForward{ + ctx: ctx, + cancelFn: cancel, + fromBufferPartition: fromStep, + toBuffer: toStep, + wmFetcher: fetchWatermark, + wmPublisher: publishWatermark, + // should we do a check here for the values not being null? + vertexName: vertex.Spec.Name, + pipelineName: vertex.Spec.PipelineName, + idleManager: wmb.NewIdleManager(1), + wmbChecker: wmb.NewWMBChecker(2), // TODO: make configurable + Shutdown: Shutdown{ + rwlock: new(sync.RWMutex), + }, + opts: *options, + } + + // Add logger from parent ctx to child context. + df.ctx = logging.WithLogger(ctx, options.logger) + + return &df, nil +} + +// Start starts reading the buffer and forwards to sinker. Call `Stop` to stop. +func (df *DataForward) Start() <-chan struct{} { + log := logging.FromContext(df.ctx) + stopped := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(1) + go func() { + log.Info("Starting sink forwarder...") + // with wg approach can do more cleanup in case we need in the future. + defer wg.Done() + for { + select { + case <-df.ctx.Done(): + ok, err := df.IsShuttingDown() + if err != nil { + // ignore the error for now. + log.Errorw("Failed to check if it can shutdown", zap.Error(err)) + } + if ok { + log.Info("Shutting down...") + return + } + default: + // once context.Done() is called, we still have to try to forwardAChunk because in graceful + // shutdown the fromBufferPartition should be empty. + } + // keep doing what you are good at + df.forwardAChunk(df.ctx) + } + }() + + go func() { + wg.Wait() + // Clean up resources for buffer reader and all the writers if any. + if err := df.fromBufferPartition.Close(); err != nil { + log.Errorw("Failed to close buffer reader, shutdown anyways...", zap.Error(err)) + } else { + log.Infow("Closed buffer reader", zap.String("bufferFrom", df.fromBufferPartition.GetName())) + } + toBuffer := df.toBuffer + if err := toBuffer.Close(); err != nil { + log.Errorw("Failed to close toBuffer writer, shutdown anyways...", zap.Error(err), zap.String("bufferTo", toBuffer.GetName())) + } else { + log.Infow("Closed toBuffer writer", zap.String("bufferTo", toBuffer.GetName())) + } + + close(stopped) + }() + + return stopped +} + +// readWriteMessagePair represents a read message and its processed write messages. +type readWriteMessagePair struct { + readMessage *isb.ReadMessage + writeMessages []*isb.WriteMessage +} + +// forwardAChunk forwards a chunk of message from the fromBufferPartition to the toBuffer. It does the Read -> Process -> Forward -> Ack chain +// for a chunk of messages returned by the first Read call. It will return only if only we are successfully able to ack +// the message after forwarding, barring any platform errors. The platform errors include buffer-full, +// buffer-not-reachable, etc., but does not include errors due to WhereTo, etc. +func (df *DataForward) forwardAChunk(ctx context.Context) { + start := time.Now() + // There is a chance that we have read the message and the container got forcefully terminated before processing. To provide + // at-least-once semantics for reading, during restart we will have to reprocess all unacknowledged messages. It is the + // responsibility of the Read function to do that. + readMessages, err := df.fromBufferPartition.Read(ctx, df.opts.readBatchSize) + df.opts.logger.Debugw("Read from buffer", zap.String("bufferFrom", df.fromBufferPartition.GetName()), zap.Int64("length", int64(len(readMessages)))) + if err != nil { + df.opts.logger.Warnw("failed to read fromBufferPartition", zap.Error(err)) + readMessagesError.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelPartitionName: df.fromBufferPartition.GetName()}).Inc() + } + readMessagesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelPartitionName: df.fromBufferPartition.GetName()}).Add(float64(len(readMessages))) + + // process only if we have any read messages. There is a natural looping here if there is an internal error while + // reading, and we are not able to proceed. + if len(readMessages) == 0 { + // When the read length is zero, the write length is definitely zero too, + // meaning there's no data to be published to the next vertex, and we consider this + // situation as idling. + // In order to continue propagating watermark, we will set watermark idle=true and publish it. + // We also publish a control message if this is the first time we get this idle situation. + // We compute the HeadIdleWMB using the given partition as the idle watermark + var processorWMB = df.wmFetcher.ComputeHeadIdleWMB(df.fromBufferPartition.GetPartitionIdx()) + if !df.wmbChecker.ValidateHeadWMB(processorWMB) { + // validation failed, skip publishing + df.opts.logger.Debugw("skip publishing idle watermark", + zap.Int("counter", df.wmbChecker.GetCounter()), + zap.Int64("offset", processorWMB.Offset), + zap.Int64("watermark", processorWMB.Watermark), + zap.Bool("idle", processorWMB.Idle)) + return + } + + // if the validation passed, we will publish the watermark to all the toBuffer partitions. + idlehandler.PublishIdleWatermark(ctx, df.toBuffer, df.wmPublisher, df.idleManager, df.opts.logger, dfv1.VertexTypeSink, wmb.Watermark(time.UnixMilli(processorWMB.Watermark))) + return + } + + var dataMessages = make([]*isb.ReadMessage, 0, len(readMessages)) + + // store the offsets of the messages we read from ISB + var readOffsets = make([]isb.Offset, len(readMessages)) + for idx, m := range readMessages { + readOffsets[idx] = m.ReadOffset + if m.Kind == isb.Data { + dataMessages = append(dataMessages, m) + } + } + + // fetch watermark if available + // TODO: make it async (concurrent and wait later) + // let's track only the first element's watermark. This is important because we reassign the watermark we fetch + // to all the elements in the batch. If we were to assign last element's watermark, we will wrongly mark on-time data as late. + // we fetch the watermark for the partition from which we read the message. + processorWM := df.wmFetcher.ComputeWatermark(readMessages[0].ReadOffset, df.fromBufferPartition.GetPartitionIdx()) + + var messageToStep []isb.Message + + // concurrent processing request channel + processingCh := make(chan *readWriteMessagePair) + // sinkResults stores the results after processing for all read messages. It indexes + // a read message to the write message + sinkResults := make([]readWriteMessagePair, len(dataMessages)) + + // create a pool of processors + var wg sync.WaitGroup + for i := 0; i < df.opts.sinkConcurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + df.concurrentConvertReadToWriteMsgs(ctx, processingCh) + }() + } + concurrentProcessingStart := time.Now() + + // only send the data messages + for idx, m := range dataMessages { + // emit message size metric + readBytesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelPartitionName: df.fromBufferPartition.GetName()}).Add(float64(len(m.Payload))) + // assign watermark to the message + m.Watermark = time.Time(processorWM) + // send message to the channel + sinkResults[idx].readMessage = m + processingCh <- &sinkResults[idx] + } + // let the go routines know that there is no more work + close(processingCh) + // wait till the processing is done. this will not be an infinite wait because the processing will exit if + // context.Done() is closed. + wg.Wait() + df.opts.logger.Debugw("concurrent convert read message to write message completed", zap.Int("concurrency", df.opts.sinkConcurrency), zap.Duration("took", time.Since(concurrentProcessingStart))) + + // let's figure out which vertex to send the results to. + // update the toBuffer(s) with writeMessages. + for _, m := range sinkResults { + // update toBuffer + for _, message := range m.writeMessages { + messageToStep = append(messageToStep, message.Message) + } + } + + // forward the message to the edge buffer (could be multiple edges) + writeOffsets, err := df.writeToBuffer(ctx, df.toBuffer, messageToStep) + if err != nil { + df.opts.logger.Errorw("failed to write to toBuffer", zap.Error(err)) + df.fromBufferPartition.NoAck(ctx, readOffsets) + return + } + df.opts.logger.Debugw("writeToBuffers completed") + + // in sink we don't drop any messages + // so len(dataMessages) should be the same as len(writeOffsets) + // if len(writeOffsets) is greater than 0, publish normal watermark + // if len(writeOffsets) is 0, meaning we only have control messages, + // we should not publish anything: the next len(readMessage) check will handle this idling situation + if len(writeOffsets) > 0 { + df.wmPublisher.PublishWatermark(processorWM, nil, int32(0)) + // reset because the toBuffer is no longer idling + df.idleManager.Reset(df.toBuffer.GetName()) + } + + err = df.ackFromBuffer(ctx, readOffsets) + // implicit return for posterity :-) + if err != nil { + df.opts.logger.Errorw("Failed to ack from buffer", zap.Error(err)) + ackMessageError.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelPartitionName: df.fromBufferPartition.GetName()}).Add(float64(len(readOffsets))) + return + } + ackMessagesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelPartitionName: df.fromBufferPartition.GetName()}).Add(float64(len(readOffsets))) + + // ProcessingTimes of the entire forwardAChunk + forwardAChunkProcessingTime.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelPartitionName: df.fromBufferPartition.GetName()}).Observe(float64(time.Since(start).Microseconds())) +} + +// ackFromBuffer acknowledges an array of offsets back to fromBufferPartition and is a blocking call or until shutdown has been initiated. +func (df *DataForward) ackFromBuffer(ctx context.Context, offsets []isb.Offset) error { + var ackRetryBackOff = wait.Backoff{ + Factor: 1, + Jitter: 0.1, + Steps: math.MaxInt, + Duration: time.Millisecond * 10, + } + var ackOffsets = offsets + attempt := 0 + + ctxClosedErr := wait.ExponentialBackoff(ackRetryBackOff, func() (done bool, err error) { + errs := df.fromBufferPartition.Ack(ctx, ackOffsets) + attempt += 1 + summarizedErr := errorArrayToMap(errs) + var failedOffsets []isb.Offset + if len(summarizedErr) > 0 { + df.opts.logger.Errorw("Failed to ack from buffer, retrying", zap.Any("errors", summarizedErr), zap.Int("attempt", attempt)) + // no point retrying if ctx.Done has been invoked + select { + case <-ctx.Done(): + // no point in retrying after we have been asked to stop. + return false, ctx.Err() + default: + // retry only the failed offsets + for i, offset := range ackOffsets { + if errs[i] != nil { + failedOffsets = append(failedOffsets, offset) + } + } + ackOffsets = failedOffsets + if ok, _ := df.IsShuttingDown(); ok { + ackErr := fmt.Errorf("AckFromBuffer, Stop called while stuck on an internal error, %v", summarizedErr) + return false, ackErr + } + return false, nil + } + } else { + return true, nil + } + }) + + if ctxClosedErr != nil { + df.opts.logger.Errorw("Context closed while waiting to ack messages inside forward", zap.Error(ctxClosedErr)) + } + + return ctxClosedErr +} + +// writeToBuffer forwards an array of messages to a single buffer and is a blocking call or until shutdown has been initiated. +func (df *DataForward) writeToBuffer(ctx context.Context, toBufferPartition isb.BufferWriter, messages []isb.Message) (writeOffsets []isb.Offset, err error) { + var ( + totalCount int + writeCount int + writeBytes float64 + dropBytes float64 + ) + totalCount = len(messages) + writeOffsets = make([]isb.Offset, 0, totalCount) + + for { + _writeOffsets, errs := toBufferPartition.Write(ctx, messages) + // Note: this is an unwanted memory allocation during a happy path. We want only minimal allocation since using failedMessages is an unlikely path. + var failedMessages []isb.Message + needRetry := false + for idx, msg := range messages { + if err := errs[idx]; err != nil { + // ATM there are no user defined errors during write, all are InternalErrors. + if errors.As(err, &isb.NoRetryableBufferWriteErr{}) { + // If toBufferPartition returns us a NoRetryableBufferWriteErr, we drop the message. + dropBytes += float64(len(msg.Payload)) + } else { + needRetry = true + // we retry only failed messages + failedMessages = append(failedMessages, msg) + writeMessagesError.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelPartitionName: toBufferPartition.GetName()}).Inc() + // a shutdown can break the blocking loop caused due to InternalErr + if ok, _ := df.IsShuttingDown(); ok { + platformError.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName}).Inc() + return writeOffsets, fmt.Errorf("writeToBuffer failed, Stop called while stuck on an internal error with failed messages:%d, %v", len(failedMessages), errs) + } + } + } else { + writeCount++ + writeBytes += float64(len(msg.Payload)) + // we support write offsets only for jetstream + if _writeOffsets != nil { + writeOffsets = append(writeOffsets, _writeOffsets[idx]) + } + } + } + + if needRetry { + df.opts.logger.Errorw("Retrying failed messages", + zap.Any("errors", errorArrayToMap(errs)), + zap.String(metrics.LabelPipeline, df.pipelineName), + zap.String(metrics.LabelVertex, df.vertexName), + zap.String(metrics.LabelPartitionName, toBufferPartition.GetName()), + ) + // set messages to failed for the retry + messages = failedMessages + // TODO: implement retry with backoff etc. + time.Sleep(df.opts.retryInterval) + } else { + break + } + } + + dropMessagesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelPartitionName: toBufferPartition.GetName()}).Add(float64(totalCount - writeCount)) + dropBytesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelPartitionName: toBufferPartition.GetName()}).Add(dropBytes) + writeMessagesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelPartitionName: toBufferPartition.GetName()}).Add(float64(writeCount)) + writeBytesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelPartitionName: toBufferPartition.GetName()}).Add(writeBytes) + return writeOffsets, nil +} + +// concurrentConvertReadToWriteMsgs concurrently convert read messages to write messages +func (df *DataForward) concurrentConvertReadToWriteMsgs(_ context.Context, readMessagePair <-chan *readWriteMessagePair) { + for message := range readMessagePair { + writeMessages := []*isb.WriteMessage{{ + Message: message.readMessage.Message, + }} + message.writeMessages = append(message.writeMessages, writeMessages...) + } +} + +// errorArrayToMap summarizes an error array to map +func errorArrayToMap(errs []error) map[string]int64 { + result := make(map[string]int64) + for _, err := range errs { + if err != nil { + result[err.Error()]++ + } + } + return result +} diff --git a/pkg/sinks/forward/forward_test.go b/pkg/sinks/forward/forward_test.go new file mode 100644 index 0000000000..a1bda9c1f5 --- /dev/null +++ b/pkg/sinks/forward/forward_test.go @@ -0,0 +1,395 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package forward + +import ( + "context" + "fmt" + "strings" + "sync" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/assert" + "go.uber.org/goleak" + + dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" + "github.com/numaproj/numaflow/pkg/isb" + "github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer" + "github.com/numaproj/numaflow/pkg/isb/testutils" + "github.com/numaproj/numaflow/pkg/shared/logging" + "github.com/numaproj/numaflow/pkg/watermark/generic" + "github.com/numaproj/numaflow/pkg/watermark/publish" + "github.com/numaproj/numaflow/pkg/watermark/wmb" +) + +const ( + testPipelineName = "testPipeline" + testVertexName = "testVertex" +) + +var ( + testStartTime = time.Unix(1636470000, 0).UTC() +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} + +// testForwarderPublisher is for data_forward_test.go only +type testForwarderPublisher struct { + Name string + idle bool + lock sync.RWMutex +} + +func (t *testForwarderPublisher) IsIdle() bool { + t.lock.RLock() + defer t.lock.RUnlock() + return t.idle +} + +func (t *testForwarderPublisher) Close() error { + return nil +} + +func (t *testForwarderPublisher) PublishWatermark(_ wmb.Watermark, _ isb.Offset, _ int32) { +} + +func (t *testForwarderPublisher) PublishIdleWatermark(_ wmb.Watermark, _ isb.Offset, _ int32) { + t.lock.Lock() + defer t.lock.Unlock() + t.idle = true +} + +func (t *testForwarderPublisher) GetLatestWatermark() wmb.Watermark { + return wmb.InitialWatermark +} + +// testForwardFetcher is for data_forward_test.go only +type testForwardFetcher struct { +} + +// ComputeWatermark uses current time as the watermark because we want to make sure +// the test publisher is publishing watermark +func (t *testForwardFetcher) ComputeWatermark(_ isb.Offset, _ int32) wmb.Watermark { + return t.getWatermark() +} + +func (t *testForwardFetcher) getWatermark() wmb.Watermark { + return wmb.Watermark(testStartTime) +} + +func (t *testForwardFetcher) ComputeHeadIdleWMB(int32) wmb.WMB { + // won't be used + return wmb.WMB{} +} + +// testForwardFetcher is for data_forward_test.go only +type testIdleForwardFetcher struct { +} + +// ComputeWatermark uses current time as the watermark because we want to make sure +// the test publisher is publishing watermark +func (t *testIdleForwardFetcher) ComputeWatermark(_ isb.Offset, _ int32) wmb.Watermark { + return t.getWatermark() +} + +func (t *testIdleForwardFetcher) getWatermark() wmb.Watermark { + return wmb.Watermark(testStartTime) +} + +func (t *testIdleForwardFetcher) ComputeHeadIdleWMB(int32) wmb.WMB { + return wmb.WMB{ + Idle: true, + Offset: 10, + Watermark: 100, + Partition: 0, + } +} + +func TestNewDataForward(t *testing.T) { + var ( + testName = "sink_forward" + batchSize int64 = 10 + ) + + t.Run(testName+"_basic", func(t *testing.T) { + metricsReset() + // set the buffer size to be 5 * batchSize, so we have enough space for testing + fromStep := simplebuffer.NewInMemoryBuffer("from", 5*batchSize, 0) + // as of now, all of our sinkers have only 1 toBuffer + to1 := simplebuffer.NewInMemoryBuffer(testVertexName, 5*batchSize, 0) + toSteps := map[string][]isb.BufferWriter{ + testVertexName: {to1}, + } + + vertex := &dfv1.Vertex{Spec: dfv1.VertexSpec{ + PipelineName: testPipelineName, + AbstractVertex: dfv1.AbstractVertex{ + Name: testVertexName, + }, + }} + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + writeMessages := testutils.BuildTestWriteMessages(4*batchSize, testStartTime) + + _, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps) + fetchWatermark := &testForwardFetcher{} + f, err := NewDataForward(vertex, fromStep, to1, fetchWatermark, publishWatermark[testVertexName]) + + assert.NoError(t, err) + assert.False(t, to1.IsFull()) + assert.True(t, to1.IsEmpty()) + + stopped := f.Start() + // write some data + _, errs := fromStep.Write(ctx, writeMessages[0:batchSize]) + assert.Equal(t, make([]error, batchSize), errs) + + testReadBatchSize := int(batchSize / 2) + msgs := to1.GetMessages(testReadBatchSize) + for ; msgs[testReadBatchSize-1].ID == ""; msgs = to1.GetMessages(testReadBatchSize) { + select { + case <-ctx.Done(): + if ctx.Err() == context.DeadlineExceeded { + t.Fatal("expected to have messages in to buffer", ctx.Err()) + } + default: + time.Sleep(1 * time.Millisecond) + } + } + // read some data + readMessages, err := to1.Read(ctx, int64(testReadBatchSize)) + assert.NoError(t, err, "expected no error") + assert.Len(t, readMessages, testReadBatchSize) + for i := 0; i < testReadBatchSize; i++ { + assert.Equal(t, []interface{}{writeMessages[i].MessageInfo}, []interface{}{readMessages[i].MessageInfo}) + assert.Equal(t, []interface{}{writeMessages[i].Kind}, []interface{}{readMessages[i].Kind}) + assert.Equal(t, []interface{}{writeMessages[i].Keys}, []interface{}{readMessages[i].Keys}) + assert.Equal(t, []interface{}{writeMessages[i].Body}, []interface{}{readMessages[i].Body}) + } + + // iterate to see whether metrics will eventually succeed + err = validateMetrics(batchSize) + for err != nil { + select { + case <-ctx.Done(): + t.Errorf("failed waiting metrics collection to succeed [%s] (%s)", ctx.Err(), err) + default: + time.Sleep(10 * time.Millisecond) + err = validateMetrics(batchSize) + + } + } + + // write some data + _, errs = fromStep.Write(ctx, writeMessages[batchSize:4*batchSize]) + assert.Equal(t, make([]error, 3*batchSize), errs) + + f.Stop() + time.Sleep(1 * time.Millisecond) + // only for shutdown will work as from buffer is not empty + f.ForceStop() + + <-stopped + + }) + + t.Run(testName+"_idle", func(t *testing.T) { + + fromStep := simplebuffer.NewInMemoryBuffer("from", 5*batchSize, 0) + to1 := simplebuffer.NewInMemoryBuffer(testVertexName, 5*batchSize, 0) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + vertex := &dfv1.Vertex{Spec: dfv1.VertexSpec{ + PipelineName: testPipelineName, + AbstractVertex: dfv1.AbstractVertex{ + Name: testVertexName, + }, + }} + + fetchWatermark := &testIdleForwardFetcher{} + publishWatermark := map[string]publish.Publisher{ + testVertexName: &testForwarderPublisher{}, + } + f, err := NewDataForward(vertex, fromStep, to1, fetchWatermark, publishWatermark[testVertexName]) + + assert.NoError(t, err) + assert.False(t, to1.IsFull()) + assert.True(t, to1.IsEmpty()) + + stopped := f.Start() + + // we don't write control message for sink, only publish idle watermark + for !publishWatermark[testVertexName].(*testForwarderPublisher).IsIdle() { + select { + case <-ctx.Done(): + logging.FromContext(ctx).Fatalf("expect to have the idle wm in to1, %s", ctx.Err()) + default: + time.Sleep(100 * time.Millisecond) + } + } + + f.Stop() + + <-stopped + }) +} + +// TestWriteToBuffer tests two BufferFullWritingStrategies: 1. discarding the latest message and 2. retrying writing until context is cancelled. +func TestWriteToBuffer(t *testing.T) { + tests := []struct { + name string + batchSize int64 + strategy dfv1.BufferFullWritingStrategy + throwError bool + }{ + { + name: "test-discard-latest", + batchSize: 10, + strategy: dfv1.DiscardLatest, + // should not throw any error as we drop messages and finish writing before context is cancelled + throwError: false, + }, + { + name: "test-retry-until-success", + batchSize: 10, + strategy: dfv1.RetryUntilSuccess, + // should throw context closed error as we keep retrying writing until context is cancelled + throwError: true, + }, + { + name: "test-discard-latest", + batchSize: 1, + strategy: dfv1.DiscardLatest, + // should not throw any error as we drop messages and finish writing before context is cancelled + throwError: false, + }, + { + name: "test-retry-until-success", + batchSize: 1, + strategy: dfv1.RetryUntilSuccess, + // should throw context closed error as we keep retrying writing until context is cancelled + throwError: true, + }, + } + for _, value := range tests { + t.Run(value.name, func(t *testing.T) { + fromStep := simplebuffer.NewInMemoryBuffer("from", 5*value.batchSize, 0) + buffer := simplebuffer.NewInMemoryBuffer("to1", value.batchSize, 0, simplebuffer.WithBufferFullWritingStrategy(value.strategy)) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + vertex := &dfv1.Vertex{Spec: dfv1.VertexSpec{ + PipelineName: "testPipeline", + AbstractVertex: dfv1.AbstractVertex{ + Name: "to1", + }, + }} + + fetchWatermark := &testForwardFetcher{} + publishWatermark := map[string]publish.Publisher{ + "to1": &testForwarderPublisher{}, + } + f, err := NewDataForward(vertex, fromStep, buffer, fetchWatermark, publishWatermark["to1"]) + + assert.NoError(t, err) + assert.False(t, buffer.IsFull()) + assert.True(t, buffer.IsEmpty()) + + stopped := f.Start() + go func() { + for !buffer.IsFull() { + select { + case <-ctx.Done(): + logging.FromContext(ctx).Fatalf("not full, %s", ctx.Err()) + default: + time.Sleep(1 * time.Millisecond) + } + } + // stop will cancel the context + f.Stop() + }() + + // try to write to buffer after it is full. + var messageToStep []isb.Message + writeMessages := testutils.BuildTestWriteMessages(4*value.batchSize, testStartTime) + messageToStep = append(messageToStep, writeMessages[0:value.batchSize+1]...) + _, err = f.writeToBuffer(ctx, buffer, messageToStep) + + assert.Equal(t, value.throwError, err != nil) + if value.throwError { + // assert the number of failed messages + assert.True(t, strings.Contains(err.Error(), "with failed messages:1")) + } + <-stopped + }) + } +} + +func validateMetrics(batchSize int64) (err error) { + metadata := ` + # HELP sink_forwarder_read_total Total number of Messages Read + # TYPE sink_forwarder_read_total counter + ` + expected := ` + sink_forwarder_read_total{partition_name="from",pipeline="testPipeline",vertex="testVertex"} ` + fmt.Sprintf("%f", float64(batchSize)) + ` + ` + + err = testutil.CollectAndCompare(readMessagesCount, strings.NewReader(metadata+expected), "sink_forwarder_read_total") + if err != nil { + return err + } + + writeMetadata := ` + # HELP sink_forwarder_write_total Total number of Messages Written + # TYPE sink_forwarder_write_total counter + ` + writeExpected := ` + sink_forwarder_write_total{partition_name="testVertex",pipeline="testPipeline",vertex="testVertex"} ` + fmt.Sprintf("%d", batchSize) + ` + ` + err = testutil.CollectAndCompare(writeMessagesCount, strings.NewReader(writeMetadata+writeExpected), "sink_forwarder_write_total") + if err != nil { + return err + } + + ackMetadata := ` + # HELP sink_forwarder_ack_total Total number of Messages Acknowledged + # TYPE sink_forwarder_ack_total counter + ` + ackExpected := ` + sink_forwarder_ack_total{partition_name="from",pipeline="testPipeline",vertex="testVertex"} ` + fmt.Sprintf("%d", batchSize) + ` + ` + + err = testutil.CollectAndCompare(ackMessagesCount, strings.NewReader(ackMetadata+ackExpected), "sink_forwarder_ack_total") + if err != nil { + return err + } + + return nil +} + +func metricsReset() { + readMessagesCount.Reset() + writeMessagesCount.Reset() + ackMessagesCount.Reset() +} diff --git a/pkg/sinks/forward/metrics.go b/pkg/sinks/forward/metrics.go new file mode 100644 index 0000000000..1814306b79 --- /dev/null +++ b/pkg/sinks/forward/metrics.go @@ -0,0 +1,109 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package forward + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/numaproj/numaflow/pkg/metrics" +) + +// readMessagesCount is used to indicate the number of messages read +var readMessagesCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Subsystem: "sink_forwarder", + Name: "read_total", + Help: "Total number of Messages Read", +}, []string{metrics.LabelVertex, metrics.LabelPipeline, metrics.LabelPartitionName}) + +// readBytesCount is to indicate the number of bytes read +var readBytesCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Subsystem: "sink_forwarder", + Name: "read_bytes_total", + Help: "Total number of bytes read", +}, []string{metrics.LabelVertex, metrics.LabelPipeline, metrics.LabelPartitionName}) + +// readMessagesError is used to indicate the number of errors messages read +var readMessagesError = promauto.NewCounterVec(prometheus.CounterOpts{ + Subsystem: "sink_forwarder", + Name: "read_error_total", + Help: "Total number of Read Errors", +}, []string{metrics.LabelVertex, metrics.LabelPipeline, metrics.LabelPartitionName}) + +// writeMessagesCount is used to indicate the number of messages written +var writeMessagesCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Subsystem: "sink_forwarder", + Name: "write_total", + Help: "Total number of Messages Written", +}, []string{metrics.LabelVertex, metrics.LabelPipeline, metrics.LabelPartitionName}) + +// writeBytesCount is to indicate the number of bytes written +var writeBytesCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Subsystem: "sink_forwarder", + Name: "write_bytes_total", + Help: "Total number of bytes written", +}, []string{metrics.LabelVertex, metrics.LabelPipeline, metrics.LabelPartitionName}) + +// writeMessagesError is used to indicate the number of errors messages written +var writeMessagesError = promauto.NewCounterVec(prometheus.CounterOpts{ + Subsystem: "sink_forwarder", + Name: "write_error_total", + Help: "Total number of Write Errors", +}, []string{metrics.LabelVertex, metrics.LabelPipeline, metrics.LabelPartitionName}) + +// dropMessagesCount is used to indicate the number of messages dropped +var dropMessagesCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Subsystem: "sink_forwarder", + Name: "drop_total", + Help: "Total number of Messages Dropped", +}, []string{metrics.LabelVertex, metrics.LabelPipeline, metrics.LabelPartitionName}) + +// dropBytesCount is to indicate the number of bytes dropped +var dropBytesCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Subsystem: "sink_forwarder", + Name: "drop_bytes_total", + Help: "Total number of Bytes Dropped", +}, []string{metrics.LabelVertex, metrics.LabelPipeline, metrics.LabelPartitionName}) + +// ackMessagesCount is used to indicate the number of messages acknowledged +var ackMessagesCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Subsystem: "sink_forwarder", + Name: "ack_total", + Help: "Total number of Messages Acknowledged", +}, []string{metrics.LabelVertex, metrics.LabelPipeline, metrics.LabelPartitionName}) + +// ackMessageError is used to indicate the errors in the number of messages acknowledged +var ackMessageError = promauto.NewCounterVec(prometheus.CounterOpts{ + Subsystem: "sink_forwarder", + Name: "ack_error_total", + Help: "Total number of Acknowledged Errors", +}, []string{metrics.LabelVertex, metrics.LabelPipeline, metrics.LabelPartitionName}) + +// platformError is used to indicate the number of Internal/Platform errors +var platformError = promauto.NewCounterVec(prometheus.CounterOpts{ + Subsystem: "sink_forwarder", + Name: "platform_error_total", + Help: "Total number of platform Errors", +}, []string{metrics.LabelVertex, metrics.LabelPipeline}) + +// forwardAChunkProcessingTime is a histogram to Observe forwardAChunk Processing times as a whole +var forwardAChunkProcessingTime = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Subsystem: "sink_forwarder", + Name: "forward_chunk_processing_time", + Help: "Processing times of the entire forward a chunk (100 microseconds to 20 minutes)", + Buckets: prometheus.ExponentialBucketsRange(100, 60000000*20, 60), +}, []string{metrics.LabelVertex, metrics.LabelPipeline, metrics.LabelPartitionName}) diff --git a/pkg/sinks/forward/options.go b/pkg/sinks/forward/options.go new file mode 100644 index 0000000000..dcfba6b5be --- /dev/null +++ b/pkg/sinks/forward/options.go @@ -0,0 +1,81 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package forward + +import ( + "time" + + "go.uber.org/zap" + + dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" + "github.com/numaproj/numaflow/pkg/shared/logging" +) + +// options for forwarding the message +type options struct { + // readBatchSize is the default batch size + readBatchSize int64 + // sinkConcurrency sets the concurrency for concurrent processing + sinkConcurrency int + // retryInterval is the time.Duration to sleep before retrying + retryInterval time.Duration + // logger is used to pass the logger variable + logger *zap.SugaredLogger +} + +type Option func(*options) error + +func DefaultOptions() *options { + return &options{ + readBatchSize: dfv1.DefaultReadBatchSize, + sinkConcurrency: dfv1.DefaultReadBatchSize, + retryInterval: time.Millisecond, + logger: logging.NewLogger(), + } +} + +// WithReadBatchSize sets the read batch size +func WithReadBatchSize(f int64) Option { + return func(o *options) error { + o.readBatchSize = f + return nil + } +} + +// WithSinkConcurrency sets concurrency for processing +func WithSinkConcurrency(f int) Option { + return func(o *options) error { + o.sinkConcurrency = f + return nil + } +} + +// WithRetryInterval sets the retry interval +func WithRetryInterval(f time.Duration) Option { + return func(o *options) error { + o.retryInterval = time.Duration(f) + return nil + } +} + +// WithLogger is used to return logger information +func WithLogger(l *zap.SugaredLogger) Option { + return func(o *options) error { + o.logger = l + return nil + } +} diff --git a/pkg/sinks/forward/shutdown.go b/pkg/sinks/forward/shutdown.go new file mode 100644 index 0000000000..7db5961156 --- /dev/null +++ b/pkg/sinks/forward/shutdown.go @@ -0,0 +1,73 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package forward + +import ( + "fmt" + "sync" + "time" +) + +// Shutdown tracks and enforces the shutdown activity. +type Shutdown struct { + startShutdown bool + forceShutdown bool + initiateTime time.Time + shutdownRequestCtr int + rwlock *sync.RWMutex +} + +// IsShuttingDown returns whether we can stop processing. +func (df *DataForward) IsShuttingDown() (bool, error) { + df.Shutdown.rwlock.RLock() + defer df.Shutdown.rwlock.RUnlock() + + if df.Shutdown.forceShutdown || df.Shutdown.startShutdown { + return true, nil + } + + return false, nil +} + +func (s *Shutdown) String() string { + s.rwlock.RLock() + defer s.rwlock.RUnlock() + return fmt.Sprintf("startShutdown:%t forceShutdown:%t shutdownRequestCtr:%d initiateTime:%s", + s.startShutdown, s.forceShutdown, s.shutdownRequestCtr, s.initiateTime) +} + +// Stop stops the processing. +func (df *DataForward) Stop() { + df.Shutdown.rwlock.Lock() + defer df.Shutdown.rwlock.Unlock() + if df.Shutdown.initiateTime.IsZero() { + df.Shutdown.initiateTime = time.Now() + } + df.Shutdown.startShutdown = true + df.Shutdown.shutdownRequestCtr++ + // call cancel + df.cancelFn() +} + +// ForceStop sets up the force shutdown flag. +func (df *DataForward) ForceStop() { + // call stop (what if we have an enthusiastic shutdown that forces first) + df.Stop() + df.Shutdown.rwlock.Lock() + defer df.Shutdown.rwlock.Unlock() + df.Shutdown.forceShutdown = true +} diff --git a/pkg/sinks/forward/shutdown_test.go b/pkg/sinks/forward/shutdown_test.go new file mode 100644 index 0000000000..f35785b641 --- /dev/null +++ b/pkg/sinks/forward/shutdown_test.go @@ -0,0 +1,122 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package forward + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" + "github.com/numaproj/numaflow/pkg/isb" + "github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer" + "github.com/numaproj/numaflow/pkg/isb/testutils" + "github.com/numaproj/numaflow/pkg/watermark/generic" +) + +func TestShutDown(t *testing.T) { + tests := []struct { + name string + batchSize int64 + }{ + { + name: "batch_forward", + batchSize: 1, + }, + { + name: "batch_forward", + batchSize: 5, + }, + } + for _, tt := range tests { + t.Run(tt.name+"_stop", func(t *testing.T) { + batchSize := tt.batchSize + fromStep := simplebuffer.NewInMemoryBuffer("from", 5*batchSize, 0) + to1 := simplebuffer.NewInMemoryBuffer("to1", 2*batchSize, 0) + toSteps := map[string][]isb.BufferWriter{ + "to1": {to1}, + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + startTime := time.Unix(1636470000, 0) + writeMessages := testutils.BuildTestWriteMessages(4*batchSize, startTime) + + vertex := &dfv1.Vertex{Spec: dfv1.VertexSpec{ + PipelineName: "testPipeline", + AbstractVertex: dfv1.AbstractVertex{ + Name: "to1", + }, + }} + + fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps) + f, err := NewDataForward(vertex, fromStep, to1, fetchWatermark, publishWatermark["to1"], WithReadBatchSize(batchSize)) + assert.NoError(t, err) + stopped := f.Start() + // write some data but buffer is not full even though we are not reading + _, errs := fromStep.Write(ctx, writeMessages[0:batchSize]) + assert.Equal(t, make([]error, batchSize), errs) + + f.Stop() + // we cannot assert the result of IsShuttingDown because it might take a couple of iterations to be successful. + _, _ = f.IsShuttingDown() + <-stopped + }) + t.Run(tt.name+"_forceStop", func(t *testing.T) { + batchSize := tt.batchSize + fromStep := simplebuffer.NewInMemoryBuffer("from", 5*batchSize, 0) + to1 := simplebuffer.NewInMemoryBuffer("to1", 2*batchSize, 0) + toSteps := map[string][]isb.BufferWriter{ + "to1": {to1}, + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + startTime := time.Unix(1636470000, 0) + writeMessages := testutils.BuildTestWriteMessages(4*batchSize, startTime) + + vertex := &dfv1.Vertex{Spec: dfv1.VertexSpec{ + PipelineName: "testPipeline", + AbstractVertex: dfv1.AbstractVertex{ + Name: "to1", + }, + }} + + fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps) + f, err := NewDataForward(vertex, fromStep, to1, fetchWatermark, publishWatermark["to1"], WithReadBatchSize(batchSize)) + assert.NoError(t, err) + stopped := f.Start() + // write some data such that the fromBufferPartition can be empty, that is toBuffer gets full + _, errs := fromStep.Write(ctx, writeMessages[0:4*batchSize]) + assert.Equal(t, make([]error, 4*batchSize), errs) + + f.Stop() + canIShutdown, _ := f.IsShuttingDown() + assert.Equal(t, true, canIShutdown) + time.Sleep(1 * time.Millisecond) + // only for canIShutdown will work as from buffer is not empty + f.ForceStop() + canIShutdown, err = f.IsShuttingDown() + assert.NoError(t, err) + assert.Equal(t, true, canIShutdown) + + <-stopped + }) + } +} diff --git a/pkg/sinks/kafka/kafka.go b/pkg/sinks/kafka/kafka.go index 02035cc42a..6bbac5133a 100644 --- a/pkg/sinks/kafka/kafka.go +++ b/pkg/sinks/kafka/kafka.go @@ -25,12 +25,11 @@ import ( "go.uber.org/zap" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" - "github.com/numaproj/numaflow/pkg/forward" - "github.com/numaproj/numaflow/pkg/forward/applier" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/metrics" "github.com/numaproj/numaflow/pkg/shared/logging" "github.com/numaproj/numaflow/pkg/shared/util" + sinkforward "github.com/numaproj/numaflow/pkg/sinks/forward" "github.com/numaproj/numaflow/pkg/watermark/fetch" "github.com/numaproj/numaflow/pkg/watermark/publish" ) @@ -42,7 +41,7 @@ type ToKafka struct { producer sarama.AsyncProducer connected bool topic string - isdf *forward.InterStepDataForward + isdf *sinkforward.DataForward kafkaSink *dfv1.KafkaSink log *zap.SugaredLogger } @@ -60,8 +59,7 @@ func WithLogger(log *zap.SugaredLogger) Option { func NewToKafka(vertex *dfv1.Vertex, fromBuffer isb.BufferReader, fetchWatermark fetch.Fetcher, - publishWatermark map[string]publish.Publisher, - whereToDecider forward.GoWhere, + publishWatermark publish.Publisher, opts ...Option) (*ToKafka, error) { kafkaSink := vertex.Spec.Sink.Kafka @@ -83,14 +81,14 @@ func NewToKafka(vertex *dfv1.Vertex, toKafka.topic = kafkaSink.Topic toKafka.kafkaSink = kafkaSink - forwardOpts := []forward.Option{forward.WithVertexType(dfv1.VertexTypeSink), forward.WithLogger(toKafka.log)} + forwardOpts := []sinkforward.Option{sinkforward.WithLogger(toKafka.log)} if x := vertex.Spec.Limits; x != nil { if x.ReadBatchSize != nil { - forwardOpts = append(forwardOpts, forward.WithReadBatchSize(int64(*x.ReadBatchSize))) + forwardOpts = append(forwardOpts, sinkforward.WithReadBatchSize(int64(*x.ReadBatchSize))) } } - f, err := forward.NewInterStepDataForward(vertex, fromBuffer, map[string][]isb.BufferWriter{vertex.Spec.Name: {toKafka}}, whereToDecider, applier.Terminal, applier.TerminalMapStream, fetchWatermark, publishWatermark, forwardOpts...) + f, err := sinkforward.NewDataForward(vertex, fromBuffer, toKafka, fetchWatermark, publishWatermark, forwardOpts...) if err != nil { return nil, err } diff --git a/pkg/sinks/kafka/kafka_test.go b/pkg/sinks/kafka/kafka_test.go index 210024916f..ee4be499d8 100644 --- a/pkg/sinks/kafka/kafka_test.go +++ b/pkg/sinks/kafka/kafka_test.go @@ -22,11 +22,10 @@ import ( "testing" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" - "github.com/numaproj/numaflow/pkg/forward" - "github.com/numaproj/numaflow/pkg/forward/applier" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer" "github.com/numaproj/numaflow/pkg/shared/logging" + sinkforward "github.com/numaproj/numaflow/pkg/sinks/forward" "github.com/numaproj/numaflow/pkg/watermark/generic" mock "github.com/IBM/sarama/mocks" @@ -47,8 +46,7 @@ func TestWriteSuccessToKafka(t *testing.T) { }, }} fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferList([]string{vertex.Spec.Name}) - toSteps := map[string][]isb.BufferWriter{vertex.Spec.Name: {toKafka}} - toKafka.isdf, err = forward.NewInterStepDataForward(vertex, fromStep, toSteps, getSinkGoWhereDecider(vertex.Spec.Name), applier.Terminal, applier.TerminalMapStream, fetchWatermark, publishWatermark) + toKafka.isdf, err = sinkforward.NewDataForward(vertex, fromStep, toKafka, fetchWatermark, publishWatermark["testVertex"]) assert.NoError(t, err) toKafka.kafkaSink = vertex.Spec.Sink.Kafka toKafka.name = "Test" @@ -101,7 +99,7 @@ func TestWriteFailureToKafka(t *testing.T) { }} toSteps := map[string][]isb.BufferWriter{vertex.Spec.Name: {toKafka}} fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps) - toKafka.isdf, err = forward.NewInterStepDataForward(vertex, fromStep, toSteps, getSinkGoWhereDecider(vertex.Spec.Name), applier.Terminal, applier.TerminalMapStream, fetchWatermark, publishWatermark) + toKafka.isdf, err = sinkforward.NewDataForward(vertex, fromStep, toKafka, fetchWatermark, publishWatermark["testVertex"]) assert.NoError(t, err) toKafka.name = "Test" toKafka.topic = "topic-1" @@ -140,15 +138,3 @@ func TestWriteFailureToKafka(t *testing.T) { toKafka.Stop() } - -func getSinkGoWhereDecider(vertexName string) forward.GoWhere { - fsd := forward.GoWhere(func(keys []string, tags []string) ([]forward.VertexBuffer, error) { - var result []forward.VertexBuffer - result = append(result, forward.VertexBuffer{ - ToVertexName: vertexName, - ToVertexPartitionIdx: 0, - }) - return result, nil - }) - return fsd -} diff --git a/pkg/sinks/logger/log.go b/pkg/sinks/logger/log.go index 079958b63f..69ac398957 100644 --- a/pkg/sinks/logger/log.go +++ b/pkg/sinks/logger/log.go @@ -23,11 +23,10 @@ import ( "go.uber.org/zap" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" - "github.com/numaproj/numaflow/pkg/forward" - "github.com/numaproj/numaflow/pkg/forward/applier" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/metrics" "github.com/numaproj/numaflow/pkg/shared/logging" + sinkforward "github.com/numaproj/numaflow/pkg/sinks/forward" "github.com/numaproj/numaflow/pkg/watermark/fetch" "github.com/numaproj/numaflow/pkg/watermark/publish" ) @@ -36,7 +35,7 @@ import ( type ToLog struct { name string pipelineName string - isdf *forward.InterStepDataForward + isdf *sinkforward.DataForward logger *zap.SugaredLogger } @@ -53,8 +52,7 @@ func WithLogger(log *zap.SugaredLogger) Option { func NewToLog(vertex *dfv1.Vertex, fromBuffer isb.BufferReader, fetchWatermark fetch.Fetcher, - publishWatermark map[string]publish.Publisher, - whereToDecider forward.GoWhere, + publishWatermark publish.Publisher, opts ...Option) (*ToLog, error) { toLog := new(ToLog) @@ -71,14 +69,14 @@ func NewToLog(vertex *dfv1.Vertex, toLog.logger = logging.NewLogger() } - forwardOpts := []forward.Option{forward.WithVertexType(dfv1.VertexTypeSink), forward.WithLogger(toLog.logger)} + forwardOpts := []sinkforward.Option{sinkforward.WithLogger(toLog.logger)} if x := vertex.Spec.Limits; x != nil { if x.ReadBatchSize != nil { - forwardOpts = append(forwardOpts, forward.WithReadBatchSize(int64(*x.ReadBatchSize))) + forwardOpts = append(forwardOpts, sinkforward.WithReadBatchSize(int64(*x.ReadBatchSize))) } } - isdf, err := forward.NewInterStepDataForward(vertex, fromBuffer, map[string][]isb.BufferWriter{vertex.Spec.Name: {toLog}}, whereToDecider, applier.Terminal, applier.TerminalMapStream, fetchWatermark, publishWatermark, forwardOpts...) + isdf, err := sinkforward.NewDataForward(vertex, fromBuffer, toLog, fetchWatermark, publishWatermark, forwardOpts...) if err != nil { return nil, err } diff --git a/pkg/sinks/logger/log_test.go b/pkg/sinks/logger/log_test.go index 6ef1510173..07e4d217a6 100644 --- a/pkg/sinks/logger/log_test.go +++ b/pkg/sinks/logger/log_test.go @@ -22,11 +22,10 @@ import ( "time" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" - "github.com/numaproj/numaflow/pkg/forward" - "github.com/numaproj/numaflow/pkg/forward/applier" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer" "github.com/numaproj/numaflow/pkg/isb/testutils" + sinkforward "github.com/numaproj/numaflow/pkg/sinks/forward" "github.com/numaproj/numaflow/pkg/watermark/generic" "github.com/stretchr/testify/assert" @@ -36,21 +35,6 @@ var ( testStartTime = time.Unix(1636470000, 0).UTC() ) -type myForwardToAllTest struct { -} - -func (f myForwardToAllTest) WhereTo(_ []string, _ []string) ([]forward.VertexBuffer, error) { - return []forward.VertexBuffer{{ - ToVertexName: "to1", - ToVertexPartitionIdx: 0, - }, - { - ToVertexName: "to2", - ToVertexPartitionIdx: 0, - }, - }, nil -} - func TestToLog_Start(t *testing.T) { fromStep := simplebuffer.NewInMemoryBuffer("from", 25, 0) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) @@ -68,7 +52,7 @@ func TestToLog_Start(t *testing.T) { }, }} fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferList([]string{vertex.Spec.Name}) - s, err := NewToLog(vertex, fromStep, fetchWatermark, publishWatermark, getSinkGoWhereDecider(vertex.Spec.Name)) + s, err := NewToLog(vertex, fromStep, fetchWatermark, publishWatermark[vertex.Spec.Name]) assert.NoError(t, err) stopped := s.Start() @@ -85,22 +69,19 @@ func TestToLog_Start(t *testing.T) { <-stopped } -// TestToLog_ForwardToTwoVertex writes to 2 vertices and have a logger sinks attached to each vertex. -func TestToLog_ForwardToTwoVertex(t *testing.T) { +// TestToLog_Forward writes to a vertex which has a logger sink +func TestToLog_Forward(t *testing.T) { tests := []struct { - name string - batchSize int64 - streamEnabled bool + name string + batchSize int64 }{ { - name: "stream_forward", - batchSize: 1, - streamEnabled: true, + name: "batch_forward", + batchSize: 1, }, { - name: "batch_forward", - batchSize: 5, - streamEnabled: false, + name: "batch_forward", + batchSize: 5, }, } for _, tt := range tests { @@ -110,11 +91,8 @@ func TestToLog_ForwardToTwoVertex(t *testing.T) { defer cancel() fromStep := simplebuffer.NewInMemoryBuffer("from", 5*batchSize, 0) - to1 := simplebuffer.NewInMemoryBuffer("to1", 5*batchSize, 0) - to2 := simplebuffer.NewInMemoryBuffer("to2", 5*batchSize, 0) + to1 := simplebuffer.NewInMemoryBuffer("sinks.logger1", 5*batchSize, 0) - // start the last vertex first - // add 2 sinks per vertex vertex1 := &dfv1.Vertex{Spec: dfv1.VertexSpec{ AbstractVertex: dfv1.AbstractVertex{ Name: "sinks.logger1", @@ -124,35 +102,18 @@ func TestToLog_ForwardToTwoVertex(t *testing.T) { }, }} - vertex2 := &dfv1.Vertex{Spec: dfv1.VertexSpec{ - AbstractVertex: dfv1.AbstractVertex{ - Name: "sinks.logger2", - Sink: &dfv1.Sink{ - Log: &dfv1.Log{}, - }, - }, - }} fetchWatermark1, publishWatermark1 := generic.BuildNoOpWatermarkProgressorsFromBufferList([]string{vertex1.Spec.Name}) - logger1, _ := NewToLog(vertex1, to1, fetchWatermark1, publishWatermark1, getSinkGoWhereDecider(vertex1.Spec.Name)) - fetchWatermark2, publishWatermark2 := generic.BuildNoOpWatermarkProgressorsFromBufferList([]string{vertex2.Spec.Name}) - logger2, _ := NewToLog(vertex2, to2, fetchWatermark2, publishWatermark2, getSinkGoWhereDecider(vertex2.Spec.Name)) + logger1, _ := NewToLog(vertex1, to1, fetchWatermark1, publishWatermark1[vertex1.Spec.Name]) logger1Stopped := logger1.Start() - logger2Stopped := logger2.Start() toSteps := map[string][]isb.BufferWriter{ - "to1": {to1}, - "to2": {to2}, + "sinks.logger1": {to1}, } writeMessages := testutils.BuildTestWriteMessages(int64(20), testStartTime) - vertex := &dfv1.Vertex{Spec: dfv1.VertexSpec{ - PipelineName: "testPipeline", - AbstractVertex: dfv1.AbstractVertex{ - Name: "testVertex", - }, - }} + fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps) - f, err := forward.NewInterStepDataForward(vertex, fromStep, toSteps, myForwardToAllTest{}, applier.Terminal, applier.TerminalMapStream, fetchWatermark, publishWatermark, forward.WithReadBatchSize(batchSize), forward.WithUDFStreaming(tt.streamEnabled)) + f, err := sinkforward.NewDataForward(vertex1, fromStep, to1, fetchWatermark, publishWatermark["sinks.logger1"], sinkforward.WithReadBatchSize(batchSize)) assert.NoError(t, err) stopped := f.Start() @@ -163,22 +124,8 @@ func TestToLog_ForwardToTwoVertex(t *testing.T) { <-stopped // downstream should be stopped only after upstream is stopped logger1.Stop() - logger2.Stop() <-logger1Stopped - <-logger2Stopped }) } } - -func getSinkGoWhereDecider(vertexName string) forward.GoWhere { - fsd := forward.GoWhere(func(keys []string, tags []string) ([]forward.VertexBuffer, error) { - var result []forward.VertexBuffer - result = append(result, forward.VertexBuffer{ - ToVertexName: vertexName, - ToVertexPartitionIdx: 0, - }) - return result, nil - }) - return fsd -} diff --git a/pkg/sinks/sink.go b/pkg/sinks/sink.go index 7ee3e050fa..81e51434c2 100644 --- a/pkg/sinks/sink.go +++ b/pkg/sinks/sink.go @@ -23,7 +23,6 @@ import ( "go.uber.org/zap" - "github.com/numaproj/numaflow/pkg/forward" sharedutil "github.com/numaproj/numaflow/pkg/shared/util" "github.com/numaproj/numaflow/pkg/watermark/generic/jetstream" "github.com/numaproj/numaflow/pkg/watermark/store" @@ -161,7 +160,7 @@ func (u *SinkProcessor) Start(ctx context.Context) error { var finalWg sync.WaitGroup for index := range u.VertexInstance.Vertex.OwnedBuffers() { finalWg.Add(1) - sinker, err := u.getSinker(readers[index], log, fetchWatermark, publishWatermark, sinkHandler) + sinker, err := u.getSinker(readers[index], log, fetchWatermark, publishWatermark[u.VertexInstance.Vertex.Spec.Name], sinkHandler) if err != nil { return fmt.Errorf("failed to find a sink, errpr: %w", err) } @@ -226,32 +225,17 @@ func (u *SinkProcessor) Start(ctx context.Context) error { } // getSinker takes in the logger from the parent context -func (u *SinkProcessor) getSinker(reader isb.BufferReader, logger *zap.SugaredLogger, fetchWM fetch.Fetcher, publishWM map[string]publish.Publisher, sinkHandler udsink.SinkApplier) (Sinker, error) { +func (u *SinkProcessor) getSinker(reader isb.BufferReader, logger *zap.SugaredLogger, fetchWM fetch.Fetcher, publishWM publish.Publisher, sinkHandler udsink.SinkApplier) (Sinker, error) { sink := u.VertexInstance.Vertex.Spec.Sink if x := sink.Log; x != nil { - return logsink.NewToLog(u.VertexInstance.Vertex, reader, fetchWM, publishWM, u.getSinkGoWhereDecider(), logsink.WithLogger(logger)) + return logsink.NewToLog(u.VertexInstance.Vertex, reader, fetchWM, publishWM, logsink.WithLogger(logger)) } else if x := sink.Kafka; x != nil { - return kafkasink.NewToKafka(u.VertexInstance.Vertex, reader, fetchWM, publishWM, u.getSinkGoWhereDecider(), kafkasink.WithLogger(logger)) + return kafkasink.NewToKafka(u.VertexInstance.Vertex, reader, fetchWM, publishWM, kafkasink.WithLogger(logger)) } else if x := sink.Blackhole; x != nil { - return blackhole.NewBlackhole(u.VertexInstance.Vertex, reader, fetchWM, publishWM, u.getSinkGoWhereDecider(), blackhole.WithLogger(logger)) + return blackhole.NewBlackhole(u.VertexInstance.Vertex, reader, fetchWM, publishWM, blackhole.WithLogger(logger)) } else if x := sink.UDSink; x != nil { // if the sink is a user defined sink, then we need to pass the sinkHandler to it which will be used to invoke the user defined sink - return udsink.NewUserDefinedSink(u.VertexInstance.Vertex, reader, fetchWM, publishWM, u.getSinkGoWhereDecider(), sinkHandler, udsink.WithLogger(logger)) + return udsink.NewUserDefinedSink(u.VertexInstance.Vertex, reader, fetchWM, publishWM, sinkHandler, udsink.WithLogger(logger)) } return nil, fmt.Errorf("invalid sink spec") } - -// getSinkGoWhereDecider returns a function that decides where to send the message -// based on the keys and tags -// for sink processor, we send the message to the same vertex and partition will be set to 0 -func (u *SinkProcessor) getSinkGoWhereDecider() forward.GoWhere { - fsd := forward.GoWhere(func(keys []string, tags []string) ([]forward.VertexBuffer, error) { - var result []forward.VertexBuffer - result = append(result, forward.VertexBuffer{ - ToVertexName: u.VertexInstance.Vertex.Spec.Name, - ToVertexPartitionIdx: 0, - }) - return result, nil - }) - return fsd -} diff --git a/pkg/sinks/udsink/sink.go b/pkg/sinks/udsink/sink.go index 5614b4d1d6..b3507dd209 100644 --- a/pkg/sinks/udsink/sink.go +++ b/pkg/sinks/udsink/sink.go @@ -26,10 +26,9 @@ import ( sinkpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sink/v1" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" - "github.com/numaproj/numaflow/pkg/forward" - "github.com/numaproj/numaflow/pkg/forward/applier" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/shared/logging" + sinkforward "github.com/numaproj/numaflow/pkg/sinks/forward" "github.com/numaproj/numaflow/pkg/watermark/fetch" "github.com/numaproj/numaflow/pkg/watermark/publish" ) @@ -37,7 +36,7 @@ import ( type UserDefinedSink struct { name string pipelineName string - isdf *forward.InterStepDataForward + isdf *sinkforward.DataForward logger *zap.SugaredLogger udsink SinkApplier } @@ -55,8 +54,7 @@ func WithLogger(log *zap.SugaredLogger) Option { func NewUserDefinedSink(vertex *dfv1.Vertex, fromBuffer isb.BufferReader, fetchWatermark fetch.Fetcher, - publishWatermark map[string]publish.Publisher, - whereToDecider forward.GoWhere, + publishWatermark publish.Publisher, udsink SinkApplier, opts ...Option) (*UserDefinedSink, error) { @@ -73,15 +71,15 @@ func NewUserDefinedSink(vertex *dfv1.Vertex, s.logger = logging.NewLogger() } - forwardOpts := []forward.Option{forward.WithVertexType(dfv1.VertexTypeSink), forward.WithLogger(s.logger)} + forwardOpts := []sinkforward.Option{sinkforward.WithLogger(s.logger)} if x := vertex.Spec.Limits; x != nil { if x.ReadBatchSize != nil { - forwardOpts = append(forwardOpts, forward.WithReadBatchSize(int64(*x.ReadBatchSize))) + forwardOpts = append(forwardOpts, sinkforward.WithReadBatchSize(int64(*x.ReadBatchSize))) } } s.udsink = udsink - isdf, err := forward.NewInterStepDataForward(vertex, fromBuffer, map[string][]isb.BufferWriter{vertex.Spec.Name: {s}}, whereToDecider, applier.Terminal, applier.TerminalMapStream, fetchWatermark, publishWatermark, forwardOpts...) + isdf, err := sinkforward.NewDataForward(vertex, fromBuffer, s, fetchWatermark, publishWatermark, forwardOpts...) if err != nil { return nil, err } diff --git a/pkg/sinks/udsink/udsink_grpc.go b/pkg/sinks/udsink/udsink_grpc.go index 089a2b15ac..ce423788f3 100644 --- a/pkg/sinks/udsink/udsink_grpc.go +++ b/pkg/sinks/udsink/udsink_grpc.go @@ -21,11 +21,11 @@ import ( "fmt" "time" - sinkclient "github.com/numaproj/numaflow/pkg/sdkclient/sinker" - "github.com/numaproj/numaflow/pkg/shared/logging" - sinkpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sink/v1" "google.golang.org/protobuf/types/known/emptypb" + + sinkclient "github.com/numaproj/numaflow/pkg/sdkclient/sinker" + "github.com/numaproj/numaflow/pkg/shared/logging" ) // SinkApplier applies the sink on the read message and gives back a response. Any UserError will be retried here, while diff --git a/pkg/sinks/udsink/udsink_grpc_test.go b/pkg/sinks/udsink/udsink_grpc_test.go index ba0255b576..16098fc7a0 100644 --- a/pkg/sinks/udsink/udsink_grpc_test.go +++ b/pkg/sinks/udsink/udsink_grpc_test.go @@ -23,13 +23,13 @@ import ( "testing" "time" - sinkclient "github.com/numaproj/numaflow/pkg/sdkclient/sinker" - "github.com/golang/mock/gomock" sinkpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sink/v1" "github.com/numaproj/numaflow-go/pkg/apis/proto/sink/v1/sinkmock" "github.com/stretchr/testify/assert" "google.golang.org/protobuf/types/known/timestamppb" + + sinkclient "github.com/numaproj/numaflow/pkg/sdkclient/sinker" ) func NewMockUDSgRPCBasedUDSink(mockClient *sinkmock.MockSinkClient) *UDSgRPCBasedUDSink { diff --git a/pkg/sources/forward/data_forward_test.go b/pkg/sources/forward/data_forward_test.go index aa70c59ae9..cfb9dd0262 100644 --- a/pkg/sources/forward/data_forward_test.go +++ b/pkg/sources/forward/data_forward_test.go @@ -26,6 +26,9 @@ import ( "go.uber.org/goleak" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/assert" + dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" "github.com/numaproj/numaflow/pkg/forward" "github.com/numaproj/numaflow/pkg/isb" @@ -36,9 +39,6 @@ import ( "github.com/numaproj/numaflow/pkg/watermark/generic" wmstore "github.com/numaproj/numaflow/pkg/watermark/store" "github.com/numaproj/numaflow/pkg/watermark/wmb" - - "github.com/prometheus/client_golang/prometheus/testutil" - "github.com/stretchr/testify/assert" ) const ( diff --git a/pkg/sources/forward/shutdown_test.go b/pkg/sources/forward/shutdown_test.go index 02ea94b9ee..4dfb21f8fa 100644 --- a/pkg/sources/forward/shutdown_test.go +++ b/pkg/sources/forward/shutdown_test.go @@ -21,14 +21,14 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" "github.com/numaproj/numaflow/pkg/forward" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer" "github.com/numaproj/numaflow/pkg/isb/testutils" "github.com/numaproj/numaflow/pkg/watermark/generic" - - "github.com/stretchr/testify/assert" ) type myShutdownTest struct { diff --git a/pkg/sources/generator/tickgen_test.go b/pkg/sources/generator/tickgen_test.go index c0e016ba89..8449d48bf6 100644 --- a/pkg/sources/generator/tickgen_test.go +++ b/pkg/sources/generator/tickgen_test.go @@ -21,6 +21,9 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" "github.com/numaproj/numaflow/pkg/forward" "github.com/numaproj/numaflow/pkg/isb" @@ -28,9 +31,6 @@ import ( "github.com/numaproj/numaflow/pkg/sources/forward/applier" "github.com/numaproj/numaflow/pkg/watermark/generic" "github.com/numaproj/numaflow/pkg/watermark/store" - - "github.com/stretchr/testify/assert" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) type myForwardToAllTest struct { diff --git a/pkg/sources/http/http_test.go b/pkg/sources/http/http_test.go index 98e5a75f1d..6c7d8b5f85 100644 --- a/pkg/sources/http/http_test.go +++ b/pkg/sources/http/http_test.go @@ -22,14 +22,13 @@ import ( "github.com/stretchr/testify/assert" - "github.com/numaproj/numaflow/pkg/sources/forward/applier" - "github.com/numaproj/numaflow/pkg/watermark/store" - dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" "github.com/numaproj/numaflow/pkg/forward" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer" + "github.com/numaproj/numaflow/pkg/sources/forward/applier" "github.com/numaproj/numaflow/pkg/watermark/generic" + "github.com/numaproj/numaflow/pkg/watermark/store" ) type myForwardToAllTest struct { diff --git a/pkg/sources/kafka/handler.go b/pkg/sources/kafka/handler.go index 885a4b22f1..ccd40813e8 100644 --- a/pkg/sources/kafka/handler.go +++ b/pkg/sources/kafka/handler.go @@ -19,9 +19,9 @@ package kafka import ( "sync" + "github.com/IBM/sarama" "go.uber.org/zap" - "github.com/IBM/sarama" "github.com/numaproj/numaflow/pkg/shared/logging" ) diff --git a/pkg/sources/kafka/handler_test.go b/pkg/sources/kafka/handler_test.go index 4f611f4d30..57db741201 100644 --- a/pkg/sources/kafka/handler_test.go +++ b/pkg/sources/kafka/handler_test.go @@ -22,9 +22,8 @@ import ( "testing" "time" - "github.com/stretchr/testify/assert" - "github.com/IBM/sarama" + "github.com/stretchr/testify/assert" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" "github.com/numaproj/numaflow/pkg/forward" diff --git a/pkg/sources/transformer/grpc_transformer_test.go b/pkg/sources/transformer/grpc_transformer_test.go index 39bcdb321f..5707082733 100644 --- a/pkg/sources/transformer/grpc_transformer_test.go +++ b/pkg/sources/transformer/grpc_transformer_test.go @@ -26,20 +26,17 @@ import ( "github.com/golang/mock/gomock" v1 "github.com/numaproj/numaflow-go/pkg/apis/proto/sourcetransform/v1" transformermock "github.com/numaproj/numaflow-go/pkg/apis/proto/sourcetransform/v1/transformmock" - - "github.com/numaproj/numaflow/pkg/sdkclient/sourcetransformer" - "github.com/numaproj/numaflow/pkg/udf/rpc" - "github.com/stretchr/testify/assert" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - - "google.golang.org/grpc" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/isb/testutils" + "github.com/numaproj/numaflow/pkg/sdkclient/sourcetransformer" + "github.com/numaproj/numaflow/pkg/udf/rpc" ) func NewMockGRPCBasedTransformer(mockClient *transformermock.MockSourceTransformClient) *GRPCBasedTransformer { diff --git a/pkg/sources/udsource/grpc_udsource.go b/pkg/sources/udsource/grpc_udsource.go index e85361f407..409a307729 100644 --- a/pkg/sources/udsource/grpc_udsource.go +++ b/pkg/sources/udsource/grpc_udsource.go @@ -23,7 +23,6 @@ import ( "time" sourcepb "github.com/numaproj/numaflow-go/pkg/apis/proto/source/v1" - "google.golang.org/protobuf/types/known/emptypb" "github.com/numaproj/numaflow/pkg/isb" diff --git a/pkg/sources/udsource/grpc_udsource_test.go b/pkg/sources/udsource/grpc_udsource_test.go index fcfacb1acb..df7115996f 100644 --- a/pkg/sources/udsource/grpc_udsource_test.go +++ b/pkg/sources/udsource/grpc_udsource_test.go @@ -24,7 +24,10 @@ import ( "testing" "time" + "github.com/golang/mock/gomock" sourcepb "github.com/numaproj/numaflow-go/pkg/apis/proto/source/v1" + "github.com/numaproj/numaflow-go/pkg/apis/proto/source/v1/sourcemock" + "github.com/stretchr/testify/assert" "go.uber.org/goleak" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -35,10 +38,6 @@ import ( "github.com/numaproj/numaflow/pkg/isb" sourceclient "github.com/numaproj/numaflow/pkg/sdkclient/source/client" "github.com/numaproj/numaflow/pkg/sources/udsource/utils" - - "github.com/golang/mock/gomock" - "github.com/numaproj/numaflow-go/pkg/apis/proto/source/v1/sourcemock" - "github.com/stretchr/testify/assert" ) func TestMain(m *testing.M) { diff --git a/pkg/udf/map_udf.go b/pkg/udf/map_udf.go index 132e58307e..9031ea0c72 100644 --- a/pkg/udf/map_udf.go +++ b/pkg/udf/map_udf.go @@ -216,7 +216,7 @@ func (u *MapUDFProcessor) Start(ctx context.Context) error { return result, nil }) - opts := []forward.Option{forward.WithVertexType(dfv1.VertexTypeMapUDF), forward.WithLogger(log), + opts := []forward.Option{forward.WithLogger(log), forward.WithUDFStreaming(enableMapUdfStream)} if x := u.VertexInstance.Vertex.Spec.Limits; x != nil { if x.ReadBatchSize != nil { diff --git a/pkg/watermark/generic/noop.go b/pkg/watermark/generic/noop.go index 3dd6571e96..e9d48d71d1 100644 --- a/pkg/watermark/generic/noop.go +++ b/pkg/watermark/generic/noop.go @@ -53,12 +53,12 @@ func (n NoOpWMProgressor) GetLatestWatermark() wmb.Watermark { return wmb.Watermark{} } -// GetHeadWatermark returns the default head watermark. +// ComputeHeadWatermark returns the default head watermark. func (n NoOpWMProgressor) ComputeHeadWatermark(int32) wmb.Watermark { return wmb.Watermark{} } -// GetHeadWMB returns the default WMB. +// ComputeHeadIdleWMB returns the default WMB. func (n NoOpWMProgressor) ComputeHeadIdleWMB(int32) wmb.WMB { return wmb.WMB{} } From 8b41a3975bb08a946ba0a2145e81102087a48018 Mon Sep 17 00:00:00 2001 From: guangwu Date: Thu, 7 Sep 2023 15:26:09 +0800 Subject: [PATCH 2/2] chore: import packages only once (#1023) Signed-off-by: guoguangwu --- test/e2e-api/nats.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/e2e-api/nats.go b/test/e2e-api/nats.go index 57c95cc162..59537c6be6 100644 --- a/test/e2e-api/nats.go +++ b/test/e2e-api/nats.go @@ -23,7 +23,6 @@ import ( "strconv" "time" - "github.com/nats-io/nats.go" natslib "github.com/nats-io/nats.go" ) @@ -61,7 +60,7 @@ func init() { w.Header().Set("Content-Type", "application/octet-stream") w.WriteHeader(200) - opts := []nats.Option{natslib.Token(testingToken)} + opts := []natslib.Option{natslib.Token(testingToken)} nc, err := natslib.Connect(url, opts...) if err != nil { log.Println(err)