diff --git a/integration-tests/src/common/java/pl/allegro/tech/hermes/consumers/EndpointAddressResolverConfiguration.java b/integration-tests/src/common/java/pl/allegro/tech/hermes/consumers/EndpointAddressResolverConfiguration.java new file mode 100644 index 0000000000..676759d1bd --- /dev/null +++ b/integration-tests/src/common/java/pl/allegro/tech/hermes/consumers/EndpointAddressResolverConfiguration.java @@ -0,0 +1,19 @@ +package pl.allegro.tech.hermes.consumers; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; +import org.springframework.context.annotation.Profile; +import pl.allegro.tech.hermes.consumers.consumer.sender.resolver.EndpointAddressResolver; +import pl.allegro.tech.hermes.test.helper.endpoint.MultiUrlEndpointAddressResolver; + +@Configuration +public class EndpointAddressResolverConfiguration { + + @Bean + @Primary + @Profile("integration") + public EndpointAddressResolver testMultiUrlEndpointAddressResolver() { + return new MultiUrlEndpointAddressResolver(); + } +} diff --git a/integration-tests/src/common/java/pl/allegro/tech/hermes/integrationtests/subscriber/TestSubscriber.java b/integration-tests/src/common/java/pl/allegro/tech/hermes/integrationtests/subscriber/TestSubscriber.java index de25de839f..deccfb4969 100644 --- a/integration-tests/src/common/java/pl/allegro/tech/hermes/integrationtests/subscriber/TestSubscriber.java +++ b/integration-tests/src/common/java/pl/allegro/tech/hermes/integrationtests/subscriber/TestSubscriber.java @@ -55,6 +55,10 @@ public void waitUntilAnyMessageReceived() { await().atMost(adjust(new Duration(DEFAULT_WAIT_TIME_IN_SEC, SECONDS))).until(() -> assertThat(receivedRequests.size()).isPositive()); } + public void waitUntilReceived(Duration duration, int numberOfExpectedMessages) { + await().atMost(adjust(duration)).until(() -> + assertThat(receivedRequests.size()).isEqualTo(numberOfExpectedMessages)); + } public void waitUntilRequestReceived(Consumer requestConsumer) { waitUntilAnyMessageReceived(); @@ -64,6 +68,16 @@ public void waitUntilRequestReceived(Consumer requestConsumer) { } } + public void waitUntilRequestsReceived(Consumer> requestsConsumer) { + await().atMost(adjust(new Duration(DEFAULT_WAIT_TIME_IN_SEC, SECONDS))).until( + () -> { + synchronized (receivedRequests) { + requestsConsumer.accept(receivedRequests); + } + } + ); + } + public void noMessagesReceived() { assertThat(receivedRequests).isEmpty(); } @@ -97,7 +111,7 @@ private LoggedRequest getFirstReceivedRequest() { } } - private LoggedRequest getLastReceivedRequest() { + public LoggedRequest getLastReceivedRequest() { synchronized (receivedRequests) { return Streams.findLast(receivedRequests.stream()).orElseThrow(NoSuchElementException::new); } diff --git a/integration-tests/src/common/java/pl/allegro/tech/hermes/integrationtests/subscriber/TestSubscribersExtension.java b/integration-tests/src/common/java/pl/allegro/tech/hermes/integrationtests/subscriber/TestSubscribersExtension.java index 6aeb56738a..0ec1b75196 100644 --- a/integration-tests/src/common/java/pl/allegro/tech/hermes/integrationtests/subscriber/TestSubscribersExtension.java +++ b/integration-tests/src/common/java/pl/allegro/tech/hermes/integrationtests/subscriber/TestSubscribersExtension.java @@ -39,16 +39,25 @@ public TestSubscribersExtension() { } public TestSubscriber createSubscriber(String endpointPathSuffix) { + return createSubscriber(OK.getStatusCode(), endpointPathSuffix); + } + + public TestSubscriber createSubscriber() { + return createSubscriber(""); + } + + public TestSubscriber createSubscriber(int statusCode) { + return createSubscriber(statusCode, ""); + } + + public TestSubscriber createSubscriber(int statusCode, String endpointPathSuffix) { String path = createPath(endpointPathSuffix); - service.addStubMapping(post(urlPathEqualTo(path)).willReturn(aResponse().withStatus(OK.getStatusCode())).build()); + service.addStubMapping(post(urlPathEqualTo(path)).willReturn(aResponse().withStatus(statusCode)).build()); TestSubscriber subscriber = new TestSubscriber(createSubscriberURI(path)); subscribersPerPath.put(path, subscriber); return subscriber; } - public TestSubscriber createSubscriber() { - return createSubscriber(""); - } public TestSubscriber createSubscriberWithRetry(String message, int delay) { String path = createPath(""); @@ -84,7 +93,18 @@ public TestSubscriber createSubscriberWithRetry(String message, int delay) { private String createPath(String pathSuffix) { return "/subscriber-" + subscriberIndex.incrementAndGet() + pathSuffix; + } + + public interface SubscriberBehaviorProvider { + void provide(WireMockServer subscriber, String endpoint); + } + public TestSubscriber createSubscriber(SubscriberBehaviorProvider scenario) { + String path = createPath(""); + scenario.provide(service, path); + TestSubscriber subscriber = new TestSubscriber(createSubscriberURI(path)); + subscribersPerPath.put(path, subscriber); + return subscriber; } private URI createSubscriberURI(String path) { diff --git a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/BatchDeliveryTest.java b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/BatchDeliveryTest.java new file mode 100644 index 0000000000..ee2dfaa935 --- /dev/null +++ b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/BatchDeliveryTest.java @@ -0,0 +1,335 @@ +package pl.allegro.tech.hermes.integrationtests; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.jayway.awaitility.Duration; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import pl.allegro.tech.hermes.api.BatchSubscriptionPolicy; +import pl.allegro.tech.hermes.api.ContentType; +import pl.allegro.tech.hermes.api.Subscription; +import pl.allegro.tech.hermes.api.Topic; +import pl.allegro.tech.hermes.integrationtests.setup.HermesExtension; +import pl.allegro.tech.hermes.integrationtests.subscriber.TestSubscriber; +import pl.allegro.tech.hermes.integrationtests.subscriber.TestSubscribersExtension; +import pl.allegro.tech.hermes.test.helper.avro.AvroUser; +import pl.allegro.tech.hermes.test.helper.message.TestMessage; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.post; +import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; +import static com.github.tomakehurst.wiremock.stubbing.Scenario.STARTED; +import static java.util.Arrays.stream; +import static pl.allegro.tech.hermes.api.BatchSubscriptionPolicy.Builder.batchSubscriptionPolicy; +import static pl.allegro.tech.hermes.api.TopicWithSchema.topicWithSchema; +import static pl.allegro.tech.hermes.test.helper.builder.SubscriptionBuilder.subscription; +import static pl.allegro.tech.hermes.test.helper.builder.TopicBuilder.topicWithRandomName; + +public class BatchDeliveryTest { + + private ObjectMapper mapper = new ObjectMapper(); + + @RegisterExtension + public static final HermesExtension hermes = new HermesExtension(); + + @RegisterExtension + public static final TestSubscribersExtension subscribers = new TestSubscribersExtension(); + + private static final TestMessage[] SMALL_BATCH = TestMessage.simpleMessages(2); + + private static final TestMessage SINGLE_MESSAGE = TestMessage.simple(); + + @Test + public void shouldDeliverMessagesInBatch() { + // given + Topic topic = hermes.initHelper().createTopic(topicWithRandomName().build()); + TestSubscriber subscriber = subscribers.createSubscriber(); + + Subscription subscription = subscription(topic.getQualifiedName(), + "subscription", subscriber.getEndpoint()) + .withSubscriptionPolicy(buildBatchPolicy() + .withBatchSize(2) + .withBatchTime(Integer.MAX_VALUE) + .withBatchVolume(1024) + .build() + ).build(); + + hermes.initHelper().createSubscription(subscription); + + // when + publishAll(topic.getQualifiedName(), SMALL_BATCH); + + // then + expectSingleBatch(subscriber, SMALL_BATCH); + } + + @Test + public void shouldDeliverBatchInGivenTimePeriod() { + // given + Topic topic = hermes.initHelper().createTopic(topicWithRandomName().build()); + TestSubscriber subscriber = subscribers.createSubscriber(); + + Subscription subscription = subscription(topic.getQualifiedName(), + "subscription", subscriber.getEndpoint()) + .withSubscriptionPolicy(buildBatchPolicy() + .withBatchSize(100) + .withBatchTime(1) + .withBatchVolume(1024) + .build() + ).build(); + + hermes.initHelper().createSubscription(subscription); + + // when + hermes.api().publishUntilSuccess(topic.getQualifiedName(), SINGLE_MESSAGE.body()); + + // then + expectSingleBatch(subscriber, SINGLE_MESSAGE); + } + + @Test + public void shouldDeliverBatchInGivenVolume() { + // given + Topic topic = hermes.initHelper().createTopic(topicWithRandomName().build()); + TestSubscriber subscriber = subscribers.createSubscriber(); + + int batchVolumeThatFitsOneMessageOnly = 150; + + Subscription subscription = subscription(topic.getQualifiedName(), + "subscription", subscriber.getEndpoint()) + .withSubscriptionPolicy(buildBatchPolicy() + .withBatchSize(100) + .withBatchTime(Integer.MAX_VALUE) + .withBatchVolume(batchVolumeThatFitsOneMessageOnly) + .build() + ).build(); + + hermes.initHelper().createSubscription(subscription); + + // when publishing more than buffer capacity + hermes.api().publishUntilSuccess(topic.getQualifiedName(), SINGLE_MESSAGE.body()); + hermes.api().publishUntilSuccess(topic.getQualifiedName(), SINGLE_MESSAGE.body()); + + // then we expect to receive batch that has desired batch volume (one message only) + expectSingleBatch(subscriber, SINGLE_MESSAGE); + } + + @Test + public void shouldDeliverAvroMessagesAsJsonBatch() { + // given + AvroUser user = new AvroUser("Bob", 50, "blue"); + + Topic topic = hermes.initHelper().createTopicWithSchema( + topicWithSchema(topicWithRandomName().build(), user.getSchemaAsString()) + ); + + TestSubscriber subscriber = subscribers.createSubscriber(); + + Subscription subscription = subscription(topic.getQualifiedName(), + "subscription", subscriber.getEndpoint()) + .withSubscriptionPolicy(buildBatchPolicy() + .withBatchSize(2) + .withBatchTime(Integer.MAX_VALUE) + .withBatchVolume(1024) + .build() + ).build(); + + hermes.initHelper().createSubscription(subscription); + + TestMessage[] avroBatch = {user.asTestMessage(), user.asTestMessage()}; + + // when + publishAll(topic.getQualifiedName(), avroBatch); + + // then + expectSingleBatch(subscriber, avroBatch); + } + + + @Test + public void shouldPassSubscriptionHeaders() { + // given + Topic topic = hermes.initHelper().createTopic(topicWithRandomName().build()); + TestSubscriber subscriber = subscribers.createSubscriber(); + + BatchSubscriptionPolicy policy = buildBatchPolicy() + .withBatchSize(100) + .withBatchTime(1) + .withBatchVolume(1024) + .build(); + Subscription subscription = subscription(topic, "batchSubscription") + .withEndpoint(subscriber.getEndpoint()) + .withContentType(ContentType.JSON) + .withSubscriptionPolicy(policy) + .withHeader("MY-HEADER", "myHeaderValue") + .withHeader("MY-OTHER-HEADER", "myOtherHeaderValue") + .build(); + + hermes.initHelper().createSubscription(subscription); + + // when + hermes.api().publishUntilSuccess(topic.getQualifiedName(), SINGLE_MESSAGE.body()); + + // then + subscriber.waitUntilRequestReceived(request -> { + Assertions.assertThat(request.getHeader("MY-HEADER")).isEqualTo("myHeaderValue"); + Assertions.assertThat(request.getHeader("MY-OTHER-HEADER")).isEqualTo("myOtherHeaderValue"); + }); + } + + @Test + public void shouldAttachSubscriptionIdentityHeadersWhenItIsEnabled() { + // given + Topic topic = hermes.initHelper().createTopic(topicWithRandomName().build()); + + TestSubscriber subscriber = subscribers.createSubscriber(); + + BatchSubscriptionPolicy policy = buildBatchPolicy() + .withBatchSize(100) + .withBatchTime(1) + .withBatchVolume(1024) + .build(); + Subscription subscription = subscription(topic, "batchSubscription") + .withEndpoint(subscriber.getEndpoint()) + .withContentType(ContentType.JSON) + .withSubscriptionPolicy(policy) + .withAttachingIdentityHeadersEnabled(true) + .build(); + + hermes.initHelper().createSubscription(subscription); + + // when + hermes.api().publishUntilSuccess(topic.getQualifiedName(), SINGLE_MESSAGE.body()); + + // then + subscriber.waitUntilRequestReceived(request -> { + Assertions.assertThat(request.getHeader("Hermes-Topic-Name")).isEqualTo(topic.getQualifiedName()); + Assertions.assertThat(request.getHeader("Hermes-Subscription-Name")).isEqualTo("batchSubscription"); + }); + } + + @Test + public void shouldNotAttachSubscriptionIdentityHeadersWhenItIsDisabled() { + // given + Topic topic = hermes.initHelper().createTopic(topicWithRandomName().build()); + + TestSubscriber subscriber = subscribers.createSubscriber(); + + BatchSubscriptionPolicy policy = buildBatchPolicy() + .withBatchSize(100) + .withBatchTime(1) + .withBatchVolume(1024) + .build(); + + Subscription subscription = subscription(topic, "batchSubscription") + .withEndpoint(subscriber.getEndpoint()) + .withContentType(ContentType.JSON) + .withSubscriptionPolicy(policy) + .withAttachingIdentityHeadersEnabled(false) + .build(); + + hermes.initHelper().createSubscription(subscription); + + // when + hermes.api().publishUntilSuccess(topic.getQualifiedName(), SINGLE_MESSAGE.body()); + + // then + subscriber.waitUntilRequestReceived(request -> { + Assertions.assertThat(request.getHeader("Hermes-Topic-Name")).isNull(); + Assertions.assertThat(request.getHeader("Hermes-Subscription-Name")).isNull(); + }); + } + + @Test + public void shouldTimeoutRequestToSlowlyRespondingClient() { + //given + Topic topic = hermes.initHelper().createTopic(topicWithRandomName().build()); + + // response chunk every 500ms, total 5s + int chunks = 10; + int totalResponseDuration = 5000; + + TestSubscriber subscriber = subscribers.createSubscriber((service, endpoint) -> { + service.addStubMapping( + post(urlEqualTo(endpoint)) + .inScenario("slowAndFast") + .whenScenarioStateIs(STARTED) + .willSetStateTo("slow") + .willReturn( + aResponse() + .withStatus(200) + .withBody("I am very slow!") + .withChunkedDribbleDelay(chunks, totalResponseDuration) + ).build() + ); + + service.addStubMapping( + post(urlEqualTo(endpoint)) + .inScenario("slowAndFast") + .whenScenarioStateIs("slow") + .willReturn( + aResponse() + .withStatus(200) + .withFixedDelay(0) + ).build() + ); + }); + + Subscription subscription = subscription(topic.getQualifiedName(), + "subscription", subscriber.getEndpoint()) + .withSubscriptionPolicy(buildBatchPolicy() + .withBatchSize(1) + .withBatchTime(1) + .withBatchVolume(1024) + .withRequestTimeout(1000) + .build() + ).build(); + + hermes.initHelper().createSubscription(subscription); + + // when + hermes.api().publishUntilSuccess(topic.getQualifiedName(), SINGLE_MESSAGE.body()); + + // then + // first request is retried because of timeout (with socket / idle timeout only, the request wouldn't be timed out because + // there are chunks of response every 500ms which is smaller than 1s timeout) + subscriber.waitUntilReceived(Duration.FIVE_SECONDS, 2); + Assertions.assertThat(subscriber.getLastReceivedRequest().getHeader("Hermes-Retry-Count")).isEqualTo("1"); + } + + private void publishAll(String topicQualifiedName, TestMessage... messages) { + stream(messages).forEach(message -> hermes.api().publishUntilSuccess(topicQualifiedName, message.body())); + } + + private void expectSingleBatch(TestSubscriber subscriber, TestMessage... expectedContents) { + subscriber.waitUntilRequestReceived(message -> { + List> batch = readBatch(message.getBodyAsString()); + Assertions.assertThat(batch).hasSize(expectedContents.length); + for (int i = 0; i < expectedContents.length; i++) { + Assertions.assertThat(batch.get(i).get("message")).isEqualTo(expectedContents[i].getContent()); + Assertions.assertThat((String) ((Map) batch.get(i).get("metadata")).get("id")).isNotEmpty(); + } + }); + } + + private BatchSubscriptionPolicy.Builder buildBatchPolicy() { + return batchSubscriptionPolicy() + .applyDefaults() + .withMessageTtl(100) + .withRequestTimeout(100) + .withMessageBackoff(10); + } + + @SuppressWarnings("unchecked") + private List> readBatch(String message) { + try { + return mapper.readValue(message, List.class); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/BatchRetryPolicyTest.java b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/BatchRetryPolicyTest.java new file mode 100644 index 0000000000..385d228248 --- /dev/null +++ b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/BatchRetryPolicyTest.java @@ -0,0 +1,234 @@ +package pl.allegro.tech.hermes.integrationtests; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.tomakehurst.wiremock.stubbing.Scenario; +import com.github.tomakehurst.wiremock.verification.LoggedRequest; +import com.jayway.awaitility.Duration; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.Test; +import pl.allegro.tech.hermes.api.BatchSubscriptionPolicy; +import pl.allegro.tech.hermes.api.ContentType; +import pl.allegro.tech.hermes.api.Subscription; +import pl.allegro.tech.hermes.api.Topic; +import pl.allegro.tech.hermes.integrationtests.setup.HermesExtension; +import pl.allegro.tech.hermes.integrationtests.subscriber.TestSubscriber; +import pl.allegro.tech.hermes.integrationtests.subscriber.TestSubscribersExtension; +import pl.allegro.tech.hermes.test.helper.message.TestMessage; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.containing; +import static com.github.tomakehurst.wiremock.client.WireMock.post; +import static org.apache.http.HttpStatus.SC_BAD_REQUEST; +import static org.apache.http.HttpStatus.SC_CREATED; +import static org.apache.http.HttpStatus.SC_INTERNAL_SERVER_ERROR; +import static pl.allegro.tech.hermes.api.BatchSubscriptionPolicy.Builder.batchSubscriptionPolicy; +import static pl.allegro.tech.hermes.test.helper.builder.SubscriptionBuilder.subscription; +import static pl.allegro.tech.hermes.test.helper.builder.TopicBuilder.topicWithRandomName; + +public class BatchRetryPolicyTest { + + public static final String HEALTHY = "healthy"; + + String failedRequestBody = "{\"body\":\"failed\"}"; + String successfulRequestBody = "{\"body\":\"successful\"}"; + + ObjectMapper mapper = new ObjectMapper(); + + @RegisterExtension + public static final HermesExtension hermes = new HermesExtension(); + + @RegisterExtension + public static final TestSubscribersExtension subscribers = new TestSubscribersExtension(); + + @Test + public void shouldRetryUntilRequestSuccessfulAndSendRetryCounterInHeader() throws Throwable { + //given + Topic topic = hermes.initHelper().createTopic(topicWithRandomName().build()); + + TestSubscriber subscriber = subscribers.createSubscriber((service, endpoint) -> { + service.addStubMapping(post(endpoint) + .inScenario(topic.getQualifiedName()) + .whenScenarioStateIs(Scenario.STARTED) + .willReturn(aResponse().withStatus(SC_INTERNAL_SERVER_ERROR)) + .willSetStateTo(HEALTHY) + .build() + ); + + service.addStubMapping(post(endpoint) + .inScenario(topic.getQualifiedName()) + .whenScenarioStateIs(HEALTHY) + .willReturn(aResponse().withStatus(SC_CREATED)).build()); + }); + + createSingleMessageBatchSubscription(topic, subscriber.getEndpoint()); + + //when + hermes.api().publishUntilSuccess(topic.getQualifiedName(), TestMessage.simple().body()); + + //then + subscriber.waitUntilRequestsReceived(requests -> { + Assertions.assertThat(requests).hasSize(2); + Assertions.assertThat(requests.get(0).header("Hermes-Retry-Count").containsValue("0")).isTrue(); + Assertions.assertThat(requests.get(1).header("Hermes-Retry-Count").containsValue("1")).isTrue(); + }); + } + + @Test + public void shouldNotRetryIfRequestSuccessful() { + //given + Topic topic = hermes.initHelper().createTopic(topicWithRandomName().build()); + TestMessage message = TestMessage.simple(); + + TestSubscriber subscriber = subscribers.createSubscriber(); + + createSingleMessageBatchSubscription(topic, subscriber.getEndpoint()); + + //when + hermes.api().publishUntilSuccess(topic.getQualifiedName(), message.body()); + + //then + subscriber.waitUntilReceived(Duration.FIVE_SECONDS, 1); + } + + @Test + public void shouldRetryUntilTtlExceeded() throws Throwable { + //given + Topic topic = hermes.initHelper().createTopic(topicWithRandomName().build()); + + TestSubscriber subscriber = subscribers.createSubscriber((service, endpoint) -> { + service.addStubMapping((post(endpoint)) + .withRequestBody(containing("failed")) + .willReturn(aResponse().withStatus(SC_INTERNAL_SERVER_ERROR)).build()); + + service.addStubMapping((post(endpoint)) + .withRequestBody(containing("successful")) + .willReturn(aResponse().withStatus(SC_CREATED)).build()); + }); + + createSingleMessageBatchSubscription(topic, subscriber.getEndpoint(), 1, 10); + + //when + hermes.api().publishUntilSuccess(topic.getQualifiedName(), failedRequestBody); + hermes.api().publishUntilSuccess(topic.getQualifiedName(), successfulRequestBody); + + //then + subscriber.waitUntilRequestsReceived(requests -> + Assertions.assertThat(requests) + .extracting(LoggedRequest::getBodyAsString) + .extracting(this::readMessage) + .containsSequence(failedRequestBody, failedRequestBody, successfulRequestBody)); + } + + @Test + public void shouldRetryOnClientErrors() throws Throwable { + //given + Topic topic = hermes.initHelper().createTopic(topicWithRandomName().build()); + + TestSubscriber subscriber = subscribers.createSubscriber((service, endpoint) -> { + service.addStubMapping(post(endpoint) + .inScenario(topic.getQualifiedName()) + .whenScenarioStateIs(Scenario.STARTED) + .willReturn(aResponse().withStatus(SC_BAD_REQUEST)) + .willSetStateTo(HEALTHY).build()); + + service.addStubMapping(post(endpoint) + .inScenario(topic.getQualifiedName()) + .whenScenarioStateIs(HEALTHY) + .willReturn(aResponse().withStatus(SC_CREATED)).build()); + }); + + createSingleMessageBatchSubscription(topic, subscriber.getEndpoint(), true); + + //when + hermes.api().publishUntilSuccess(topic.getQualifiedName(), TestMessage.simple().body()); + + //then + subscriber.waitUntilRequestsReceived(requests -> Assertions.assertThat(requests.size()).isEqualTo(2)); + } + + @Test + public void shouldNotRetryOnClientErrors() throws Throwable { + //given + Topic topic = hermes.initHelper().createTopic(topicWithRandomName().build()); + + + TestSubscriber subscriber = subscribers.createSubscriber((service, endpoint) -> { + + service.addStubMapping(post(endpoint) + .withRequestBody(containing("failed")) + .willReturn(aResponse().withStatus(SC_BAD_REQUEST)).build()); + + service.addStubMapping(post(endpoint) + .withRequestBody(containing("successful")) + .willReturn(aResponse().withStatus(SC_CREATED)).build()); + }); + + createSingleMessageBatchSubscription(topic, subscriber.getEndpoint()); + + //when + hermes.api().publishUntilSuccess(topic.getQualifiedName(), failedRequestBody); + hermes.api().publishUntilSuccess(topic.getQualifiedName(), successfulRequestBody); + + //then + subscriber.waitUntilRequestsReceived(requests -> + Assertions.assertThat(requests) + .extracting(LoggedRequest::getBodyAsString) + .extracting(this::readMessage) + .containsExactly(failedRequestBody, successfulRequestBody)); + } + + + private void createSingleMessageBatchSubscription(Topic topic, String endpoint, int messageTtl, int messageBackoff) { + createBatchSubscription( + topic, endpoint, messageTtl, messageBackoff, 1, 1, 200, false + ); + } + + private void createSingleMessageBatchSubscription(Topic topic, String endpoint) { + createSingleMessageBatchSubscription(topic, endpoint, false); + } + + private void createSingleMessageBatchSubscription(Topic topic, String endpoint, boolean retryOnClientErrors) { + createBatchSubscription(topic, endpoint, 1, 10, 1, 1, 500, retryOnClientErrors); + } + + public void createBatchSubscription(Topic topic, String endpoint, int messageTtl, int messageBackoff, int batchSize, int batchTime, + int batchVolume, boolean retryOnClientErrors) { + BatchSubscriptionPolicy policy = batchSubscriptionPolicy() + .applyDefaults() + .withMessageTtl(messageTtl) + .withMessageBackoff(messageBackoff) + .withBatchSize(batchSize) + .withBatchTime(batchTime) + .withBatchVolume(batchVolume) + .withClientErrorRetry(retryOnClientErrors) + .withRequestTimeout(500) + .build(); + + createBatchSubscription(topic, endpoint, policy); + } + + public void createBatchSubscription(Topic topic, String endpoint, BatchSubscriptionPolicy policy) { + Subscription subscription = subscription(topic, "batchSubscription") + .withEndpoint(endpoint) + .withContentType(ContentType.JSON) + .withSubscriptionPolicy(policy) + .build(); + + hermes.initHelper().createSubscription(subscription); + } + + private String readMessage(String body) { + try { + return mapper.writeValueAsString(((Map) mapper.readValue(body, List.class).get(0)).get("message")); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + +} diff --git a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/BroadcastDeliveryTest.java b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/BroadcastDeliveryTest.java new file mode 100644 index 0000000000..699d64d5d0 --- /dev/null +++ b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/BroadcastDeliveryTest.java @@ -0,0 +1,128 @@ +package pl.allegro.tech.hermes.integrationtests; + +import com.jayway.awaitility.Duration; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import pl.allegro.tech.hermes.api.ContentType; +import pl.allegro.tech.hermes.api.Subscription; +import pl.allegro.tech.hermes.api.SubscriptionMetrics; +import pl.allegro.tech.hermes.api.SubscriptionMode; +import pl.allegro.tech.hermes.api.Topic; +import pl.allegro.tech.hermes.integrationtests.setup.HermesExtension; +import pl.allegro.tech.hermes.integrationtests.subscriber.TestSubscriber; +import pl.allegro.tech.hermes.integrationtests.subscriber.TestSubscribersExtension; +import pl.allegro.tech.hermes.test.helper.message.TestMessage; + +import java.util.List; +import java.util.stream.Stream; + +import static com.jayway.awaitility.Awaitility.waitAtMost; +import static com.jayway.awaitility.Duration.TEN_SECONDS; +import static java.util.stream.Collectors.joining; +import static java.util.stream.Collectors.toList; +import static org.assertj.core.api.Assertions.assertThat; +import static pl.allegro.tech.hermes.api.SubscriptionPolicy.Builder.subscriptionPolicy; +import static pl.allegro.tech.hermes.test.helper.builder.SubscriptionBuilder.subscription; +import static pl.allegro.tech.hermes.test.helper.builder.TopicBuilder.topicWithRandomName; + +public class BroadcastDeliveryTest { + + @RegisterExtension + public static final HermesExtension hermes = new HermesExtension(); + + @RegisterExtension + public static final TestSubscribersExtension subscribersFactory = new TestSubscribersExtension(); + + @Test + public void shouldPublishAndConsumeMessageByAllServices() { + // given + Topic topic = hermes.initHelper().createTopic(topicWithRandomName().build()); + TestMessage message = TestMessage.random(); + + List subscribers = succeedingSubscribers(4); + String endpointUrl = setUpSubscribersAndGetEndpoint(subscribers); + + hermes.initHelper().createSubscription( + broadcastSubscription(topic, "subscription", endpointUrl) + ); + + // when + hermes.api().publishUntilSuccess(topic.getQualifiedName(), message.body()); + + // then + subscribers.forEach(s -> s.waitUntilReceived(message.body())); + } + + @Test + public void shouldPublishAndRetryOnlyForUndeliveredConsumers() { + // given + Topic topic = hermes.initHelper().createTopic(topicWithRandomName().build()); + TestMessage message = TestMessage.random(); + + List subscribers = succeedingSubscribers(3); + TestSubscriber retryingSubscriber = subscribersFactory.createSubscriberWithRetry(message.body(), 1); + subscribers.add(retryingSubscriber); + + String endpointUrl = setUpSubscribersAndGetEndpoint(subscribers); + + hermes.initHelper().createSubscription( + broadcastSubscription(topic, "subscription", endpointUrl) + ); + + // when + hermes.api().publishUntilSuccess(topic.getQualifiedName(), message.body()); + + // then + subscribers.forEach(s -> s.waitUntilReceived(message.body())); + retryingSubscriber.waitUntilReceived(Duration.ONE_MINUTE, 2); + Assertions.assertThat(retryingSubscriber.getLastReceivedRequest().getHeader("Hermes-Retry-Count")).isEqualTo("1"); + } + + @Test + public void shouldNotRetryForBadRequestsFromConsumers() { + // given + Topic topic = hermes.initHelper().createTopic(topicWithRandomName().build()); + TestMessage message = TestMessage.random(); + + List subscribers = succeedingSubscribers(3); + subscribers.add(subscribersFactory.createSubscriber(400)); + + String endpointUrl = setUpSubscribersAndGetEndpoint(subscribers); + + + hermes.initHelper().createSubscription( + broadcastSubscription(topic, "subscription", endpointUrl) + ); + + // when + hermes.api().publishUntilSuccess(topic.getQualifiedName(), message.body()); + + // then + subscribers.forEach(s -> s.waitUntilReceived(message.body())); + waitAtMost(TEN_SECONDS).until(() -> { + long discarded = hermes.api() + .getSubscriptionMetrics(topic.getQualifiedName(), "subscription") + .expectBody(SubscriptionMetrics.class).returnResult().getResponseBody().getDiscarded(); + assertThat(discarded).isEqualTo(1); + }); + } + + private List succeedingSubscribers(int subscribersCount) { + return Stream.generate(subscribersFactory::createSubscriber).limit(subscribersCount).collect(toList()); + } + + private Subscription broadcastSubscription(Topic topic, String subscriptionName, String endpoint) { + return subscription(topic, subscriptionName) + .withEndpoint(endpoint) + .withContentType(ContentType.JSON) + .withSubscriptionPolicy(subscriptionPolicy().applyDefaults().build()) + .withMode(SubscriptionMode.BROADCAST) + .build(); + } + + private String setUpSubscribersAndGetEndpoint(List subscribers) { + return subscribers.stream().map(TestSubscriber::getEndpoint).collect(joining(";")); + } + +} diff --git a/integration/src/integration/java/pl/allegro/tech/hermes/integration/BatchDeliveryTest.java b/integration/src/integration/java/pl/allegro/tech/hermes/integration/BatchDeliveryTest.java deleted file mode 100644 index 7594e3a4d9..0000000000 --- a/integration/src/integration/java/pl/allegro/tech/hermes/integration/BatchDeliveryTest.java +++ /dev/null @@ -1,263 +0,0 @@ -package pl.allegro.tech.hermes.integration; - -import com.fasterxml.jackson.databind.ObjectMapper; -import jakarta.ws.rs.core.Response; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; -import pl.allegro.tech.hermes.api.BatchSubscriptionPolicy; -import pl.allegro.tech.hermes.api.ContentType; -import pl.allegro.tech.hermes.api.Subscription; -import pl.allegro.tech.hermes.api.Topic; -import pl.allegro.tech.hermes.integration.env.SharedServices; -import pl.allegro.tech.hermes.test.helper.avro.AvroUser; -import pl.allegro.tech.hermes.test.helper.endpoint.RemoteServiceEndpoint; -import pl.allegro.tech.hermes.test.helper.message.TestMessage; - -import java.io.IOException; -import java.util.List; -import java.util.Map; - -import static java.util.Arrays.stream; -import static pl.allegro.tech.hermes.api.BatchSubscriptionPolicy.Builder.batchSubscriptionPolicy; -import static pl.allegro.tech.hermes.api.TopicWithSchema.topicWithSchema; -import static pl.allegro.tech.hermes.integration.test.HermesAssertions.assertThat; -import static pl.allegro.tech.hermes.test.helper.builder.SubscriptionBuilder.subscription; -import static pl.allegro.tech.hermes.test.helper.builder.TopicBuilder.topic; - -public class BatchDeliveryTest extends IntegrationTest { - - private RemoteServiceEndpoint remoteService; - - private ObjectMapper mapper = new ObjectMapper(); - - @BeforeMethod - public void initializeAlways() { - this.remoteService = new RemoteServiceEndpoint(SharedServices.services().serviceMock()); - } - - private static final TestMessage[] SMALL_BATCH = TestMessage.simpleMessages(2); - - private static final TestMessage SINGLE_MESSAGE = TestMessage.simple(); - - @Test - public void shouldDeliverMessagesInBatch() throws IOException { - // given - Topic topic = operations.buildTopic("batchSizeTest", "topic"); - operations.createBatchSubscription(topic, remoteService.getUrl(), buildBatchPolicy() - .withBatchSize(2) - .withBatchTime(Integer.MAX_VALUE) - .withBatchVolume(1024) - .build()); - - remoteService.expectMessages(SMALL_BATCH); - - // when - stream(SMALL_BATCH).forEach(m -> publish(topic, m)); - - // then - expectSingleBatch(SMALL_BATCH); - } - - @Test - public void shouldDeliverBatchInGivenTimePeriod() throws IOException { - // given - Topic topic = operations.buildTopic("deliverBatchInGivenTimePeriod", "topic"); - operations.createBatchSubscription(topic, remoteService.getUrl(), buildBatchPolicy() - .withBatchSize(100) - .withBatchTime(1) - .withBatchVolume(1024) - .build()); - - remoteService.expectMessages(SINGLE_MESSAGE); - - // when - publish(topic, SINGLE_MESSAGE); - - // then - expectSingleBatch(SINGLE_MESSAGE); - } - - @Test - public void shouldDeliverBatchInGivenVolume() throws IOException, InterruptedException { - // given - Topic topic = operations.buildTopic("deliverBatchInGivenVolume", "topic"); - int batchVolumeThatFitsOneMessageOnly = 150; - operations.createBatchSubscription(topic, remoteService.getUrl(), buildBatchPolicy() - .withBatchSize(100) - .withBatchTime(Integer.MAX_VALUE) - .withBatchVolume(batchVolumeThatFitsOneMessageOnly) - .build()); - - remoteService.expectMessages(SINGLE_MESSAGE); - - // when publishing more than buffer capacity - publish(topic, SINGLE_MESSAGE); - publish(topic, SINGLE_MESSAGE); - - // then we expect to receive batch that has desired batch volume (one message only) - expectSingleBatch(SINGLE_MESSAGE); - } - - @Test - public void shouldDeliverAvroMessagesAsJsonBatch() { - // given - AvroUser user = new AvroUser("Bob", 50, "blue"); - Topic topic = topic("batch.avro.topic").build(); - operations.buildTopicWithSchema(topicWithSchema(topic, user.getSchemaAsString())); - - operations.createBatchSubscription(topic, remoteService.getUrl(), buildBatchPolicy() - .withBatchSize(2) - .withBatchTime(Integer.MAX_VALUE) - .withBatchVolume(1024) - .build()); - - TestMessage[] avroBatch = {user.asTestMessage(), user.asTestMessage()}; - remoteService.expectMessages(avroBatch); - - // when - stream(avroBatch).forEach(m -> publish(topic, user.asTestMessage())); - - // then - expectSingleBatch(avroBatch); - } - - @Test - public void shouldPassSubscriptionHeaders() { - // given - Topic topic = operations.buildTopic("deliverBatchWithSubscriptionHeaders", "topic"); - BatchSubscriptionPolicy policy = buildBatchPolicy() - .withBatchSize(100) - .withBatchTime(1) - .withBatchVolume(1024) - .build(); - Subscription subscription = subscription(topic, "batchSubscription") - .withEndpoint(remoteService.getUrl()) - .withContentType(ContentType.JSON) - .withSubscriptionPolicy(policy) - .withHeader("MY-HEADER", "myHeaderValue") - .withHeader("MY-OTHER-HEADER", "myOtherHeaderValue") - .build(); - operations.createSubscription(topic, subscription); - - remoteService.expectMessages(SINGLE_MESSAGE); - - // when - publish(topic, SINGLE_MESSAGE); - - // then - remoteService.waitUntilRequestReceived(request -> { - assertThat(request.getHeader("MY-HEADER")).isEqualTo("myHeaderValue"); - assertThat(request.getHeader("MY-OTHER-HEADER")).isEqualTo("myOtherHeaderValue"); - }); - } - - @Test - public void shouldAttachSubscriptionIdentityHeadersWhenItIsEnabled() { - // given - Topic topic = operations.buildTopic("deliverBatchWithSubscriptionIdentityHeaders", "topic"); - BatchSubscriptionPolicy policy = buildBatchPolicy() - .withBatchSize(100) - .withBatchTime(1) - .withBatchVolume(1024) - .build(); - Subscription subscription = subscription(topic, "batchSubscription") - .withEndpoint(remoteService.getUrl()) - .withContentType(ContentType.JSON) - .withSubscriptionPolicy(policy) - .withAttachingIdentityHeadersEnabled(true) - .build(); - operations.createSubscription(topic, subscription); - remoteService.expectMessages(SINGLE_MESSAGE); - - // when - publish(topic, SINGLE_MESSAGE); - - // then - remoteService.waitUntilRequestReceived(request -> { - assertThat(request.getHeader("Hermes-Topic-Name")).isEqualTo("deliverBatchWithSubscriptionIdentityHeaders.topic"); - assertThat(request.getHeader("Hermes-Subscription-Name")).isEqualTo("batchSubscription"); - }); - } - - @Test - public void shouldNotAttachSubscriptionIdentityHeadersWhenItIsDisabled() { - // given - Topic topic = operations.buildTopic("deliverBatchWithoutSubscriptionIdentityHeaders", "topic"); - BatchSubscriptionPolicy policy = buildBatchPolicy() - .withBatchSize(100) - .withBatchTime(1) - .withBatchVolume(1024) - .build(); - Subscription subscription = subscription(topic, "batchSubscription") - .withEndpoint(remoteService.getUrl()) - .withContentType(ContentType.JSON) - .withSubscriptionPolicy(policy) - .withAttachingIdentityHeadersEnabled(false) - .build(); - operations.createSubscription(topic, subscription); - remoteService.expectMessages(SINGLE_MESSAGE); - - // when - publish(topic, SINGLE_MESSAGE); - - // then - remoteService.waitUntilRequestReceived(request -> { - assertThat(request.getHeader("Hermes-Topic-Name")).isNull(); - assertThat(request.getHeader("Hermes-Subscription-Name")).isNull(); - }); - } - - @Test - public void shouldTimeoutRequestAsAWhole() { - //given - Topic topic = operations.buildTopic("timeoutTest", "topic"); - operations.createBatchSubscription(topic, remoteService.getUrl(), buildBatchPolicy() - .withBatchSize(1) - .withBatchTime(1) - .withBatchVolume(1024) - .withRequestTimeout(1000) - .build()); - - remoteService.slowThenFastMessage(SINGLE_MESSAGE.body(), 10, 5000); // response chunk every 500ms, total 5s - - // when - publish(topic, SINGLE_MESSAGE); - - // then - // first request is retried because of timeout (with socket / idle timeout only, the request wouldn't be timed out) - remoteService.waitUntilReceived(5, 2); - assertThat(remoteService.getLastReceivedRequest().getHeader("Hermes-Retry-Count")).isEqualTo("1"); - } - - private void publish(Topic topic, TestMessage m) { - assertThat(publisher.publish(topic.getQualifiedName(), m.body())).hasStatusFamily(Response.Status.Family.SUCCESSFUL); - } - - private void expectSingleBatch(TestMessage... expectedContents) { - remoteService.waitUntilReceived(message -> { - List> batch = readBatch(message); - assertThat(batch).hasSize(expectedContents.length); - for (int i = 0; i < expectedContents.length; i++) { - assertThat(batch.get(i).get("message")).isEqualTo(expectedContents[i].getContent()); - assertThat((String) ((Map) batch.get(i).get("metadata")).get("id")).isNotEmpty(); - } - }); - } - - private BatchSubscriptionPolicy.Builder buildBatchPolicy() { - return batchSubscriptionPolicy() - .applyDefaults() - .withMessageTtl(100) - .withRequestTimeout(100) - .withMessageBackoff(10); - } - - @SuppressWarnings("unchecked") - private List> readBatch(String message) { - try { - return mapper.readValue(message, List.class); - } catch (IOException e) { - throw new RuntimeException(e); - } - } -} diff --git a/integration/src/integration/java/pl/allegro/tech/hermes/integration/BatchRetryPolicyTest.java b/integration/src/integration/java/pl/allegro/tech/hermes/integration/BatchRetryPolicyTest.java deleted file mode 100644 index b4a1f21dd1..0000000000 --- a/integration/src/integration/java/pl/allegro/tech/hermes/integration/BatchRetryPolicyTest.java +++ /dev/null @@ -1,238 +0,0 @@ -package pl.allegro.tech.hermes.integration; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.github.tomakehurst.wiremock.WireMockServer; -import com.github.tomakehurst.wiremock.client.WireMock; -import com.github.tomakehurst.wiremock.matching.UrlPattern; -import com.github.tomakehurst.wiremock.stubbing.Scenario; -import com.github.tomakehurst.wiremock.verification.LoggedRequest; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; -import pl.allegro.tech.hermes.api.Topic; -import pl.allegro.tech.hermes.test.helper.message.TestMessage; -import pl.allegro.tech.hermes.test.helper.util.Ports; - -import java.io.IOException; -import java.util.List; -import java.util.Map; - -import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; -import static com.github.tomakehurst.wiremock.client.WireMock.containing; -import static com.github.tomakehurst.wiremock.client.WireMock.post; -import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor; -import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; -import static jakarta.ws.rs.core.Response.Status.CREATED; -import static java.util.Comparator.comparingLong; -import static org.apache.http.HttpStatus.SC_BAD_REQUEST; -import static org.apache.http.HttpStatus.SC_CREATED; -import static org.apache.http.HttpStatus.SC_INTERNAL_SERVER_ERROR; -import static pl.allegro.tech.hermes.integration.test.HermesAssertions.assertThat; -import static pl.allegro.tech.hermes.test.helper.builder.TopicBuilder.randomTopic; - -public class BatchRetryPolicyTest extends IntegrationTest { - - public static final String HEALTHY = "healthy"; - - private WireMockServer remoteServer; - private WireMock wireMock; - - String failedRequestBody = "{\"body\":\"failed\"}"; - String successfulRequestBody = "{\"body\":\"successful\"}"; - - ObjectMapper mapper = new ObjectMapper(); - - @BeforeMethod - public void beforeMethod() { - wireMock.resetScenarios(); - } - - @BeforeClass - public void beforeClass() { - remoteServer = new WireMockServer(Ports.nextAvailable()); - remoteServer.start(); - - wireMock = new WireMock("localhost", remoteServer.port()); - } - - @AfterClass - public void afterClass() { - remoteServer.stop(); - } - - @Test - public void shouldRetryUntilRequestSuccessfulAndSendRetryCounterInHeader() throws Throwable { - //given - Topic topic = operations.buildTopic(randomTopic("group", "retryUntilRequestSuccessful").build()); - createSingleMessageBatchSubscription(topic); - - wireMock.register(post(topicUrl(topic)) - .inScenario(topic.getQualifiedName()) - .whenScenarioStateIs(Scenario.STARTED) - .willReturn(aResponse().withStatus(SC_INTERNAL_SERVER_ERROR)) - .willSetStateTo(HEALTHY)); - - wireMock.register(post(topicUrl(topic)) - .inScenario(topic.getQualifiedName()) - .whenScenarioStateIs(HEALTHY) - .willReturn(aResponse().withStatus(SC_CREATED))); - - //when - publish(topic, TestMessage.simple()); - - //then - wait.until(() -> { - List requests = recordedRequests(topic); - assertThat(requests).hasSize(2); - assertThat(requests.get(0).header("Hermes-Retry-Count").containsValue("0")).isTrue(); - assertThat(requests.get(1).header("Hermes-Retry-Count").containsValue("1")).isTrue(); - }); - } - - @Test - public void shouldNotRetryIfRequestSuccessful() throws Throwable { - //given - Topic topic = operations.buildTopic(randomTopic("group", "notRetryIfRequestSuccessful").build()); - createSingleMessageBatchSubscription(topic); - - wireMock.register(post(topicUrl(topic)).willReturn(aResponse().withStatus(SC_CREATED))); - - //when - publish(topic, TestMessage.simple()); - - //then - wait.until(() -> assertThat(recordedRequests(topic)).hasSize(1)); - } - - @Test - public void shouldRetryUntilTtlExceeded() throws Throwable { - //given - Topic topic = operations.buildTopic(randomTopic("group", "retryUntilTtlExceeded").build()); - createSingleMessageBatchSubscription(topic, 1, 10); - - wireMock.register(post(topicUrl(topic)) - .withRequestBody(containing("failed")) - .willReturn(aResponse().withStatus(SC_INTERNAL_SERVER_ERROR))); - - wireMock.register(post(topicUrl(topic)) - .withRequestBody(containing("successful")) - .willReturn(aResponse().withStatus(SC_CREATED))); - - - //when - publishRequestThatIsExpectedToFail(topic); - publishRequestThatIsExpectedToSucceed(topic); - - //then - wait.until(() -> - assertThat(recordedRequests(topic)) - .extracting(LoggedRequest::getBodyAsString) - .extracting(this::readMessage) - .containsSequence(failedRequestBody, failedRequestBody, successfulRequestBody)); - } - - private void publishRequestThatIsExpectedToSucceed(Topic topic) { - assertThat(publisher.publish(topic.getQualifiedName(), successfulRequestBody)).hasStatus(CREATED); - } - - @Test - public void shouldRetryOnClientErrors() throws Throwable { - //given - Topic topic = operations.buildTopic(randomTopic("group", "retryOnClientErrors").build()); - createSingleMessageBatchSubscription(topic, true); - - wireMock.register(post(topicUrl(topic)) - .inScenario(topic.getQualifiedName()) - .whenScenarioStateIs(Scenario.STARTED) - .willReturn(aResponse().withStatus(SC_BAD_REQUEST)) - .willSetStateTo(HEALTHY)); - - wireMock.register(post(topicUrl(topic)) - .inScenario(topic.getQualifiedName()) - .whenScenarioStateIs(HEALTHY) - .willReturn(aResponse().withStatus(SC_CREATED))); - - //when - publish(topic, TestMessage.simple()); - - //then - wait.until(() -> assertThat(recordedRequests(topic)).hasSize(2)); - } - - @Test - public void shouldNotRetryOnClientErrors() throws Throwable { - //given - Topic topic = operations.buildTopic(randomTopic("group", "notRetryOnClientErrors").build()); - createSingleMessageBatchSubscription(topic, false); - - wireMock.register(post(topicUrl(topic)) - .withRequestBody(containing("failed")) - .willReturn(aResponse().withStatus(SC_BAD_REQUEST))); - - wireMock.register(post(topicUrl(topic)) - .withRequestBody(containing("successful")) - .willReturn(aResponse().withStatus(SC_CREATED))); - - //when - publishRequestThatIsExpectedToFail(topic); - publishRequestThatIsExpectedToSucceed(topic); - - //then - wait.until(() -> - assertThat(recordedRequests(topic)) - .extracting(LoggedRequest::getBodyAsString) - .extracting(this::readMessage) - .containsExactly(failedRequestBody, successfulRequestBody)); - } - - private void publishRequestThatIsExpectedToFail(Topic topic) { - assertThat(publisher.publish(topic.getQualifiedName(), failedRequestBody)).hasStatus(CREATED); - wait.until(() -> assertThat(recordedRequests(topic).size()).isPositive()); - } - - private UrlPattern topicUrl(Topic topic) { - return topicUrl(topic.getName().getName()); - } - - private UrlPattern topicUrl(String topicName) { - return urlEqualTo("/" + topicName); - } - - private List recordedRequests(Topic topic) { - List requests = wireMock.find(postRequestedFor(topicUrl(topic))); - requests.sort(comparingLong(req -> req.getLoggedDate().getTime())); - return requests; - } - - private void publish(Topic topic, TestMessage m) { - assertThat(publisher.publish(topic.getQualifiedName(), m.body())).hasStatus(CREATED); - } - - private void createSingleMessageBatchSubscription(Topic topic) { - createSingleMessageBatchSubscription(topic, false); - } - - private void createSingleMessageBatchSubscription(Topic topic, int messageTtl, int messageBackoff) { - operations.createBatchSubscription( - topic, subscriptionEndpoint(topic.getName().getName()), messageTtl, messageBackoff, 1, 1, 200, false - ); - } - - private void createSingleMessageBatchSubscription(Topic topic, boolean retryOnClientErrors) { - operations.createBatchSubscription(topic, subscriptionEndpoint(topic.getName().getName()), 1, 10, 1, 1, 500, retryOnClientErrors); - } - - private String subscriptionEndpoint(String topicName) { - return "http://localhost:" + remoteServer.port() + "/" + topicName; - } - - private String readMessage(String body) { - try { - return mapper.writeValueAsString(((Map) mapper.readValue(body, List.class).get(0)).get("message")); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - -} diff --git a/integration/src/integration/java/pl/allegro/tech/hermes/integration/BroadcastDeliveryTest.java b/integration/src/integration/java/pl/allegro/tech/hermes/integration/BroadcastDeliveryTest.java deleted file mode 100644 index ffdab66220..0000000000 --- a/integration/src/integration/java/pl/allegro/tech/hermes/integration/BroadcastDeliveryTest.java +++ /dev/null @@ -1,105 +0,0 @@ -package pl.allegro.tech.hermes.integration; - -import com.github.tomakehurst.wiremock.WireMockServer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.Test; -import pl.allegro.tech.hermes.api.Topic; -import pl.allegro.tech.hermes.test.helper.endpoint.RemoteServiceEndpoint; -import pl.allegro.tech.hermes.test.helper.message.TestMessage; -import pl.allegro.tech.hermes.test.helper.util.Ports; - -import java.util.List; -import java.util.stream.Stream; - -import static java.util.stream.Collectors.joining; -import static java.util.stream.Collectors.toList; -import static pl.allegro.tech.hermes.test.helper.builder.TopicBuilder.randomTopic; - -public class BroadcastDeliveryTest extends IntegrationTest { - private static final Logger logger = LoggerFactory.getLogger(BroadcastDeliveryTest.class); - - private List remoteServices; - private RemoteServiceEndpoint firstRemoteService; - - @AfterMethod - public void cleanup() { - this.remoteServices.forEach(service -> { - try { - service.stop(); - } catch (Exception ex) { - logger.warn("Failed to stop remote service.", ex); - } - }); - } - - @Test - public void shouldPublishAndConsumeMessageByAllServices() { - // given - String endpointUrl = setUpServicesAndGetEndpoint(); - Topic topic = operations.buildTopic(randomTopic("publishAndConsumeGroup", "broadcastTopic").build()); - operations.createBroadcastSubscription(topic, "broadcastSubscription", endpointUrl); - - TestMessage message = TestMessage.random(); - remoteServices.forEach(remoteService -> remoteService.expectMessages(message.body())); - - // when - publisher.publish(topic.getQualifiedName(), message.body()); - - // then - remoteServices.forEach(RemoteServiceEndpoint::waitUntilReceived); - } - - @Test - public void shouldPublishAndRetryOnlyForUndeliveredConsumers() { - // given - String endpointUrl = setUpServicesAndGetEndpoint(); - Topic topic = operations.buildTopic(randomTopic("publishAndConsumeGroup", "broadcastTopic2").build()); - operations.createBroadcastSubscription(topic, "broadcastSubscription2", endpointUrl); - - TestMessage message = TestMessage.random(); - firstRemoteService.retryMessage(message.body(), 1); - remoteServices.stream().skip(1).forEach(remoteService -> remoteService.expectMessages(message.body())); - - // when - publisher.publish(topic.getQualifiedName(), message.body()); - - // then - remoteServices.stream().skip(1).forEach(RemoteServiceEndpoint::waitUntilReceived); - firstRemoteService.waitUntilReceived(60, 2); - } - - @Test - public void shouldNotRetryForBadRequestsFromConsumers() { - // given - String endpointUrl = setUpServicesAndGetEndpoint(); - Topic topic = operations.buildTopic(randomTopic("publishAndConsumeGroup", "broadcastTopic3").build()); - operations.createBroadcastSubscription(topic, "broadcastSubscription3", endpointUrl); - - TestMessage message = TestMessage.random(); - firstRemoteService.setReturnedStatusCode(400); - remoteServices.forEach(remoteService -> remoteService.expectMessages(message.body())); - - // when - publisher.publish(topic.getQualifiedName(), message.body()); - - // then - remoteServices.forEach(service -> service.waitUntilReceived(5)); - } - - private RemoteServiceEndpoint createRemoteServiceEndpoint() { - WireMockServer service = new WireMockServer(Ports.nextAvailable()); - service.start(); - return new RemoteServiceEndpoint(service); - } - - - private String setUpServicesAndGetEndpoint() { - remoteServices = Stream.generate(this::createRemoteServiceEndpoint).limit(4).collect(toList()); - firstRemoteService = remoteServices.get(0); - - return this.remoteServices.stream().map(RemoteServiceEndpoint::getUrl).map(Object::toString).collect(joining(";")); - } - -}