Skip to content

Commit

Permalink
Merge branch 'hotfix-0.19.1' into release/v0.19.1
Browse files Browse the repository at this point in the history
  • Loading branch information
devinrsmith committed Dec 12, 2022
2 parents 129c68e + 72ef0bc commit 9c50936
Show file tree
Hide file tree
Showing 15 changed files with 76 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ class AuthorizingServerCallHandler<ReqT, RespT> implements ServerCallHandler<Req
private final MessageReceivedCallback<ReqT> messageReceivedCallback;

private final boolean mustHaveRequest;
private final String serviceName;
private final String methodName;
private final String fullMethodName;

public AuthorizingServerCallHandler(
final ServerCallHandler<ReqT, RespT> delegate,
Expand All @@ -69,9 +68,7 @@ public AuthorizingServerCallHandler(
this.callStartedCallback = callStartedCallback;
this.messageReceivedCallback = messageReceivedCallback;
mustHaveRequest = method.getMethodDescriptor().getType().clientSendsOneMessage();
serviceName = method.getMethodDescriptor().getServiceName();
methodName = method.getMethodDescriptor().getFullMethodName();

fullMethodName = method.getMethodDescriptor().getFullMethodName();
// validate that we have handlers for the methods we will try to invoke
if (!mustHaveRequest) {
Assert.neqNull(callStartedCallback, "callStartedCallback");
Expand Down Expand Up @@ -116,20 +113,16 @@ private boolean validateAuth(
Status.Code status = sre.getStatus().getCode();
switch (status) {
case UNAUTHENTICATED:
log.info().append(serviceName).append(".").append(methodName)
.append(": request unauthenticated").endl();
log.info().append(fullMethodName).append(": request unauthenticated").endl();
break;
case PERMISSION_DENIED:
log.info().append(serviceName).append(".").append(methodName)
.append(": request unauthorized").endl();
log.info().append(fullMethodName).append(": request unauthorized").endl();
break;
case RESOURCE_EXHAUSTED:
log.info().append(serviceName).append(".").append(methodName)
.append(": request throttled").endl();
log.info().append(fullMethodName).append(": request throttled").endl();
break;
default:
log.error().append(serviceName).append(".").append(methodName)
.append(": authorization failed: ").append(err).endl();
log.error().append(fullMethodName).append(": authorization failed: ").append(err).endl();
}

quietlyCloseCall(call, sre.getStatus(), sre.getTrailers());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ plugins {
}

group = 'io.deephaven'
version = '0.19.0'
version = '0.19.1'

if (!name.startsWith('deephaven-')) {
archivesBaseName = "deephaven-${name}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1861,6 +1861,7 @@ private void prepareVectorization(MethodCallExpr n, Expression[] expressions, Py
if (pyCallableWrapper.isVectorized()) {
throw ex;
}
pyCallableWrapper.setVectorizable(false);
if (log.isDebugEnabled()) {
log.debug().append("Python function call ").append(n.toString()).append(" is not auto-vectorizable:")
.append(ex.getMessage()).endl();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ final public class SelectColumnLayer extends SelectOrViewColumnLayer {
/**
* The same reference as super.columnSource, but as a WritableColumnSource and maybe reinterpretted
*/
private final WritableColumnSource writableSource;
private final WritableColumnSource<?> writableSource;

/**
* The execution context the select column layer was constructed in
Expand Down Expand Up @@ -70,12 +70,12 @@ final public class SelectColumnLayer extends SelectOrViewColumnLayer {
private ChunkSource.WithPrev<Values> chunkSource;

SelectColumnLayer(RowSet parentRowSet, SelectAndViewAnalyzer inner, String name, SelectColumn sc,
WritableColumnSource ws, WritableColumnSource underlying,
WritableColumnSource<?> ws, WritableColumnSource<?> underlying,
String[] deps, ModifiedColumnSet mcsBuilder, boolean isRedirected,
boolean flattenedResult, boolean alreadyFlattenedSources) {
super(inner, name, sc, ws, underlying, deps, mcsBuilder);
this.parentRowSet = parentRowSet;
this.writableSource = (WritableColumnSource) ReinterpretUtils.maybeConvertToPrimitive(ws);
this.writableSource = (WritableColumnSource<?>) ReinterpretUtils.maybeConvertToPrimitive(ws);
this.isRedirected = isRedirected;
this.executionContext = ExecutionContext.getContextToRecord();

Expand All @@ -88,7 +88,8 @@ final public class SelectColumnLayer extends SelectOrViewColumnLayer {
// We can only parallelize this column if we are not redirected, our destination provides ensure previous, and
// the select column is stateless
canParallelizeThisColumn = !isRedirected
&& WritableSourceWithPrepareForParallelPopulation.supportsParallelPopulation(ws) && sc.isStateless();
&& WritableSourceWithPrepareForParallelPopulation.supportsParallelPopulation(writableSource)
&& sc.isStateless();

// If we were created on a systemic thread, we want to be sure to make sure that any updates are also
// applied systemically.
Expand Down Expand Up @@ -139,6 +140,11 @@ public void onAllRequiredColumnsCompleted() {
// If we have shifts, that makes everything nasty; so we do not want to deal with it
final boolean hasShifts = upstream.shifted().nonempty();

final boolean checkTableOperations =
UpdateGraphProcessor.DEFAULT.getCheckTableOperations()
&& !UpdateGraphProcessor.DEFAULT.sharedLock().isHeldByCurrentThread()
&& !UpdateGraphProcessor.DEFAULT.exclusiveLock().isHeldByCurrentThread();

if (canParallelizeThisColumn && jobScheduler.threadCount() > 1 && !hasShifts &&
((resultTypeIsTable && totalSize > 0)
|| totalSize > QueryTable.MINIMUM_PARALLEL_SELECT_ROWS)) {
Expand Down Expand Up @@ -180,12 +186,14 @@ public void onAllRequiredColumnsCompleted() {
jobScheduler.submit(
executionContext,
() -> prepareParallelUpdate(jobScheduler, upstream, toClear, helper,
liveResultOwner, onCompletion, this::onError, updates),
liveResultOwner, onCompletion, this::onError, updates,
checkTableOperations),
SelectColumnLayer.this, this::onError);
} else {
jobScheduler.submit(
executionContext,
() -> doSerialApplyUpdate(upstream, toClear, helper, liveResultOwner, onCompletion),
() -> doSerialApplyUpdate(upstream, toClear, helper, liveResultOwner, onCompletion,
checkTableOperations),
SelectColumnLayer.this, this::onError);
}
}
Expand All @@ -195,17 +203,13 @@ public void onAllRequiredColumnsCompleted() {
private void prepareParallelUpdate(final JobScheduler jobScheduler, final TableUpdate upstream,
final RowSet toClear, final UpdateHelper helper, @Nullable final LivenessNode liveResultOwner,
final SelectLayerCompletionHandler onCompletion, final Consumer<Exception> onError,
final List<TableUpdate> splitUpdates) {
final List<TableUpdate> splitUpdates, final boolean checkTableOperations) {
// we have to do removal and previous initialization before we can do any of the actual filling in multiple
// threads to avoid concurrency problems with our destination column sources
doEnsureCapacity();

prepareSourcesForParallelPopulation(upstream);

final boolean checkTableOperations =
UpdateGraphProcessor.DEFAULT.getCheckTableOperations()
&& !UpdateGraphProcessor.DEFAULT.sharedLock().isHeldByCurrentThread()
&& !UpdateGraphProcessor.DEFAULT.exclusiveLock().isHeldByCurrentThread();
final AtomicInteger divisions = new AtomicInteger(splitUpdates.size());

long destinationOffset = 0;
Expand All @@ -226,11 +230,16 @@ private void prepareParallelUpdate(final JobScheduler jobScheduler, final TableU
}

private void doSerialApplyUpdate(final TableUpdate upstream, final RowSet toClear, final UpdateHelper helper,
@Nullable final LivenessNode liveResultOwner, final SelectLayerCompletionHandler onCompletion) {
@Nullable final LivenessNode liveResultOwner, final SelectLayerCompletionHandler onCompletion,
final boolean checkTableOperations) {
doEnsureCapacity();
SystemicObjectTracker.executeSystemically(isSystemic,
() -> doApplyUpdate(upstream, helper, liveResultOwner, 0));

final boolean oldCheck = UpdateGraphProcessor.DEFAULT.setCheckTableOperations(checkTableOperations);
try {
SystemicObjectTracker.executeSystemically(isSystemic,
() -> doApplyUpdate(upstream, helper, liveResultOwner, 0));
} finally {
UpdateGraphProcessor.DEFAULT.setCheckTableOperations(oldCheck);
}
if (!isRedirected) {
clearObjectsAtThisLevel(toClear);
}
Expand Down Expand Up @@ -568,6 +577,6 @@ public LogOutput append(LogOutput logOutput) {

@Override
public boolean allowCrossColumnParallelization() {
return inner.allowCrossColumnParallelization();
return selectColumn.isStateless() && inner.allowCrossColumnParallelization();
}
}
2 changes: 1 addition & 1 deletion py/client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ $ python3 -m examples.demo_asof_join
```
## Install
``` shell
$ pip3 install dist/pydeephaven-0.19.0-py3-none-any.whl
$ pip3 install dist/pydeephaven-0.19.1-py3-none-any.whl
```
## Quick start

Expand Down
2 changes: 1 addition & 1 deletion py/client/pydeephaven/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@
from .constants import SortDirection, MatchRule
from .query import Query

__version__ = "0.19.0"
__version__ = "0.19.1"
2 changes: 1 addition & 1 deletion py/client/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

setup(
name='pydeephaven',
version='0.19.0',
version='0.19.1',
description='The Deephaven Python Client',
long_description=README,
long_description_content_type="text/markdown",
Expand Down
2 changes: 1 addition & 1 deletion py/embedded-server/deephaven_server/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
#
__version__ = "0.19.0"
__version__ = "0.19.1"

from .start_jvm import DEFAULT_JVM_PROPERTIES, DEFAULT_JVM_ARGS, start_jvm
from .server import Server
Expand Down
2 changes: 1 addition & 1 deletion py/server/deephaven/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"""

__version__ = "0.19.0"
__version__ = "0.19.1"

from deephaven_internal import jvm
try:
Expand Down
10 changes: 7 additions & 3 deletions py/server/deephaven_internal/auto_completer/_completer.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,15 @@ def do_completion(

# run jedi
txt = self.get_doc(uri)
# The Script completer is static analysis only, so we should actually be feeding it a whole document at once.

completer = Script if self.__mode == Mode.SAFE else Interpreter
completer = (
# The Script completer is static analysis only, so we should actually be feeding it a whole document at once.
Script(txt)
if self.__mode == Mode.SAFE
else Interpreter(txt, [self.__scope])
)
completions = completer.complete(line, col)

completions = completer(txt, [self.__scope]).complete(line, col)
# for now, a simple sorting based on number of preceding _
# we may want to apply additional sorting to each list before combining
results: list = []
Expand Down
6 changes: 3 additions & 3 deletions server/jetty-app/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@ be sure to first set `PYTHON_CONFIGURE_OPTS="--enabled-shared"`.
```shell
$ python -m pip install --upgrade pip # First upgrade pip
$ pip install wheel
$ export DEEPHAVEN_VERSION=0.19.0 # this should match the current version of your git repo
$ export DEEPHAVEN_VERSION=0.19.1 # this should match the current version of your git repo
$ cd py/jpy
$ export JAVA_HOME=/path/to/your/java/home # Customize this to fit your computer
$ python setup.py bdist_wheel
$ pip install dist/deephaven_jpy-0.19.0-cp39-cp39-linux_x86_64.whl # This will vary by version/platform
$ pip install dist/deephaven_jpy-0.19.1-cp39-cp39-linux_x86_64.whl # This will vary by version/platform
$ cd -
$ cd Integrations/python
$ python setup.py bdist_wheel
$ pip install dist/deephaven-0.19.0-py2.py3-none-any.whl
$ pip install dist/deephaven-0.19.1-py2.py3-none-any.whl
$ cd -
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,9 +302,7 @@ public void onError(Throwable t) {
@Override
public void onCompleted() {
// just hang up too, browser will reconnect if interested
synchronized (responseObserver) {
responseObserver.onCompleted();
}
safelyExecuteLocked(responseObserver, responseObserver::onCompleted);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import io.grpc.stub.StreamObserver;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

Expand All @@ -34,11 +36,12 @@ public class JavaAutoCompleteObserver extends SessionCloseableObserver<AutoCompl
implements StreamObserver<AutoCompleteRequest> {

private static final Logger log = LoggerFactory.getLogger(JavaAutoCompleteObserver.class);
private final CompletionParser parser;
/** Track parsers by their session state, to ensure each session has its own, singleton, parser */
private static final Map<SessionState, CompletionParser> parsers = Collections.synchronizedMap(new WeakHashMap<>());

private final Map<SessionState, CompletionParser> parsers = new ConcurrentHashMap<>();
private final CompletionParser parser;

private CompletionParser ensureParserForSession(SessionState session) {
private static CompletionParser ensureParserForSession(SessionState session) {
return parsers.computeIfAbsent(session, s -> {
CompletionParser parser = new CompletionParser();
s.addOnCloseCallback(() -> {
Expand Down Expand Up @@ -166,9 +169,7 @@ public void onError(Throwable t) {

@Override
public void onCompleted() {
// just hang up too, browser will reconnect if interested
synchronized (responseObserver) {
responseObserver.onCompleted();
}
// just hang up too, browser will reconnect if interested, and we'll maintain state if the session isn't gc'd
safelyExecuteLocked(responseObserver, responseObserver::onCompleted);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,6 @@ public void onError(Throwable t) {
@Override
public void onCompleted() {
// just hang up too, browser will reconnect if interested
synchronized (responseObserver) {
responseObserver.onCompleted();
}
safelyExecuteLocked(responseObserver, responseObserver::onCompleted);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
import io.deephaven.proto.backplane.grpc.PartitionByRequest;
import io.deephaven.proto.backplane.grpc.PartitionByResponse;
import io.deephaven.proto.backplane.grpc.PartitionedTableServiceGrpc;
import io.deephaven.server.auth.AuthorizationProvider;
import io.deephaven.server.session.SessionService;
import io.deephaven.server.session.SessionState;
import io.deephaven.server.session.TicketResolverBase;
import io.deephaven.server.session.TicketRouter;
import io.grpc.stub.StreamObserver;

Expand All @@ -34,17 +36,20 @@ public class PartitionedTableServiceGrpcImpl extends PartitionedTableServiceGrpc
private final SessionService sessionService;
private final UpdateGraphProcessor updateGraphProcessor;
private final PartitionedTableServiceContextualAuthWiring authWiring;
private final TicketResolverBase.AuthTransformation authorizationTransformation;

@Inject
public PartitionedTableServiceGrpcImpl(
TicketRouter ticketRouter,
SessionService sessionService,
UpdateGraphProcessor updateGraphProcessor,
AuthorizationProvider authorizationProvider,
PartitionedTableServiceContextualAuthWiring authWiring) {
this.ticketRouter = ticketRouter;
this.sessionService = sessionService;
this.updateGraphProcessor = updateGraphProcessor;
this.authWiring = authWiring;
this.authorizationTransformation = authorizationProvider.getTicketTransformation();
}

@Override
Expand Down Expand Up @@ -86,16 +91,19 @@ public void merge(MergeRequest request, StreamObserver<ExportedTableCreationResp
.onError(responseObserver)
.submit(() -> {
authWiring.checkPermissionMerge(session.getAuthContext(), request,
Collections.singletonList((Table) partitionedTable.get()));
final Table merged;
Collections.singletonList(partitionedTable.get().table()));
Table merged;
if (partitionedTable.get().table().isRefreshing()) {
merged = updateGraphProcessor.sharedLock()
.computeLocked(partitionedTable.get()::merge);
} else {
merged = partitionedTable.get().merge();
}
merged = authorizationTransformation.transform(merged);
final ExportedTableCreationResponse response =
buildTableCreationResponse(request.getResultId(), merged);
safelyExecute(() -> {
responseObserver.onNext(buildTableCreationResponse(request.getResultId(), merged));
responseObserver.onNext(response);
responseObserver.onCompleted();
});
return merged;
Expand All @@ -118,10 +126,10 @@ public void getTable(GetTableRequest request, StreamObserver<ExportedTableCreati
.require(partitionedTable, keys)
.onError(responseObserver)
.submit(() -> {
authWiring.checkPermissionGetTable(session.getAuthContext(), request,
List.of((Table) partitionedTable.get(), keys.get()));
final Table table;
Table table;
Table keyTable = keys.get();
authWiring.checkPermissionGetTable(session.getAuthContext(), request,
List.of(partitionedTable.get().table(), keyTable));
if (!keyTable.isRefreshing()) {
long keyTableSize = keyTable.size();
if (keyTableSize != 1) {
Expand Down Expand Up @@ -159,8 +167,11 @@ public void getTable(GetTableRequest request, StreamObserver<ExportedTableCreati
.get(requestedRow.getRowSet().firstRowKey());
});
}
table = authorizationTransformation.transform(table);
final ExportedTableCreationResponse response =
buildTableCreationResponse(request.getResultId(), table);
safelyExecute(() -> {
responseObserver.onNext(buildTableCreationResponse(request.getResultId(), table));
responseObserver.onNext(response);
responseObserver.onCompleted();
});
return table;
Expand Down

0 comments on commit 9c50936

Please sign in to comment.