Skip to content

Commit

Permalink
test: fix flakes in HttpJsonDirectServerStreamingCallableTest (#2432)
Browse files Browse the repository at this point in the history
The flakes seem to stem from parallel execution and the resulting race conditions around static member variables, particularly the `mockService`. Attempting to fix this by using a separate `mockService` for each test.

Fixes: #1905.
Fixes: #2107.
Fixes: #1876.
Fixes: #2083.
Fixes: #1587.
Fixes: #1684.
  • Loading branch information
meltsufin authored Feb 6, 2024
1 parent dc2b9ff commit 523d6b6
Showing 1 changed file with 27 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,8 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand Down Expand Up @@ -107,9 +106,7 @@ public class HttpJsonDirectServerStreamingCallableTest {
.setType(MethodType.SERVER_STREAMING)
.build();

private static final MockHttpService MOCK_SERVICE =
new MockHttpService(
Collections.singletonList(METHOD_SERVER_STREAMING_RECOGNIZE), "google.com:443");
private MockHttpService mockService;

private static final Color DEFAULT_REQUEST = Color.newBuilder().setRed(0.5f).build();
private static final Color ASYNC_REQUEST = DEFAULT_REQUEST.toBuilder().setGreen(1000).build();
Expand All @@ -120,22 +117,25 @@ public class HttpJsonDirectServerStreamingCallableTest {
Money.newBuilder().setCurrencyCode("UAH").setUnits(255).build();
private static final int AWAIT_TERMINATION_SECONDS = 10;

private static ServerStreamingCallSettings<Color, Money> streamingCallSettings;
private static ServerStreamingCallable<Color, Money> streamingCallable;
private ServerStreamingCallSettings<Color, Money> streamingCallSettings;
private ServerStreamingCallable<Color, Money> streamingCallable;

private static ManagedHttpJsonChannel channel;
private static ClientContext clientContext;
private static ExecutorService executorService;
private ManagedHttpJsonChannel channel;
private ClientContext clientContext;
private ExecutorService executorService;

@BeforeClass
public static void initialize() throws IOException {
@Before
public void initialize() throws IOException {
mockService =
new MockHttpService(
Collections.singletonList(METHOD_SERVER_STREAMING_RECOGNIZE), "google.com:443");
executorService = Executors.newFixedThreadPool(2);
channel =
new ManagedHttpJsonInterceptorChannel(
ManagedHttpJsonChannel.newBuilder()
.setEndpoint("google.com:443")
.setExecutor(executorService)
.setHttpTransport(MOCK_SERVICE)
.setHttpTransport(mockService)
.build(),
new HttpJsonHeaderInterceptor(Collections.singletonMap("header-key", "headerValue")));
EndpointContext endpointContext = Mockito.mock(EndpointContext.class);
Expand All @@ -158,25 +158,23 @@ public static void initialize() throws IOException {
HttpJsonCallSettings.create(METHOD_SERVER_STREAMING_RECOGNIZE),
streamingCallSettings,
clientContext);

mockService.reset();
}

@AfterClass
public static void destroy() throws InterruptedException {
@After
public void destroy() throws InterruptedException {
executorService.shutdown();
channel.shutdown();

executorService.awaitTermination(AWAIT_TERMINATION_SECONDS, TimeUnit.SECONDS);
channel.awaitTermination(AWAIT_TERMINATION_SECONDS, TimeUnit.SECONDS);
}

@After
public void tearDown() throws InterruptedException {
MOCK_SERVICE.reset();
mockService.reset();
}

@Test
public void testBadContext() {
MOCK_SERVICE.addResponse(new Money[] {DEFAULT_RESPONSE});
mockService.addResponse(new Money[] {DEFAULT_RESPONSE});
// Create a local callable with a bad context
ServerStreamingCallable<Color, Money> streamingCallable =
HttpJsonCallableFactory.createServerStreamingCallable(
Expand All @@ -202,22 +200,18 @@ public void testBadContext() {

@Test
public void testServerStreamingStart() throws InterruptedException {
MOCK_SERVICE.addResponse(new Money[] {DEFAULT_RESPONSE});
mockService.addResponse(new Money[] {DEFAULT_RESPONSE});
CountDownLatch latch = new CountDownLatch(1);
MoneyObserver moneyObserver = new MoneyObserver(true, latch);

streamingCallable.call(DEFAULT_REQUEST, moneyObserver);

Truth.assertThat(moneyObserver.controller).isNotNull();
// wait for the task to complete, otherwise it may interfere with other tests, since they share
// the same MockService and unfinished request in this test may start reading messages
// designated for other tests.
Truth.assertThat(latch.await(2, TimeUnit.SECONDS)).isTrue();
}

@Test
public void testServerStreaming() throws InterruptedException {
MOCK_SERVICE.addResponse(new Money[] {DEFAULT_RESPONSE, DEFAULTER_RESPONSE});
mockService.addResponse(new Money[] {DEFAULT_RESPONSE, DEFAULTER_RESPONSE});
CountDownLatch latch = new CountDownLatch(3);
MoneyObserver moneyObserver = new MoneyObserver(true, latch);

Expand All @@ -231,7 +225,7 @@ public void testServerStreaming() throws InterruptedException {

@Test
public void testManualFlowControl() throws Exception {
MOCK_SERVICE.addResponse(new Money[] {DEFAULT_RESPONSE});
mockService.addResponse(new Money[] {DEFAULT_RESPONSE});
CountDownLatch latch = new CountDownLatch(2);
MoneyObserver moneyObserver = new MoneyObserver(false, latch);

Expand All @@ -251,7 +245,7 @@ public void testManualFlowControl() throws Exception {

@Test
public void testCancelClientCall() throws Exception {
MOCK_SERVICE.addResponse(new Money[] {DEFAULT_RESPONSE});
mockService.addResponse(new Money[] {DEFAULT_RESPONSE});
CountDownLatch latch = new CountDownLatch(1);
MoneyObserver moneyObserver = new MoneyObserver(false, latch);

Expand All @@ -267,7 +261,7 @@ public void testCancelClientCall() throws Exception {

@Test
public void testOnResponseError() throws Throwable {
MOCK_SERVICE.addException(404, new RuntimeException("some error"));
mockService.addException(404, new RuntimeException("some error"));

CountDownLatch latch = new CountDownLatch(1);
MoneyObserver moneyObserver = new MoneyObserver(true, latch);
Expand All @@ -292,7 +286,7 @@ public void testOnResponseError() throws Throwable {

@Test
public void testObserverErrorCancelsCall() throws Throwable {
MOCK_SERVICE.addResponse(new Money[] {DEFAULT_RESPONSE});
mockService.addResponse(new Money[] {DEFAULT_RESPONSE});
final RuntimeException expectedCause = new RuntimeException("some error");
final SettableApiFuture<Throwable> actualErrorF = SettableApiFuture.create();

Expand Down Expand Up @@ -332,7 +326,7 @@ protected void onCompleteImpl() {

@Test
public void testBlockingServerStreaming() {
MOCK_SERVICE.addResponse(new Money[] {DEFAULT_RESPONSE});
mockService.addResponse(new Money[] {DEFAULT_RESPONSE});
Color request = Color.newBuilder().setRed(0.5f).build();
ServerStream<Money> response = streamingCallable.call(request);
List<Money> responseData = Lists.newArrayList(response);
Expand All @@ -344,7 +338,7 @@ public void testBlockingServerStreaming() {
// This test ensures that the server-side streaming does not exceed the timeout value
@Test
public void testDeadlineExceededServerStreaming() throws InterruptedException {
MOCK_SERVICE.addResponse(
mockService.addResponse(
new Money[] {DEFAULT_RESPONSE, DEFAULTER_RESPONSE}, java.time.Duration.ofSeconds(5));
Color request = Color.newBuilder().setRed(0.5f).build();
CountDownLatch latch = new CountDownLatch(1);
Expand Down

0 comments on commit 523d6b6

Please sign in to comment.