diff --git a/pkg/reduce/data_forward_test.go b/pkg/reduce/data_forward_test.go index d1306920d1..2f1ae6f84c 100644 --- a/pkg/reduce/data_forward_test.go +++ b/pkg/reduce/data_forward_test.go @@ -46,9 +46,11 @@ import ( "github.com/numaproj/numaflow/pkg/window/strategy/fixed" ) +const pipelineName = "testPipeline" + var keyedVertex = &dfv1.VertexInstance{ Vertex: &dfv1.Vertex{Spec: dfv1.VertexSpec{ - PipelineName: "testPipeline", + PipelineName: pipelineName, AbstractVertex: dfv1.AbstractVertex{ Name: "testVertex", UDF: &dfv1.UDF{GroupBy: &dfv1.GroupBy{Keyed: true}}, @@ -60,7 +62,7 @@ var keyedVertex = &dfv1.VertexInstance{ var nonKeyedVertex = &dfv1.VertexInstance{ Vertex: &dfv1.Vertex{Spec: dfv1.VertexSpec{ - PipelineName: "testPipeline", + PipelineName: pipelineName, AbstractVertex: dfv1.AbstractVertex{ Name: "testVertex", UDF: &dfv1.UDF{GroupBy: &dfv1.GroupBy{Keyed: false}}, @@ -275,7 +277,7 @@ func TestDataForward_StartWithNoOpWM(t *testing.T) { var pbqManager *pbq.Manager // create pbqManager - pbqManager, err = pbq.NewManager(child, "reduce", "test-pipeline", 0, memory.NewMemoryStores(memory.WithStoreSize(100)), + pbqManager, err = pbq.NewManager(child, "reduce", pipelineName, 0, memory.NewMemoryStores(memory.WithStoreSize(100)), pbq.WithReadTimeout(1*time.Second), pbq.WithChannelBufferSize(10)) assert.NoError(t, err) @@ -341,7 +343,6 @@ func TestReduceDataForward_IdleWM(t *testing.T) { startTime = 1679961600000 // time in millis fromBufferName = "source-reduce-buffer" toVertexName = "reduce-to-vertex" - pipelineName = "test-reduce-pipeline" err error ) defer cancel() @@ -546,7 +547,6 @@ func TestReduceDataForward_Count(t *testing.T) { startTime = 60000 // time in millis fromBufferName = "source-reduce-buffer" toVertexName = "reduce-to-vertex" - pipelineName = "test-reduce-pipeline" err error ) @@ -621,7 +621,6 @@ func TestReduceDataForward_AllowedLatencyCount(t *testing.T) { startTime = 60000 // time in millis fromBufferName = "source-reduce-buffer" toVertexName = "reduce-to-vertex" - pipelineName = "test-reduce-pipeline" err error ) @@ -700,7 +699,6 @@ func TestReduceDataForward_Sum(t *testing.T) { startTime = 0 // time in millis fromBufferName = "source-reduce-buffer" toVertexName = "reduce-to-vertex" - pipelineName = "test-reduce-pipeline" err error ) @@ -775,7 +773,6 @@ func TestReduceDataForward_Max(t *testing.T) { startTime = 0 // time in millis fromBufferName = "source-reduce-buffer" toVertexName = "reduce-to-vertex" - pipelineName = "test-reduce-pipeline" err error ) @@ -851,7 +848,6 @@ func TestReduceDataForward_SumWithDifferentKeys(t *testing.T) { startTime = 0 // time in millis fromBufferName = "source-reduce-buffer" toVertexName = "reduce-to-vertex" - pipelineName = "test-reduce-pipeline" err error ) @@ -868,7 +864,7 @@ func TestReduceDataForward_SumWithDifferentKeys(t *testing.T) { // create pbq manager var pbqManager *pbq.Manager - pbqManager, err = pbq.NewManager(ctx, "reduce", "test-pipeline", 0, memory.NewMemoryStores(memory.WithStoreSize(1000)), + pbqManager, err = pbq.NewManager(ctx, "reduce", pipelineName, 0, memory.NewMemoryStores(memory.WithStoreSize(1000)), pbq.WithReadTimeout(1*time.Second), pbq.WithChannelBufferSize(10)) assert.NoError(t, err) @@ -947,7 +943,6 @@ func TestReduceDataForward_NonKeyed(t *testing.T) { startTime = 0 // time in millis fromBufferName = "source-reduce-buffer" toVertexName = "reduce-to-vertex" - pipelineName = "test-reduce-pipeline" err error ) @@ -1025,7 +1020,6 @@ func TestDataForward_WithContextClose(t *testing.T) { startTime = 0 // time in millis fromBufferName = "source-reduce-buffer" toVertexName = "reduce-to-vertex" - pipelineName = "test-reduce-pipeline" err error ) @@ -1048,7 +1042,7 @@ func TestDataForward_WithContextClose(t *testing.T) { // create pbq manager var pbqManager *pbq.Manager - pbqManager, err = pbq.NewManager(cctx, "reduce", "test-pipeline", 0, storeProvider, + pbqManager, err = pbq.NewManager(cctx, "reduce", pipelineName, 0, storeProvider, pbq.WithReadTimeout(1*time.Second), pbq.WithChannelBufferSize(10)) assert.NoError(t, err) @@ -1118,7 +1112,6 @@ func TestReduceDataForward_SumMultiPartitions(t *testing.T) { startTime = 0 // time in millis fromBufferName = "source-reduce-buffer" toVertexName = "reduce-to-vertex" - pipelineName = "test-reduce-pipeline" err error ) @@ -1136,7 +1129,7 @@ func TestReduceDataForward_SumMultiPartitions(t *testing.T) { // create pbq manager var pbqManager *pbq.Manager - pbqManager, err = pbq.NewManager(ctx, "reduce", "test-pipeline", 0, memory.NewMemoryStores(memory.WithStoreSize(1000)), + pbqManager, err = pbq.NewManager(ctx, "reduce", pipelineName, 0, memory.NewMemoryStores(memory.WithStoreSize(1000)), pbq.WithReadTimeout(1*time.Second), pbq.WithChannelBufferSize(10)) assert.NoError(t, err) @@ -1210,7 +1203,6 @@ func fetcherAndPublisher(ctx context.Context, fromBuffer *simplebuffer.InMemoryB var ( keyspace = key - pipelineName = "testPipeline" hbBucketName = keyspace + "_PROCESSORS" otBucketName = keyspace + "_OT" ) @@ -1220,9 +1212,9 @@ func fetcherAndPublisher(ctx context.Context, fromBuffer *simplebuffer.InMemoryB ot, otWatcherCh, _ := inmem.NewKVInMemKVStore(ctx, pipelineName, otBucketName) // publisher for source - sourcePublisher := publish.NewPublish(ctx, sourcePublishEntity, wmstore.BuildWatermarkStore(hb, ot), 1, publish.WithAutoRefreshHeartbeatDisabled(), publish.WithPodHeartbeatRate(1)) + sourcePublisher := publish.NewPublish(ctx, sourcePublishEntity, wmstore.BuildWatermarkStore(hb, ot), 1, publish.WithAutoRefreshHeartbeatDisabled()) - // publish heartbeat for the processors + // publish heartbeat manually for the processor go func() { for { select { @@ -1239,6 +1231,10 @@ func fetcherAndPublisher(ctx context.Context, fromBuffer *simplebuffer.InMemoryB otWatcher, _ := inmem.NewInMemWatch(ctx, pipelineName, keyspace+"_OT", otWatcherCh) storeWatcher := wmstore.BuildWatermarkStoreWatcher(hbWatcher, otWatcher) pm := processor.NewProcessorManager(ctx, storeWatcher, 1, processor.WithIsReduce(true)) + for waitForReadyP := pm.GetProcessor(fromBuffer.GetName()); waitForReadyP == nil; waitForReadyP = pm.GetProcessor(fromBuffer.GetName()) { + // wait until the test processor has been added to the processor list + time.Sleep(time.Millisecond * 100) + } f := fetch.NewEdgeFetcher(ctx, fromBuffer.GetName(), storeWatcher, pm, 1) return f, sourcePublisher }