Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Jul 5, 2023
1 parent aa1be27 commit 49a6156
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 10 deletions.
7 changes: 2 additions & 5 deletions pkg/reconciler/vertex/scaling/scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,21 +333,18 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
}

func (s *Scaler) desiredReplicas(ctx context.Context, vertex *dfv1.Vertex, partitionProcessingRate []float64, partitionPending []int64, partitionBufferLengths []int64, partitionAvailableBufferLengths []int64) int32 {
maxDesired := int32(0)
maxDesired := int32(1)
// We calculate the max desired replicas based on the pending messages and processing rate for each partition.
for i := 0; i < len(partitionPending); i++ {
rate := partitionProcessingRate[i]
pending := partitionPending[i]
var desired int32
if pending == 0 || rate == 0 {
// Pending is 0 and rate is not 0, or rate is 0 and pending is not 0, we don't do anything.
// Technically this would not happen because the pending includes ackpending, which means rate and pending are either both 0, or both > 0.
// But we still keep this check here for safety.
if maxDesired < int32(vertex.Status.Replicas) {
maxDesired = int32(vertex.Status.Replicas)
}
continue
}
var desired int32
if vertex.IsASource() {
// For sources, we calculate the time of finishing processing the pending messages,
// and then we know how many replicas are needed to get them done in target seconds.
Expand Down
32 changes: 27 additions & 5 deletions pkg/reconciler/vertex/scaling/scaling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func Test_BasicOperations(t *testing.T) {
assert.False(t, s.Contains("key1"))
}

func Test_desiredReplicas(t *testing.T) {
func Test_desiredReplicasSinglePartition(t *testing.T) {
cl := fake.NewClientBuilder().Build()
s := NewScaler(cl)
one := uint32(1)
Expand All @@ -58,14 +58,14 @@ func Test_desiredReplicas(t *testing.T) {
Replicas: uint32(2),
},
}
assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), src, []float64{0}, []int64{0}, []int64{10000}, []int64{5000}))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{0}, []int64{0}, []int64{10000}, []int64{5000}))
assert.Equal(t, int32(8), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{10010}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(8), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{9950}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(7), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{8751}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(7), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{8749}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), src, []float64{0}, []int64{9950}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{0}, []int64{9950}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{2}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{0}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{0}, []int64{30000}, []int64{20000}))

udf := &dfv1.Vertex{
Spec: dfv1.VertexSpec{
Expand All @@ -78,7 +78,7 @@ func Test_desiredReplicas(t *testing.T) {
Replicas: uint32(2),
},
}
assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{0}, []int64{0}, []int64{10000}, []int64{5000}))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{0}, []int64{0}, []int64{10000}, []int64{5000}))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{5000}))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{6000}))
assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{7500}))
Expand All @@ -87,3 +87,25 @@ func Test_desiredReplicas(t *testing.T) {
assert.Equal(t, int32(3), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{12500}))
assert.Equal(t, int32(3), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{12550}))
}

func Test_desiredReplicasMultiplePartitions(t *testing.T) {
cl := fake.NewClientBuilder().Build()
s := NewScaler(cl)
udf := &dfv1.Vertex{
Spec: dfv1.VertexSpec{
Replicas: pointer.Int32(2),
AbstractVertex: dfv1.AbstractVertex{
UDF: &dfv1.UDF{},
},
},
Status: dfv1.VertexStatus{
Replicas: uint32(2),
},
}

assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{0, 0, 1}, []int64{0, 0, 1}, []int64{24000, 24000, 24000}, []int64{15000, 15000, 15000}))
assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{5000, 3000, 5000}, []int64{0, 10000, 1}, []int64{24000, 24000, 24000}, []int64{15000, 15000, 15000}))
assert.Equal(t, int32(30), s.desiredReplicas(context.TODO(), udf, []float64{5000, 3000, 5000}, []int64{0, 23000, 1}, []int64{24000, 24000, 24000}, []int64{15000, 15000, 15000}))
assert.Equal(t, int32(4), s.desiredReplicas(context.TODO(), udf, []float64{5000, 3000, 5000}, []int64{0, 30000, 1}, []int64{24000, 24000, 24000}, []int64{15000, 15000, 15000}))
assert.Equal(t, int32(4), s.desiredReplicas(context.TODO(), udf, []float64{1000, 3000, 1000}, []int64{0, 27000, 3000}, []int64{24000, 24000, 24000}, []int64{15000, 15000, 15000}))
}

0 comments on commit 49a6156

Please sign in to comment.