Skip to content

Commit

Permalink
Minor tests stability improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
szczygiel-m committed Jan 25, 2024
1 parent 11808ea commit de07734
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

import java.nio.charset.StandardCharsets;
import java.time.Clock;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;

Expand All @@ -47,9 +47,9 @@
public class ConsumerMessageSenderTest {

public static final int ASYNC_TIMEOUT_MS = 2000;
private Subscription subscription = subscriptionWithTtl(10);
private final Subscription subscription = subscriptionWithTtl(10);

private Subscription subscriptionWith4xxRetry = subscriptionWithTtlAndClientErrorRetry(10);
private final Subscription subscriptionWith4xxRetry = subscriptionWithTtlAndClientErrorRetry(10);

@Mock
private MessageSender messageSender;
Expand Down Expand Up @@ -122,7 +122,7 @@ public void shouldHandleSuccessfulSending() {
}

@Test
public void shouldKeepTryingToSendMessageFailedSending() throws InterruptedException {
public void shouldKeepTryingToSendMessageFailedSending() {
// given
Message message = message();
doReturn(failure()).doReturn(failure()).doReturn(success()).when(messageSender).send(message);
Expand Down Expand Up @@ -154,7 +154,7 @@ public void shouldDiscardMessageWhenTTLIsExceeded() {
}

@Test
public void shouldNotKeepTryingToSendMessageFailedWithStatusCode4xx() throws InterruptedException {
public void shouldNotKeepTryingToSendMessageFailedWithStatusCode4xx() {
// given
Message message = message();
doReturn(failure(403)).doReturn(success()).when(messageSender).send(message);
Expand All @@ -170,7 +170,7 @@ public void shouldNotKeepTryingToSendMessageFailedWithStatusCode4xx() throws Int
}

@Test
public void shouldKeepTryingToSendMessageFailedWithStatusCode4xxForSubscriptionWith4xxRetry() throws InterruptedException {
public void shouldKeepTryingToSendMessageFailedWithStatusCode4xxForSubscriptionWith4xxRetry() {
// given
ConsumerMessageSender sender = consumerMessageSender(subscriptionWith4xxRetry);
Message message = message();
Expand Down Expand Up @@ -227,11 +227,11 @@ public void shouldBackoffRetriesWhenEndpointFails() throws InterruptedException
}

@Test
public void shouldNotRetryOnRetryAfterAboveTtl() throws InterruptedException {
public void shouldNotRetryOnRetryAfterAboveTtl() {
// given
int retrySeconds = subscription.getSerialSubscriptionPolicy().getMessageTtl();
Message message = message();
doReturn(backoff(retrySeconds)).when(messageSender).send(message);
doReturn(backoff(retrySeconds + 1)).when(messageSender).send(message);

// when
sender.sendAsync(message);
Expand Down Expand Up @@ -390,8 +390,8 @@ private ConsumerMessageSender consumerMessageSender(Subscription subscription) {
ConsumerMessageSender sender = new ConsumerMessageSender(
subscription,
messageSenderFactory,
Arrays.asList(successHandler),
Arrays.asList(errorHandler),
List.of(successHandler),
List.of(errorHandler),
rateLimiter,
Executors.newSingleThreadExecutor(),
() -> inflightSemaphore.release(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,6 @@ private void removeGroups() {
logger.warn("Error during removing group: {}", group, e);
}
}

waitAtMost(adjust(Duration.ONE_MINUTE)).until(() ->
Assertions.assertThat(service.listGroups().size()).isEqualTo(0)
);
}

public void clearManagementData() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.springframework.test.web.reactive.server.WebTestClient;
import pl.allegro.tech.hermes.api.ConsumerGroup;
import pl.allegro.tech.hermes.api.ContentType;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionMetrics;
Expand All @@ -25,6 +26,7 @@
import java.time.Duration;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static com.jayway.awaitility.Awaitility.waitAtMost;
import static com.jayway.awaitility.Duration.TEN_SECONDS;
Expand Down Expand Up @@ -222,6 +224,13 @@ public void shouldNotConsumeMessagesWhenSubscriptionIsSuspended() {
hermes.api().waitUntilSubscriptionActivated(topic.getQualifiedName(), subscriptionName);
hermes.api().suspendSubscription(topic, subscriptionName);
hermes.api().waitUntilSubscriptionSuspended(topic.getQualifiedName(), subscriptionName);
// wait until consumer group removed
waitAtMost(30, TimeUnit.SECONDS).until(() ->
hermes.api().getConsumerGroupsDescription(topic.getQualifiedName(), subscriptionName).expectBodyList(ConsumerGroup.class).returnResult().getResponseBody()
.get(0)
.getState()
.equals("Empty")
);
hermes.api().publishUntilSuccess(topic.getQualifiedName(), TestMessage.of("hello", "world").body());

// then
Expand Down

0 comments on commit de07734

Please sign in to comment.