Skip to content

Commit

Permalink
fix: close the watermark fetcher and publishers after all the forward…
Browse files Browse the repository at this point in the history
…ers exit (#921)

Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Aug 8, 2023
1 parent 2d6112b commit e3da4a3
Show file tree
Hide file tree
Showing 45 changed files with 525 additions and 321 deletions.
11 changes: 0 additions & 11 deletions pkg/forward/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,17 +171,6 @@ func (isdf *InterStepDataForward) Start() <-chan struct{} {
}
}

// stop watermark fetcher
if err := isdf.wmFetcher.Close(); err != nil {
log.Errorw("Failed to close watermark fetcher", zap.Error(err))
}

// stop watermark publisher
for _, publisher := range isdf.wmPublishers {
if err := publisher.Close(); err != nil {
log.Errorw("Failed to close watermark publisher", zap.Error(err))
}
}
close(stopped)
}()

Expand Down
52 changes: 48 additions & 4 deletions pkg/forward/forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ import (
"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer"
"github.com/numaproj/numaflow/pkg/isb/testutils"
"github.com/numaproj/numaflow/pkg/shared/kvs"
"github.com/numaproj/numaflow/pkg/shared/kvs/inmem"
"github.com/numaproj/numaflow/pkg/shared/logging"
udfapplier "github.com/numaproj/numaflow/pkg/udf/function"
"github.com/numaproj/numaflow/pkg/watermark/generic"
"github.com/numaproj/numaflow/pkg/watermark/processor"
"github.com/numaproj/numaflow/pkg/watermark/publish"
wmstore "github.com/numaproj/numaflow/pkg/watermark/store"
"github.com/numaproj/numaflow/pkg/watermark/store/inmem"
"github.com/numaproj/numaflow/pkg/watermark/wmb"

"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -231,6 +231,14 @@ func TestNewInterStepDataForward(t *testing.T) {
fetchWatermark := &testForwardFetcher{}
publishWatermark, otStores := buildPublisherMapAndOTStore(toSteps)

// close the fetcher and publishers
defer func() {
_ = fetchWatermark.Close()
for _, p := range publishWatermark {
_ = p.Close()
}
}()

f, err := NewInterStepDataForward(vertex, fromStep, toSteps, &myForwardToAllTest{}, &myForwardToAllTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(batchSize), WithVertexType(dfv1.VertexTypeMapUDF), WithUDFStreaming(tt.streamEnabled))

assert.NoError(t, err)
Expand Down Expand Up @@ -377,6 +385,15 @@ func TestNewInterStepDataForward(t *testing.T) {

fetchWatermark := &testForwardFetcher{}
publishWatermark, otStores := buildPublisherMapAndOTStore(toSteps)

// close the fetcher and publishers
defer func() {
_ = fetchWatermark.Close()
for _, p := range publishWatermark {
_ = p.Close()
}
}()

f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardDropTest{}, myForwardDropTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(batchSize), WithVertexType(dfv1.VertexTypeMapUDF), WithUDFStreaming(tt.streamEnabled))

assert.NoError(t, err)
Expand Down Expand Up @@ -535,6 +552,15 @@ func TestNewInterStepDataForward(t *testing.T) {

fetchWatermark := &testForwardFetcher{}
publishWatermark, otStores := buildPublisherMapAndOTStore(toSteps)

// close the fetcher and publishers
defer func() {
_ = fetchWatermark.Close()
for _, p := range publishWatermark {
_ = p.Close()
}
}()

f, err := NewInterStepDataForward(vertex, fromStep, toSteps, &mySourceForwardTestRoundRobin{}, myForwardTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(batchSize), WithVertexType(dfv1.VertexTypeMapUDF), WithUDFStreaming(tt.streamEnabled))

assert.NoError(t, err)
Expand Down Expand Up @@ -860,6 +886,15 @@ func TestNewInterStepDataForwardIdleWatermark(t *testing.T) {

fetchWatermark := &testWMBFetcher{WMBTestSameHeadWMB: true}
publishWatermark, otStores := buildPublisherMapAndOTStore(toSteps)

// close the fetcher and publishers
defer func() {
_ = fetchWatermark.Close()
for _, p := range publishWatermark {
_ = p.Close()
}
}()

f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardTest{}, myForwardTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(2), WithVertexType(dfv1.VertexTypeMapUDF))
assert.NoError(t, err)
assert.False(t, to1.IsFull())
Expand Down Expand Up @@ -1012,6 +1047,15 @@ func TestNewInterStepDataForwardIdleWatermark_Reset(t *testing.T) {

fetchWatermark := &testWMBFetcher{WMBTestSameHeadWMB: true}
publishWatermark, otStores := buildPublisherMapAndOTStore(toSteps)

// close the fetcher and publishers
defer func() {
_ = fetchWatermark.Close()
for _, p := range publishWatermark {
_ = p.Close()
}
}()

f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardTest{}, myForwardTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(2), WithVertexType(dfv1.VertexTypeMapUDF))
assert.NoError(t, err)
assert.False(t, to1.IsFull())
Expand Down Expand Up @@ -1579,11 +1623,11 @@ func metricsReset() {
}

// buildPublisherMap builds OTStore and publisher for each toBuffer
func buildPublisherMapAndOTStore(toBuffers map[string][]isb.BufferWriter) (map[string]publish.Publisher, map[string]wmstore.WatermarkKVStorer) {
func buildPublisherMapAndOTStore(toBuffers map[string][]isb.BufferWriter) (map[string]publish.Publisher, map[string]kvs.KVStorer) {
var ctx = context.Background()
processorEntity := processor.NewProcessorEntity("publisherTestPod")
publishers := make(map[string]publish.Publisher)
otStores := make(map[string]wmstore.WatermarkKVStorer)
otStores := make(map[string]kvs.KVStorer)
for key, partitionedBuffers := range toBuffers {
heartbeatKV, _, _ := inmem.NewKVInMemKVStore(ctx, testPipelineName, fmt.Sprintf(publisherHBKeyspace, key))
otKV, _, _ := inmem.NewKVInMemKVStore(ctx, testPipelineName, fmt.Sprintf(publisherOTKeyspace, key))
Expand Down
4 changes: 2 additions & 2 deletions pkg/isbsvc/jetstream_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ import (

"github.com/nats-io/nats.go"
jsclient "github.com/numaproj/numaflow/pkg/shared/clients/nats"
"github.com/numaproj/numaflow/pkg/shared/kvs/jetstream"
"github.com/numaproj/numaflow/pkg/shared/logging"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/processor"
"github.com/numaproj/numaflow/pkg/watermark/store"
"github.com/numaproj/numaflow/pkg/watermark/store/jetstream"
"github.com/spf13/viper"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -340,7 +340,7 @@ func (jss *jetStreamSvc) CreateWatermarkFetcher(ctx context.Context, bucketName
} else {
pm = processor.NewProcessorManager(ctx, storeWatcher, bucketName, int32(fromBufferPartitionCount))
}
watermarkFetcher := fetch.NewEdgeFetcher(ctx, bucketName, storeWatcher, pm, fromBufferPartitionCount)
watermarkFetcher := fetch.NewEdgeFetcher(ctx, pm, fromBufferPartitionCount)
watermarkFetchers = append(watermarkFetchers, watermarkFetcher)
}
return watermarkFetchers, nil
Expand Down
11 changes: 6 additions & 5 deletions pkg/isbsvc/redis_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import (
"fmt"

redis2 "github.com/numaproj/numaflow/pkg/isb/stores/redis"
"github.com/numaproj/numaflow/pkg/shared/kvs/noop"
"github.com/numaproj/numaflow/pkg/watermark/processor"
"github.com/numaproj/numaflow/pkg/watermark/store"
"github.com/numaproj/numaflow/pkg/watermark/store/noop"
"go.uber.org/multierr"
"go.uber.org/zap"

Expand All @@ -41,7 +41,7 @@ func NewISBRedisSvc(client *redisclient.RedisClient) ISBService {
return &isbsRedisSvc{client: client}
}

// CreateBuffers is used to create the inter-step redis buffers.
// CreateBuffersAndBuckets is used to create the inter-step redis buffers.
func (r *isbsRedisSvc) CreateBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string, opts ...CreateOption) error {
if len(buffers) == 0 && len(buckets) == 0 {
return nil
Expand Down Expand Up @@ -69,7 +69,7 @@ func (r *isbsRedisSvc) CreateBuffersAndBuckets(ctx context.Context, buffers, buc
return nil
}

// DeleteBuffers is used to delete the inter-step redis buffers.
// DeleteBuffersAndBuckets is used to delete the inter-step redis buffers.
func (r *isbsRedisSvc) DeleteBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string) error {
if len(buffers) == 0 && len(buckets) == 0 {
return nil
Expand Down Expand Up @@ -105,7 +105,7 @@ func (r *isbsRedisSvc) DeleteBuffersAndBuckets(ctx context.Context, buffers, buc
return nil
}

// ValidateBuffers is used to validate inter-step redis buffers to see if the stream/stream group exist
// ValidateBuffersAndBuckets is used to validate inter-step redis buffers to see if the stream/stream group exist
func (r *isbsRedisSvc) ValidateBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string) error {
if len(buffers) == 0 && len(buckets) == 0 {
return nil
Expand Down Expand Up @@ -139,6 +139,7 @@ func (r *isbsRedisSvc) GetBufferInfo(ctx context.Context, buffer string) (*Buffe
return bufferInfo, nil
}

// CreateWatermarkFetcher is used to create watermark fetcher for the given bucket
func (r *isbsRedisSvc) CreateWatermarkFetcher(ctx context.Context, bucketName string, fromBufferPartitionCount int, isReduce bool) ([]fetch.Fetcher, error) {
// Watermark fetching is not supported for Redis ATM. Creating noop watermark fetcher.
var watermarkFetchers []fetch.Fetcher
Expand All @@ -156,7 +157,7 @@ func (r *isbsRedisSvc) CreateWatermarkFetcher(ctx context.Context, bucketName st
} else {
pm = processor.NewProcessorManager(ctx, storeWatcher, bucketName, int32(fromBufferPartitionCount))
}
watermarkFetcher := fetch.NewEdgeFetcher(ctx, bucketName, storeWatcher, pm, fromBufferPartitionCount)
watermarkFetcher := fetch.NewEdgeFetcher(ctx, pm, fromBufferPartitionCount)
watermarkFetchers = append(watermarkFetchers, watermarkFetcher)
}

Expand Down
12 changes: 0 additions & 12 deletions pkg/reduce/data_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,18 +577,6 @@ func (df *DataForward) ShutDown(ctx context.Context) {

// flush pending messages to persistent storage
df.pbqManager.ShutDown(ctx)

// stop watermark fetcher
if err := df.wmFetcher.Close(); err != nil {
df.log.Errorw("Failed to close watermark fetcher", zap.Error(err))
}

// stop watermark publisher
for _, publisher := range df.wmPublishers {
if err := publisher.Close(); err != nil {
df.log.Errorw("Failed to close watermark publisher", zap.Error(err))
}
}
}

// upsertWindowsAndKeys will create or assigns (if already present) a window to the message. It is an upsert operation
Expand Down
Loading

0 comments on commit e3da4a3

Please sign in to comment.