Skip to content

Commit

Permalink
fix: Reduce idle WM unit test fix (numaproj#897)
Browse files Browse the repository at this point in the history
Signed-off-by: jyu6 <[email protected]>
Co-authored-by: jyu6 <[email protected]>
  • Loading branch information
2 people authored and yhl25 committed Aug 1, 2023
1 parent 5073f1c commit 0db1238
Showing 1 changed file with 14 additions and 18 deletions.
32 changes: 14 additions & 18 deletions pkg/reduce/data_forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ import (
"github.com/numaproj/numaflow/pkg/window/strategy/fixed"
)

const pipelineName = "testPipeline"

var keyedVertex = &dfv1.VertexInstance{
Vertex: &dfv1.Vertex{Spec: dfv1.VertexSpec{
PipelineName: "testPipeline",
PipelineName: pipelineName,
AbstractVertex: dfv1.AbstractVertex{
Name: "testVertex",
UDF: &dfv1.UDF{GroupBy: &dfv1.GroupBy{Keyed: true}},
Expand All @@ -60,7 +62,7 @@ var keyedVertex = &dfv1.VertexInstance{

var nonKeyedVertex = &dfv1.VertexInstance{
Vertex: &dfv1.Vertex{Spec: dfv1.VertexSpec{
PipelineName: "testPipeline",
PipelineName: pipelineName,
AbstractVertex: dfv1.AbstractVertex{
Name: "testVertex",
UDF: &dfv1.UDF{GroupBy: &dfv1.GroupBy{Keyed: false}},
Expand Down Expand Up @@ -275,7 +277,7 @@ func TestDataForward_StartWithNoOpWM(t *testing.T) {
var pbqManager *pbq.Manager

// create pbqManager
pbqManager, err = pbq.NewManager(child, "reduce", "test-pipeline", 0, memory.NewMemoryStores(memory.WithStoreSize(100)),
pbqManager, err = pbq.NewManager(child, "reduce", pipelineName, 0, memory.NewMemoryStores(memory.WithStoreSize(100)),
pbq.WithReadTimeout(1*time.Second), pbq.WithChannelBufferSize(10))
assert.NoError(t, err)

Expand Down Expand Up @@ -341,7 +343,6 @@ func TestReduceDataForward_IdleWM(t *testing.T) {
startTime = 1679961600000 // time in millis
fromBufferName = "source-reduce-buffer"
toVertexName = "reduce-to-vertex"
pipelineName = "test-reduce-pipeline"
err error
)
defer cancel()
Expand Down Expand Up @@ -546,7 +547,6 @@ func TestReduceDataForward_Count(t *testing.T) {
startTime = 60000 // time in millis
fromBufferName = "source-reduce-buffer"
toVertexName = "reduce-to-vertex"
pipelineName = "test-reduce-pipeline"
err error
)

Expand Down Expand Up @@ -621,7 +621,6 @@ func TestReduceDataForward_AllowedLatencyCount(t *testing.T) {
startTime = 60000 // time in millis
fromBufferName = "source-reduce-buffer"
toVertexName = "reduce-to-vertex"
pipelineName = "test-reduce-pipeline"
err error
)

Expand Down Expand Up @@ -700,7 +699,6 @@ func TestReduceDataForward_Sum(t *testing.T) {
startTime = 0 // time in millis
fromBufferName = "source-reduce-buffer"
toVertexName = "reduce-to-vertex"
pipelineName = "test-reduce-pipeline"
err error
)

Expand Down Expand Up @@ -775,7 +773,6 @@ func TestReduceDataForward_Max(t *testing.T) {
startTime = 0 // time in millis
fromBufferName = "source-reduce-buffer"
toVertexName = "reduce-to-vertex"
pipelineName = "test-reduce-pipeline"
err error
)

Expand Down Expand Up @@ -851,7 +848,6 @@ func TestReduceDataForward_SumWithDifferentKeys(t *testing.T) {
startTime = 0 // time in millis
fromBufferName = "source-reduce-buffer"
toVertexName = "reduce-to-vertex"
pipelineName = "test-reduce-pipeline"
err error
)

Expand All @@ -868,7 +864,7 @@ func TestReduceDataForward_SumWithDifferentKeys(t *testing.T) {

// create pbq manager
var pbqManager *pbq.Manager
pbqManager, err = pbq.NewManager(ctx, "reduce", "test-pipeline", 0, memory.NewMemoryStores(memory.WithStoreSize(1000)),
pbqManager, err = pbq.NewManager(ctx, "reduce", pipelineName, 0, memory.NewMemoryStores(memory.WithStoreSize(1000)),
pbq.WithReadTimeout(1*time.Second), pbq.WithChannelBufferSize(10))
assert.NoError(t, err)

Expand Down Expand Up @@ -947,7 +943,6 @@ func TestReduceDataForward_NonKeyed(t *testing.T) {
startTime = 0 // time in millis
fromBufferName = "source-reduce-buffer"
toVertexName = "reduce-to-vertex"
pipelineName = "test-reduce-pipeline"
err error
)

Expand Down Expand Up @@ -1025,7 +1020,6 @@ func TestDataForward_WithContextClose(t *testing.T) {
startTime = 0 // time in millis
fromBufferName = "source-reduce-buffer"
toVertexName = "reduce-to-vertex"
pipelineName = "test-reduce-pipeline"
err error
)

Expand All @@ -1048,7 +1042,7 @@ func TestDataForward_WithContextClose(t *testing.T) {

// create pbq manager
var pbqManager *pbq.Manager
pbqManager, err = pbq.NewManager(cctx, "reduce", "test-pipeline", 0, storeProvider,
pbqManager, err = pbq.NewManager(cctx, "reduce", pipelineName, 0, storeProvider,
pbq.WithReadTimeout(1*time.Second), pbq.WithChannelBufferSize(10))
assert.NoError(t, err)

Expand Down Expand Up @@ -1118,7 +1112,6 @@ func TestReduceDataForward_SumMultiPartitions(t *testing.T) {
startTime = 0 // time in millis
fromBufferName = "source-reduce-buffer"
toVertexName = "reduce-to-vertex"
pipelineName = "test-reduce-pipeline"
err error
)

Expand All @@ -1136,7 +1129,7 @@ func TestReduceDataForward_SumMultiPartitions(t *testing.T) {

// create pbq manager
var pbqManager *pbq.Manager
pbqManager, err = pbq.NewManager(ctx, "reduce", "test-pipeline", 0, memory.NewMemoryStores(memory.WithStoreSize(1000)),
pbqManager, err = pbq.NewManager(ctx, "reduce", pipelineName, 0, memory.NewMemoryStores(memory.WithStoreSize(1000)),
pbq.WithReadTimeout(1*time.Second), pbq.WithChannelBufferSize(10))
assert.NoError(t, err)

Expand Down Expand Up @@ -1210,7 +1203,6 @@ func fetcherAndPublisher(ctx context.Context, fromBuffer *simplebuffer.InMemoryB

var (
keyspace = key
pipelineName = "testPipeline"
hbBucketName = keyspace + "_PROCESSORS"
otBucketName = keyspace + "_OT"
)
Expand All @@ -1220,9 +1212,9 @@ func fetcherAndPublisher(ctx context.Context, fromBuffer *simplebuffer.InMemoryB
ot, otWatcherCh, _ := inmem.NewKVInMemKVStore(ctx, pipelineName, otBucketName)

// publisher for source
sourcePublisher := publish.NewPublish(ctx, sourcePublishEntity, wmstore.BuildWatermarkStore(hb, ot), 1, publish.WithAutoRefreshHeartbeatDisabled(), publish.WithPodHeartbeatRate(1))
sourcePublisher := publish.NewPublish(ctx, sourcePublishEntity, wmstore.BuildWatermarkStore(hb, ot), 1, publish.WithAutoRefreshHeartbeatDisabled())

// publish heartbeat for the processors
// publish heartbeat manually for the processor
go func() {
for {
select {
Expand All @@ -1239,6 +1231,10 @@ func fetcherAndPublisher(ctx context.Context, fromBuffer *simplebuffer.InMemoryB
otWatcher, _ := inmem.NewInMemWatch(ctx, pipelineName, keyspace+"_OT", otWatcherCh)
storeWatcher := wmstore.BuildWatermarkStoreWatcher(hbWatcher, otWatcher)
pm := processor.NewProcessorManager(ctx, storeWatcher, 1, processor.WithIsReduce(true))
for waitForReadyP := pm.GetProcessor(fromBuffer.GetName()); waitForReadyP == nil; waitForReadyP = pm.GetProcessor(fromBuffer.GetName()) {
// wait until the test processor has been added to the processor list
time.Sleep(time.Millisecond * 100)
}
f := fetch.NewEdgeFetcher(ctx, fromBuffer.GetName(), storeWatcher, pm, 1)
return f, sourcePublisher
}
Expand Down

0 comments on commit 0db1238

Please sign in to comment.