Skip to content

Commit

Permalink
fix test case
Browse files Browse the repository at this point in the history
  • Loading branch information
huyuanfeng committed Sep 10, 2024
1 parent 52e0e8a commit 214eb82
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public enum ScalingMetric {
PARALLELISM,
RECOMMENDED_PARALLELISM,
MAX_PARALLELISM,
NUM_PARTITIONS,
SCALE_UP_RATE_THRESHOLD,
SCALE_DOWN_RATE_THRESHOLD,
EXPECTED_PROCESSING_RATE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,21 +233,16 @@ private void setDefaultMetrics(
}

@Test
public void testKafkaPulsarPartitionMaxParallelism() throws Exception {
public void testKafkaPulsarNumPartitions() throws Exception {
setDefaultMetrics(metricsCollector);
metricsCollector.updateMetrics(context, stateStore);

var clock = Clock.fixed(Instant.now().plus(Duration.ofSeconds(3)), ZoneId.systemDefault());
metricsCollector.setClock(clock);

var collectedMetrics = metricsCollector.updateMetrics(context, stateStore);

assertEquals(720, collectedMetrics.getJobTopology().get(source1).getMaxParallelism());
assertEquals(720, collectedMetrics.getJobTopology().get(source2).getMaxParallelism());

clock = Clock.fixed(Instant.now().plus(Duration.ofSeconds(3)), ZoneId.systemDefault());
metricsCollector.setClock(clock);

metricsCollector.setMetricNames(
Map.of(
source1,
Expand All @@ -260,8 +255,7 @@ public void testKafkaPulsarPartitionMaxParallelism() throws Exception {
"1.Source__Kafka_Source_(testTopic).KafkaSourceReader.topic.testTopic.partition.3.currentOffset")));

collectedMetrics = metricsCollector.updateMetrics(context, stateStore);
assertEquals(5, collectedMetrics.getJobTopology().get(source1).getMaxParallelism());
assertEquals(720, collectedMetrics.getJobTopology().get(source2).getMaxParallelism());
assertEquals(5, collectedMetrics.getJobTopology().get(source1).getNumPartitions());

metricsCollector.setMetricNames(
Map.of(
Expand All @@ -280,7 +274,7 @@ public void testKafkaPulsarPartitionMaxParallelism() throws Exception {
"0.Source__pulsar_source[1].PulsarConsumer"
+ ".persistent_//public/default/testTopic-partition-4.m962n.numMsgsReceived")));
collectedMetrics = metricsCollector.updateMetrics(context, stateStore);
assertEquals(5, collectedMetrics.getJobTopology().get(source2).getMaxParallelism());
assertEquals(5, collectedMetrics.getJobTopology().get(source2).getNumPartitions());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,7 @@ private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(
var metrics = new HashMap<ScalingMetric, EvaluatedScalingMetric>();
metrics.put(ScalingMetric.PARALLELISM, EvaluatedScalingMetric.of(parallelism));
metrics.put(ScalingMetric.MAX_PARALLELISM, EvaluatedScalingMetric.of(MAX_PARALLELISM));
metrics.put(ScalingMetric.NUM_PARTITIONS, EvaluatedScalingMetric.of(0));
metrics.put(ScalingMetric.TARGET_DATA_RATE, new EvaluatedScalingMetric(target, target));
metrics.put(ScalingMetric.CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(catchupRate));
metrics.put(
Expand Down

0 comments on commit 214eb82

Please sign in to comment.