Skip to content

Commit

Permalink
Add better asynchronous impl for TableHandleFuture (#4802)
Browse files Browse the repository at this point in the history
This is in support of #4798. This adds some additional logic to BatchHandler to improve logging in exceptional cases. This also fixes some exceptional cases in TableServiceAsyncImpl. I _expected_ this fixes the intermittent test failures.
  • Loading branch information
devinrsmith authored Nov 9, 2023
1 parent 475b383 commit c2dd5f3
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -378,10 +378,14 @@ public void onCompleted() {
private static final class BatchHandler
implements StreamObserver<ExportedTableCreationResponse> {

private static final Logger log = LoggerFactory.getLogger(BatchHandler.class);

private final Map<Integer, State> newStates;
private final Set<State> handled;

private BatchHandler(Map<Integer, State> newStates) {
this.newStates = Objects.requireNonNull(newStates);
this.handled = new HashSet<>(newStates.size());
}

@Override
Expand All @@ -398,24 +402,41 @@ public void onNext(ExportedTableCreationResponse value) {
"Not expecting export creation responses for empty tickets");
}
final int exportId = ExportTicketHelper.ticketToExportId(value.getResultId().getTicket(), "export");
final State state = newStates.remove(exportId);
final State state = newStates.get(exportId);
if (state == null) {
throw new IllegalStateException("Unable to find state for creation response");
}
state.onCreationResponse(value);
if (!handled.add(state)) {
throw new IllegalStateException(
String.format("Server misbehaving, already received response for export id %d", exportId));
}
try {
state.onCreationResponse(value);
} catch (RuntimeException e) {
log.error("state.onCreationResponse had unexpected exception", e);
state.onCreationError(e);
}
}

@Override
public void onError(Throwable t) {
for (State state : newStates.values()) {
state.onCreationError(t);
try {
state.onCreationError(t);
} catch (RuntimeException e) {
log.error("state.onCreationError had unexpected exception, ignoring", e);
}
}
}

@Override
public void onCompleted() {
for (State state : newStates.values()) {
state.onCreationCompleted();
try {
state.onCreationCompleted();
} catch (RuntimeException e) {
log.error("state.onCreationCompleted had unexpected exception, ignoring", e);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;

final class TableServiceAsyncImpl {

Expand Down Expand Up @@ -59,72 +62,80 @@ static List<? extends TableHandleFuture> executeAsync(ExportService exportServic

private static class TableHandleAsyncImpl implements TableHandleFuture, Listener {
private final TableSpec tableSpec;
private final CompletableFuture<Export> exportFuture;
private final CompletableFuture<ExportedTableCreationResponse> etcrFuture;
private final CompletableFuture<TableHandle> future;
private TableHandle handle;
private Export export;

TableHandleAsyncImpl(TableSpec tableSpec) {
this.tableSpec = Objects.requireNonNull(tableSpec);
this.future = new CompletableFuture<>();
}

synchronized void init(Export export) {
this.export = Objects.requireNonNull(export);
// TODO(deephaven-core#4781): Immediately notify server of release when user cancels TableHandleFuture
// this.future.whenComplete((tableHandle, throwable) -> {
// if (isCancelled()) {
// export.release();
// }
// });
maybeComplete();
exportFuture = new CompletableFuture<>();
etcrFuture = new CompletableFuture<>();
final CompletableFuture<TableHandle> internalFuture = CompletableFuture
.allOf(exportFuture, etcrFuture)
.thenCompose(this::complete);
// thenApply(Function.identity()) _may_ seem extraneous, but we need to ensure separation between the user's
// future and our internal state
future = internalFuture.thenApply(Function.identity());
future.whenComplete((tableHandle, throwable) -> {
// TODO(deephaven-core#4781): Immediately notify server of release when user cancels TableHandleFuture
if (throwable instanceof CancellationException) {
// Would be better if we could immediately tell server of release, but currently we need to wait for
// etcr/export object.
internalFuture.thenAccept(TableHandle::close);
}
});
}

private void maybeComplete() {
if (handle == null || export == null) {
return;
}
handle.init(export);
if (!future.complete(handle)) {
// If we are unable to complete the future, it means the user cancelled it. It's only at this point in
// time we are able to let the server know that we don't need it anymore.
// TODO(deephaven-core#4781): Immediately notify server of release when user cancels TableHandleFuture
handle.close();
}
handle = null;
export = null;
void init(Export export) {
// Note: we aren't expecting exceptional completions of exportFuture; we're using a future to make it easy
// to compose with our etcrFuture (which may or may not be completed before exportFuture).
// In exceptional cases where we _don't_ complete exportFuture (for example, the calling code has a runtime
// exception), we know we _haven't_ called io.deephaven.client.impl.ExportServiceRequest#send, so there
// isn't any possibility that we have left open a server-side export. And in those cases, this object isn't
// returned to the user and becomes garbage. The client-side cleanup will be handled in
// io.deephaven.client.impl.ExportServiceRequest#cleanupUnsent.
exportFuture.complete(Objects.requireNonNull(export));
}

// --------------------------

@Override
public void onNext(ExportedTableCreationResponse etcr) {
private CompletionStage<TableHandle> complete(Void ignore) {
final Export export = Objects.requireNonNull(exportFuture.getNow(null));
final ExportedTableCreationResponse etcr = Objects.requireNonNull(etcrFuture.getNow(null));
final TableHandle tableHandle = new TableHandle(tableSpec, null);
tableHandle.init(export);
final ResponseAdapter responseAdapter = tableHandle.responseAdapter();
responseAdapter.onNext(etcr);
responseAdapter.onCompleted();
final TableHandleException error = tableHandle.error().orElse(null);
if (error != null) {
future.completeExceptionally(error);
} else {
// It's possible that onNext comes before #init; either in the case where it was already cached from
// io.deephaven.client.impl.ExportService.export, or where the RPC comes in asynchronously. In either
// case, we need to store handle so it can potentially be completed here, or in init.
synchronized (this) {
handle = tableHandle;
maybeComplete();
}
// Only available in Java 9+
// return CompletableFuture.failedStage(error);
final CompletableFuture<TableHandle> f = new CompletableFuture<>();
f.completeExceptionally(error);
return f;
}
// Only available in Java 9+
// return CompletableFuture.completedStage(tableHandle);
return CompletableFuture.completedFuture(tableHandle);
}

// --------------------------

@Override
public void onNext(ExportedTableCreationResponse etcr) {
etcrFuture.complete(etcr);
}

@Override
public void onError(Throwable t) {
future.completeExceptionally(t);
etcrFuture.completeExceptionally(t);
}

@Override
public void onCompleted() {
if (!future.isDone()) {
future.completeExceptionally(new IllegalStateException("onCompleted without future.isDone()"));
if (!etcrFuture.isDone()) {
etcrFuture.completeExceptionally(new IllegalStateException("onCompleted without etcrFuture.isDone()"));
}
}

Expand Down

0 comments on commit c2dd5f3

Please sign in to comment.