Skip to content

Commit

Permalink
add idleManager to sink forward
Browse files Browse the repository at this point in the history
Signed-off-by: jyu6 <[email protected]>
  • Loading branch information
jyu6 committed Sep 7, 2023
1 parent 2e637f8 commit 17da2b1
Show file tree
Hide file tree
Showing 11 changed files with 43 additions and 26 deletions.
4 changes: 3 additions & 1 deletion pkg/sinks/blackhole/blackhole.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
sinkforward "github.com/numaproj/numaflow/pkg/sinks/forward"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/publish"
"github.com/numaproj/numaflow/pkg/watermark/wmb"
)

// Blackhole is a sink to emulate /dev/null
Expand All @@ -52,6 +53,7 @@ func NewBlackhole(vertex *dfv1.Vertex,
fromBuffer isb.BufferReader,
fetchWatermark fetch.Fetcher,
publishWatermark publish.Publisher,
idleManager wmb.IdleManagerInterface,
opts ...Option) (*Blackhole, error) {

bh := new(Blackhole)
Expand All @@ -75,7 +77,7 @@ func NewBlackhole(vertex *dfv1.Vertex,
}
}

isdf, err := sinkforward.NewDataForward(vertex, fromBuffer, bh, fetchWatermark, publishWatermark, forwardOpts...)
isdf, err := sinkforward.NewDataForward(vertex, fromBuffer, bh, fetchWatermark, publishWatermark, idleManager, forwardOpts...)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/sinks/blackhole/blackhole_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer"
"github.com/numaproj/numaflow/pkg/isb/testutils"
"github.com/numaproj/numaflow/pkg/watermark/generic"
"github.com/numaproj/numaflow/pkg/watermark/wmb"
)

func TestBlackhole_Start(t *testing.T) {
Expand All @@ -46,7 +47,7 @@ func TestBlackhole_Start(t *testing.T) {
},
}}
fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferList([]string{vertex.Spec.Name})
s, err := NewBlackhole(vertex, fromStep, fetchWatermark, publishWatermark[vertex.Spec.Name])
s, err := NewBlackhole(vertex, fromStep, fetchWatermark, publishWatermark[vertex.Spec.Name], wmb.NewIdleManager(1))
assert.NoError(t, err)

stopped := s.Start()
Expand Down
5 changes: 3 additions & 2 deletions pkg/sinks/forward/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type DataForward struct {
vertexName string
pipelineName string
// idleManager manages the idle watermark status.
idleManager *wmb.IdleManager
idleManager wmb.IdleManagerInterface
// wmbChecker checks if the idle watermark is valid.
wmbChecker wmb.WMBChecker
Shutdown
Expand All @@ -64,6 +64,7 @@ func NewDataForward(
toStep isb.BufferWriter,
fetchWatermark fetch.Fetcher,
publishWatermark publish.Publisher,
idleManager wmb.IdleManagerInterface,
opts ...Option) (*DataForward, error) {

options := DefaultOptions()
Expand All @@ -85,7 +86,7 @@ func NewDataForward(
// should we do a check here for the values not being null?
vertexName: vertex.Spec.Name,
pipelineName: vertex.Spec.PipelineName,
idleManager: wmb.NewIdleManager(1),
idleManager: idleManager,
wmbChecker: wmb.NewWMBChecker(2), // TODO: make configurable
Shutdown: Shutdown{
rwlock: new(sync.RWMutex),
Expand Down
6 changes: 3 additions & 3 deletions pkg/sinks/forward/forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func TestNewDataForward(t *testing.T) {

_, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps)
fetchWatermark := &testForwardFetcher{}
f, err := NewDataForward(vertex, fromStep, to1, fetchWatermark, publishWatermark[testVertexName])
f, err := NewDataForward(vertex, fromStep, to1, fetchWatermark, publishWatermark[testVertexName], wmb.NewIdleManager(1))

assert.NoError(t, err)
assert.False(t, to1.IsFull())
Expand Down Expand Up @@ -232,7 +232,7 @@ func TestNewDataForward(t *testing.T) {
publishWatermark := map[string]publish.Publisher{
testVertexName: &testForwarderPublisher{},
}
f, err := NewDataForward(vertex, fromStep, to1, fetchWatermark, publishWatermark[testVertexName])
f, err := NewDataForward(vertex, fromStep, to1, fetchWatermark, publishWatermark[testVertexName], wmb.NewIdleManager(1))

assert.NoError(t, err)
assert.False(t, to1.IsFull())
Expand Down Expand Up @@ -310,7 +310,7 @@ func TestWriteToBuffer(t *testing.T) {
publishWatermark := map[string]publish.Publisher{
"to1": &testForwarderPublisher{},
}
f, err := NewDataForward(vertex, fromStep, buffer, fetchWatermark, publishWatermark["to1"])
f, err := NewDataForward(vertex, fromStep, buffer, fetchWatermark, publishWatermark["to1"], wmb.NewIdleManager(1))

assert.NoError(t, err)
assert.False(t, buffer.IsFull())
Expand Down
5 changes: 3 additions & 2 deletions pkg/sinks/forward/shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer"
"github.com/numaproj/numaflow/pkg/isb/testutils"
"github.com/numaproj/numaflow/pkg/watermark/generic"
"github.com/numaproj/numaflow/pkg/watermark/wmb"
)

func TestShutDown(t *testing.T) {
Expand Down Expand Up @@ -66,7 +67,7 @@ func TestShutDown(t *testing.T) {
}}

fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps)
f, err := NewDataForward(vertex, fromStep, to1, fetchWatermark, publishWatermark["to1"], WithReadBatchSize(batchSize))
f, err := NewDataForward(vertex, fromStep, to1, fetchWatermark, publishWatermark["to1"], wmb.NewIdleManager(1), WithReadBatchSize(batchSize))
assert.NoError(t, err)
stopped := f.Start()
// write some data but buffer is not full even though we are not reading
Expand Down Expand Up @@ -99,7 +100,7 @@ func TestShutDown(t *testing.T) {
}}

fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps)
f, err := NewDataForward(vertex, fromStep, to1, fetchWatermark, publishWatermark["to1"], WithReadBatchSize(batchSize))
f, err := NewDataForward(vertex, fromStep, to1, fetchWatermark, publishWatermark["to1"], wmb.NewIdleManager(1), WithReadBatchSize(batchSize))
assert.NoError(t, err)
stopped := f.Start()
// write some data such that the fromBufferPartition can be empty, that is toBuffer gets full
Expand Down
4 changes: 3 additions & 1 deletion pkg/sinks/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
sinkforward "github.com/numaproj/numaflow/pkg/sinks/forward"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/publish"
"github.com/numaproj/numaflow/pkg/watermark/wmb"
)

// ToKafka produce the output to a kafka sinks.
Expand Down Expand Up @@ -60,6 +61,7 @@ func NewToKafka(vertex *dfv1.Vertex,
fromBuffer isb.BufferReader,
fetchWatermark fetch.Fetcher,
publishWatermark publish.Publisher,
idleManager wmb.IdleManagerInterface,
opts ...Option) (*ToKafka, error) {

kafkaSink := vertex.Spec.Sink.Kafka
Expand Down Expand Up @@ -88,7 +90,7 @@ func NewToKafka(vertex *dfv1.Vertex,
}
}

f, err := sinkforward.NewDataForward(vertex, fromBuffer, toKafka, fetchWatermark, publishWatermark, forwardOpts...)
f, err := sinkforward.NewDataForward(vertex, fromBuffer, toKafka, fetchWatermark, publishWatermark, idleManager, forwardOpts...)
if err != nil {
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/sinks/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/numaproj/numaflow/pkg/shared/logging"
sinkforward "github.com/numaproj/numaflow/pkg/sinks/forward"
"github.com/numaproj/numaflow/pkg/watermark/generic"
"github.com/numaproj/numaflow/pkg/watermark/wmb"
)

func TestWriteSuccessToKafka(t *testing.T) {
Expand All @@ -46,7 +47,7 @@ func TestWriteSuccessToKafka(t *testing.T) {
},
}}
fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferList([]string{vertex.Spec.Name})
toKafka.isdf, err = sinkforward.NewDataForward(vertex, fromStep, toKafka, fetchWatermark, publishWatermark["testVertex"])
toKafka.isdf, err = sinkforward.NewDataForward(vertex, fromStep, toKafka, fetchWatermark, publishWatermark["testVertex"], wmb.NewIdleManager(1))
assert.NoError(t, err)
toKafka.kafkaSink = vertex.Spec.Sink.Kafka
toKafka.name = "Test"
Expand Down Expand Up @@ -99,7 +100,7 @@ func TestWriteFailureToKafka(t *testing.T) {
}}
toSteps := map[string][]isb.BufferWriter{vertex.Spec.Name: {toKafka}}
fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps)
toKafka.isdf, err = sinkforward.NewDataForward(vertex, fromStep, toKafka, fetchWatermark, publishWatermark["testVertex"])
toKafka.isdf, err = sinkforward.NewDataForward(vertex, fromStep, toKafka, fetchWatermark, publishWatermark["testVertex"], wmb.NewIdleManager(1))
assert.NoError(t, err)
toKafka.name = "Test"
toKafka.topic = "topic-1"
Expand Down
4 changes: 3 additions & 1 deletion pkg/sinks/logger/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
sinkforward "github.com/numaproj/numaflow/pkg/sinks/forward"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/publish"
"github.com/numaproj/numaflow/pkg/watermark/wmb"
)

// ToLog prints the output to a log sinks.
Expand All @@ -53,6 +54,7 @@ func NewToLog(vertex *dfv1.Vertex,
fromBuffer isb.BufferReader,
fetchWatermark fetch.Fetcher,
publishWatermark publish.Publisher,
idleManager wmb.IdleManagerInterface,
opts ...Option) (*ToLog, error) {

toLog := new(ToLog)
Expand All @@ -76,7 +78,7 @@ func NewToLog(vertex *dfv1.Vertex,
}
}

isdf, err := sinkforward.NewDataForward(vertex, fromBuffer, toLog, fetchWatermark, publishWatermark, forwardOpts...)
isdf, err := sinkforward.NewDataForward(vertex, fromBuffer, toLog, fetchWatermark, publishWatermark, idleManager, forwardOpts...)
if err != nil {
return nil, err
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/sinks/logger/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/numaproj/numaflow/pkg/isb/testutils"
sinkforward "github.com/numaproj/numaflow/pkg/sinks/forward"
"github.com/numaproj/numaflow/pkg/watermark/generic"
"github.com/numaproj/numaflow/pkg/watermark/wmb"
)

var (
Expand All @@ -52,7 +53,7 @@ func TestToLog_Start(t *testing.T) {
},
}}
fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferList([]string{vertex.Spec.Name})
s, err := NewToLog(vertex, fromStep, fetchWatermark, publishWatermark[vertex.Spec.Name])
s, err := NewToLog(vertex, fromStep, fetchWatermark, publishWatermark[vertex.Spec.Name], wmb.NewIdleManager(1))
assert.NoError(t, err)

stopped := s.Start()
Expand Down Expand Up @@ -103,7 +104,7 @@ func TestToLog_Forward(t *testing.T) {
}}

fetchWatermark1, publishWatermark1 := generic.BuildNoOpWatermarkProgressorsFromBufferList([]string{vertex1.Spec.Name})
logger1, _ := NewToLog(vertex1, to1, fetchWatermark1, publishWatermark1[vertex1.Spec.Name])
logger1, _ := NewToLog(vertex1, to1, fetchWatermark1, publishWatermark1[vertex1.Spec.Name], wmb.NewIdleManager(1))
logger1Stopped := logger1.Start()

toSteps := map[string][]isb.BufferWriter{
Expand All @@ -113,7 +114,7 @@ func TestToLog_Forward(t *testing.T) {
writeMessages := testutils.BuildTestWriteMessages(int64(20), testStartTime)

fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps)
f, err := sinkforward.NewDataForward(vertex1, fromStep, to1, fetchWatermark, publishWatermark["sinks.logger1"], sinkforward.WithReadBatchSize(batchSize))
f, err := sinkforward.NewDataForward(vertex1, fromStep, to1, fetchWatermark, publishWatermark["sinks.logger1"], wmb.NewIdleManager(1), sinkforward.WithReadBatchSize(batchSize))
assert.NoError(t, err)

stopped := f.Start()
Expand Down
22 changes: 13 additions & 9 deletions pkg/sinks/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
sharedutil "github.com/numaproj/numaflow/pkg/shared/util"
"github.com/numaproj/numaflow/pkg/watermark/generic/jetstream"
"github.com/numaproj/numaflow/pkg/watermark/store"
"github.com/numaproj/numaflow/pkg/watermark/wmb"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/isb"
Expand Down Expand Up @@ -58,6 +59,7 @@ func (u *SinkProcessor) Start(ctx context.Context) error {
err error
fromVertexWmStores map[string]store.WatermarkStore
sinkWmStores map[string]store.WatermarkStore
idleManager wmb.IdleManagerInterface
sdkClient sinkclient.Client
sinkHandler *udsink.UDSgRPCBasedUDSink
)
Expand All @@ -74,6 +76,7 @@ func (u *SinkProcessor) Start(ctx context.Context) error {
// publishWatermark is a map representing a progressor per edge, we are initializing them to a no-op progressor
// For sinks, the buffer name is the vertex name
fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferList([]string{u.VertexInstance.Vertex.Spec.Name})
idleManager = wmb.NewNoopIdleManager()

switch u.ISBSvcType {
case dfv1.ISBSvcTypeRedis:
Expand Down Expand Up @@ -108,9 +111,7 @@ func (u *SinkProcessor) Start(ctx context.Context) error {
}

if u.VertexInstance.Vertex.Spec.Watermark.Disabled {
// sink has no to buffers, so we use the vertex name to publish the watermark
names := []string{u.VertexInstance.Vertex.Spec.Name}
fetchWatermark, publishWatermark = generic.BuildNoOpWatermarkProgressorsFromBufferList(names)
// use default no op fetcher, publisher, idleManager
} else {
// build from vertex watermark stores
fromVertexWmStores, err = jetstream.BuildFromVertexWatermarkStores(ctx, u.VertexInstance, natsClientPool.NextAvailableClient())
Expand All @@ -129,7 +130,10 @@ func (u *SinkProcessor) Start(ctx context.Context) error {

// create watermark publisher using watermark stores
publishWatermark = jetstream.BuildPublishersFromStores(ctx, u.VertexInstance, sinkWmStores)

idleManager = wmb.NewIdleManager(1)
}

default:
return fmt.Errorf("unrecognized isb svc type %q", u.ISBSvcType)
}
Expand Down Expand Up @@ -160,7 +164,7 @@ func (u *SinkProcessor) Start(ctx context.Context) error {
var finalWg sync.WaitGroup
for index := range u.VertexInstance.Vertex.OwnedBuffers() {
finalWg.Add(1)
sinker, err := u.getSinker(readers[index], log, fetchWatermark, publishWatermark[u.VertexInstance.Vertex.Spec.Name], sinkHandler)
sinker, err := u.getSinker(readers[index], log, fetchWatermark, publishWatermark[u.VertexInstance.Vertex.Spec.Name], idleManager, sinkHandler)
if err != nil {
return fmt.Errorf("failed to find a sink, errpr: %w", err)
}
Expand Down Expand Up @@ -225,17 +229,17 @@ func (u *SinkProcessor) Start(ctx context.Context) error {
}

// getSinker takes in the logger from the parent context
func (u *SinkProcessor) getSinker(reader isb.BufferReader, logger *zap.SugaredLogger, fetchWM fetch.Fetcher, publishWM publish.Publisher, sinkHandler udsink.SinkApplier) (Sinker, error) {
func (u *SinkProcessor) getSinker(reader isb.BufferReader, logger *zap.SugaredLogger, fetchWM fetch.Fetcher, publishWM publish.Publisher, idleManager wmb.IdleManagerInterface, sinkHandler udsink.SinkApplier) (Sinker, error) {
sink := u.VertexInstance.Vertex.Spec.Sink
if x := sink.Log; x != nil {
return logsink.NewToLog(u.VertexInstance.Vertex, reader, fetchWM, publishWM, logsink.WithLogger(logger))
return logsink.NewToLog(u.VertexInstance.Vertex, reader, fetchWM, publishWM, idleManager, logsink.WithLogger(logger))
} else if x := sink.Kafka; x != nil {
return kafkasink.NewToKafka(u.VertexInstance.Vertex, reader, fetchWM, publishWM, kafkasink.WithLogger(logger))
return kafkasink.NewToKafka(u.VertexInstance.Vertex, reader, fetchWM, publishWM, idleManager, kafkasink.WithLogger(logger))
} else if x := sink.Blackhole; x != nil {
return blackhole.NewBlackhole(u.VertexInstance.Vertex, reader, fetchWM, publishWM, blackhole.WithLogger(logger))
return blackhole.NewBlackhole(u.VertexInstance.Vertex, reader, fetchWM, publishWM, idleManager, blackhole.WithLogger(logger))
} else if x := sink.UDSink; x != nil {
// if the sink is a user defined sink, then we need to pass the sinkHandler to it which will be used to invoke the user defined sink
return udsink.NewUserDefinedSink(u.VertexInstance.Vertex, reader, fetchWM, publishWM, sinkHandler, udsink.WithLogger(logger))
return udsink.NewUserDefinedSink(u.VertexInstance.Vertex, reader, fetchWM, publishWM, idleManager, sinkHandler, udsink.WithLogger(logger))
}
return nil, fmt.Errorf("invalid sink spec")
}
4 changes: 3 additions & 1 deletion pkg/sinks/udsink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
sinkforward "github.com/numaproj/numaflow/pkg/sinks/forward"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/publish"
"github.com/numaproj/numaflow/pkg/watermark/wmb"
)

type UserDefinedSink struct {
Expand All @@ -54,6 +55,7 @@ func NewUserDefinedSink(vertex *dfv1.Vertex,
fromBuffer isb.BufferReader,
fetchWatermark fetch.Fetcher,
publishWatermark publish.Publisher,
idleManager wmb.IdleManagerInterface,
udsink SinkApplier,
opts ...Option) (*UserDefinedSink, error) {

Expand All @@ -77,7 +79,7 @@ func NewUserDefinedSink(vertex *dfv1.Vertex,
}
}
s.udsink = udsink
isdf, err := sinkforward.NewDataForward(vertex, fromBuffer, s, fetchWatermark, publishWatermark, forwardOpts...)
isdf, err := sinkforward.NewDataForward(vertex, fromBuffer, s, fetchWatermark, publishWatermark, idleManager, forwardOpts...)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 17da2b1

Please sign in to comment.