Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
igorbernstein2 committed Aug 29, 2024
1 parent 323e5a1 commit 17f4890
Showing 1 changed file with 127 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.api.core.AbstractApiFuture;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
Expand All @@ -60,6 +61,7 @@
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
Expand All @@ -82,6 +84,7 @@
import org.junit.jupiter.api.Timeout;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.threeten.bp.Duration;

class BatcherImplTest {

Expand Down Expand Up @@ -249,6 +252,107 @@ public ApiFuture<List<Integer>> futureCall(
closeFuture.get();
}

@Test
void testCloseTimeout() throws ExecutionException, InterruptedException {
final String futureToStringMsg = "some descriptive message about this future";
MySettableApiFuture<List<Integer>> innerFuture = new MySettableApiFuture<>(futureToStringMsg);

UnaryCallable<LabeledIntList, List<Integer>> unaryCallable =
new UnaryCallable<LabeledIntList, List<Integer>>() {
@Override
public ApiFuture<List<Integer>> futureCall(
LabeledIntList request, ApiCallContext context) {
return innerFuture;
}
};
underTest =
new BatcherImpl<>(
SQUARER_BATCHING_DESC_V2, unaryCallable, labeledIntList, batchingSettings, EXECUTOR);

underTest.add(1);

Stopwatch stopwatch = Stopwatch.createStarted();

BatchingException closeException =
assertThrows(BatchingException.class, () -> underTest.close(Duration.ofMillis(10)));

// resolve the future to allow batcher to close
innerFuture.set(ImmutableList.of(1));

assertThat(stopwatch.elapsed()).isAtMost(java.time.Duration.ofSeconds(1));
System.out.println();
assertThat(closeException)
.hasMessageThat()
.matches(".*Outstanding batches.*" + futureToStringMsg + ".*elements=1.*");
}

@Test
void testCloseTimeoutPreventsAdd() throws ExecutionException, InterruptedException {
final String futureToStringMsg = "some descriptive message about this future";
MySettableApiFuture<List<Integer>> innerFuture = new MySettableApiFuture<>(futureToStringMsg);

UnaryCallable<LabeledIntList, List<Integer>> unaryCallable =
new UnaryCallable<LabeledIntList, List<Integer>>() {
@Override
public ApiFuture<List<Integer>> futureCall(
LabeledIntList request, ApiCallContext context) {
return innerFuture;
}
};
underTest =
new BatcherImpl<>(
SQUARER_BATCHING_DESC_V2, unaryCallable, labeledIntList, batchingSettings, EXECUTOR);

underTest.add(1);

try {
underTest.close(Duration.ofMillis(10));
} catch (BatchingException ignored) {
// ignored
}

// Even though the close operation timed out, the batcher should be in a closed state
// and reject new additions
assertThrows(IllegalStateException.class, () -> underTest.add(2));

// resolve the future to allow batcher to close
innerFuture.set(ImmutableList.of(1));
}

@Test
void testCancelOutstanding() throws ExecutionException, InterruptedException {
SettableApiFuture<List<Integer>> innerFuture = SettableApiFuture.create();

UnaryCallable<LabeledIntList, List<Integer>> unaryCallable =
new UnaryCallable<LabeledIntList, List<Integer>>() {
@Override
public ApiFuture<List<Integer>> futureCall(
LabeledIntList request, ApiCallContext context) {
return innerFuture;
}
};
underTest =
new BatcherImpl<>(
SQUARER_BATCHING_DESC_V2, unaryCallable, labeledIntList, batchingSettings, EXECUTOR);

ApiFuture<Integer> elementF = underTest.add(1);

// Initial close will timeout
BatchingException firstCloseException =
assertThrows(BatchingException.class, () -> underTest.close(Duration.ofMillis(10)));
assertThat(firstCloseException).hasMessageThat().contains("Timed out");

underTest.cancelOutstanding();

BatchingException finalCloseException =
assertThrows(BatchingException.class, () -> underTest.close(Duration.ofSeconds(1)));
assertThat(finalCloseException).hasMessageThat().contains("Batching finished");

// element future should resolve to a cancelled future
ExecutionException elementException = assertThrows(ExecutionException.class, elementF::get);
assertThat(elementException).hasCauseThat().isInstanceOf(CancellationException.class);
}

/** Verifies exception occurred at RPC is propagated to element results */
@Test
void testResultFailureAfterRPCFailure() throws Exception {
Expand Down Expand Up @@ -1102,4 +1206,27 @@ private BatcherImpl<Integer, Integer, LabeledIntList, List<Integer>> createDefau
EXECUTOR,
flowController);
}

private static class MySettableApiFuture<T> extends AbstractApiFuture<T> {
private final String desc;

MySettableApiFuture(String desc) {
this.desc = desc;
}

@Override
public boolean set(T value) {
return super.set(value);
}

@Override
public boolean setException(Throwable throwable) {
return super.setException(throwable);
}

@Override
public String toString() {
return desc;
}
}
}

0 comments on commit 17f4890

Please sign in to comment.