Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add java client TableServiceAsync #4756

Merged
merged 20 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public interface ExceptionHandler {
* The reference queue from the most recent initialization.
*/
private volatile ReferenceQueue<?> referenceQueue;
private Thread cleanerThread;

/**
* The cleaner thread from the most recent initialization, guarded by the lock on {@code this}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,14 @@
import io.deephaven.client.impl.TableHandle;
import io.deephaven.client.impl.TableHandle.TableHandleException;
import io.deephaven.client.impl.TableHandleManager;
import io.deephaven.client.impl.TableServiceAsync;
import io.deephaven.client.impl.TableServiceAsync.TableHandleFuture;
import io.deephaven.client.impl.TableServices;
import io.deephaven.client.impl.TableService;
import io.deephaven.client.impl.TableService.TableHandleFuture;
import io.deephaven.qst.TableCreator;
import io.deephaven.qst.table.TableSpec;
import org.junit.Ignore;
import org.junit.Test;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

Expand Down Expand Up @@ -43,13 +41,13 @@ public void distinctBatchManagerIsNotStateful() throws TableHandleException, Int

@Test
public void distinctTableServicesIsNotStateful() throws TableHandleException, InterruptedException {
checkState(session.tableServices(), session.tableServices(), NOT_STATEFUL);
checkState(session.newStatefulTableService(), session.newStatefulTableService(), NOT_STATEFUL);
}

@Test
public void distinctTableServiceAsyncsIsNotStateful()
throws InterruptedException, ExecutionException, TimeoutException {
checkAsyncState(session.tableServices(), session.tableServices(), NOT_STATEFUL);
checkAsyncState(session.newStatefulTableService(), session.newStatefulTableService(), NOT_STATEFUL);
}

// this is currently broken; serial clients *can't* reliably execute the same non-trivial TableSpec DAG
Expand All @@ -61,21 +59,36 @@ public void singleSerialManagerIsStateful() throws TableHandleException, Interru
}

@Test
public void singleBatchManagerIsStateful() throws TableHandleException, InterruptedException {
public void singleBatchManagerNotIsStateful() throws TableHandleException, InterruptedException {
final TableHandleManager manager = session.batch();
checkState(manager, manager, STATEFUL);
checkState(manager, manager, NOT_STATEFUL);
}

@Test
public void singleTableServiceIsStateful() throws TableHandleException, InterruptedException {
final TableServices tableServices = session.tableServices();
checkState(tableServices, tableServices, STATEFUL);
public void newStatefulTableServiceIsStateful() throws TableHandleException, InterruptedException {
final TableService ts = session.newStatefulTableService();
checkState(ts, ts, STATEFUL);
}

@Test
public void singleTableServiceAsyncIsStateful() throws InterruptedException, ExecutionException, TimeoutException {
final TableServices tableServices = session.tableServices();
checkAsyncState(tableServices, tableServices, STATEFUL);
public void newStatefulTableServiceAsyncIsStateful()
throws InterruptedException, ExecutionException, TimeoutException {
final TableService ts = session.newStatefulTableService();
checkAsyncState(ts, ts, STATEFUL);
}

@Test
public void newStatefulTableServiceBatchIsStateful() throws TableHandleException, InterruptedException {
final TableService ts = session.newStatefulTableService();
checkState(ts.batch(), ts.batch(), STATEFUL);
}

// this is currently broken; serial clients *can't* reliably execute the same non-trivial TableSpec DAG
@Ignore
@Test
public void newStatefulTableServiceSerialIsStateful() throws TableHandleException, InterruptedException {
final TableService ts = session.newStatefulTableService();
checkState(ts.serial(), ts.serial(), STATEFUL);
}

static void checkState(TableHandleManager m1, TableHandleManager m2, boolean expectEquals)
Expand All @@ -84,7 +97,7 @@ static void checkState(TableHandleManager m1, TableHandleManager m2, boolean exp
checkExecuteState(m1, m2, expectEquals);
}

static void checkAsyncState(TableServiceAsync a1, TableServiceAsync a2, boolean expectEquals)
static void checkAsyncState(TableService a1, TableService a2, boolean expectEquals)
throws InterruptedException, ExecutionException, TimeoutException {
checkExecuteAsyncState(a1, a2, expectEquals);
}
Expand Down Expand Up @@ -117,12 +130,35 @@ private static void checkExecuteState(TableHandleManager m1, TableHandleManager
}
}

private static void checkExecuteAsyncState(TableServiceAsync a1, TableServiceAsync a2, boolean expectEquals)
private static void checkExecuteAsyncState(TableService a1, TableService a2, boolean expectEquals)
throws InterruptedException, ExecutionException, TimeoutException {
checkExecuteAsyncStateOneAtTime(a1, a2, expectEquals);
checkExecuteAsyncStateAtSameTime(a1, a2, expectEquals);
}

private static void checkExecuteAsyncStateOneAtTime(TableService a1, TableService a2, boolean expectEquals)
throws InterruptedException, ExecutionException, TimeoutException {
final TableSpec q = tableSpec();
try (
final TableHandle h1 = a1.executeAsync(q).getOrCancel(Duration.ofSeconds(5));
final TableHandle h2 = a2.executeAsync(q).getOrCancel(Duration.ofSeconds(5))) {
assertThat(h1.exportId().toString().equals(h2.exportId().toString())).isEqualTo(expectEquals);
try (
final TableHandle h3 = h1.updateView("K=ii");
final TableHandle h4 = h2.updateView("K=ii")) {
assertThat(h3.exportId().toString().equals(h4.exportId().toString())).isEqualTo(expectEquals);
}
}
}

private static void checkExecuteAsyncStateAtSameTime(TableService a1, TableService a2, boolean expectEquals)
throws InterruptedException, ExecutionException, TimeoutException {
final TableSpec q = tableSpec();
final TableHandleFuture f1 = a1.executeAsync(q);
final TableHandleFuture f2 = a2.executeAsync(q);
try (
final TableHandle h1 = TableHandleFuture.get(a1.executeAsync(q), Duration.ofSeconds(5));
final TableHandle h2 = TableHandleFuture.get(a2.executeAsync(q), Duration.ofSeconds(5))) {
final TableHandle h1 = f1.getOrCancel(Duration.ofSeconds(5));
final TableHandle h2 = f2.getOrCancel(Duration.ofSeconds(5))) {
assertThat(h1.exportId().toString().equals(h2.exportId().toString())).isEqualTo(expectEquals);
try (
final TableHandle h3 = h1.updateView("K=ii");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import io.deephaven.client.impl.TableHandle;
import io.deephaven.client.impl.TableHandle.TableHandleException;
import io.deephaven.client.impl.TableServiceAsync.TableHandleFuture;
import io.deephaven.client.impl.TableService;
import io.deephaven.qst.table.TableSpec;
import org.junit.Test;

Expand Down Expand Up @@ -44,7 +44,7 @@ public void serial() throws TableHandleException, InterruptedException {

@Test(timeout = 10000)
public void async() throws ExecutionException, InterruptedException {
try (final TableHandle handle = TableHandleFuture.get(session.executeAsync(table))) {
try (final TableHandle handle = session.executeAsync(table).getOrCancel()) {
assertThat(handle.isSuccessful()).isTrue();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package io.deephaven.client.impl;

import io.deephaven.client.DeephavenSessionTestBase;
import io.deephaven.client.impl.TableServiceAsync.TableHandleFuture;
import io.deephaven.client.impl.TableService.TableHandleFuture;
import io.deephaven.qst.table.TableSpec;
import org.junit.Test;

Expand All @@ -16,7 +16,7 @@

import static org.assertj.core.api.Assertions.assertThat;

public class TableServicesAsyncTest extends DeephavenSessionTestBase {
public class TableServiceAsyncTest extends DeephavenSessionTestBase {

private static final Duration GETTIME = Duration.ofSeconds(15);
private static final int CHAIN_OPS = 50;
Expand All @@ -26,23 +26,23 @@ public class TableServicesAsyncTest extends DeephavenSessionTestBase {
public void longChainAsyncExportOnlyLast() throws ExecutionException, InterruptedException, TimeoutException {
final List<TableSpec> longChain = createLongChain();
final TableSpec longChainLast = longChain.get(longChain.size() - 1);
try (final TableHandle handle = get(session.tableServices().executeAsync(longChainLast))) {
try (final TableHandle handle = get(session.newStatefulTableService().executeAsync(longChainLast))) {
checkSucceeded(handle);
}
}

@Test(timeout = 20000)
public void longChainAsyncExportAll() throws ExecutionException, InterruptedException, TimeoutException {
final List<TableSpec> longChain = createLongChain();
final List<? extends TableHandleFuture> futures = session.tableServices().executeAsync(longChain);
final List<? extends TableHandleFuture> futures = session.newStatefulTableService().executeAsync(longChain);
try {
for (final TableHandleFuture future : futures) {
try (final TableHandle handle = get(future)) {
checkSucceeded(handle);
}
}
} catch (final Throwable t) {
TableHandleFuture.cancelOrClose(futures, true);
TableService.TableHandleFuture.cancelOrClose(futures, true);
throw t;
}
}
Expand All @@ -51,9 +51,9 @@ public void longChainAsyncExportAll() throws ExecutionException, InterruptedExce
public void longChainAsyncExportAllCancelAllButLast()
throws ExecutionException, InterruptedException, TimeoutException {
final List<TableSpec> longChain = createLongChain();
final List<? extends TableHandleFuture> futures = session.tableServices().executeAsync(longChain);
final List<? extends TableHandleFuture> futures = session.newStatefulTableService().executeAsync(longChain);
// Cancel or close all but the last one
TableHandleFuture.cancelOrClose(futures.subList(0, futures.size() - 1), true);
TableService.TableHandleFuture.cancelOrClose(futures.subList(0, futures.size() - 1), true);
try (final TableHandle lastHandle = get(futures.get(futures.size() - 1))) {
checkSucceeded(lastHandle);
}
Expand All @@ -64,10 +64,10 @@ public void immediatelyCompletedFromCachedTableServices()
throws ExecutionException, InterruptedException, TimeoutException {
final List<TableSpec> longChain = createLongChain();
final TableSpec longChainLast = longChain.get(longChain.size() - 1);
final TableServices tableServices = session.tableServices();
try (final TableHandle ignored = get(tableServices.executeAsync(longChainLast))) {
final TableService tableService = session.newStatefulTableService();
try (final TableHandle ignored = get(tableService.executeAsync(longChainLast))) {
for (int i = 0; i < 1000; ++i) {
try (final TableHandle handle = get(tableServices.executeAsync(longChainLast))) {
try (final TableHandle handle = get(tableService.executeAsync(longChainLast))) {
checkSucceeded(handle);
}
}
Expand All @@ -76,7 +76,7 @@ public void immediatelyCompletedFromCachedTableServices()

private static TableHandle get(TableHandleFuture future)
throws ExecutionException, InterruptedException, TimeoutException {
return TableHandleFuture.get(future, GETTIME);
return future.getOrCancel(GETTIME);
}

private static void checkSucceeded(TableHandle x) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import io.deephaven.api.filter.Filter;
import io.deephaven.client.impl.Session;
import io.deephaven.client.impl.TableHandle;
import io.deephaven.client.impl.TableServiceAsync.TableHandleFuture;
import io.deephaven.qst.table.TableSpec;
import picocli.CommandLine;
import picocli.CommandLine.ArgGroup;
Expand Down Expand Up @@ -39,7 +38,7 @@ enum Type {
protected void execute(Session session) throws Exception {
final Filter filter = type == Type.AND ? Filter.and(Filter.from(filters)) : Filter.or(Filter.from(filters));
final TableSpec filtered = ticket.ticketId().table().where(filter);
try (final TableHandle handle = TableHandleFuture.get(session.tableServices().executeAsync(filtered))) {
try (final TableHandle handle = session.newStatefulTableService().executeAsync(filtered).getOrCancel()) {
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
session.publish("filter_table_results", handle).get();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ protected void execute(Session session) throws Exception {
final TableSpec rPlusOne = r.view("PlusOne=R + 1");
final TableSpec rMinusOne = r.view("PlusOne=R - 1");
final TableHandleManager manager = mode == null
? session.tableServices()
? session.newStatefulTableService()
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
: mode.batch
? session.batch()
: session.serial();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package io.deephaven.client.impl;

import io.deephaven.client.impl.ExportRequest.Listener;
import io.deephaven.client.impl.ExportStates.State;
import io.deephaven.qst.table.TableSpec;

import java.util.Objects;
Expand Down Expand Up @@ -61,6 +62,10 @@ ExportStates exportStates() {
return state.exportStates();
}

State state() {
return state;
}

/**
* The table spec.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

Expand Down Expand Up @@ -142,6 +142,8 @@ private ExportServiceRequest exportRequestImpl(ExportsRequest requests) {
};
}
return new ExportServiceRequest() {
boolean sent;
boolean closed;

@Override
public List<Export> exports() {
Expand All @@ -150,27 +152,62 @@ public List<Export> exports() {

@Override
public void send() {
if (closed || sent) {
return;
}
sent = true;
// After the user has called send, all handling of state needs to be handled by the respective
// io.deephaven.client.impl.ExportRequest.listener
send.run();

}

@Override
public void close() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have the potential for state leak if the user calls close without calling send. Do we need to clean those up? What about on errors?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I've added some new logic to handle the case where the user does not call send. After sent, any errors are the handled by respective io.deephaven.client.impl.ExportRequest.listener.

lock.unlock();
if (closed) {
return;
}
closed = true;
try {
if (!sent) {
cleanupUnsent();
}
} finally {
lock.unlock();
}
}

private void cleanupUnsent() {
for (Export result : results) {
final State state = result.state();
if (newStates.containsKey(state.exportId())) {
// On brand new states that we didn't even send, we can simply remove them. We aren't
// leaking anything, but we have incremented our export id creator state.
removeImpl(state);
continue;
}
result.release();
}
}
};
}

private void release(State state) {

private void remove(State state) {
lock.lock();
try {
if (!exports.remove(state.table(), state)) {
throw new IllegalStateException("Unable to remove state");
}
removeImpl(state);
} finally {
lock.unlock();
}
}

private void removeImpl(State state) {
if (!exports.remove(state.table(), state)) {
throw new IllegalStateException("Unable to remove state");
}
}

private Optional<State> lookup(TableSpec table) {
return Optional.ofNullable(exports.get(table));
}
Expand Down Expand Up @@ -224,7 +261,7 @@ class State {
State(TableSpec table, int exportId) {
this.table = Objects.requireNonNull(table);
this.exportId = exportId;
this.children = new LinkedHashSet<>();
this.children = new CopyOnWriteArraySet<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand we want this to avoid CME's on remove during an iteration. Can we build a list to remove and do a removeAll instead? I'm worried about adding COW data structures with user-facing adds.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I more thoroughly traced exactly the part that was causing concurrent mod, and it was actually the mitigation; now that the Batch / ETCR race has been fixed, I can get rid of the mitigation and revert this change.

}

Session session() {
Expand Down Expand Up @@ -258,7 +295,7 @@ synchronized void release(Export export) {
throw new IllegalStateException("Unable to remove child");
}
if (children.isEmpty()) {
ExportStates.this.release(this);
ExportStates.this.remove(this);
released = true;
sessionStub.release(
ReleaseRequest.newBuilder().setId(ExportTicketHelper.wrapExportIdInTicket(exportId)).build(),
Expand Down
Loading
Loading