Skip to content

Commit

Permalink
Our UsePreviousValues implementations generally need to care about sa…
Browse files Browse the repository at this point in the history
…tisfaction, rather than just last notification step (#3983)

* Our UsePreviousValues implementations generally need to care about satisfaction, rather than just last notification step

* Overhaul concurrent safety for all NotificationQueue.Dependency.satisfied implementations

* Add ClockInconsistencyException for satisfied to throw, enabling fail-fast for snapshots and error detection for update propagation

* Port PeriodicUpdateGraph unit test support for sources-satisfied control, port TestConcurrentInstantiation fixes and new test, and fix other ExportTableUpdateListenerTest
  • Loading branch information
rcaudy authored and devinrsmith committed Jun 14, 2023
1 parent 2d80df2 commit a88c202
Show file tree
Hide file tree
Showing 17 changed files with 462 additions and 204 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
import io.deephaven.base.verify.Require;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.context.PoisonedUpdateGraph;
import io.deephaven.engine.exceptions.UpdateGraphConflictException;
import io.deephaven.engine.table.impl.util.StepUpdater;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.exceptions.NotSortableException;
Expand Down Expand Up @@ -45,6 +45,7 @@
import java.io.*;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.Condition;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -84,6 +85,10 @@ public abstract class BaseTable<IMPL_TYPE extends BaseTable<IMPL_TYPE>> extends
throw new UnsupportedOperationException("EMPTY_CHILDREN does not support adds");
}, Collections.emptyList());

@SuppressWarnings("rawtypes")
private static final AtomicLongFieldUpdater<BaseTable> LAST_SATISFIED_STEP_UPDATER =
AtomicLongFieldUpdater.newUpdater(BaseTable.class, "lastSatisfiedStep");

/**
* This table's definition.
*/
Expand All @@ -99,6 +104,11 @@ public abstract class BaseTable<IMPL_TYPE extends BaseTable<IMPL_TYPE>> extends
*/
protected final UpdateGraph updateGraph;

/**
* The last clock step when this table dispatched a notification.
*/
private volatile long lastNotificationStep;

// Fields for DynamicNode implementation and update propagation support
private volatile boolean refreshing;
@SuppressWarnings({"FieldMayBeFinal", "unused"}) // Set via ensureField with CONDITION_UPDATER
Expand All @@ -108,8 +118,8 @@ public abstract class BaseTable<IMPL_TYPE extends BaseTable<IMPL_TYPE>> extends
@SuppressWarnings("FieldMayBeFinal") // Set via ensureField with CHILD_LISTENER_REFERENCES_UPDATER
private volatile SimpleReferenceManager<TableUpdateListener, ? extends SimpleReference<TableUpdateListener>> childListenerReferences =
EMPTY_CHILD_LISTENER_REFERENCES;
private volatile long lastNotificationStep;
private volatile long lastSatisfiedStep;
@SuppressWarnings("FieldMayBeFinal")
private volatile long lastSatisfiedStep = NotificationStepReceiver.NULL_NOTIFICATION_STEP;
private volatile boolean isFailed;

/**
Expand Down Expand Up @@ -461,13 +471,17 @@ public boolean satisfied(final long step) {
return true;
}

StepUpdater.checkForOlderStep(step, lastSatisfiedStep);
StepUpdater.checkForOlderStep(step, lastNotificationStep);

final Collection<Object> localParents = parents;
// If we have no parents whatsoever then we are a source, and have no dependency chain other than the UGP
// itself
if (localParents.isEmpty()) {
if (updateGraph.satisfied(step)) {
updateGraph.logDependencies().append("Root node satisfied ").append(this)
.endl();
StepUpdater.tryUpdateRecordedStep(LAST_SATISFIED_STEP_UPDATER, this, step);
return true;
}
return false;
Expand All @@ -492,7 +506,7 @@ public boolean satisfied(final long step) {
.append("All parents dependencies satisfied for ").append(this)
.endl();

lastSatisfiedStep = step;
StepUpdater.tryUpdateRecordedStep(LAST_SATISFIED_STEP_UPDATER, this, step);

return true;
}
Expand Down Expand Up @@ -1251,7 +1265,7 @@ public static void initializeWithSnapshot(
}

public interface SwapListenerFactory<T extends SwapListener> {
T newListener(BaseTable sourceTable);
T newListener(BaseTable<?> sourceTable);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.SharedContext;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.impl.util.StepUpdater;
import io.deephaven.engine.updategraph.NotificationQueue.Dependency;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.SafeCloseableArray;
import org.jetbrains.annotations.NotNull;

import java.util.concurrent.atomic.AtomicLongFieldUpdater;

import static io.deephaven.engine.table.iterators.ChunkedColumnIterator.DEFAULT_CHUNK_SIZE;

/**
Expand Down Expand Up @@ -51,6 +54,9 @@ public static void install(
new ConstituentDependency(resultUpdatedDependency, result.getRowSet(), dependencyColumns));
}

private static final AtomicLongFieldUpdater<ConstituentDependency> LAST_SATISFIED_STEP_UPDATER =
AtomicLongFieldUpdater.newUpdater(ConstituentDependency.class, "lastSatisfiedStep");

/**
* A {@link Dependency dependency} used to determine if the result table is done updating for this cycle. See
* {@link #install(Table, Dependency)} for more information.
Expand All @@ -59,7 +65,10 @@ public static void install(
private final RowSet resultRows;
private final ColumnSource<? extends Dependency>[] dependencyColumns;

private volatile long lastSatisfiedStep;
@SuppressWarnings("FieldMayBeFinal")
private volatile long lastSatisfiedStep = NotificationStepReceiver.NULL_NOTIFICATION_STEP;

private long lastQueriedStep = NotificationStepReceiver.NULL_NOTIFICATION_STEP;
private long firstUnsatisfiedRowPosition = 0;

private ConstituentDependency(
Expand All @@ -83,6 +92,7 @@ public LogOutput append(@NotNull final LogOutput logOutput) {

@Override
public boolean satisfied(final long step) {
StepUpdater.checkForOlderStep(step, lastSatisfiedStep);
if (lastSatisfiedStep == step) {
return true;
}
Expand All @@ -95,13 +105,20 @@ public boolean satisfied(final long step) {
}
// Now that we know the result is updated (or won't be), it's safe to look at current contents.
if (resultRows.isEmpty()) {
lastSatisfiedStep = step;
StepUpdater.tryUpdateRecordedStep(LAST_SATISFIED_STEP_UPDATER, this, step);
return true;
}
synchronized (this) {
StepUpdater.checkForOlderStep(step, lastSatisfiedStep);
StepUpdater.checkForOlderStep(step, lastQueriedStep);
if (lastSatisfiedStep == step) {
return true;
}
if (lastQueriedStep != step) {
// Re-initialize for this cycle
lastQueriedStep = step;
firstUnsatisfiedRowPosition = 0;
}
final int chunkSize = Math.toIntExact(Math.min(DEFAULT_CHUNK_SIZE, resultRows.size()));
final int numColumns = dependencyColumns.length;
final ChunkSource.GetContext[] contexts = new ChunkSource.GetContext[numColumns];
Expand Down Expand Up @@ -142,10 +159,9 @@ public boolean satisfied(final long step) {
"resultRows.size()");
getUpdateGraph().logDependencies()
.append("All constituent dependencies satisfied for ").append(this)
.append(", step=").append(step)
.endl();
lastSatisfiedStep = step;
firstUnsatisfiedRowPosition = 0; // Re-initialize for next cycle

StepUpdater.tryUpdateRecordedStep(LAST_SATISFIED_STEP_UPDATER, this, step);
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.deephaven.base.verify.Assert;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.table.impl.util.StepUpdater;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.exceptions.UncheckedTableException;
import io.deephaven.engine.table.TableListener;
Expand Down Expand Up @@ -50,7 +51,9 @@ public abstract class InstrumentedTableListenerBase extends LivenessArtifact
.getInstance()
.getBooleanWithDefault("InstrumentedTableListenerBase.verboseLogging", false);

@SuppressWarnings("FieldMayBeFinal")
private volatile long lastCompletedStep = NotificationStepReceiver.NULL_NOTIFICATION_STEP;
@SuppressWarnings("FieldMayBeFinal")
private volatile long lastEnqueuedStep = NotificationStepReceiver.NULL_NOTIFICATION_STEP;

InstrumentedTableListenerBase(@Nullable String description, boolean terminalListener) {
Expand Down Expand Up @@ -97,6 +100,9 @@ public boolean canExecute(final long step) {

@Override
public boolean satisfied(final long step) {
StepUpdater.checkForOlderStep(step, lastCompletedStep);
StepUpdater.checkForOlderStep(step, lastEnqueuedStep);

// Check and see if we've already been completed.
if (lastCompletedStep == step) {
getUpdateGraph().logDependencies()
Expand Down Expand Up @@ -148,8 +154,7 @@ public boolean satisfied(final long step) {

// Mark this node as completed. All our dependencies have been satisfied, but we are not enqueued, so we can
// never actually execute.
final long oldLastCompletedStep = LAST_COMPLETED_STEP_UPDATER.getAndSet(this, step);
Assert.lt(oldLastCompletedStep, "oldLastCompletedStep", step, "step");
StepUpdater.tryUpdateRecordedStep(LAST_COMPLETED_STEP_UPDATER, this, step);
return true;
}

Expand All @@ -161,7 +166,9 @@ public void onFailure(Throwable originalException, Entry sourceEntry) {

protected abstract void onFailureInternal(Throwable originalException, Entry sourceEntry);

protected final void onFailureInternalWithDependent(final BaseTable<?> dependent, final Throwable originalException,
protected final void onFailureInternalWithDependent(
final BaseTable<?> dependent,
final Throwable originalException,
final Entry sourceEntry) {
dependent.notifyListenersOnError(originalException, sourceEntry);

Expand Down Expand Up @@ -234,9 +241,8 @@ protected abstract class NotificationBase extends AbstractNotification implement
+ ", step=" + currentStep + ", lastCompletedStep=" + lastCompletedStep);
}

final long oldLastEnqueuedStep =
LAST_ENQUEUED_STEP_UPDATER.getAndSet(InstrumentedTableListenerBase.this, currentStep);
Assert.lt(oldLastEnqueuedStep, "oldLastEnqueuedStep", currentStep, "currentStep");
StepUpdater.forceUpdateRecordedStep(
LAST_ENQUEUED_STEP_UPDATER, InstrumentedTableListenerBase.this, currentStep);
}

@Override
Expand Down Expand Up @@ -321,9 +327,8 @@ private void doRunInternal(final Runnable invokeOnUpdate) {
onFailure(e, entry);
} finally {
entry.onUpdateEnd();
final long oldLastCompletedStep =
LAST_COMPLETED_STEP_UPDATER.getAndSet(InstrumentedTableListenerBase.this, currentStep);
Assert.lt(oldLastCompletedStep, "oldLastCompletedStep", currentStep, "currentStep");
StepUpdater.forceUpdateRecordedStep(
LAST_COMPLETED_STEP_UPDATER, InstrumentedTableListenerBase.this, currentStep);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.deephaven.engine.table.impl.perf.PerformanceEntry;
import io.deephaven.engine.table.impl.perf.UpdatePerformanceTracker;
import io.deephaven.engine.table.impl.util.AsyncClientErrorNotifier;
import io.deephaven.engine.table.impl.util.StepUpdater;
import io.deephaven.engine.updategraph.AbstractNotification;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.updategraph.UpdateGraph;
Expand All @@ -28,6 +29,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
Expand All @@ -42,6 +44,9 @@
public abstract class MergedListener extends LivenessArtifact implements NotificationQueue.Dependency {
private static final Logger log = LoggerFactory.getLogger(MergedListener.class);

private static final AtomicLongFieldUpdater<MergedListener> LAST_COMPLETED_STEP_UPDATER =
AtomicLongFieldUpdater.newUpdater(MergedListener.class, "lastCompletedStep");

private final UpdateGraph updateGraph;
private final Iterable<? extends ListenerRecorder> recorders;
private final Iterable<NotificationQueue.Dependency> dependencies;
Expand All @@ -50,9 +55,11 @@ public abstract class MergedListener extends LivenessArtifact implements Notific
private final PerformanceEntry entry;
private final String logPrefix;

private long notificationStep = -1;
private volatile long queuedNotificationStep = -1;
private volatile long lastCompletedStep;
@SuppressWarnings("FieldMayBeFinal")
private volatile long lastCompletedStep = NotificationStepReceiver.NULL_NOTIFICATION_STEP;
private volatile long lastEnqueuedStep = NotificationStepReceiver.NULL_NOTIFICATION_STEP;

private long notificationStep = NotificationStepReceiver.NULL_NOTIFICATION_STEP;
private Throwable upstreamError;
private TableListener.Entry errorSourceEntry;

Expand Down Expand Up @@ -100,7 +107,7 @@ private void notifyInternal(@Nullable final Throwable upstreamError,
if (notificationStep == currentStep) {
// noinspection ConstantConditions
throw Assert.statementNeverExecuted(
"MergedListener was fired before both all listener records completed: listener="
"MergedListener was fired before all listener recorders were satisfied: listener="
+ System.identityHashCode(this) + ", currentStep=" + currentStep);
}

Expand All @@ -111,16 +118,16 @@ private void notifyInternal(@Nullable final Throwable upstreamError,

// We've already got something in the notification queue that has not yet been executed for the current
// step.
if (queuedNotificationStep == currentStep) {
if (lastEnqueuedStep == currentStep) {
return;
}

// Otherwise we should have already flushed that notification.
Assert.assertion(queuedNotificationStep == notificationStep,
"queuedNotificationStep == notificationStep", queuedNotificationStep, "queuedNotificationStep",
Assert.assertion(lastEnqueuedStep == notificationStep,
"queuedNotificationStep == notificationStep", lastEnqueuedStep, "queuedNotificationStep",
notificationStep, "notificationStep", currentStep, "currentStep", this, "MergedListener");

queuedNotificationStep = currentStep;
lastEnqueuedStep = currentStep;
}

getUpdateGraph().addNotification(new MergedNotification());
Expand Down Expand Up @@ -208,6 +215,9 @@ protected boolean canExecute(final long step) {

@Override
public boolean satisfied(final long step) {
StepUpdater.checkForOlderStep(step, lastCompletedStep);
StepUpdater.checkForOlderStep(step, lastEnqueuedStep);

// Check and see if we've already been completed.
if (lastCompletedStep == step) {
getUpdateGraph().logDependencies()
Expand All @@ -217,7 +227,7 @@ public boolean satisfied(final long step) {

// This notification could be enqueued during the course of canExecute, but checking if we're enqueued is a very
// cheap check that may let us avoid recursively checking all the dependencies.
if (queuedNotificationStep == step) {
if (lastEnqueuedStep == step) {
getUpdateGraph().logDependencies()
.append("Enqueued notification for ").append(this).append(", step=").append(step).endl();
return false;
Expand All @@ -242,7 +252,7 @@ public boolean satisfied(final long step) {

// We check the queued notification step again after the dependency check. It is possible that something
// enqueued us while we were evaluating the dependencies, and we must not miss that race.
if (queuedNotificationStep == step) {
if (lastEnqueuedStep == step) {
getUpdateGraph().logDependencies()
.append("Enqueued notification during dependency check for ").append(this)
.append(", step=").append(step)
Expand All @@ -253,13 +263,13 @@ public boolean satisfied(final long step) {
getUpdateGraph().logDependencies()
.append("Dependencies satisfied for ").append(this)
.append(", lastCompleted=").append(lastCompletedStep)
.append(", lastQueued=").append(queuedNotificationStep)
.append(", lastQueued=").append(lastEnqueuedStep)
.append(", step=").append(step)
.endl();

// Mark this node as completed. All our dependencies have been satisfied, but we are not enqueued, so we can
// never actually execute.
lastCompletedStep = step;
StepUpdater.tryUpdateRecordedStep(LAST_COMPLETED_STEP_UPDATER, this, step);
return true;
}

Expand All @@ -283,11 +293,11 @@ public MergedNotification() {
public void run() {
final long currentStep = getUpdateGraph().clock().currentStep();
try {
if (queuedNotificationStep != currentStep) {
if (lastEnqueuedStep != currentStep) {
// noinspection ConstantConditions
throw Assert.statementNeverExecuted("Notification step mismatch: listener="
+ System.identityHashCode(MergedListener.this) + ": queuedNotificationStep="
+ queuedNotificationStep + ", step=" + currentStep);
+ lastEnqueuedStep + ", step=" + currentStep);
}

if (upstreamError != null) {
Expand All @@ -312,13 +322,13 @@ public void run() {
entry.onUpdateStart(added, removed, modified, shifted);
try {
synchronized (MergedListener.this) {
if (notificationStep == queuedNotificationStep) {
if (notificationStep == lastEnqueuedStep) {
// noinspection ConstantConditions
throw Assert.statementNeverExecuted("Multiple notifications in the same step: listener="
+ System.identityHashCode(MergedListener.this) + ", queuedNotificationStep="
+ queuedNotificationStep);
+ lastEnqueuedStep);
}
notificationStep = queuedNotificationStep;
notificationStep = lastEnqueuedStep;
}
process();
getUpdateGraph().logDependencies()
Expand All @@ -330,7 +340,7 @@ public void run() {
} catch (Exception updateException) {
handleUncaughtException(updateException);
} finally {
lastCompletedStep = currentStep;
StepUpdater.forceUpdateRecordedStep(LAST_COMPLETED_STEP_UPDATER, MergedListener.this, currentStep);
releaseFromRecorders();
}
}
Expand Down
Loading

0 comments on commit a88c202

Please sign in to comment.