diff --git a/pkg/sinks/blackhole/blackhole.go b/pkg/sinks/blackhole/blackhole.go index 74ebe4041a..15754a706d 100644 --- a/pkg/sinks/blackhole/blackhole.go +++ b/pkg/sinks/blackhole/blackhole.go @@ -28,6 +28,7 @@ import ( sinkforward "github.com/numaproj/numaflow/pkg/sinks/forward" "github.com/numaproj/numaflow/pkg/watermark/fetch" "github.com/numaproj/numaflow/pkg/watermark/publish" + "github.com/numaproj/numaflow/pkg/watermark/wmb" ) // Blackhole is a sink to emulate /dev/null @@ -52,6 +53,7 @@ func NewBlackhole(vertex *dfv1.Vertex, fromBuffer isb.BufferReader, fetchWatermark fetch.Fetcher, publishWatermark publish.Publisher, + idleManager wmb.IdleManagerInterface, opts ...Option) (*Blackhole, error) { bh := new(Blackhole) @@ -75,7 +77,7 @@ func NewBlackhole(vertex *dfv1.Vertex, } } - isdf, err := sinkforward.NewDataForward(vertex, fromBuffer, bh, fetchWatermark, publishWatermark, forwardOpts...) + isdf, err := sinkforward.NewDataForward(vertex, fromBuffer, bh, fetchWatermark, publishWatermark, idleManager, forwardOpts...) if err != nil { return nil, err } diff --git a/pkg/sinks/blackhole/blackhole_test.go b/pkg/sinks/blackhole/blackhole_test.go index 4c6f76c29d..0432711d44 100644 --- a/pkg/sinks/blackhole/blackhole_test.go +++ b/pkg/sinks/blackhole/blackhole_test.go @@ -27,6 +27,7 @@ import ( "github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer" "github.com/numaproj/numaflow/pkg/isb/testutils" "github.com/numaproj/numaflow/pkg/watermark/generic" + "github.com/numaproj/numaflow/pkg/watermark/wmb" ) func TestBlackhole_Start(t *testing.T) { @@ -46,7 +47,7 @@ func TestBlackhole_Start(t *testing.T) { }, }} fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferList([]string{vertex.Spec.Name}) - s, err := NewBlackhole(vertex, fromStep, fetchWatermark, publishWatermark[vertex.Spec.Name]) + s, err := NewBlackhole(vertex, fromStep, fetchWatermark, publishWatermark[vertex.Spec.Name], wmb.NewIdleManager(1)) assert.NoError(t, err) stopped := s.Start() diff --git a/pkg/sinks/forward/forward.go b/pkg/sinks/forward/forward.go index c4ff3ab1a7..3fcfcd13a5 100644 --- a/pkg/sinks/forward/forward.go +++ b/pkg/sinks/forward/forward.go @@ -51,7 +51,7 @@ type DataForward struct { vertexName string pipelineName string // idleManager manages the idle watermark status. - idleManager *wmb.IdleManager + idleManager wmb.IdleManagerInterface // wmbChecker checks if the idle watermark is valid. wmbChecker wmb.WMBChecker Shutdown @@ -64,6 +64,7 @@ func NewDataForward( toStep isb.BufferWriter, fetchWatermark fetch.Fetcher, publishWatermark publish.Publisher, + idleManager wmb.IdleManagerInterface, opts ...Option) (*DataForward, error) { options := DefaultOptions() @@ -85,7 +86,7 @@ func NewDataForward( // should we do a check here for the values not being null? vertexName: vertex.Spec.Name, pipelineName: vertex.Spec.PipelineName, - idleManager: wmb.NewIdleManager(1), + idleManager: idleManager, wmbChecker: wmb.NewWMBChecker(2), // TODO: make configurable Shutdown: Shutdown{ rwlock: new(sync.RWMutex), diff --git a/pkg/sinks/forward/forward_test.go b/pkg/sinks/forward/forward_test.go index a1bda9c1f5..2715546b37 100644 --- a/pkg/sinks/forward/forward_test.go +++ b/pkg/sinks/forward/forward_test.go @@ -153,7 +153,7 @@ func TestNewDataForward(t *testing.T) { _, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps) fetchWatermark := &testForwardFetcher{} - f, err := NewDataForward(vertex, fromStep, to1, fetchWatermark, publishWatermark[testVertexName]) + f, err := NewDataForward(vertex, fromStep, to1, fetchWatermark, publishWatermark[testVertexName], wmb.NewIdleManager(1)) assert.NoError(t, err) assert.False(t, to1.IsFull()) @@ -232,7 +232,7 @@ func TestNewDataForward(t *testing.T) { publishWatermark := map[string]publish.Publisher{ testVertexName: &testForwarderPublisher{}, } - f, err := NewDataForward(vertex, fromStep, to1, fetchWatermark, publishWatermark[testVertexName]) + f, err := NewDataForward(vertex, fromStep, to1, fetchWatermark, publishWatermark[testVertexName], wmb.NewIdleManager(1)) assert.NoError(t, err) assert.False(t, to1.IsFull()) @@ -310,7 +310,7 @@ func TestWriteToBuffer(t *testing.T) { publishWatermark := map[string]publish.Publisher{ "to1": &testForwarderPublisher{}, } - f, err := NewDataForward(vertex, fromStep, buffer, fetchWatermark, publishWatermark["to1"]) + f, err := NewDataForward(vertex, fromStep, buffer, fetchWatermark, publishWatermark["to1"], wmb.NewIdleManager(1)) assert.NoError(t, err) assert.False(t, buffer.IsFull()) diff --git a/pkg/sinks/forward/shutdown_test.go b/pkg/sinks/forward/shutdown_test.go index f35785b641..3307b8b0eb 100644 --- a/pkg/sinks/forward/shutdown_test.go +++ b/pkg/sinks/forward/shutdown_test.go @@ -28,6 +28,7 @@ import ( "github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer" "github.com/numaproj/numaflow/pkg/isb/testutils" "github.com/numaproj/numaflow/pkg/watermark/generic" + "github.com/numaproj/numaflow/pkg/watermark/wmb" ) func TestShutDown(t *testing.T) { @@ -66,7 +67,7 @@ func TestShutDown(t *testing.T) { }} fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps) - f, err := NewDataForward(vertex, fromStep, to1, fetchWatermark, publishWatermark["to1"], WithReadBatchSize(batchSize)) + f, err := NewDataForward(vertex, fromStep, to1, fetchWatermark, publishWatermark["to1"], wmb.NewIdleManager(1), WithReadBatchSize(batchSize)) assert.NoError(t, err) stopped := f.Start() // write some data but buffer is not full even though we are not reading @@ -99,7 +100,7 @@ func TestShutDown(t *testing.T) { }} fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps) - f, err := NewDataForward(vertex, fromStep, to1, fetchWatermark, publishWatermark["to1"], WithReadBatchSize(batchSize)) + f, err := NewDataForward(vertex, fromStep, to1, fetchWatermark, publishWatermark["to1"], wmb.NewIdleManager(1), WithReadBatchSize(batchSize)) assert.NoError(t, err) stopped := f.Start() // write some data such that the fromBufferPartition can be empty, that is toBuffer gets full diff --git a/pkg/sinks/kafka/kafka.go b/pkg/sinks/kafka/kafka.go index 6bbac5133a..c4d5ac0cb4 100644 --- a/pkg/sinks/kafka/kafka.go +++ b/pkg/sinks/kafka/kafka.go @@ -32,6 +32,7 @@ import ( sinkforward "github.com/numaproj/numaflow/pkg/sinks/forward" "github.com/numaproj/numaflow/pkg/watermark/fetch" "github.com/numaproj/numaflow/pkg/watermark/publish" + "github.com/numaproj/numaflow/pkg/watermark/wmb" ) // ToKafka produce the output to a kafka sinks. @@ -60,6 +61,7 @@ func NewToKafka(vertex *dfv1.Vertex, fromBuffer isb.BufferReader, fetchWatermark fetch.Fetcher, publishWatermark publish.Publisher, + idleManager wmb.IdleManagerInterface, opts ...Option) (*ToKafka, error) { kafkaSink := vertex.Spec.Sink.Kafka @@ -88,7 +90,7 @@ func NewToKafka(vertex *dfv1.Vertex, } } - f, err := sinkforward.NewDataForward(vertex, fromBuffer, toKafka, fetchWatermark, publishWatermark, forwardOpts...) + f, err := sinkforward.NewDataForward(vertex, fromBuffer, toKafka, fetchWatermark, publishWatermark, idleManager, forwardOpts...) if err != nil { return nil, err } diff --git a/pkg/sinks/kafka/kafka_test.go b/pkg/sinks/kafka/kafka_test.go index 18a0c08a6b..33711d462b 100644 --- a/pkg/sinks/kafka/kafka_test.go +++ b/pkg/sinks/kafka/kafka_test.go @@ -30,6 +30,7 @@ import ( "github.com/numaproj/numaflow/pkg/shared/logging" sinkforward "github.com/numaproj/numaflow/pkg/sinks/forward" "github.com/numaproj/numaflow/pkg/watermark/generic" + "github.com/numaproj/numaflow/pkg/watermark/wmb" ) func TestWriteSuccessToKafka(t *testing.T) { @@ -46,7 +47,7 @@ func TestWriteSuccessToKafka(t *testing.T) { }, }} fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferList([]string{vertex.Spec.Name}) - toKafka.isdf, err = sinkforward.NewDataForward(vertex, fromStep, toKafka, fetchWatermark, publishWatermark["testVertex"]) + toKafka.isdf, err = sinkforward.NewDataForward(vertex, fromStep, toKafka, fetchWatermark, publishWatermark["testVertex"], wmb.NewIdleManager(1)) assert.NoError(t, err) toKafka.kafkaSink = vertex.Spec.Sink.Kafka toKafka.name = "Test" @@ -99,7 +100,7 @@ func TestWriteFailureToKafka(t *testing.T) { }} toSteps := map[string][]isb.BufferWriter{vertex.Spec.Name: {toKafka}} fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps) - toKafka.isdf, err = sinkforward.NewDataForward(vertex, fromStep, toKafka, fetchWatermark, publishWatermark["testVertex"]) + toKafka.isdf, err = sinkforward.NewDataForward(vertex, fromStep, toKafka, fetchWatermark, publishWatermark["testVertex"], wmb.NewIdleManager(1)) assert.NoError(t, err) toKafka.name = "Test" toKafka.topic = "topic-1" diff --git a/pkg/sinks/logger/log.go b/pkg/sinks/logger/log.go index 69ac398957..f499324973 100644 --- a/pkg/sinks/logger/log.go +++ b/pkg/sinks/logger/log.go @@ -29,6 +29,7 @@ import ( sinkforward "github.com/numaproj/numaflow/pkg/sinks/forward" "github.com/numaproj/numaflow/pkg/watermark/fetch" "github.com/numaproj/numaflow/pkg/watermark/publish" + "github.com/numaproj/numaflow/pkg/watermark/wmb" ) // ToLog prints the output to a log sinks. @@ -53,6 +54,7 @@ func NewToLog(vertex *dfv1.Vertex, fromBuffer isb.BufferReader, fetchWatermark fetch.Fetcher, publishWatermark publish.Publisher, + idleManager wmb.IdleManagerInterface, opts ...Option) (*ToLog, error) { toLog := new(ToLog) @@ -76,7 +78,7 @@ func NewToLog(vertex *dfv1.Vertex, } } - isdf, err := sinkforward.NewDataForward(vertex, fromBuffer, toLog, fetchWatermark, publishWatermark, forwardOpts...) + isdf, err := sinkforward.NewDataForward(vertex, fromBuffer, toLog, fetchWatermark, publishWatermark, idleManager, forwardOpts...) if err != nil { return nil, err } diff --git a/pkg/sinks/logger/log_test.go b/pkg/sinks/logger/log_test.go index 14a2ab22da..3c4a3533f8 100644 --- a/pkg/sinks/logger/log_test.go +++ b/pkg/sinks/logger/log_test.go @@ -29,6 +29,7 @@ import ( "github.com/numaproj/numaflow/pkg/isb/testutils" sinkforward "github.com/numaproj/numaflow/pkg/sinks/forward" "github.com/numaproj/numaflow/pkg/watermark/generic" + "github.com/numaproj/numaflow/pkg/watermark/wmb" ) var ( @@ -52,7 +53,7 @@ func TestToLog_Start(t *testing.T) { }, }} fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferList([]string{vertex.Spec.Name}) - s, err := NewToLog(vertex, fromStep, fetchWatermark, publishWatermark[vertex.Spec.Name]) + s, err := NewToLog(vertex, fromStep, fetchWatermark, publishWatermark[vertex.Spec.Name], wmb.NewIdleManager(1)) assert.NoError(t, err) stopped := s.Start() @@ -103,7 +104,7 @@ func TestToLog_Forward(t *testing.T) { }} fetchWatermark1, publishWatermark1 := generic.BuildNoOpWatermarkProgressorsFromBufferList([]string{vertex1.Spec.Name}) - logger1, _ := NewToLog(vertex1, to1, fetchWatermark1, publishWatermark1[vertex1.Spec.Name]) + logger1, _ := NewToLog(vertex1, to1, fetchWatermark1, publishWatermark1[vertex1.Spec.Name], wmb.NewIdleManager(1)) logger1Stopped := logger1.Start() toSteps := map[string][]isb.BufferWriter{ @@ -113,7 +114,7 @@ func TestToLog_Forward(t *testing.T) { writeMessages := testutils.BuildTestWriteMessages(int64(20), testStartTime) fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps) - f, err := sinkforward.NewDataForward(vertex1, fromStep, to1, fetchWatermark, publishWatermark["sinks.logger1"], sinkforward.WithReadBatchSize(batchSize)) + f, err := sinkforward.NewDataForward(vertex1, fromStep, to1, fetchWatermark, publishWatermark["sinks.logger1"], wmb.NewIdleManager(1), sinkforward.WithReadBatchSize(batchSize)) assert.NoError(t, err) stopped := f.Start() diff --git a/pkg/sinks/sink.go b/pkg/sinks/sink.go index 81e51434c2..84a567f6d5 100644 --- a/pkg/sinks/sink.go +++ b/pkg/sinks/sink.go @@ -26,6 +26,7 @@ import ( sharedutil "github.com/numaproj/numaflow/pkg/shared/util" "github.com/numaproj/numaflow/pkg/watermark/generic/jetstream" "github.com/numaproj/numaflow/pkg/watermark/store" + "github.com/numaproj/numaflow/pkg/watermark/wmb" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" "github.com/numaproj/numaflow/pkg/isb" @@ -58,6 +59,7 @@ func (u *SinkProcessor) Start(ctx context.Context) error { err error fromVertexWmStores map[string]store.WatermarkStore sinkWmStores map[string]store.WatermarkStore + idleManager wmb.IdleManagerInterface sdkClient sinkclient.Client sinkHandler *udsink.UDSgRPCBasedUDSink ) @@ -74,6 +76,7 @@ func (u *SinkProcessor) Start(ctx context.Context) error { // publishWatermark is a map representing a progressor per edge, we are initializing them to a no-op progressor // For sinks, the buffer name is the vertex name fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferList([]string{u.VertexInstance.Vertex.Spec.Name}) + idleManager = wmb.NewNoopIdleManager() switch u.ISBSvcType { case dfv1.ISBSvcTypeRedis: @@ -108,9 +111,7 @@ func (u *SinkProcessor) Start(ctx context.Context) error { } if u.VertexInstance.Vertex.Spec.Watermark.Disabled { - // sink has no to buffers, so we use the vertex name to publish the watermark - names := []string{u.VertexInstance.Vertex.Spec.Name} - fetchWatermark, publishWatermark = generic.BuildNoOpWatermarkProgressorsFromBufferList(names) + // use default no op fetcher, publisher, idleManager } else { // build from vertex watermark stores fromVertexWmStores, err = jetstream.BuildFromVertexWatermarkStores(ctx, u.VertexInstance, natsClientPool.NextAvailableClient()) @@ -129,7 +130,10 @@ func (u *SinkProcessor) Start(ctx context.Context) error { // create watermark publisher using watermark stores publishWatermark = jetstream.BuildPublishersFromStores(ctx, u.VertexInstance, sinkWmStores) + + idleManager = wmb.NewIdleManager(1) } + default: return fmt.Errorf("unrecognized isb svc type %q", u.ISBSvcType) } @@ -160,7 +164,7 @@ func (u *SinkProcessor) Start(ctx context.Context) error { var finalWg sync.WaitGroup for index := range u.VertexInstance.Vertex.OwnedBuffers() { finalWg.Add(1) - sinker, err := u.getSinker(readers[index], log, fetchWatermark, publishWatermark[u.VertexInstance.Vertex.Spec.Name], sinkHandler) + sinker, err := u.getSinker(readers[index], log, fetchWatermark, publishWatermark[u.VertexInstance.Vertex.Spec.Name], idleManager, sinkHandler) if err != nil { return fmt.Errorf("failed to find a sink, errpr: %w", err) } @@ -225,17 +229,17 @@ func (u *SinkProcessor) Start(ctx context.Context) error { } // getSinker takes in the logger from the parent context -func (u *SinkProcessor) getSinker(reader isb.BufferReader, logger *zap.SugaredLogger, fetchWM fetch.Fetcher, publishWM publish.Publisher, sinkHandler udsink.SinkApplier) (Sinker, error) { +func (u *SinkProcessor) getSinker(reader isb.BufferReader, logger *zap.SugaredLogger, fetchWM fetch.Fetcher, publishWM publish.Publisher, idleManager wmb.IdleManagerInterface, sinkHandler udsink.SinkApplier) (Sinker, error) { sink := u.VertexInstance.Vertex.Spec.Sink if x := sink.Log; x != nil { - return logsink.NewToLog(u.VertexInstance.Vertex, reader, fetchWM, publishWM, logsink.WithLogger(logger)) + return logsink.NewToLog(u.VertexInstance.Vertex, reader, fetchWM, publishWM, idleManager, logsink.WithLogger(logger)) } else if x := sink.Kafka; x != nil { - return kafkasink.NewToKafka(u.VertexInstance.Vertex, reader, fetchWM, publishWM, kafkasink.WithLogger(logger)) + return kafkasink.NewToKafka(u.VertexInstance.Vertex, reader, fetchWM, publishWM, idleManager, kafkasink.WithLogger(logger)) } else if x := sink.Blackhole; x != nil { - return blackhole.NewBlackhole(u.VertexInstance.Vertex, reader, fetchWM, publishWM, blackhole.WithLogger(logger)) + return blackhole.NewBlackhole(u.VertexInstance.Vertex, reader, fetchWM, publishWM, idleManager, blackhole.WithLogger(logger)) } else if x := sink.UDSink; x != nil { // if the sink is a user defined sink, then we need to pass the sinkHandler to it which will be used to invoke the user defined sink - return udsink.NewUserDefinedSink(u.VertexInstance.Vertex, reader, fetchWM, publishWM, sinkHandler, udsink.WithLogger(logger)) + return udsink.NewUserDefinedSink(u.VertexInstance.Vertex, reader, fetchWM, publishWM, idleManager, sinkHandler, udsink.WithLogger(logger)) } return nil, fmt.Errorf("invalid sink spec") } diff --git a/pkg/sinks/udsink/sink.go b/pkg/sinks/udsink/sink.go index 3efa958fe3..f6f5d06e00 100644 --- a/pkg/sinks/udsink/sink.go +++ b/pkg/sinks/udsink/sink.go @@ -30,6 +30,7 @@ import ( sinkforward "github.com/numaproj/numaflow/pkg/sinks/forward" "github.com/numaproj/numaflow/pkg/watermark/fetch" "github.com/numaproj/numaflow/pkg/watermark/publish" + "github.com/numaproj/numaflow/pkg/watermark/wmb" ) type UserDefinedSink struct { @@ -54,6 +55,7 @@ func NewUserDefinedSink(vertex *dfv1.Vertex, fromBuffer isb.BufferReader, fetchWatermark fetch.Fetcher, publishWatermark publish.Publisher, + idleManager wmb.IdleManagerInterface, udsink SinkApplier, opts ...Option) (*UserDefinedSink, error) { @@ -77,7 +79,7 @@ func NewUserDefinedSink(vertex *dfv1.Vertex, } } s.udsink = udsink - isdf, err := sinkforward.NewDataForward(vertex, fromBuffer, s, fetchWatermark, publishWatermark, forwardOpts...) + isdf, err := sinkforward.NewDataForward(vertex, fromBuffer, s, fetchWatermark, publishWatermark, idleManager, forwardOpts...) if err != nil { return nil, err }