Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QueryCompiler Batch Formula Compilation #5070

Merged
merged 21 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 22 additions & 12 deletions Util/src/main/java/io/deephaven/util/CompletionStageFuture.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

import org.jetbrains.annotations.NotNull;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.function.BiConsumer;
Expand All @@ -21,31 +21,40 @@
*/
public interface CompletionStageFuture<T> extends Future<T>, CompletionStage<T> {

/**
* Create a new incomplete future.
*
* @param <T> The result type returned by this future's {@code join}
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
* @return a resolver for the future
*/
static <T> Resolver<T> make() {
return new CompletionStageFutureImpl<T>().new ResolverImpl();
}

/**
* Returns a new CompletionStageFuture that is already completed with the given value.
*
* @param value the value
* @param <U> the type of the value
* @return the completed CompletionStageFuture
* @see CompletableFuture#completedFuture(Object)
* @see java.util.concurrent.CompletableFuture#completedFuture(Object)
*/
static <U> CompletionStageFuture<U> completedFuture(U value) {
final CompletionStageFutureImpl.Resolver<U> resolver = CompletionStageFutureImpl.make();
final CompletionStageFutureImpl.Resolver<U> resolver = CompletionStageFuture.make();
resolver.complete(value);
return resolver.getFuture();
}

/**
* Returns a new CompletableFuture that is already completed exceptionally with the given exception.
* Returns a new CompletionStageFuture that is already completed exceptionally with the given exception.
*
* @param ex the exception
* @param <U> the type of the value
* @return the exceptionally completed CompletableFuture
* @since 9
* @see CompletableFuture#failedFuture(Throwable)
* @return the exceptionally completed CompletionStageFuture
* @see java.util.concurrent.CompletableFuture#failedFuture(Throwable)
*/
static <U> CompletionStageFuture<U> failedFuture(Throwable ex) {
final CompletionStageFutureImpl.Resolver<U> resolver = CompletionStageFutureImpl.make();
final CompletionStageFutureImpl.Resolver<U> resolver = CompletionStageFuture.make();
resolver.completeExceptionally(ex);
return resolver.getFuture();
}
Expand All @@ -56,25 +65,26 @@ interface Resolver<T> {
* If not already completed, sets the value returned by {@link #get()} and related methods to the given value.
*
* @param value the result value
* @return {@code true} if this invocation caused this SafeCompletableFuture to transition to a completed state,
* @return {@code true} if this invocation caused this CompletionStageFuture to transition to a completed state,
* else {@code false}
* @see java.util.concurrent.CompletableFuture#complete(Object)
*/
boolean complete(T value);

/**
* If not already completed, causes invocations of {@link #get()} and related methods to throw the given
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
* exception.
* exception wrapped in an {@link ExecutionException}.
*
* @param ex the exception
* @return {@code true} if this invocation caused this SafeCompletableFuture to transition to a completed state,
* @return {@code true} if this invocation caused this CompletionStageFuture to transition to a completed state,
* else {@code false}
* @see java.util.concurrent.CompletableFuture#completeExceptionally(Throwable)
*/
boolean completeExceptionally(@NotNull Throwable ex);

/**
* @return the underlying future to provide to the recipient
* @return the underlying future to provide to the recipient; implementations must ensure that this method
* always returns an identical result for a given Resolver instance
*/
CompletionStageFuture<T> getFuture();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,10 @@
@SuppressWarnings("unchecked")
public class CompletionStageFutureImpl<T> extends CompletableFuture<T> implements CompletionStageFuture<T> {

/**
* Create a new incomplete future.
*
* @param <T> The result type returned by this future's {@code join}
* @return a resolver for the future
*/
public static <T> Resolver<T> make() {
return new CompletionStageFutureImpl<T>().new ResolverImpl();
}

/**
* A resolver for this future implementation.
*/
private class ResolverImpl implements CompletionStageFuture.Resolver<T> {
class ResolverImpl implements CompletionStageFuture.Resolver<T> {
public boolean complete(final T value) {
return safelyComplete(value);
}
Expand Down
2 changes: 1 addition & 1 deletion Util/src/main/java/io/deephaven/util/MultiException.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import java.util.List;

/**
* An exception to use when a series of operations mus all be executed, but may all throw exceptions themselves. This
* An exception to use when a series of operations must all be executed, but may all throw exceptions themselves. This
* allows for retention of all exception data.
*/
public class MultiException extends Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
// ****** AUTO-GENERATED CLASS - DO NOT EDIT MANUALLY
// ****** Edit LazyCachingSupplier and run "./gradlew replicateCachingSupplier" to regenerate
// ****** Edit SoftCachingSupplier and run "./gradlew replicateCachingSupplier" to regenerate
//
// @formatter:off
package io.deephaven.util.datastructures;
Expand All @@ -18,7 +18,7 @@
*
* @param <OUTPUT_TYPE> the type of results supplied by this function
*/
public final class LazyCachingFunction<INPUT_TYPE, OUTPUT_TYPE> implements Function<INPUT_TYPE, OUTPUT_TYPE> {
public final class SoftCachingFunction<INPUT_TYPE, OUTPUT_TYPE> implements Function<INPUT_TYPE, OUTPUT_TYPE> {

private final Function<INPUT_TYPE, OUTPUT_TYPE> internalFunction;

Expand All @@ -29,7 +29,7 @@ public final class LazyCachingFunction<INPUT_TYPE, OUTPUT_TYPE> implements Funct
*
* @param internalFunction The {@link Function} to wrap. Must be safely repeatable and must not return {@code null}.
*/
public LazyCachingFunction(@NotNull final Function<INPUT_TYPE, OUTPUT_TYPE> internalFunction) {
public SoftCachingFunction(@NotNull final Function<INPUT_TYPE, OUTPUT_TYPE> internalFunction) {
this.internalFunction = internalFunction;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*
* @param <OUTPUT_TYPE> the type of results supplied by this supplier
*/
public final class LazyCachingSupplier<OUTPUT_TYPE> implements Supplier<OUTPUT_TYPE> {
public final class SoftCachingSupplier<OUTPUT_TYPE> implements Supplier<OUTPUT_TYPE> {

private final Supplier<OUTPUT_TYPE> internalSupplier;

Expand All @@ -25,7 +25,7 @@ public final class LazyCachingSupplier<OUTPUT_TYPE> implements Supplier<OUTPUT_T
*
* @param internalSupplier The {@link Supplier} to wrap. Must be safely repeatable and must not return {@code null}.
*/
public LazyCachingSupplier(@NotNull final Supplier<OUTPUT_TYPE> internalSupplier) {
public SoftCachingSupplier(@NotNull final Supplier<OUTPUT_TYPE> internalSupplier) {
this.internalSupplier = internalSupplier;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import io.deephaven.io.logger.Logger;
import io.deephaven.util.ByteUtils;
import io.deephaven.util.CompletionStageFuture;
import io.deephaven.util.CompletionStageFutureImpl;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.text.StringEscapeUtils;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -216,7 +215,7 @@ public void setParentClassLoader(final ClassLoader parentClassLoader) {
* @param request The compilation request
*/
public Class<?> compile(@NotNull final QueryCompilerRequest request) {
final CompletionStageFuture.Resolver<Class<?>> resolver = CompletionStageFutureImpl.make();
final CompletionStageFuture.Resolver<Class<?>> resolver = CompletionStageFuture.make();
compile(request, resolver);
try {
return resolver.getFuture().get();
Expand All @@ -227,14 +226,15 @@ public Class<?> compile(@NotNull final QueryCompilerRequest request) {
}
throw new UncheckedDeephavenException("Error while compiling class", cause);
} catch (InterruptedException e) {
throw new UncheckedDeephavenException("Interrupted while compile class", e);
throw new UncheckedDeephavenException("Interrupted while compiling class", e);
}
}

/**
* Compile a class.
*
* @param request The compilation request
* @param resolver The resolver to use for delivering compilation results
*/
public void compile(
@NotNull final QueryCompilerRequest request,
Expand All @@ -247,6 +247,7 @@ public void compile(
* Compiles all requests.
*
* @param requests The compilation requests
* @param resolvers The resolvers to use for delivering compilation results
*/
public void compile(
@NotNull final QueryCompilerRequest[] requests,
Expand Down Expand Up @@ -459,8 +460,8 @@ private String getClassPath() {
}

private static class CompilationState {
int next_pi;
boolean compiled;
int nextProbeIndex;
boolean complete;
String packageName;
String fqClassName;
}
Expand All @@ -481,29 +482,29 @@ private void compileHelper(
requests.get(ii).classBody().getBytes(StandardCharsets.UTF_8)));
}

int numCompiled = 0;
int numComplete = 0;
final CompilationState[] states = new CompilationState[requests.size()];
for (int ii = 0; ii < requests.size(); ++ii) {
states[ii] = new CompilationState();
}

/*
* @formatter:off
* 1. try to resolve CFs without compiling; retain next hash to try
* 1. try to resolve without compiling; retain next hash to try
* 2. compile all remaining with a single compilation task
* 3. goto step 1 if any are unresolved
* @formatter:on
*/

while (numCompiled < requests.size()) {
while (numComplete < requests.size()) {
for (int ii = 0; ii < requests.size(); ++ii) {
final CompilationState state = states[ii];
if (state.compiled) {
if (state.complete) {
continue;
}

while (true) {
final int pi = state.next_pi++;
final int pi = state.nextProbeIndex++;
final String packageNameSuffix = "c_" + basicHashText[ii]
+ (pi == 0 ? "" : ("p" + pi))
+ "v" + JAVA_CLASS_VERSION;
Expand All @@ -514,8 +515,8 @@ private void compileHelper(
+ request.packageNameRoot() + ", class name=" + request.className() + ", class body "
+ "hash=" + basicHashText[ii] + " - contact Deephaven support!");
resolvers.get(ii).completeExceptionally(err);
state.compiled = true;
++numCompiled;
state.complete = true;
++numComplete;
break;
}

Expand All @@ -533,22 +534,22 @@ private void compileHelper(

if (completeIfResultMatchesQueryCompilerRequest(state.packageName, request, resolvers.get(ii),
result)) {
state.compiled = true;
++numCompiled;
state.complete = true;
++numComplete;
break;
}
}
}

if (numCompiled == requests.size()) {
if (numComplete == requests.size()) {
return;
}

// Couldn't resolve at least one of the requests, so try a round of compilation.
final List<CompilationRequestAttempt> compilationRequestAttempts = new ArrayList<>();
for (int ii = 0; ii < requests.size(); ++ii) {
final CompilationState state = states[ii];
if (!state.compiled) {
if (!state.complete) {
final QueryCompilerRequest request = requests.get(ii);
compilationRequestAttempts.add(new CompilationRequestAttempt(
request,
Expand All @@ -558,22 +559,22 @@ private void compileHelper(
}
}

maybeCreateClass(compilationRequestAttempts);
maybeCreateClasses(compilationRequestAttempts);

// We could be running on a screwy filesystem that is slow (e.g. NFS). If we wrote a file and can't load it
// ... then give the filesystem some time. All requests should use the same deadline.
final long deadline = System.currentTimeMillis() + CODEGEN_TIMEOUT_MS - CODEGEN_LOOP_DELAY_MS;
for (int ii = 0; ii < requests.size(); ++ii) {
final CompilationState state = states[ii];
if (state.compiled) {
if (state.complete) {
continue;
}

final QueryCompilerRequest request = requests.get(ii);
final CompletionStageFuture.Resolver<Class<?>> resolver = resolvers.get(ii);
if (resolver.getFuture().isDone()) {
state.compiled = true;
++numCompiled;
state.complete = true;
++numComplete;
continue;
}

Expand All @@ -599,8 +600,8 @@ private void compileHelper(
}

if (completeIfResultMatchesQueryCompilerRequest(state.packageName, request, resolver, clazz)) {
state.compiled = true;
++numCompiled;
state.complete = true;
++numComplete;
}
}
}
Expand Down Expand Up @@ -796,7 +797,7 @@ public JavaSourceFromString makeSource() {
}
}

private void maybeCreateClass(
private void maybeCreateClasses(
@NotNull final List<CompilationRequestAttempt> requests) {
// Get the destination root directory (e.g. /tmp/workspace/cache/classes) and populate it with the package
// directories (e.g. io/deephaven/test) if they are not already there. This will be useful later.
Expand Down Expand Up @@ -841,8 +842,6 @@ private void maybeCreateClass(
int parallelismFactor = operationInitializer.parallelismFactor();

int requestsPerTask = Math.max(32, (requests.size() + parallelismFactor - 1) / parallelismFactor);
log.info().append("Compiling with parallelismFactor = ").append(parallelismFactor)
.append(" requestsPerTask = ").append(requestsPerTask).endl();
if (parallelismFactor == 1 || requestsPerTask >= requests.size()) {
maybeCreateClassHelper(compiler, fileManager, requests, rootPathAsString, tempDirAsString,
0, requests.size());
Expand Down Expand Up @@ -895,6 +894,9 @@ private void maybeCreateClassHelper(
final int startInclusive,
final int endExclusive) {
final List<CompilationRequestAttempt> toRetry = new ArrayList<>();
// If any of our requests fail to compile then the JavaCompiler will not write any class files at all. The
// non-failing requests will be retried in a second pass that is expected to succeed. This enables us to
// fulfill futures independent of each other; otherwise a single failure would taint all requests in a batch.
final boolean wantRetry = maybeCreateClassHelper2(compiler,
fileManager, requests, rootPathAsString, tempDirAsString, startInclusive, endExclusive, toRetry);
if (!wantRetry) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import io.deephaven.engine.testutil.junit4.EngineCleanup;
import io.deephaven.time.DateTimeUtils;
import io.deephaven.util.CompletionStageFuture;
import io.deephaven.util.CompletionStageFutureImpl;
import io.deephaven.util.SafeCloseable;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -295,8 +294,8 @@ public void testMultiCompileWithFailure() throws ExecutionException, Interrupted
// noinspection unchecked
CompletionStageFuture.Resolver<Class<?>>[] resolvers =
(CompletionStageFuture.Resolver<Class<?>>[]) new CompletionStageFuture.Resolver[] {
CompletionStageFutureImpl.make(),
CompletionStageFutureImpl.make(),
CompletionStageFuture.make(),
CompletionStageFuture.make(),
};

try {
Expand Down
Loading
Loading