Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: watermark refactor #1007

Merged
merged 14 commits into from
Aug 31, 2023
14 changes: 7 additions & 7 deletions pkg/daemon/server/daemon_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,20 +86,20 @@ func (ds *daemonServer) Run(ctx context.Context) error {
default:
return fmt.Errorf("unsupported isbsvc buffer type %q", ds.isbSvcType)
}
processorManagers, err := service.GetProcessorManagers(ctx, ds.pipeline, isbSvcClient)
wmStores, err := service.BuildWatermarkStores(ctx, ds.pipeline, isbSvcClient)
if err != nil {
return fmt.Errorf("failed to get processor managers, %w", err)
return fmt.Errorf("failed to get watermark stores, %w", err)
}
wmFetchers, err := service.GetUXEdgeWatermarkFetchers(ctx, ds.pipeline, processorManagers)
wmFetchers, err := service.BuildUXEdgeWatermarkFetchers(ctx, ds.pipeline, wmStores)
if err != nil {
return fmt.Errorf("failed to get watermark fetchers, %w", err)
}

// Stop all the processor managers, it will stop watching for offset and heartbeat updates.
// Close all the watermark stores when the daemon server exits
defer func() {
for _, pms := range processorManagers {
for _, pm := range pms {
pm.Close()
for _, edgeStores := range wmStores {
for _, store := range edgeStores {
_ = store.Close()
}
}
}()
Expand Down
4 changes: 2 additions & 2 deletions pkg/daemon/server/service/pipeline_metrics_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/apis/proto/daemon"
"github.com/numaproj/numaflow/pkg/isbsvc"
"github.com/numaproj/numaflow/pkg/watermark/processor"
"github.com/numaproj/numaflow/pkg/watermark/store"
)

type mockGetType func(url string) (*http.Response, error)
Expand Down Expand Up @@ -67,7 +67,7 @@ func (ms *mockIsbSvcClient) ValidateBuffersAndBuckets(ctx context.Context, buffe
return nil
}

func (ms *mockIsbSvcClient) CreateProcessorManagers(ctx context.Context, bucketName string, partitions int, isReduce bool) ([]*processor.ProcessorManager, error) {
func (ms *mockIsbSvcClient) CreateWatermarkStores(ctx context.Context, bucketName string, partitions int, isReduce bool) ([]store.WatermarkStore, error) {
return nil, nil
}

Expand Down
28 changes: 15 additions & 13 deletions pkg/daemon/server/service/pipeline_watermark_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,46 +26,48 @@ import (
"github.com/numaproj/numaflow/pkg/apis/proto/daemon"
"github.com/numaproj/numaflow/pkg/isbsvc"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/processor"
"github.com/numaproj/numaflow/pkg/watermark/store"
)

// GetUXEdgeWatermarkFetchers returns a map of the watermark fetchers, where key is the buffer name,
// BuildUXEdgeWatermarkFetchers returns a map of the watermark fetchers, where key is the buffer name,
// value is a list of fetchers to the buffers.
func GetUXEdgeWatermarkFetchers(ctx context.Context, pipeline *v1alpha1.Pipeline, processorManagers map[v1alpha1.Edge][]*processor.ProcessorManager) (map[v1alpha1.Edge][]fetch.UXFetcher, error) {
func BuildUXEdgeWatermarkFetchers(ctx context.Context, pipeline *v1alpha1.Pipeline, wmStores map[v1alpha1.Edge][]store.WatermarkStore) (map[v1alpha1.Edge][]fetch.UXFetcher, error) {
var wmFetchers = make(map[v1alpha1.Edge][]fetch.UXFetcher)
if pipeline.Spec.Watermark.Disabled {
return wmFetchers, nil
}

for edge, pms := range processorManagers {
for edge, stores := range wmStores {
var fetchers []fetch.UXFetcher
for _, pm := range pms {
fetchers = append(fetchers, fetch.NewEdgeFetcher(ctx, pm, pipeline.GetVertex(edge.To).GetPartitionCount()))
isReduce := pipeline.GetVertex(edge.To).IsReduceUDF()
partitionCount := pipeline.GetVertex(edge.To).GetPartitionCount()
for i, s := range stores {
fetchers = append(fetchers, fetch.NewEdgeFetcher(ctx, s, partitionCount, fetch.WithIsReduce(isReduce), fetch.WithVertexReplica(int32(i))))
}
wmFetchers[edge] = fetchers
}

return wmFetchers, nil
}

// GetProcessorManagers returns a map of ProcessorManager per edge.
func GetProcessorManagers(ctx context.Context, pipeline *v1alpha1.Pipeline, isbsvcClient isbsvc.ISBService) (map[v1alpha1.Edge][]*processor.ProcessorManager, error) {
var processorManagers = make(map[v1alpha1.Edge][]*processor.ProcessorManager)
// BuildWatermarkStores returns a map of watermark stores per edge.
func BuildWatermarkStores(ctx context.Context, pipeline *v1alpha1.Pipeline, isbsvcClient isbsvc.ISBService) (map[v1alpha1.Edge][]store.WatermarkStore, error) {
var wmStoresMap = make(map[v1alpha1.Edge][]store.WatermarkStore)
if pipeline.Spec.Watermark.Disabled {
return processorManagers, nil
return wmStoresMap, nil
}

for _, edge := range pipeline.ListAllEdges() {
bucketName := v1alpha1.GenerateEdgeBucketName(pipeline.Namespace, pipeline.Name, edge.From, edge.To)
isReduce := pipeline.GetVertex(edge.To).IsReduceUDF()
partitionCount := pipeline.GetVertex(edge.To).GetPartitionCount()
pms, err := isbsvcClient.CreateProcessorManagers(ctx, bucketName, partitionCount, isReduce)
stores, err := isbsvcClient.CreateWatermarkStores(ctx, bucketName, partitionCount, isReduce)
if err != nil {
return nil, fmt.Errorf("failed to create processor manager %w", err)
}
processorManagers[edge] = pms
wmStoresMap[edge] = stores
}
return processorManagers, nil
return wmStoresMap, nil
}

// GetPipelineWatermarks is used to return the head watermarks for a given pipeline.
Expand Down
86 changes: 70 additions & 16 deletions pkg/forward/forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ import (
"github.com/numaproj/numaflow/pkg/shared/kvs"
"github.com/numaproj/numaflow/pkg/shared/logging"
udfapplier "github.com/numaproj/numaflow/pkg/udf/rpc"
"github.com/numaproj/numaflow/pkg/watermark/entity"
"github.com/numaproj/numaflow/pkg/watermark/generic"
"github.com/numaproj/numaflow/pkg/watermark/processor"
"github.com/numaproj/numaflow/pkg/watermark/publish"
wmstore "github.com/numaproj/numaflow/pkg/watermark/store"
"github.com/numaproj/numaflow/pkg/watermark/wmb"
Expand Down Expand Up @@ -217,8 +217,22 @@ func TestNewInterStepDataForward(t *testing.T) {
Name: "testVertex",
},
}}

fetchWatermark := &testForwardFetcher{}
publishWatermark, otStores := buildPublisherMapAndOTStore(toSteps)
toVertexWmStores := buildWatermarkStores(toSteps)
publishWatermark, otStores := buildPublisherMapAndOTStoreFromWmStores(toSteps, toVertexWmStores)

defer func() {
for _, p := range publishWatermark {
_ = p.Close()
}
}()

defer func() {
for _, store := range toVertexWmStores {
_ = store.Close()
}
}()

f, err := NewInterStepDataForward(vertex, fromStep, toSteps, &myForwardToAllTest{}, &myForwardToAllTest{}, &myForwardToAllTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(batchSize), WithVertexType(dfv1.VertexTypeMapUDF), WithUDFStreaming(tt.streamEnabled))

Expand Down Expand Up @@ -365,15 +379,21 @@ func TestNewInterStepDataForward(t *testing.T) {
}}

fetchWatermark := &testForwardFetcher{}
publishWatermark, otStores := buildPublisherMapAndOTStore(toSteps)
toVertexWmStores := buildWatermarkStores(toSteps)
publishWatermark, otStores := buildPublisherMapAndOTStoreFromWmStores(toSteps, toVertexWmStores)

// close the fetcher and publishers
defer func() {
for _, p := range publishWatermark {
_ = p.Close()
}
}()

defer func() {
for _, store := range toVertexWmStores {
_ = store.Close()
}
}()

f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardDropTest{}, myForwardDropTest{}, myForwardDropTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(batchSize), WithVertexType(dfv1.VertexTypeMapUDF), WithUDFStreaming(tt.streamEnabled))

assert.NoError(t, err)
Expand Down Expand Up @@ -531,15 +551,21 @@ func TestNewInterStepDataForward(t *testing.T) {
}}

fetchWatermark := &testForwardFetcher{}
publishWatermark, otStores := buildPublisherMapAndOTStore(toSteps)
toVertexWmStores := buildWatermarkStores(toSteps)
publishWatermark, otStores := buildPublisherMapAndOTStoreFromWmStores(toSteps, toVertexWmStores)

// close the fetcher and publishers
defer func() {
for _, p := range publishWatermark {
_ = p.Close()
}
}()

defer func() {
for _, store := range toVertexWmStores {
_ = store.Close()
}
}()

f, err := NewInterStepDataForward(vertex, fromStep, toSteps, &mySourceForwardTestRoundRobin{}, myForwardTest{}, myForwardTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(batchSize), WithVertexType(dfv1.VertexTypeMapUDF), WithUDFStreaming(tt.streamEnabled))

assert.NoError(t, err)
Expand Down Expand Up @@ -859,15 +885,21 @@ func TestNewInterStepDataForwardIdleWatermark(t *testing.T) {
writeMessages := testutils.BuildTestWriteMessages(int64(20), testStartTime)

fetchWatermark := &testWMBFetcher{WMBTestSameHeadWMB: true}
publishWatermark, otStores := buildPublisherMapAndOTStore(toSteps)
toVertexWmStores := buildWatermarkStores(toSteps)
publishWatermark, otStores := buildPublisherMapAndOTStoreFromWmStores(toSteps, toVertexWmStores)

// close the fetcher and publishers
defer func() {
for _, p := range publishWatermark {
_ = p.Close()
}
}()

defer func() {
for _, store := range toVertexWmStores {
_ = store.Close()
}
}()

f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardTest{}, myForwardTest{}, myForwardTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(2), WithVertexType(dfv1.VertexTypeMapUDF))
assert.NoError(t, err)
assert.False(t, to1.IsFull())
Expand Down Expand Up @@ -1019,7 +1051,20 @@ func TestNewInterStepDataForwardIdleWatermark_Reset(t *testing.T) {
writeMessages := testutils.BuildTestWriteMessages(int64(20), testStartTime)

fetchWatermark := &testWMBFetcher{WMBTestSameHeadWMB: true}
publishWatermark, otStores := buildPublisherMapAndOTStore(toSteps)
toVertexWmStores := buildWatermarkStores(toSteps)
publishWatermark, otStores := buildPublisherMapAndOTStoreFromWmStores(toSteps, toVertexWmStores)

defer func() {
for _, p := range publishWatermark {
_ = p.Close()
}
}()

defer func() {
for _, store := range toVertexWmStores {
_ = store.Close()
}
}()

// close the fetcher and publishers
defer func() {
Expand Down Expand Up @@ -1594,17 +1639,26 @@ func metricsReset() {
ackMessagesCount.Reset()
}

// buildPublisherMap builds OTStore and publisher for each toBuffer
func buildPublisherMapAndOTStore(toBuffers map[string][]isb.BufferWriter) (map[string]publish.Publisher, map[string]kvs.KVStorer) {
func buildWatermarkStores(toBuffers map[string][]isb.BufferWriter) map[string]wmstore.WatermarkStore {
var ctx = context.Background()
watermarkStores := make(map[string]wmstore.WatermarkStore)
for key := range toBuffers {
store, _ := wmstore.BuildInmemWatermarkStore(ctx, fmt.Sprintf(publisherKeyspace, key))
watermarkStores[key] = store
}
return watermarkStores
}

func buildPublisherMapAndOTStoreFromWmStores(toBuffers map[string][]isb.BufferWriter, wmStores map[string]wmstore.WatermarkStore) (map[string]publish.Publisher, map[string]kvs.KVStorer) {
var ctx = context.Background()
processorEntity := processor.NewProcessorEntity("publisherTestPod")
processorEntity := entity.NewProcessorEntity("publisherTestPod")
publishers := make(map[string]publish.Publisher)
otStores := make(map[string]kvs.KVStorer)
for key, partitionedBuffers := range toBuffers {
store, _, _, _ := wmstore.BuildInmemWatermarkStore(ctx, fmt.Sprintf(publisherKeyspace, key))
otStores[key] = store.OffsetTimelineStore()
p := publish.NewPublish(ctx, processorEntity, store, int32(len(partitionedBuffers)), publish.WithAutoRefreshHeartbeatDisabled(), publish.WithPodHeartbeatRate(1))

for key, store := range wmStores {
p := publish.NewPublish(ctx, processorEntity, store, int32(len(toBuffers[key])), publish.WithAutoRefreshHeartbeatDisabled(), publish.WithPodHeartbeatRate(1))
publishers[key] = p
otStores[key] = store.OffsetTimelineStore()
}
return publishers, otStores
}
9 changes: 7 additions & 2 deletions pkg/isbsvc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,21 @@ package isbsvc
import (
"context"

"github.com/numaproj/numaflow/pkg/watermark/processor"
"github.com/numaproj/numaflow/pkg/watermark/store"
)

// ISBService is an interface used to do the operations on ISBSvc
type ISBService interface {
// CreateBuffersAndBuckets creates buffers and buckets
CreateBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string, opts ...CreateOption) error
// DeleteBuffersAndBuckets deletes buffers and buckets
DeleteBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string) error
// ValidateBuffersAndBuckets validates buffers and buckets
ValidateBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string) error
// GetBufferInfo returns buffer info for the given buffer
GetBufferInfo(ctx context.Context, buffer string) (*BufferInfo, error)
CreateProcessorManagers(ctx context.Context, bucketName string, partitions int, isReduce bool) ([]*processor.ProcessorManager, error)
// CreateWatermarkStores creates watermark stores
CreateWatermarkStores(ctx context.Context, bucketName string, partitions int, isReduce bool) ([]store.WatermarkStore, error)
}

// createOptions describes the options for creating buffers and buckets
Expand Down
29 changes: 11 additions & 18 deletions pkg/isbsvc/jetstream_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (

jsclient "github.com/numaproj/numaflow/pkg/shared/clients/nats"
"github.com/numaproj/numaflow/pkg/shared/logging"
"github.com/numaproj/numaflow/pkg/watermark/processor"
wmstore "github.com/numaproj/numaflow/pkg/watermark/store"
)

Expand Down Expand Up @@ -314,30 +313,24 @@ func (jss *jetStreamSvc) GetBufferInfo(ctx context.Context, buffer string) (*Buf
return bufferInfo, nil
}

// CreateProcessorManagers is used to create processor manager for the given bucket.
func (jss *jetStreamSvc) CreateProcessorManagers(ctx context.Context, bucketName string, fromBufferPartitionCount int, isReduce bool) ([]*processor.ProcessorManager, error) {
// CreateWatermarkStores is used to create watermark stores.
func (jss *jetStreamSvc) CreateWatermarkStores(ctx context.Context, bucketName string, fromBufferPartitionCount int, isReduce bool) ([]wmstore.WatermarkStore, error) {
log := logging.FromContext(ctx).With("bucket", bucketName)
ctx = logging.WithLogger(ctx, log)
var processorManagers []*processor.ProcessorManager
fetchers := 1
var wmStores []wmstore.WatermarkStore
partitions := 1
if isReduce {
fetchers = fromBufferPartitionCount
partitions = fromBufferPartitionCount
}
// if it's not a reduce vertex, we don't need multiple watermark fetchers. We use common fetcher among all partitions.
for i := 0; i < fetchers; i++ {
storeWatcher, err := wmstore.BuildJetStreamWatermarkStoreWatcher(ctx, bucketName, jss.jsClient)
// if it's not a reduce vertex, we only need one store to store the watermark
for i := 0; i < partitions; i++ {
wmStore, err := wmstore.BuildJetStreamWatermarkStore(ctx, bucketName, jss.jsClient)
if err != nil {
return nil, fmt.Errorf("failed at new JetStream watermark store watcher, %w", err)
return nil, fmt.Errorf("failed to create new JetStream watermark store, %w", err)
}
var pm *processor.ProcessorManager
if isReduce {
pm = processor.NewProcessorManager(ctx, storeWatcher, int32(fromBufferPartitionCount), processor.WithVertexReplica(int32(i)), processor.WithIsReduce(isReduce))
} else {
pm = processor.NewProcessorManager(ctx, storeWatcher, int32(fromBufferPartitionCount))
}
processorManagers = append(processorManagers, pm)
wmStores = append(wmStores, wmStore)
}
return processorManagers, nil
return wmStores, nil
}

func JetStreamName(bufferName string) string {
Expand Down
27 changes: 9 additions & 18 deletions pkg/isbsvc/redis_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
redis2 "github.com/numaproj/numaflow/pkg/isb/stores/redis"
redisclient "github.com/numaproj/numaflow/pkg/shared/clients/redis"
"github.com/numaproj/numaflow/pkg/shared/logging"
"github.com/numaproj/numaflow/pkg/watermark/processor"
"github.com/numaproj/numaflow/pkg/watermark/store"
)

Expand Down Expand Up @@ -137,26 +136,18 @@ func (r *isbsRedisSvc) GetBufferInfo(ctx context.Context, buffer string) (*Buffe
return bufferInfo, nil
}

// CreateProcessorManagers is used to create the processor managers for the given bucket.
func (r *isbsRedisSvc) CreateProcessorManagers(ctx context.Context, bucketName string, fromBufferPartitionCount int, isReduce bool) ([]*processor.ProcessorManager, error) {
log := logging.FromContext(ctx).With("bucket", bucketName)
ctx = logging.WithLogger(ctx, log)
// CreateWatermarkStores is used to create the watermark stores.
func (r *isbsRedisSvc) CreateWatermarkStores(ctx context.Context, bucketName string, fromBufferPartitionCount int, isReduce bool) ([]store.WatermarkStore, error) {
// Watermark fetching is not supported for Redis ATM. Creating noop watermark fetcher.
var processorManagers []*processor.ProcessorManager
fetchers := 1
var wmStores []store.WatermarkStore
partitions := 1
if isReduce {
fetchers = fromBufferPartitionCount
partitions = fromBufferPartitionCount
}
for i := 0; i < fetchers; i++ {
storeWatcher, _ := store.BuildNoOpWatermarkStoreWatcher()
var pm *processor.ProcessorManager
if isReduce {
pm = processor.NewProcessorManager(ctx, storeWatcher, int32(fromBufferPartitionCount), processor.WithVertexReplica(int32(i)), processor.WithIsReduce(isReduce))
} else {
pm = processor.NewProcessorManager(ctx, storeWatcher, int32(fromBufferPartitionCount))
}
processorManagers = append(processorManagers, pm)
for i := 0; i < partitions; i++ {
wmStore, _ := store.BuildNoOpWatermarkStore()
wmStores = append(wmStores, wmStore)
}

return processorManagers, nil
return wmStores, nil
}
Loading
Loading