From c2dd5f37b6d55dd1d7ca7014b9dac98e9059274d Mon Sep 17 00:00:00 2001 From: Devin Smith Date: Thu, 9 Nov 2023 14:43:16 -0800 Subject: [PATCH] Add better asynchronous impl for TableHandleFuture (#4802) This is in support of #4798. This adds some additional logic to BatchHandler to improve logging in exceptional cases. This also fixes some exceptional cases in TableServiceAsyncImpl. I _expected_ this fixes the intermittent test failures. --- .../deephaven/client/impl/ExportStates.java | 29 +++++- .../client/impl/TableServiceAsyncImpl.java | 93 +++++++++++-------- 2 files changed, 77 insertions(+), 45 deletions(-) diff --git a/java-client/session/src/main/java/io/deephaven/client/impl/ExportStates.java b/java-client/session/src/main/java/io/deephaven/client/impl/ExportStates.java index d9d11f1fc4e..18456c55424 100644 --- a/java-client/session/src/main/java/io/deephaven/client/impl/ExportStates.java +++ b/java-client/session/src/main/java/io/deephaven/client/impl/ExportStates.java @@ -378,10 +378,14 @@ public void onCompleted() { private static final class BatchHandler implements StreamObserver { + private static final Logger log = LoggerFactory.getLogger(BatchHandler.class); + private final Map newStates; + private final Set handled; private BatchHandler(Map newStates) { this.newStates = Objects.requireNonNull(newStates); + this.handled = new HashSet<>(newStates.size()); } @Override @@ -398,24 +402,41 @@ public void onNext(ExportedTableCreationResponse value) { "Not expecting export creation responses for empty tickets"); } final int exportId = ExportTicketHelper.ticketToExportId(value.getResultId().getTicket(), "export"); - final State state = newStates.remove(exportId); + final State state = newStates.get(exportId); if (state == null) { throw new IllegalStateException("Unable to find state for creation response"); } - state.onCreationResponse(value); + if (!handled.add(state)) { + throw new IllegalStateException( + String.format("Server misbehaving, already received response for export id %d", exportId)); + } + try { + state.onCreationResponse(value); + } catch (RuntimeException e) { + log.error("state.onCreationResponse had unexpected exception", e); + state.onCreationError(e); + } } @Override public void onError(Throwable t) { for (State state : newStates.values()) { - state.onCreationError(t); + try { + state.onCreationError(t); + } catch (RuntimeException e) { + log.error("state.onCreationError had unexpected exception, ignoring", e); + } } } @Override public void onCompleted() { for (State state : newStates.values()) { - state.onCreationCompleted(); + try { + state.onCreationCompleted(); + } catch (RuntimeException e) { + log.error("state.onCreationCompleted had unexpected exception, ignoring", e); + } } } } diff --git a/java-client/session/src/main/java/io/deephaven/client/impl/TableServiceAsyncImpl.java b/java-client/session/src/main/java/io/deephaven/client/impl/TableServiceAsyncImpl.java index 230f76b9068..524750d3b26 100644 --- a/java-client/session/src/main/java/io/deephaven/client/impl/TableServiceAsyncImpl.java +++ b/java-client/session/src/main/java/io/deephaven/client/impl/TableServiceAsyncImpl.java @@ -13,10 +13,13 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Function; final class TableServiceAsyncImpl { @@ -59,72 +62,80 @@ static List executeAsync(ExportService exportServic private static class TableHandleAsyncImpl implements TableHandleFuture, Listener { private final TableSpec tableSpec; + private final CompletableFuture exportFuture; + private final CompletableFuture etcrFuture; private final CompletableFuture future; - private TableHandle handle; - private Export export; TableHandleAsyncImpl(TableSpec tableSpec) { this.tableSpec = Objects.requireNonNull(tableSpec); - this.future = new CompletableFuture<>(); - } - - synchronized void init(Export export) { - this.export = Objects.requireNonNull(export); - // TODO(deephaven-core#4781): Immediately notify server of release when user cancels TableHandleFuture - // this.future.whenComplete((tableHandle, throwable) -> { - // if (isCancelled()) { - // export.release(); - // } - // }); - maybeComplete(); + exportFuture = new CompletableFuture<>(); + etcrFuture = new CompletableFuture<>(); + final CompletableFuture internalFuture = CompletableFuture + .allOf(exportFuture, etcrFuture) + .thenCompose(this::complete); + // thenApply(Function.identity()) _may_ seem extraneous, but we need to ensure separation between the user's + // future and our internal state + future = internalFuture.thenApply(Function.identity()); + future.whenComplete((tableHandle, throwable) -> { + // TODO(deephaven-core#4781): Immediately notify server of release when user cancels TableHandleFuture + if (throwable instanceof CancellationException) { + // Would be better if we could immediately tell server of release, but currently we need to wait for + // etcr/export object. + internalFuture.thenAccept(TableHandle::close); + } + }); } - private void maybeComplete() { - if (handle == null || export == null) { - return; - } - handle.init(export); - if (!future.complete(handle)) { - // If we are unable to complete the future, it means the user cancelled it. It's only at this point in - // time we are able to let the server know that we don't need it anymore. - // TODO(deephaven-core#4781): Immediately notify server of release when user cancels TableHandleFuture - handle.close(); - } - handle = null; - export = null; + void init(Export export) { + // Note: we aren't expecting exceptional completions of exportFuture; we're using a future to make it easy + // to compose with our etcrFuture (which may or may not be completed before exportFuture). + // In exceptional cases where we _don't_ complete exportFuture (for example, the calling code has a runtime + // exception), we know we _haven't_ called io.deephaven.client.impl.ExportServiceRequest#send, so there + // isn't any possibility that we have left open a server-side export. And in those cases, this object isn't + // returned to the user and becomes garbage. The client-side cleanup will be handled in + // io.deephaven.client.impl.ExportServiceRequest#cleanupUnsent. + exportFuture.complete(Objects.requireNonNull(export)); } // -------------------------- - @Override - public void onNext(ExportedTableCreationResponse etcr) { + private CompletionStage complete(Void ignore) { + final Export export = Objects.requireNonNull(exportFuture.getNow(null)); + final ExportedTableCreationResponse etcr = Objects.requireNonNull(etcrFuture.getNow(null)); final TableHandle tableHandle = new TableHandle(tableSpec, null); + tableHandle.init(export); final ResponseAdapter responseAdapter = tableHandle.responseAdapter(); responseAdapter.onNext(etcr); responseAdapter.onCompleted(); final TableHandleException error = tableHandle.error().orElse(null); if (error != null) { - future.completeExceptionally(error); - } else { - // It's possible that onNext comes before #init; either in the case where it was already cached from - // io.deephaven.client.impl.ExportService.export, or where the RPC comes in asynchronously. In either - // case, we need to store handle so it can potentially be completed here, or in init. - synchronized (this) { - handle = tableHandle; - maybeComplete(); - } + // Only available in Java 9+ + // return CompletableFuture.failedStage(error); + final CompletableFuture f = new CompletableFuture<>(); + f.completeExceptionally(error); + return f; } + // Only available in Java 9+ + // return CompletableFuture.completedStage(tableHandle); + return CompletableFuture.completedFuture(tableHandle); + } + + // -------------------------- + + @Override + public void onNext(ExportedTableCreationResponse etcr) { + etcrFuture.complete(etcr); } @Override public void onError(Throwable t) { - future.completeExceptionally(t); + etcrFuture.completeExceptionally(t); } @Override public void onCompleted() { - if (!future.isDone()) { - future.completeExceptionally(new IllegalStateException("onCompleted without future.isDone()")); + if (!etcrFuture.isDone()) { + etcrFuture.completeExceptionally(new IllegalStateException("onCompleted without etcrFuture.isDone()")); } }