Skip to content

Commit

Permalink
fix UT
Browse files Browse the repository at this point in the history
  • Loading branch information
huyuanfeng committed Sep 13, 2024
1 parent 3d89669 commit 17145d4
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ public class JobVertexScalerTest {

private static List<Collection<ShipStrategy>> adjustmentInputsProvider() {
return List.of(
List.of(),
List.of(ShipStrategy.HASH),
List.of(ShipStrategy.REBALANCE, ShipStrategy.HASH, ShipStrategy.RESCALE));
}
Expand Down Expand Up @@ -936,9 +935,8 @@ public void testSendingIneffectiveScalingEvents(Collection<ShipStrategy> inputSh
assertThat(event.getReason()).isEqualTo(INEFFECTIVE_SCALING);
}

@ParameterizedTest
@MethodSource("sourceInputsProvider")
public void testNumPartitionsAdjustment(Collection<ShipStrategy> inputShipStrategies) {
@Test
public void testNumPartitionsAdjustment() {
final int minParallelism = 1;
final int maxParallelism = Integer.MAX_VALUE;
final var vertex = new JobVertexID();
Expand All @@ -948,7 +946,7 @@ public void testNumPartitionsAdjustment(Collection<ShipStrategy> inputShipStrate
JobVertexScaler.scale(
vertex,
6,
inputShipStrategies,
List.of(),
15,
128,
0.4,
Expand All @@ -961,117 +959,59 @@ public void testNumPartitionsAdjustment(Collection<ShipStrategy> inputShipStrate
JobVertexScaler.scale(
vertex,
7,
inputShipStrategies,
List.of(),
15,
128,
0.8,
minParallelism,
maxParallelism,
eventCollector,
context));
assertEquals(
11,
JobVertexScaler.scale(
vertex,
7,
inputShipStrategies,
21,
20,
2.1,
minParallelism,
maxParallelism,
eventCollector,
context));
assertEquals(
18,
JobVertexScaler.scale(
vertex,
24,
inputShipStrategies,
20,
List.of(),
35,
24,
0.8,
18,
30,
0.9,
minParallelism,
maxParallelism,
eventCollector,
context));

assertEquals(
18,
20,
JobVertexScaler.scale(
vertex,
24,
inputShipStrategies,
22,
List.of(),
35,
24,
0.8,
17,
30,
1.1,
20,
maxParallelism,
eventCollector,
context));

// numSourcePartition > upperBound
assertEquals(
100,
JobVertexScaler.scale(
vertex,
80,
inputShipStrategies,
List.of(),
200,
128,
1.4,
minParallelism,
maxParallelism,
eventCollector,
context));

assertEquals(
20,
JobVertexScaler.scale(
vertex,
15,
inputShipStrategies,
200,
128,
1.2,
minParallelism,
maxParallelism,
eventCollector,
context));

// minParallelism limited
assertEquals(
5,
JobVertexScaler.scale(
vertex,
6,
inputShipStrategies,
15,
128,
0.5,
4,
maxParallelism,
eventCollector,
context));

assertEquals(
16,
JobVertexScaler.scale(
vertex,
6,
inputShipStrategies,
15,
128,
0.5,
16,
maxParallelism,
eventCollector,
context));
}

@ParameterizedTest
@MethodSource("sourceInputsProvider")
public void testSendingScalingLimitedEvents(Collection<ShipStrategy> inputShipStrategies) {
@Test
public void testSendingScalingLimitedEvents() {
var jobVertexID = new JobVertexID();
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.0);
conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
Expand All @@ -1086,7 +1026,7 @@ public void testSendingScalingLimitedEvents(Collection<ShipStrategy> inputShipSt
vertexScaler.computeScaleTargetParallelism(
context,
jobVertexID,
inputShipStrategies,
List.of(),
evaluated,
history,
restartTime,
Expand All @@ -1099,14 +1039,7 @@ public void testSendingScalingLimitedEvents(Collection<ShipStrategy> inputShipSt
assertThat(partitionLimitedEvent.getMessage())
.isEqualTo(
String.format(
SCALE_LIMITED_MESSAGE_FORMAT,
jobVertexID,
20,
15,
String.format(
"numPartitions : %s,upperBoundForAlignment(maxParallelism or parallelismUpperLimit): %s, "
+ "parallelismLowerLimit: %s.",
15, 200, 1)));
SCALE_LIMITED_MESSAGE_FORMAT, jobVertexID, 20, 15, 15, 200, 1));
}

private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,17 +475,17 @@ public void testMemoryTuning(boolean memoryTuningEnabled) throws Exception {
TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(),
"0.652",
TaskManagerOptions.NETWORK_MEMORY_MIN.key(),
"24320 kb",
"23040 kb",
TaskManagerOptions.NETWORK_MEMORY_MAX.key(),
"24320 kb",
"23040 kb",
TaskManagerOptions.JVM_METASPACE.key(),
"360 mb",
TaskManagerOptions.JVM_OVERHEAD_FRACTION.key(),
"0.053",
TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key(),
"0 bytes",
TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(),
"20400832696 bytes");
"20399521976 bytes");
} else {
assertEquals(context.getConfiguration(), capturedConfForMaxResources);
expected = Map.of();
Expand Down Expand Up @@ -752,7 +752,7 @@ public void testAdjustByMaxParallelism() throws Exception {
.containsAllEntriesOf(
Map.of(
"0bfd135746ac8efb3cce668b12e16d3a",
"8",
"7",
"869fb403873411306404e9f2e4438c0e",
"7",
"a6b7102b8d3e3a9564998c1ffeb5e2b7",
Expand Down

0 comments on commit 17145d4

Please sign in to comment.