Skip to content

Commit

Permalink
RPC: add runtime support for request timeouts in metadata headers (gr…
Browse files Browse the repository at this point in the history
…pc-stub, futures, virtual threads) (#16)

* rpc grpc-stub: request timeouts support

* rpc futures: request timeouts support
  • Loading branch information
mostroverkhov authored Mar 11, 2024
1 parent 4507ab2 commit 6da56d0
Show file tree
Hide file tree
Showing 2 changed files with 322 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import javax.annotation.Nullable;

Expand All @@ -36,25 +39,42 @@ interface Factory<T extends MessageStreams> {
T withLifecycle(Closeable requester);
}

final class ResponseListener<T> implements BiConsumer<T, Throwable> {
private final Future<?> cancelHandle;
private final BiConsumer<? super T, ? super Throwable> listener;
class ResponseListener<T> implements BiConsumer<T, Throwable> {
final Future<?> cancelHandle;
final BiConsumer<? super T, ? super Throwable> listener;

private ResponseListener(
ResponseListener(
CompletionStage<?> response, @Nullable BiConsumer<? super T, ? super Throwable> listener) {
this.cancelHandle = response.toCompletableFuture();
this.listener = listener;
}

public static <T> ResponseListener<T> create(
CompletionStage<?> response, BiConsumer<? super T, ? super Throwable> listener) {
return new ResponseListener<T>(response, listener);
return new ResponseListener<>(response, listener);
}

public static <T> ResponseListener<T> create(
CompletionStage<?> response,
BiConsumer<? super T, ? super Throwable> listener,
@Nullable ScheduledExecutorService timeoutScheduler,
long timeoutMillis) {
return new ResponseTimeoutListener<T>(response, listener)
.scheduleTimeout(timeoutScheduler, timeoutMillis);
}

public static <T> ResponseListener<T> create(CompletionStage<?> response) {
return new ResponseListener<>(response, null);
}

public static <T> ResponseListener<T> create(
CompletionStage<?> response,
@Nullable ScheduledExecutorService timeoutScheduler,
long timeoutMillis) {
return new ResponseTimeoutListener<T>(response, null)
.scheduleTimeout(timeoutScheduler, timeoutMillis);
}

@Override
public void accept(T t, Throwable throwable) {
if (throwable instanceof CancellationException) {
Expand All @@ -67,6 +87,46 @@ public void accept(T t, Throwable throwable) {
}
}

final class ResponseTimeoutListener<T> extends ResponseListener<T> {
volatile ScheduledFuture<?> timeoutHandle;

ResponseTimeoutListener(
CompletionStage<?> response, @Nullable BiConsumer<? super T, ? super Throwable> listener) {
super(response, listener);
}

ResponseListener<T> scheduleTimeout(
@Nullable ScheduledExecutorService timeoutScheduler, long timeoutMillis) {
if (timeoutMillis > 0 && timeoutScheduler != null) {
timeoutHandle =
timeoutScheduler.schedule(
() -> {
cancelHandle.cancel(true);
},
timeoutMillis,
TimeUnit.MILLISECONDS);
}
return this;
}

@Override
public void accept(T t, Throwable throwable) {
ScheduledFuture<?> h = timeoutHandle;
if (h != null) {
h.cancel(true);
}
super.accept(t, throwable);
}
}

static ScheduledExecutorService timeoutScheduler(
MessageStreams messageStreams, long timeoutMillis) {
if (timeoutMillis <= 0) {
return null;
}
return messageStreams.scheduler().orElse(null);
}

@SuppressWarnings("all")
abstract class ServerFactory<T extends MessageStreams> implements RpcService.Factory<T> {
private final Object service;
Expand Down
Loading

0 comments on commit 6da56d0

Please sign in to comment.