Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
Signed-off-by: jyu6 <[email protected]>
  • Loading branch information
jyu6 committed Sep 7, 2023
2 parents 769a0b4 + 8b41a39 commit 2e637f8
Show file tree
Hide file tree
Showing 35 changed files with 1,316 additions and 350 deletions.
2 changes: 2 additions & 0 deletions pkg/daemon/server/service/rater/rater.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ func (r *Rater) getPodReadCounts(vertexName, vertexType, podName string) *PodRea
readTotalMetricName = "reduce_isb_reader_read_total"
} else if vertexType == "source" {
readTotalMetricName = "source_forwarder_read_total"
} else if vertexType == "sink" {
readTotalMetricName = "sink_forwarder_read_total"
} else {
readTotalMetricName = "forwarder_read_total"
}
Expand Down
9 changes: 0 additions & 9 deletions pkg/forward/applier/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,3 @@ type ApplyMapFunc func(context.Context, *isb.ReadMessage) ([]*isb.WriteMessage,
func (f ApplyMapFunc) ApplyMap(ctx context.Context, message *isb.ReadMessage) ([]*isb.WriteMessage, error) {
return f(ctx, message)
}

var (
// Terminal Applier do not make any change to the message
Terminal = ApplyMapFunc(func(ctx context.Context, msg *isb.ReadMessage) ([]*isb.WriteMessage, error) {
return []*isb.WriteMessage{{
Message: msg.Message,
}}, nil
})
)
13 changes: 0 additions & 13 deletions pkg/forward/applier/mapstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,3 @@ type ApplyMapStreamFunc func(context.Context, *isb.ReadMessage, chan<- isb.Write
func (f ApplyMapStreamFunc) ApplyMapStream(ctx context.Context, message *isb.ReadMessage, writeMessageCh chan<- isb.WriteMessage) error {
return f(ctx, message, writeMessageCh)
}

var (
// TerminalMapStream Applier do not make any change to the message
TerminalMapStream = ApplyMapStreamFunc(func(ctx context.Context, msg *isb.ReadMessage, writeMessageCh chan<- isb.WriteMessage) error {
defer close(writeMessageCh)
writeMessage := &isb.WriteMessage{
Message: msg.Message,
}

writeMessageCh <- *writeMessage
return nil
})
)
23 changes: 6 additions & 17 deletions pkg/forward/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,6 @@ func NewInterStepDataForward(
return nil, fmt.Errorf("batch size is not 1 with map UDF streaming")
}

if isdf.opts.vertexType == dfv1.VertexTypeSource {
return nil, fmt.Errorf("source vertex is not supported by inter-step forwarder, please use source forwarder instead")
}

return &isdf, nil
}

Expand Down Expand Up @@ -229,7 +225,7 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) {
for toVertexName, toVertexBuffer := range isdf.toBuffers {
for _, partition := range toVertexBuffer {
if p, ok := isdf.wmPublishers[toVertexName]; ok {
idlehandler.PublishIdleWatermark(ctx, partition, p, isdf.idleManager, isdf.opts.logger, isdf.opts.vertexType, wmb.Watermark(time.UnixMilli(processorWMB.Watermark)))
idlehandler.PublishIdleWatermark(ctx, partition, p, isdf.idleManager, isdf.opts.logger, dfv1.VertexTypeMapUDF, wmb.Watermark(time.UnixMilli(processorWMB.Watermark)))
}
}
}
Expand Down Expand Up @@ -351,20 +347,13 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) {
activeWatermarkBuffers[toVertexName] = make([]bool, len(toVertexBufferOffsets))
if publisher, ok := isdf.wmPublishers[toVertexName]; ok {
for index, offsets := range toVertexBufferOffsets {
if isdf.opts.vertexType == dfv1.VertexTypeMapUDF || isdf.opts.vertexType == dfv1.VertexTypeReduceUDF {
if len(offsets) > 0 {
publisher.PublishWatermark(processorWM, offsets[len(offsets)-1], int32(index))
activeWatermarkBuffers[toVertexName][index] = true
// reset because the toBuffer partition is no longer idling
isdf.idleManager.Reset(isdf.toBuffers[toVertexName][index].GetName())
}
// This (len(offsets) == 0) happens at conditional forwarding, there's no data written to the buffer
} else { // For Sink vertex, and it does not care about the offset during watermark publishing
publisher.PublishWatermark(processorWM, nil, int32(index))
if len(offsets) > 0 {
publisher.PublishWatermark(processorWM, offsets[len(offsets)-1], int32(index))
activeWatermarkBuffers[toVertexName][index] = true
// reset because the toBuffer partition is no longer idling
isdf.idleManager.Reset(isdf.toBuffers[toVertexName][index].GetName())
}
// This (len(offsets) == 0) happens at conditional forwarding, there's no data written to the buffer
}
}
}
Expand All @@ -384,7 +373,7 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) {
// use the watermark of the current read batch for the idle watermark
// same as read len==0 because there's no event published to the buffer
if p, ok := isdf.wmPublishers[bufferName]; ok {
idlehandler.PublishIdleWatermark(ctx, isdf.toBuffers[bufferName][index], p, isdf.idleManager, isdf.opts.logger, isdf.opts.vertexType, processorWM)
idlehandler.PublishIdleWatermark(ctx, isdf.toBuffers[bufferName][index], p, isdf.idleManager, isdf.opts.logger, dfv1.VertexTypeMapUDF, processorWM)
}
}
}
Expand All @@ -396,7 +385,7 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) {
err = isdf.ackFromBuffer(ctx, readOffsets)
// implicit return for posterity :-)
if err != nil {
isdf.opts.logger.Errorw("failed to ack from buffer", zap.Error(err))
isdf.opts.logger.Errorw("Failed to ack from buffer", zap.Error(err))
ackMessageError.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Add(float64(len(readOffsets)))
return
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/forward/forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func TestNewInterStepDataForward(t *testing.T) {
}
}()

f, err := NewInterStepDataForward(vertex, fromStep, toSteps, &myForwardToAllTest{}, &myForwardToAllTest{}, &myForwardToAllTest{}, fetchWatermark, publishWatermark, wmb.NewIdleManager(len(toSteps)), WithReadBatchSize(batchSize), WithVertexType(dfv1.VertexTypeMapUDF), WithUDFStreaming(tt.streamEnabled))
f, err := NewInterStepDataForward(vertex, fromStep, toSteps, &myForwardToAllTest{}, &myForwardToAllTest{}, &myForwardToAllTest{}, fetchWatermark, publishWatermark, wmb.NewIdleManager(len(toSteps)), WithReadBatchSize(batchSize), WithUDFStreaming(tt.streamEnabled))

assert.NoError(t, err)
assert.False(t, to11.IsFull())
Expand Down Expand Up @@ -394,7 +394,7 @@ func TestNewInterStepDataForward(t *testing.T) {
}
}()

f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardDropTest{}, myForwardDropTest{}, myForwardDropTest{}, fetchWatermark, publishWatermark, wmb.NewIdleManager(len(toSteps)), WithReadBatchSize(batchSize), WithVertexType(dfv1.VertexTypeMapUDF), WithUDFStreaming(tt.streamEnabled))
f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardDropTest{}, myForwardDropTest{}, myForwardDropTest{}, fetchWatermark, publishWatermark, wmb.NewIdleManager(len(toSteps)), WithReadBatchSize(batchSize), WithUDFStreaming(tt.streamEnabled))

assert.NoError(t, err)
assert.False(t, to11.IsFull())
Expand Down Expand Up @@ -566,7 +566,7 @@ func TestNewInterStepDataForward(t *testing.T) {
}
}()

f, err := NewInterStepDataForward(vertex, fromStep, toSteps, &mySourceForwardTestRoundRobin{}, myForwardTest{}, myForwardTest{}, fetchWatermark, publishWatermark, wmb.NewIdleManager(len(toSteps)), WithReadBatchSize(batchSize), WithVertexType(dfv1.VertexTypeMapUDF), WithUDFStreaming(tt.streamEnabled))
f, err := NewInterStepDataForward(vertex, fromStep, toSteps, &mySourceForwardTestRoundRobin{}, myForwardTest{}, myForwardTest{}, fetchWatermark, publishWatermark, wmb.NewIdleManager(len(toSteps)), WithReadBatchSize(batchSize), WithUDFStreaming(tt.streamEnabled))

assert.NoError(t, err)
assert.False(t, to11.IsFull())
Expand Down Expand Up @@ -900,7 +900,7 @@ func TestNewInterStepDataForwardIdleWatermark(t *testing.T) {
}
}()

f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardTest{}, myForwardTest{}, myForwardTest{}, fetchWatermark, publishWatermark, wmb.NewIdleManager(len(toSteps)), WithReadBatchSize(2), WithVertexType(dfv1.VertexTypeMapUDF))
f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardTest{}, myForwardTest{}, myForwardTest{}, fetchWatermark, publishWatermark, wmb.NewIdleManager(len(toSteps)), WithReadBatchSize(2))
assert.NoError(t, err)
assert.False(t, to1.IsFull())
assert.True(t, to1.IsEmpty())
Expand Down Expand Up @@ -1073,7 +1073,7 @@ func TestNewInterStepDataForwardIdleWatermark_Reset(t *testing.T) {
}
}()

f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardTest{}, myForwardTest{}, myForwardTest{}, fetchWatermark, publishWatermark, wmb.NewIdleManager(len(toSteps)), WithReadBatchSize(2), WithVertexType(dfv1.VertexTypeMapUDF))
f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardTest{}, myForwardTest{}, myForwardTest{}, fetchWatermark, publishWatermark, wmb.NewIdleManager(len(toSteps)), WithReadBatchSize(2))
assert.NoError(t, err)
assert.False(t, to1.IsFull())
assert.True(t, to1.IsEmpty())
Expand Down Expand Up @@ -1295,7 +1295,7 @@ func TestInterStepDataForwardSinglePartition(t *testing.T) {
_, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps)

// create a forwarder
f, err := NewInterStepDataForward(vertex, fromStep, toSteps, mySourceForwardTest{}, mySourceForwardTest{}, mySourceForwardTest{}, fetchWatermark, publishWatermark, wmb.NewIdleManager(len(toSteps)), WithReadBatchSize(5), WithVertexType(dfv1.VertexTypeMapUDF))
f, err := NewInterStepDataForward(vertex, fromStep, toSteps, mySourceForwardTest{}, mySourceForwardTest{}, mySourceForwardTest{}, fetchWatermark, publishWatermark, wmb.NewIdleManager(len(toSteps)), WithReadBatchSize(5))
assert.NoError(t, err)
assert.False(t, to1.IsFull())
assert.True(t, to1.IsEmpty())
Expand Down Expand Up @@ -1341,7 +1341,7 @@ func TestInterStepDataForwardMultiplePartition(t *testing.T) {
_, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps)

// create a forwarder
f, err := NewInterStepDataForward(vertex, fromStep, toSteps, &mySourceForwardTestRoundRobin{}, mySourceForwardTest{}, mySourceForwardTest{}, fetchWatermark, publishWatermark, wmb.NewIdleManager(len(toSteps)), WithReadBatchSize(5), WithVertexType(dfv1.VertexTypeMapUDF))
f, err := NewInterStepDataForward(vertex, fromStep, toSteps, &mySourceForwardTestRoundRobin{}, mySourceForwardTest{}, mySourceForwardTest{}, fetchWatermark, publishWatermark, wmb.NewIdleManager(len(toSteps)), WithReadBatchSize(5))
assert.NoError(t, err)
assert.False(t, to11.IsFull())
assert.False(t, to12.IsFull())
Expand Down
10 changes: 0 additions & 10 deletions pkg/forward/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ type options struct {
udfConcurrency int
// retryInterval is the time.Duration to sleep before retrying
retryInterval time.Duration
// vertexType indicates the type of the vertex
vertexType dfv1.VertexType
// logger is used to pass the logger variable
logger *zap.SugaredLogger
// enableMapUdfStream indicates whether the message streaming is enabled or not for map UDF processing
Expand Down Expand Up @@ -85,14 +83,6 @@ func WithLogger(l *zap.SugaredLogger) Option {
}
}

// WithVertexType sets the type of the vertex
func WithVertexType(t dfv1.VertexType) Option {
return func(o *options) error {
o.vertexType = t
return nil
}
}

// WithUDFStreaming sets streaming for map UDF processing
func WithUDFStreaming(f bool) Option {
return func(o *options) error {
Expand Down
3 changes: 2 additions & 1 deletion pkg/shared/idlehandler/idlehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ import (
)

// PublishIdleWatermark publishes a ctrl message with isb.Kind set to WMB. We only send one ctrl message when
// we see a new WMB; later we only update the WMB without a ctrl message.

func PublishIdleWatermark(ctx context.Context, toBufferPartition isb.BufferWriter, wmPublisher publish.Publisher, idleManager wmb.IdleManagerInterface, logger *zap.SugaredLogger, vertexType dfv1.VertexType, wm wmb.Watermark) {

var toPartitionName = toBufferPartition.GetName()
var toVertexPartition = toBufferPartition.GetPartitionIdx()

Expand Down
19 changes: 8 additions & 11 deletions pkg/sinks/blackhole/blackhole.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,22 @@ package blackhole
import (
"context"

"go.uber.org/zap"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/forward"
"github.com/numaproj/numaflow/pkg/forward/applier"
"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/metrics"
"github.com/numaproj/numaflow/pkg/shared/logging"
sinkforward "github.com/numaproj/numaflow/pkg/sinks/forward"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/publish"
"github.com/numaproj/numaflow/pkg/watermark/wmb"

"go.uber.org/zap"
)

// Blackhole is a sink to emulate /dev/null
type Blackhole struct {
name string
pipelineName string
isdf *forward.InterStepDataForward
isdf *sinkforward.DataForward
logger *zap.SugaredLogger
}

Expand All @@ -53,8 +51,7 @@ func WithLogger(log *zap.SugaredLogger) Option {
func NewBlackhole(vertex *dfv1.Vertex,
fromBuffer isb.BufferReader,
fetchWatermark fetch.Fetcher,
publishWatermark map[string]publish.Publisher,
whereToDecider forward.GoWhere,
publishWatermark publish.Publisher,
opts ...Option) (*Blackhole, error) {

bh := new(Blackhole)
Expand All @@ -71,14 +68,14 @@ func NewBlackhole(vertex *dfv1.Vertex,
bh.logger = logging.NewLogger()
}

forwardOpts := []forward.Option{forward.WithVertexType(dfv1.VertexTypeSink), forward.WithLogger(bh.logger)}
forwardOpts := []sinkforward.Option{sinkforward.WithLogger(bh.logger)}
if x := vertex.Spec.Limits; x != nil {
if x.ReadBatchSize != nil {
forwardOpts = append(forwardOpts, forward.WithReadBatchSize(int64(*x.ReadBatchSize)))
forwardOpts = append(forwardOpts, sinkforward.WithReadBatchSize(int64(*x.ReadBatchSize)))
}
}

isdf, err := forward.NewInterStepDataForward(vertex, fromBuffer, map[string][]isb.BufferWriter{vertex.Spec.Name: {bh}}, whereToDecider, applier.Terminal, applier.TerminalMapStream, fetchWatermark, publishWatermark, wmb.NewIdleManager(1), forwardOpts...)
isdf, err := sinkforward.NewDataForward(vertex, fromBuffer, bh, fetchWatermark, publishWatermark, forwardOpts...)
if err != nil {
return nil, err
}
Expand Down
105 changes: 3 additions & 102 deletions pkg/sinks/blackhole/blackhole_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,36 +21,14 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/forward"
"github.com/numaproj/numaflow/pkg/forward/applier"
"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer"
"github.com/numaproj/numaflow/pkg/isb/testutils"
"github.com/numaproj/numaflow/pkg/watermark/generic"
"github.com/numaproj/numaflow/pkg/watermark/wmb"

"github.com/stretchr/testify/assert"
)

var (
testStartTime = time.Unix(1636470000, 0).UTC()
)

type myForwardToAllTest struct {
}

func (f myForwardToAllTest) WhereTo(_ []string, _ []string) ([]forward.VertexBuffer, error) {
return []forward.VertexBuffer{{
ToVertexName: "to1",
ToVertexPartitionIdx: 0,
},
{
ToVertexName: "to2",
ToVertexPartitionIdx: 0,
}}, nil
}

func TestBlackhole_Start(t *testing.T) {
fromStep := simplebuffer.NewInMemoryBuffer("from", 25, 0)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
Expand All @@ -68,7 +46,7 @@ func TestBlackhole_Start(t *testing.T) {
},
}}
fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferList([]string{vertex.Spec.Name})
s, err := NewBlackhole(vertex, fromStep, fetchWatermark, publishWatermark, getSinkGoWhereDecider(vertex.Spec.Name))
s, err := NewBlackhole(vertex, fromStep, fetchWatermark, publishWatermark[vertex.Spec.Name])
assert.NoError(t, err)

stopped := s.Start()
Expand All @@ -84,80 +62,3 @@ func TestBlackhole_Start(t *testing.T) {

<-stopped
}

// TestBlackhole_ForwardToTwoVertex writes to 2 vertices and have a blackhole sinks attached to each vertex.
func TestBlackhole_ForwardToTwoVertex(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

fromStep := simplebuffer.NewInMemoryBuffer("from", 25, 0)
to1 := simplebuffer.NewInMemoryBuffer("to1", 25, 0)
to2 := simplebuffer.NewInMemoryBuffer("to2", 25, 0)

// start the last vertex first
// add 2 sinks per vertex
vertex1 := &dfv1.Vertex{Spec: dfv1.VertexSpec{
AbstractVertex: dfv1.AbstractVertex{
Name: "sinks.blackhole1",
Sink: &dfv1.Sink{
Blackhole: &dfv1.Blackhole{},
},
},
}}

vertex2 := &dfv1.Vertex{Spec: dfv1.VertexSpec{
AbstractVertex: dfv1.AbstractVertex{
Name: "sinks.blackhole2",
Sink: &dfv1.Sink{
Blackhole: &dfv1.Blackhole{},
},
},
}}
fetchWatermark1, publishWatermark1 := generic.BuildNoOpWatermarkProgressorsFromBufferList([]string{vertex1.Spec.Name})
bh1, _ := NewBlackhole(vertex1, to1, fetchWatermark1, publishWatermark1, getSinkGoWhereDecider(vertex1.Spec.Name))
fetchWatermark2, publishWatermark2 := generic.BuildNoOpWatermarkProgressorsFromBufferList([]string{vertex2.Spec.Name})
bh2, _ := NewBlackhole(vertex2, to2, fetchWatermark2, publishWatermark2, getSinkGoWhereDecider(vertex2.Spec.Name))
bh1Stopped := bh1.Start()
bh2Stopped := bh2.Start()

toSteps := map[string][]isb.BufferWriter{
"to1": {to1},
"to2": {to2},
}

writeMessages := testutils.BuildTestWriteMessages(int64(20), testStartTime)
vertex := &dfv1.Vertex{Spec: dfv1.VertexSpec{
PipelineName: "testPipeline",
AbstractVertex: dfv1.AbstractVertex{
Name: "testVertex",
},
}}
fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps)
f, err := forward.NewInterStepDataForward(vertex, fromStep, toSteps, myForwardToAllTest{}, applier.Terminal, applier.TerminalMapStream, fetchWatermark, publishWatermark, wmb.NewIdleManager(2))
assert.NoError(t, err)

stopped := f.Start()
// write some data
_, errs := fromStep.Write(ctx, writeMessages[0:5])
assert.Equal(t, make([]error, 5), errs)
f.Stop()
<-stopped
// downstream should be stopped only after upstream is stopped
bh1.Stop()
bh2.Stop()

<-bh1Stopped
<-bh2Stopped
}

func getSinkGoWhereDecider(vertexName string) forward.GoWhere {
fsd := forward.GoWhere(func(keys []string, tags []string) ([]forward.VertexBuffer, error) {
var result []forward.VertexBuffer
result = append(result, forward.VertexBuffer{
ToVertexName: vertexName,
ToVertexPartitionIdx: 0,
})
return result, nil
})
return fsd
}
Loading

0 comments on commit 2e637f8

Please sign in to comment.