diff --git a/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java b/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java index 3ebcc2c5d0..e12d09dcef 100644 --- a/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java +++ b/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java @@ -40,6 +40,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.api.core.AbstractApiFuture; import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; @@ -60,6 +61,7 @@ import java.util.List; import java.util.Queue; import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; @@ -82,6 +84,7 @@ import org.junit.jupiter.api.Timeout; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; +import org.threeten.bp.Duration; class BatcherImplTest { @@ -249,6 +252,107 @@ public ApiFuture> futureCall( closeFuture.get(); } + @Test + void testCloseTimeout() throws ExecutionException, InterruptedException { + final String futureToStringMsg = "some descriptive message about this future"; + MySettableApiFuture> innerFuture = new MySettableApiFuture<>(futureToStringMsg); + + UnaryCallable> unaryCallable = + new UnaryCallable>() { + @Override + public ApiFuture> futureCall( + LabeledIntList request, ApiCallContext context) { + return innerFuture; + } + }; + underTest = + new BatcherImpl<>( + SQUARER_BATCHING_DESC_V2, unaryCallable, labeledIntList, batchingSettings, EXECUTOR); + + underTest.add(1); + + Stopwatch stopwatch = Stopwatch.createStarted(); + + BatchingException closeException = + assertThrows(BatchingException.class, () -> underTest.close(Duration.ofMillis(10))); + + // resolve the future to allow batcher to close + innerFuture.set(ImmutableList.of(1)); + + assertThat(stopwatch.elapsed()).isAtMost(java.time.Duration.ofSeconds(1)); + System.out.println(); + assertThat(closeException) + .hasMessageThat() + .matches(".*Outstanding batches.*" + futureToStringMsg + ".*elements=1.*"); + } + + @Test + void testCloseTimeoutPreventsAdd() throws ExecutionException, InterruptedException { + final String futureToStringMsg = "some descriptive message about this future"; + MySettableApiFuture> innerFuture = new MySettableApiFuture<>(futureToStringMsg); + + UnaryCallable> unaryCallable = + new UnaryCallable>() { + @Override + public ApiFuture> futureCall( + LabeledIntList request, ApiCallContext context) { + return innerFuture; + } + }; + underTest = + new BatcherImpl<>( + SQUARER_BATCHING_DESC_V2, unaryCallable, labeledIntList, batchingSettings, EXECUTOR); + + underTest.add(1); + + try { + underTest.close(Duration.ofMillis(10)); + } catch (BatchingException ignored) { + // ignored + } + + // Even though the close operation timed out, the batcher should be in a closed state + // and reject new additions + assertThrows(IllegalStateException.class, () -> underTest.add(2)); + + // resolve the future to allow batcher to close + innerFuture.set(ImmutableList.of(1)); + } + + @Test + void testCancelOutstanding() throws ExecutionException, InterruptedException { + SettableApiFuture> innerFuture = SettableApiFuture.create(); + + UnaryCallable> unaryCallable = + new UnaryCallable>() { + @Override + public ApiFuture> futureCall( + LabeledIntList request, ApiCallContext context) { + return innerFuture; + } + }; + underTest = + new BatcherImpl<>( + SQUARER_BATCHING_DESC_V2, unaryCallable, labeledIntList, batchingSettings, EXECUTOR); + + ApiFuture elementF = underTest.add(1); + + // Initial close will timeout + BatchingException firstCloseException = + assertThrows(BatchingException.class, () -> underTest.close(Duration.ofMillis(10))); + assertThat(firstCloseException).hasMessageThat().contains("Timed out"); + + underTest.cancelOutstanding(); + + BatchingException finalCloseException = + assertThrows(BatchingException.class, () -> underTest.close(Duration.ofSeconds(1))); + assertThat(finalCloseException).hasMessageThat().contains("Batching finished"); + + // element future should resolve to a cancelled future + ExecutionException elementException = assertThrows(ExecutionException.class, elementF::get); + assertThat(elementException).hasCauseThat().isInstanceOf(CancellationException.class); + } + /** Verifies exception occurred at RPC is propagated to element results */ @Test void testResultFailureAfterRPCFailure() throws Exception { @@ -1102,4 +1206,27 @@ private BatcherImpl> createDefau EXECUTOR, flowController); } + + private static class MySettableApiFuture extends AbstractApiFuture { + private final String desc; + + MySettableApiFuture(String desc) { + this.desc = desc; + } + + @Override + public boolean set(T value) { + return super.set(value); + } + + @Override + public boolean setException(Throwable throwable) { + return super.setException(throwable); + } + + @Override + public String toString() { + return desc; + } + } }