Skip to content

Commit

Permalink
increase generator channel length
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Jul 10, 2023
1 parent f5276db commit dc80b98
Showing 1 changed file with 2 additions and 2 deletions.
4 changes: 2 additions & 2 deletions pkg/sources/generator/tickgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func NewMemGen(vertexInstance *dfv1.VertexInstance,
pipelineName: vertexInstance.Vertex.Spec.PipelineName,
genfn: recordGenerator,
vertexInstance: vertexInstance,
srcchan: make(chan record, rpu*5),
srcchan: make(chan record, rpu*int(keyCount)*5),
readTimeout: 3 * time.Second, // default timeout
}

Expand Down Expand Up @@ -334,7 +334,7 @@ func (mg *memgen) NewWorker(ctx context.Context, rate int) func(chan time.Time,
t := ts.UnixNano()
for i := 0; i < rate; i++ {
for k := int32(0); k < mg.keyCount; k++ {
key := fmt.Sprintf("key-%d", k)
key := fmt.Sprintf("key-%d-%d", mg.vertexInstance.Replica, k)
payload := mg.genfn(mg.msgSize, mg.value, t)
r := record{data: payload, offset: time.Now().UTC().UnixNano(), key: key}
select {
Expand Down

0 comments on commit dc80b98

Please sign in to comment.