From a88c202451f6949ceb412b18f57f6c494265b8e3 Mon Sep 17 00:00:00 2001 From: Ryan Caudy Date: Wed, 14 Jun 2023 09:06:30 -0400 Subject: [PATCH] Our UsePreviousValues implementations generally need to care about satisfaction, rather than just last notification step (#3983) * Our UsePreviousValues implementations generally need to care about satisfaction, rather than just last notification step * Overhaul concurrent safety for all NotificationQueue.Dependency.satisfied implementations * Add ClockInconsistencyException for satisfied to throw, enabling fail-fast for snapshots and error detection for update propagation * Port PeriodicUpdateGraph unit test support for sources-satisfied control, port TestConcurrentInstantiation fixes and new test, and fix other ExportTableUpdateListenerTest --- .../engine/table/impl/BaseTable.java | 24 ++- .../table/impl/ConstituentDependency.java | 26 ++- .../impl/InstrumentedTableListenerBase.java | 23 ++- .../engine/table/impl/MergedListener.java | 46 ++++-- .../engine/table/impl/SourceTable.java | 2 +- .../engine/table/impl/SwapListener.java | 36 ++-- .../engine/table/impl/SwapListenerEx.java | 78 ++++----- .../engine/table/impl/WhereListener.java | 6 +- .../table/impl/remote/ConstructSnapshot.java | 95 ++++++----- .../engine/table/impl/util/StepUpdater.java | 67 ++++++++ .../updategraph/impl/PeriodicUpdateGraph.java | 59 ++++++- .../impl/TestConcurrentInstantiation.java | 156 +++++++++++------- .../ClockInconsistencyException.java | 22 +++ .../engine/updategraph/LogicalClockImpl.java | 6 + .../engine/updategraph/NotificationQueue.java | 4 +- .../barrage/BarrageMessageProducer.java | 13 +- .../table/ExportTableUpdateListenerTest.java | 3 +- 17 files changed, 462 insertions(+), 204 deletions(-) create mode 100644 engine/table/src/main/java/io/deephaven/engine/table/impl/util/StepUpdater.java create mode 100644 engine/updategraph/src/main/java/io/deephaven/engine/updategraph/ClockInconsistencyException.java diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/BaseTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/BaseTable.java index c2a3d87bdb8..11f3452db39 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/BaseTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/BaseTable.java @@ -13,8 +13,8 @@ import io.deephaven.base.verify.Require; import io.deephaven.configuration.Configuration; import io.deephaven.engine.context.ExecutionContext; -import io.deephaven.engine.context.PoisonedUpdateGraph; import io.deephaven.engine.exceptions.UpdateGraphConflictException; +import io.deephaven.engine.table.impl.util.StepUpdater; import io.deephaven.engine.updategraph.NotificationQueue; import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.engine.exceptions.NotSortableException; @@ -45,6 +45,7 @@ import java.io.*; import java.util.*; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.locks.Condition; import java.util.function.BiConsumer; @@ -84,6 +85,10 @@ public abstract class BaseTable> extends throw new UnsupportedOperationException("EMPTY_CHILDREN does not support adds"); }, Collections.emptyList()); + @SuppressWarnings("rawtypes") + private static final AtomicLongFieldUpdater LAST_SATISFIED_STEP_UPDATER = + AtomicLongFieldUpdater.newUpdater(BaseTable.class, "lastSatisfiedStep"); + /** * This table's definition. */ @@ -99,6 +104,11 @@ public abstract class BaseTable> extends */ protected final UpdateGraph updateGraph; + /** + * The last clock step when this table dispatched a notification. + */ + private volatile long lastNotificationStep; + // Fields for DynamicNode implementation and update propagation support private volatile boolean refreshing; @SuppressWarnings({"FieldMayBeFinal", "unused"}) // Set via ensureField with CONDITION_UPDATER @@ -108,8 +118,8 @@ public abstract class BaseTable> extends @SuppressWarnings("FieldMayBeFinal") // Set via ensureField with CHILD_LISTENER_REFERENCES_UPDATER private volatile SimpleReferenceManager> childListenerReferences = EMPTY_CHILD_LISTENER_REFERENCES; - private volatile long lastNotificationStep; - private volatile long lastSatisfiedStep; + @SuppressWarnings("FieldMayBeFinal") + private volatile long lastSatisfiedStep = NotificationStepReceiver.NULL_NOTIFICATION_STEP; private volatile boolean isFailed; /** @@ -461,6 +471,9 @@ public boolean satisfied(final long step) { return true; } + StepUpdater.checkForOlderStep(step, lastSatisfiedStep); + StepUpdater.checkForOlderStep(step, lastNotificationStep); + final Collection localParents = parents; // If we have no parents whatsoever then we are a source, and have no dependency chain other than the UGP // itself @@ -468,6 +481,7 @@ public boolean satisfied(final long step) { if (updateGraph.satisfied(step)) { updateGraph.logDependencies().append("Root node satisfied ").append(this) .endl(); + StepUpdater.tryUpdateRecordedStep(LAST_SATISFIED_STEP_UPDATER, this, step); return true; } return false; @@ -492,7 +506,7 @@ public boolean satisfied(final long step) { .append("All parents dependencies satisfied for ").append(this) .endl(); - lastSatisfiedStep = step; + StepUpdater.tryUpdateRecordedStep(LAST_SATISFIED_STEP_UPDATER, this, step); return true; } @@ -1251,7 +1265,7 @@ public static void initializeWithSnapshot( } public interface SwapListenerFactory { - T newListener(BaseTable sourceTable); + T newListener(BaseTable sourceTable); } /** diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/ConstituentDependency.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/ConstituentDependency.java index 941b625621e..e559a260dc7 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/ConstituentDependency.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/ConstituentDependency.java @@ -13,12 +13,15 @@ import io.deephaven.engine.table.ColumnSource; import io.deephaven.engine.table.SharedContext; import io.deephaven.engine.table.Table; +import io.deephaven.engine.table.impl.util.StepUpdater; import io.deephaven.engine.updategraph.NotificationQueue.Dependency; import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.util.SafeCloseable; import io.deephaven.util.SafeCloseableArray; import org.jetbrains.annotations.NotNull; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; + import static io.deephaven.engine.table.iterators.ChunkedColumnIterator.DEFAULT_CHUNK_SIZE; /** @@ -51,6 +54,9 @@ public static void install( new ConstituentDependency(resultUpdatedDependency, result.getRowSet(), dependencyColumns)); } + private static final AtomicLongFieldUpdater LAST_SATISFIED_STEP_UPDATER = + AtomicLongFieldUpdater.newUpdater(ConstituentDependency.class, "lastSatisfiedStep"); + /** * A {@link Dependency dependency} used to determine if the result table is done updating for this cycle. See * {@link #install(Table, Dependency)} for more information. @@ -59,7 +65,10 @@ public static void install( private final RowSet resultRows; private final ColumnSource[] dependencyColumns; - private volatile long lastSatisfiedStep; + @SuppressWarnings("FieldMayBeFinal") + private volatile long lastSatisfiedStep = NotificationStepReceiver.NULL_NOTIFICATION_STEP; + + private long lastQueriedStep = NotificationStepReceiver.NULL_NOTIFICATION_STEP; private long firstUnsatisfiedRowPosition = 0; private ConstituentDependency( @@ -83,6 +92,7 @@ public LogOutput append(@NotNull final LogOutput logOutput) { @Override public boolean satisfied(final long step) { + StepUpdater.checkForOlderStep(step, lastSatisfiedStep); if (lastSatisfiedStep == step) { return true; } @@ -95,13 +105,20 @@ public boolean satisfied(final long step) { } // Now that we know the result is updated (or won't be), it's safe to look at current contents. if (resultRows.isEmpty()) { - lastSatisfiedStep = step; + StepUpdater.tryUpdateRecordedStep(LAST_SATISFIED_STEP_UPDATER, this, step); return true; } synchronized (this) { + StepUpdater.checkForOlderStep(step, lastSatisfiedStep); + StepUpdater.checkForOlderStep(step, lastQueriedStep); if (lastSatisfiedStep == step) { return true; } + if (lastQueriedStep != step) { + // Re-initialize for this cycle + lastQueriedStep = step; + firstUnsatisfiedRowPosition = 0; + } final int chunkSize = Math.toIntExact(Math.min(DEFAULT_CHUNK_SIZE, resultRows.size())); final int numColumns = dependencyColumns.length; final ChunkSource.GetContext[] contexts = new ChunkSource.GetContext[numColumns]; @@ -142,10 +159,9 @@ public boolean satisfied(final long step) { "resultRows.size()"); getUpdateGraph().logDependencies() .append("All constituent dependencies satisfied for ").append(this) + .append(", step=").append(step) .endl(); - lastSatisfiedStep = step; - firstUnsatisfiedRowPosition = 0; // Re-initialize for next cycle - + StepUpdater.tryUpdateRecordedStep(LAST_SATISFIED_STEP_UPDATER, this, step); return true; } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/InstrumentedTableListenerBase.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/InstrumentedTableListenerBase.java index db00935fab5..d09698f558b 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/InstrumentedTableListenerBase.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/InstrumentedTableListenerBase.java @@ -8,6 +8,7 @@ import io.deephaven.base.verify.Assert; import io.deephaven.configuration.Configuration; import io.deephaven.engine.context.ExecutionContext; +import io.deephaven.engine.table.impl.util.StepUpdater; import io.deephaven.engine.updategraph.NotificationQueue; import io.deephaven.engine.exceptions.UncheckedTableException; import io.deephaven.engine.table.TableListener; @@ -50,7 +51,9 @@ public abstract class InstrumentedTableListenerBase extends LivenessArtifact .getInstance() .getBooleanWithDefault("InstrumentedTableListenerBase.verboseLogging", false); + @SuppressWarnings("FieldMayBeFinal") private volatile long lastCompletedStep = NotificationStepReceiver.NULL_NOTIFICATION_STEP; + @SuppressWarnings("FieldMayBeFinal") private volatile long lastEnqueuedStep = NotificationStepReceiver.NULL_NOTIFICATION_STEP; InstrumentedTableListenerBase(@Nullable String description, boolean terminalListener) { @@ -97,6 +100,9 @@ public boolean canExecute(final long step) { @Override public boolean satisfied(final long step) { + StepUpdater.checkForOlderStep(step, lastCompletedStep); + StepUpdater.checkForOlderStep(step, lastEnqueuedStep); + // Check and see if we've already been completed. if (lastCompletedStep == step) { getUpdateGraph().logDependencies() @@ -148,8 +154,7 @@ public boolean satisfied(final long step) { // Mark this node as completed. All our dependencies have been satisfied, but we are not enqueued, so we can // never actually execute. - final long oldLastCompletedStep = LAST_COMPLETED_STEP_UPDATER.getAndSet(this, step); - Assert.lt(oldLastCompletedStep, "oldLastCompletedStep", step, "step"); + StepUpdater.tryUpdateRecordedStep(LAST_COMPLETED_STEP_UPDATER, this, step); return true; } @@ -161,7 +166,9 @@ public void onFailure(Throwable originalException, Entry sourceEntry) { protected abstract void onFailureInternal(Throwable originalException, Entry sourceEntry); - protected final void onFailureInternalWithDependent(final BaseTable dependent, final Throwable originalException, + protected final void onFailureInternalWithDependent( + final BaseTable dependent, + final Throwable originalException, final Entry sourceEntry) { dependent.notifyListenersOnError(originalException, sourceEntry); @@ -234,9 +241,8 @@ protected abstract class NotificationBase extends AbstractNotification implement + ", step=" + currentStep + ", lastCompletedStep=" + lastCompletedStep); } - final long oldLastEnqueuedStep = - LAST_ENQUEUED_STEP_UPDATER.getAndSet(InstrumentedTableListenerBase.this, currentStep); - Assert.lt(oldLastEnqueuedStep, "oldLastEnqueuedStep", currentStep, "currentStep"); + StepUpdater.forceUpdateRecordedStep( + LAST_ENQUEUED_STEP_UPDATER, InstrumentedTableListenerBase.this, currentStep); } @Override @@ -321,9 +327,8 @@ private void doRunInternal(final Runnable invokeOnUpdate) { onFailure(e, entry); } finally { entry.onUpdateEnd(); - final long oldLastCompletedStep = - LAST_COMPLETED_STEP_UPDATER.getAndSet(InstrumentedTableListenerBase.this, currentStep); - Assert.lt(oldLastCompletedStep, "oldLastCompletedStep", currentStep, "currentStep"); + StepUpdater.forceUpdateRecordedStep( + LAST_COMPLETED_STEP_UPDATER, InstrumentedTableListenerBase.this, currentStep); } } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/MergedListener.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/MergedListener.java index 2a70f735825..a64df89fd5d 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/MergedListener.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/MergedListener.java @@ -13,6 +13,7 @@ import io.deephaven.engine.table.impl.perf.PerformanceEntry; import io.deephaven.engine.table.impl.perf.UpdatePerformanceTracker; import io.deephaven.engine.table.impl.util.AsyncClientErrorNotifier; +import io.deephaven.engine.table.impl.util.StepUpdater; import io.deephaven.engine.updategraph.AbstractNotification; import io.deephaven.engine.updategraph.NotificationQueue; import io.deephaven.engine.updategraph.UpdateGraph; @@ -28,6 +29,7 @@ import java.util.Collection; import java.util.List; import java.util.Objects; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -42,6 +44,9 @@ public abstract class MergedListener extends LivenessArtifact implements NotificationQueue.Dependency { private static final Logger log = LoggerFactory.getLogger(MergedListener.class); + private static final AtomicLongFieldUpdater LAST_COMPLETED_STEP_UPDATER = + AtomicLongFieldUpdater.newUpdater(MergedListener.class, "lastCompletedStep"); + private final UpdateGraph updateGraph; private final Iterable recorders; private final Iterable dependencies; @@ -50,9 +55,11 @@ public abstract class MergedListener extends LivenessArtifact implements Notific private final PerformanceEntry entry; private final String logPrefix; - private long notificationStep = -1; - private volatile long queuedNotificationStep = -1; - private volatile long lastCompletedStep; + @SuppressWarnings("FieldMayBeFinal") + private volatile long lastCompletedStep = NotificationStepReceiver.NULL_NOTIFICATION_STEP; + private volatile long lastEnqueuedStep = NotificationStepReceiver.NULL_NOTIFICATION_STEP; + + private long notificationStep = NotificationStepReceiver.NULL_NOTIFICATION_STEP; private Throwable upstreamError; private TableListener.Entry errorSourceEntry; @@ -100,7 +107,7 @@ private void notifyInternal(@Nullable final Throwable upstreamError, if (notificationStep == currentStep) { // noinspection ConstantConditions throw Assert.statementNeverExecuted( - "MergedListener was fired before both all listener records completed: listener=" + "MergedListener was fired before all listener recorders were satisfied: listener=" + System.identityHashCode(this) + ", currentStep=" + currentStep); } @@ -111,16 +118,16 @@ private void notifyInternal(@Nullable final Throwable upstreamError, // We've already got something in the notification queue that has not yet been executed for the current // step. - if (queuedNotificationStep == currentStep) { + if (lastEnqueuedStep == currentStep) { return; } // Otherwise we should have already flushed that notification. - Assert.assertion(queuedNotificationStep == notificationStep, - "queuedNotificationStep == notificationStep", queuedNotificationStep, "queuedNotificationStep", + Assert.assertion(lastEnqueuedStep == notificationStep, + "queuedNotificationStep == notificationStep", lastEnqueuedStep, "queuedNotificationStep", notificationStep, "notificationStep", currentStep, "currentStep", this, "MergedListener"); - queuedNotificationStep = currentStep; + lastEnqueuedStep = currentStep; } getUpdateGraph().addNotification(new MergedNotification()); @@ -208,6 +215,9 @@ protected boolean canExecute(final long step) { @Override public boolean satisfied(final long step) { + StepUpdater.checkForOlderStep(step, lastCompletedStep); + StepUpdater.checkForOlderStep(step, lastEnqueuedStep); + // Check and see if we've already been completed. if (lastCompletedStep == step) { getUpdateGraph().logDependencies() @@ -217,7 +227,7 @@ public boolean satisfied(final long step) { // This notification could be enqueued during the course of canExecute, but checking if we're enqueued is a very // cheap check that may let us avoid recursively checking all the dependencies. - if (queuedNotificationStep == step) { + if (lastEnqueuedStep == step) { getUpdateGraph().logDependencies() .append("Enqueued notification for ").append(this).append(", step=").append(step).endl(); return false; @@ -242,7 +252,7 @@ public boolean satisfied(final long step) { // We check the queued notification step again after the dependency check. It is possible that something // enqueued us while we were evaluating the dependencies, and we must not miss that race. - if (queuedNotificationStep == step) { + if (lastEnqueuedStep == step) { getUpdateGraph().logDependencies() .append("Enqueued notification during dependency check for ").append(this) .append(", step=").append(step) @@ -253,13 +263,13 @@ public boolean satisfied(final long step) { getUpdateGraph().logDependencies() .append("Dependencies satisfied for ").append(this) .append(", lastCompleted=").append(lastCompletedStep) - .append(", lastQueued=").append(queuedNotificationStep) + .append(", lastQueued=").append(lastEnqueuedStep) .append(", step=").append(step) .endl(); // Mark this node as completed. All our dependencies have been satisfied, but we are not enqueued, so we can // never actually execute. - lastCompletedStep = step; + StepUpdater.tryUpdateRecordedStep(LAST_COMPLETED_STEP_UPDATER, this, step); return true; } @@ -283,11 +293,11 @@ public MergedNotification() { public void run() { final long currentStep = getUpdateGraph().clock().currentStep(); try { - if (queuedNotificationStep != currentStep) { + if (lastEnqueuedStep != currentStep) { // noinspection ConstantConditions throw Assert.statementNeverExecuted("Notification step mismatch: listener=" + System.identityHashCode(MergedListener.this) + ": queuedNotificationStep=" - + queuedNotificationStep + ", step=" + currentStep); + + lastEnqueuedStep + ", step=" + currentStep); } if (upstreamError != null) { @@ -312,13 +322,13 @@ public void run() { entry.onUpdateStart(added, removed, modified, shifted); try { synchronized (MergedListener.this) { - if (notificationStep == queuedNotificationStep) { + if (notificationStep == lastEnqueuedStep) { // noinspection ConstantConditions throw Assert.statementNeverExecuted("Multiple notifications in the same step: listener=" + System.identityHashCode(MergedListener.this) + ", queuedNotificationStep=" - + queuedNotificationStep); + + lastEnqueuedStep); } - notificationStep = queuedNotificationStep; + notificationStep = lastEnqueuedStep; } process(); getUpdateGraph().logDependencies() @@ -330,7 +340,7 @@ public void run() { } catch (Exception updateException) { handleUncaughtException(updateException); } finally { - lastCompletedStep = currentStep; + StepUpdater.forceUpdateRecordedStep(LAST_COMPLETED_STEP_UPDATER, MergedListener.this, currentStep); releaseFromRecorders(); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SourceTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SourceTable.java index c403a8e358d..a5a7a30913b 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SourceTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SourceTable.java @@ -240,7 +240,7 @@ protected final QueryTable doCoalesce() { initialize(); final SwapListener swapListener = - createSwapListenerIfRefreshing((final BaseTable parent) -> new SwapListener(parent) { + createSwapListenerIfRefreshing((final BaseTable parent) -> new SwapListener(parent) { @Override public void destroy() { diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SwapListener.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SwapListener.java index 8cf43d01da8..47179f499af 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SwapListener.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SwapListener.java @@ -61,7 +61,7 @@ public class SwapListener extends LivenessArtifact implements TableUpdateListene /** * The sourceTable, used to get the lastNotificationTime. */ - final BaseTable sourceTable; + final BaseTable sourceTable; /** * {@link WeakSimpleReference} to {@code this}, for capturing notifications from {@code sourceTable} before @@ -75,7 +75,7 @@ public class SwapListener extends LivenessArtifact implements TableUpdateListene private final SwappableDelegatingReference referenceForSource = new SwappableDelegatingReference<>(initialDelegate); - public SwapListener(final BaseTable sourceTable) { + public SwapListener(final BaseTable sourceTable) { this.sourceTable = sourceTable; } @@ -90,25 +90,37 @@ public ConstructSnapshot.SnapshotControl makeSnapshotControl() { /** * Starts a snapshot. * - * @param clockCycle the clockCycle we are starting a snapshot on + * @param beforeClockValue the logical clock value we are starting a snapshot on * @return true if we should use previous values, false if we should use current values. */ - protected synchronized boolean start(final long clockCycle) { + protected synchronized Boolean start(final long beforeClockValue) { lastNotificationStep = sourceTable.getLastNotificationStep(); success = false; - final long currentStep = LogicalClock.getStep(clockCycle); - final boolean updatedOnThisCycle = currentStep == lastNotificationStep; - final boolean updating = LogicalClock.getState(clockCycle) == LogicalClock.State.Updating; + + final long beforeStep = LogicalClock.getStep(beforeClockValue); + final LogicalClock.State beforeState = LogicalClock.getState(beforeClockValue); + + final boolean idle = beforeState == LogicalClock.State.Idle; + final boolean updatedOnThisStep = beforeStep == lastNotificationStep; + final boolean satisfied; + try { + satisfied = idle || updatedOnThisStep || sourceTable.satisfied(beforeStep); + } catch (ClockInconsistencyException e) { + return null; + } + final boolean usePrev = !satisfied; + if (DEBUG) { log.info().append("SwapListener {source=").append(System.identityHashCode(sourceTable)) .append(", swap=").append(System.identityHashCode(this)) - .append("} Start: currentStep=").append(currentStep) - .append(", last=").append(lastNotificationStep) - .append(", updating=").append(updating) - .append(", updatedOnThisCycle=").append(updatedOnThisCycle) + .append("} Start: beforeStep=").append(beforeStep) + .append(", beforeState=").append(beforeState.name()) + .append(", lastNotificationStep=").append(lastNotificationStep) + .append(", satisfied=").append(satisfied) + .append(", usePrev=").append(usePrev) .endl(); } - return updating && !updatedOnThisCycle; + return usePrev; } /** diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SwapListenerEx.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SwapListenerEx.java index eaed0416a18..52e648fa83d 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SwapListenerEx.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SwapListenerEx.java @@ -3,6 +3,7 @@ import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.table.TableUpdateListener; import io.deephaven.engine.table.impl.remote.ConstructSnapshot; +import io.deephaven.engine.updategraph.ClockInconsistencyException; import io.deephaven.engine.updategraph.LogicalClock; import io.deephaven.engine.updategraph.WaitNotification; import io.deephaven.internal.log.LoggerFactory; @@ -22,7 +23,7 @@ public final class SwapListenerEx extends SwapListener { private long extraLastNotificationStep; - public SwapListenerEx(@NotNull final BaseTable sourceTable, @NotNull final NotificationStepSource extra) { + public SwapListenerEx(@NotNull final BaseTable sourceTable, @NotNull final NotificationStepSource extra) { super(sourceTable); this.extra = extra; } @@ -45,51 +46,52 @@ public synchronized Boolean startWithExtra(final long beforeClockValue) { final long beforeStep = LogicalClock.getStep(beforeClockValue); final LogicalClock.State beforeState = LogicalClock.getState(beforeClockValue); - final Boolean result; - if (beforeState == LogicalClock.State.Idle) { - result = false; + final boolean idle = beforeState == LogicalClock.State.Idle; + final boolean sourceUpdatedOnThisStep = lastNotificationStep == beforeStep; + final boolean sourceSatisfied; + final boolean extraUpdatedOnThisStep = extraLastNotificationStep == beforeStep; + final boolean extraSatisfied; + + try { + sourceSatisfied = idle || sourceUpdatedOnThisStep || sourceTable.satisfied(beforeStep); + extraSatisfied = idle || extraUpdatedOnThisStep || extra.satisfied(beforeStep); + } catch (ClockInconsistencyException e) { + return null; + } + + final Boolean usePrev; + if (sourceSatisfied == extraSatisfied) { + usePrev = !sourceSatisfied; + } else if (sourceSatisfied) { + WaitNotification.waitForSatisfaction(beforeStep, extra); + extraLastNotificationStep = extra.getLastNotificationStep(); + final long postWaitStep = ExecutionContext.getContext().getUpdateGraph().clock().currentStep(); + usePrev = postWaitStep == beforeStep ? false : null; } else { - final boolean sourceUpdatedOnThisCycle = lastNotificationStep == beforeStep; - final boolean extraUpdatedOnThisCycle = extraLastNotificationStep == beforeStep; - - if (sourceUpdatedOnThisCycle) { - if (extraUpdatedOnThisCycle || extra.satisfied(beforeStep)) { - result = false; - } else { - WaitNotification.waitForSatisfaction(beforeStep, extra); - extraLastNotificationStep = extra.getLastNotificationStep(); - result = ExecutionContext.getContext().getUpdateGraph().clock().currentStep() == beforeStep ? false - : null; - } - } else if (extraUpdatedOnThisCycle) { - if (sourceTable.satisfied(beforeStep)) { - result = false; - } else { - WaitNotification.waitForSatisfaction(beforeStep, sourceTable); - lastNotificationStep = sourceTable.getLastNotificationStep(); - result = ExecutionContext.getContext().getUpdateGraph().clock().currentStep() == beforeStep ? false - : null; - } - } else { - result = true; - } + WaitNotification.waitForSatisfaction(beforeStep, sourceTable); + lastNotificationStep = sourceTable.getLastNotificationStep(); + final long postWaitStep = ExecutionContext.getContext().getUpdateGraph().clock().currentStep(); + usePrev = postWaitStep == beforeStep ? false : null; } + if (DEBUG) { - log.info().append("SwapListenerEx start() source=") - .append(System.identityHashCode(sourceTable)) - .append(". swap=") - .append(System.identityHashCode(this)) - .append(", start={").append(beforeStep).append(",").append(beforeState.toString()) - .append("}, last=").append(lastNotificationStep) - .append(", extraLast=").append(extraLastNotificationStep) - .append(", result=").append(result) + log.info().append("SwapListenerEx {source=").append(System.identityHashCode(sourceTable)) + .append(", extra=").append(System.identityHashCode(extra)) + .append(", swap=").append(System.identityHashCode(this)) + .append("} Start: beforeStep=").append(beforeStep) + .append(", beforeState=").append(beforeState.name()) + .append(", sourceLastNotificationStep=").append(lastNotificationStep) + .append(", sourceSatisfied=").append(sourceSatisfied) + .append(", extraLastNotificationStep=").append(extraLastNotificationStep) + .append(", extraSatisfied=").append(extraSatisfied) + .append(", usePrev=").append(usePrev) .endl(); } - return result; + return usePrev; } @Override - public boolean start(final long beforeClockValue) { + public Boolean start(final long beforeClockValue) { throw new UnsupportedOperationException("Use startWithExtra"); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java index b7dd52d4fe5..100ee7f98da 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java @@ -29,6 +29,7 @@ * source table, but a refreshing filter in which case our listener recorder is null. */ class WhereListener extends MergedListener { + private final QueryTable sourceTable; private final QueryTable.FilteredTable result; private final WritableRowSet currentMapping; @@ -36,11 +37,12 @@ class WhereListener extends MergedListener { private final ModifiedColumnSet filterColumns; private final ListenerRecorder recorder; private final long minimumThreadSize; - private long initialNotificationStep = NotificationStepReceiver.NULL_NOTIFICATION_STEP; - private long finalNotificationStep = NotificationStepReceiver.NULL_NOTIFICATION_STEP; private final boolean permitParallelization; private final int segmentCount; + private volatile long initialNotificationStep = NotificationStepReceiver.NULL_NOTIFICATION_STEP; + private volatile long finalNotificationStep = NotificationStepReceiver.NULL_NOTIFICATION_STEP; + WhereListener( final Logger log, final QueryTable sourceTable, diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java index d1e67b404b8..5d27dffc46e 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java @@ -10,8 +10,7 @@ import io.deephaven.configuration.Configuration; import io.deephaven.datastructures.util.CollectionUtil; import io.deephaven.engine.context.ExecutionContext; -import io.deephaven.engine.updategraph.LogicalClock; -import io.deephaven.engine.updategraph.UpdateGraph; +import io.deephaven.engine.updategraph.*; import io.deephaven.engine.rowset.*; import io.deephaven.engine.table.SharedContext; import io.deephaven.engine.table.impl.sources.ReinterpretUtils; @@ -22,8 +21,6 @@ import io.deephaven.engine.exceptions.CancellationException; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableDefinition; -import io.deephaven.engine.updategraph.NotificationQueue; -import io.deephaven.engine.updategraph.WaitNotification; import io.deephaven.util.datastructures.LongSizedDataStructure; import io.deephaven.engine.liveness.LivenessManager; import io.deephaven.engine.liveness.LivenessScope; @@ -781,7 +778,8 @@ public boolean snapshotConsistent(long currentClockValue, boolean usingPreviousV * @param snapshotCompletedConsistently The {@link SnapshotCompletedConsistently} to use, or null to use * {@code * snapshotConsistent} */ - public static SnapshotControl makeSnapshotControl(@NotNull final UsePreviousValues usePreviousValues, + public static SnapshotControl makeSnapshotControl( + @NotNull final UsePreviousValues usePreviousValues, @NotNull final SnapshotConsistent snapshotConsistent, @Nullable final SnapshotCompletedConsistently snapshotCompletedConsistently) { return snapshotCompletedConsistently == null @@ -894,9 +892,17 @@ private NotificationAwareSingleSourceSnapshotControl(@NotNull final Notification @Override public Boolean usePreviousValues(final long beforeClockValue) { - // noinspection AutoBoxing - return LogicalClock.getState(beforeClockValue) == LogicalClock.State.Updating && - source.getLastNotificationStep() != LogicalClock.getStep(beforeClockValue); + final long beforeStep = LogicalClock.getStep(beforeClockValue); + final LogicalClock.State beforeState = LogicalClock.getState(beforeClockValue); + + try { + // noinspection AutoBoxing + return beforeState == LogicalClock.State.Updating + && source.getLastNotificationStep() != beforeStep + && !source.satisfied(beforeStep); + } catch (ClockInconsistencyException e) { + return null; + } } @Override @@ -926,9 +932,17 @@ private NotificationObliviousSingleSourceSnapshotControl(@NotNull final Notifica @Override public Boolean usePreviousValues(final long beforeClockValue) { - // noinspection AutoBoxing - return LogicalClock.getState(beforeClockValue) == LogicalClock.State.Updating && - source.getLastNotificationStep() != LogicalClock.getStep(beforeClockValue); + final long beforeStep = LogicalClock.getStep(beforeClockValue); + final LogicalClock.State beforeState = LogicalClock.getState(beforeClockValue); + + try { + // noinspection AutoBoxing + return beforeState == LogicalClock.State.Updating + && source.getLastNotificationStep() != beforeStep + && !source.satisfied(beforeStep); + } catch (ClockInconsistencyException e) { + return null; + } } @Override @@ -957,22 +971,22 @@ public Boolean usePreviousValues(final long beforeClockValue) { return false; } final long beforeStep = LogicalClock.getStep(beforeClockValue); - final NotificationStepSource[] notYetNotified = Stream.of(sources) - .filter((final NotificationStepSource source) -> source.getLastNotificationStep() != beforeStep) - .toArray(NotificationStepSource[]::new); - if (notYetNotified.length == sources.length) { + final NotificationStepSource[] notYetSatisfied; + try { + notYetSatisfied = Stream.of(sources) + .filter((final NotificationStepSource source) -> source.getLastNotificationStep() != beforeStep + && !source.satisfied(beforeStep)) + .toArray(NotificationStepSource[]::new); + } catch (ClockInconsistencyException e) { + return null; + } + if (notYetSatisfied.length == sources.length) { return true; } - if (notYetNotified.length > 0) { - final NotificationStepSource[] notYetSatisfied = Stream.of(sources) - .filter((final NotificationQueue.Dependency dep) -> !dep.satisfied(beforeStep)) - .toArray(NotificationStepSource[]::new); - if (notYetSatisfied.length > 0 - && !WaitNotification.waitForSatisfaction(beforeStep, notYetSatisfied)) { - if (ExecutionContext.getContext().getUpdateGraph().clock().currentStep() != beforeStep) { - // If we missed a step change, we've already failed, request a do-over. - return null; - } + if (notYetSatisfied.length > 0 && !WaitNotification.waitForSatisfaction(beforeStep, notYetSatisfied)) { + if (ExecutionContext.getContext().getUpdateGraph().clock().currentStep() != beforeStep) { + // If we missed a step change, we've already failed, request a do-over. + return null; } } return false; @@ -1012,22 +1026,22 @@ public Boolean usePreviousValues(final long beforeClockValue) { return false; } final long beforeStep = LogicalClock.getStep(beforeClockValue); - final NotificationStepSource[] notYetNotified = Stream.of(sources) - .filter((final NotificationStepSource source) -> source.getLastNotificationStep() != beforeStep) - .toArray(NotificationStepSource[]::new); - if (notYetNotified.length == sources.length) { + final NotificationStepSource[] notYetSatisfied; + try { + notYetSatisfied = Stream.of(sources) + .filter((final NotificationStepSource source) -> source.getLastNotificationStep() != beforeStep + && !source.satisfied(beforeStep)) + .toArray(NotificationStepSource[]::new); + } catch (ClockInconsistencyException e) { + return null; + } + if (notYetSatisfied.length == sources.length) { return true; } - if (notYetNotified.length > 0) { - final NotificationStepSource[] notYetSatisfied = Stream.of(sources) - .filter((final NotificationQueue.Dependency dep) -> !dep.satisfied(beforeStep)) - .toArray(NotificationStepSource[]::new); - if (notYetSatisfied.length > 0 - && !WaitNotification.waitForSatisfaction(beforeStep, notYetSatisfied)) { - if (ExecutionContext.getContext().getUpdateGraph().clock().currentStep() != beforeStep) { - // If we missed a step change, we've already failed, request a do-over. - return null; - } + if (notYetSatisfied.length > 0 && !WaitNotification.waitForSatisfaction(beforeStep, notYetSatisfied)) { + if (ExecutionContext.getContext().getUpdateGraph().clock().currentStep() != beforeStep) { + // If we missed a step change, we've already failed, request a do-over. + return null; } } return false; @@ -1163,8 +1177,7 @@ public static long callDataSnapshotFunction(@NotNull final LogOutputAppendable l } if (snapshotSuccessful) { if (functionSuccessful) { - step = usePrev ? LogicalClock.getStep(beforeClockValue) - 1 - : LogicalClock.getStep(beforeClockValue); + step = LogicalClock.getStep(beforeClockValue) - (usePrev ? 1 : 0); snapshotLivenessScope.transferTo(initialLivenessManager); } break; diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/StepUpdater.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/StepUpdater.java new file mode 100644 index 00000000000..e8fe11b93af --- /dev/null +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/StepUpdater.java @@ -0,0 +1,67 @@ +package io.deephaven.engine.table.impl.util; + +import io.deephaven.base.verify.Assert; +import io.deephaven.engine.updategraph.ClockInconsistencyException; +import org.jetbrains.annotations.NotNull; + +import java.util.concurrent.atomic.AtomicLongFieldUpdater; + +/** + * Tool for maintaining recorded clock steps. + */ +public class StepUpdater { + + /** + * Validated that {@code requestedStep} is greater than or equal to {@code recordedStep}. + * + * @param requestedStep The requested step, e.g. as an argument to + * {@link io.deephaven.engine.updategraph.NotificationQueue.Dependency#satisfied(long) Dependency.satisfied} + * @param recordedStep The highest recorded step + * @throws ClockInconsistencyException if {@code requestedStep < recordedStep} + */ + public static void checkForOlderStep(final long requestedStep, final long recordedStep) { + if (requestedStep < recordedStep) { + throw new ClockInconsistencyException(String.format( + "Requested step %s is less than highest recorded step %s", requestedStep, recordedStep)); + } + } + + /** + * Update a recorded step field to be at least {@code step}. + * + * @param recordedStepUpdater An updater for the recorded step field + * @param instance The instance to update + * @param step The target step value to record + * @param The type of {@code instance} expected by {@code recordedStepUpdater} + * @throws ClockInconsistencyException if {@code step < recordedStepUpdater.get(instance)} + */ + public static void tryUpdateRecordedStep( + @NotNull final AtomicLongFieldUpdater recordedStepUpdater, + @NotNull final T instance, + final long step) { + long oldRecordedStep; + while ((oldRecordedStep = recordedStepUpdater.get(instance)) < step) { + if (recordedStepUpdater.compareAndSet(instance, oldRecordedStep, step)) { + return; + } + } + checkForOlderStep(step, oldRecordedStep); + } + + /** + * Update a recorded step field to be exactly {@code step}. Validate that the previous recorded step was less than + * {@code step}. + * + * @param recordedStepUpdater An updater for the recorded step field + * @param instance The instance to update + * @param step The target step value to record + * @param The type of {@code instance} expected by {@code recordedStepUpdater} + */ + public static void forceUpdateRecordedStep( + @NotNull final AtomicLongFieldUpdater recordedStepUpdater, + @NotNull final T instance, + final long step) { + final long oldRecordedStep = recordedStepUpdater.getAndSet(instance, step); + Assert.lt(oldRecordedStep, "oldRecordedStep", step, "step"); + } +} diff --git a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java index 23032f9a68d..05e0ad58d17 100644 --- a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java +++ b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java @@ -14,6 +14,7 @@ import io.deephaven.engine.liveness.LivenessManager; import io.deephaven.engine.liveness.LivenessScope; import io.deephaven.engine.liveness.LivenessScopeStack; +import io.deephaven.engine.table.impl.util.StepUpdater; import io.deephaven.engine.updategraph.*; import io.deephaven.engine.util.reference.CleanupReferenceProcessorInstance; import io.deephaven.engine.util.systemicmarking.SystemicObjectTracker; @@ -708,6 +709,7 @@ public boolean maybeAddNotification(@NotNull final Notification notification, fi @Override public boolean satisfied(final long step) { + StepUpdater.checkForOlderStep(step, sourcesLastSatisfiedStep); return sourcesLastSatisfiedStep == step; } @@ -838,20 +840,33 @@ public void resetForUnitTests(boolean after, /** * Begin the next {@link LogicalClockImpl#startUpdateCycle() update cycle} while in {@link #enableUnitTestMode() - * unit-test} mode. Note that this happens on a simulated UpdateGraph run thread, rather than this thread. + * unit-test} mode. Note that this happens on a simulated UpdateGraph run thread, rather than this thread. This + * overload is the same as {@code startCycleForUnitTests(true)}. */ @TestUseOnly public void startCycleForUnitTests() { + startCycleForUnitTests(true); + } + + /** + * Begin the next {@link LogicalClockImpl#startUpdateCycle() update cycle} while in {@link #enableUnitTestMode() + * unit-test} mode. Note that this happens on a simulated UpdateGraph run thread, rather than this thread. + * + * @param sourcesSatisfied Whether sources should be marked as satisfied by this invocation; if {@code false}, the + * caller must control source satisfaction using {@link #markSourcesRefreshedForUnitTests()}. + */ + @TestUseOnly + public void startCycleForUnitTests(final boolean sourcesSatisfied) { Assert.assertion(unitTestMode, "unitTestMode"); try { - unitTestRefreshThreadPool.submit(this::startCycleForUnitTestsInternal).get(); + unitTestRefreshThreadPool.submit(() -> startCycleForUnitTestsInternal(sourcesSatisfied)).get(); } catch (InterruptedException | ExecutionException e) { throw new UncheckedDeephavenException(e); } } @TestUseOnly - private void startCycleForUnitTestsInternal() { + private void startCycleForUnitTestsInternal(final boolean sourcesSatisfied) { // noinspection AutoBoxing isUpdateThread.set(true); exclusiveLock().lock(); @@ -861,6 +876,20 @@ private void startCycleForUnitTestsInternal() { LivenessScopeStack.push(refreshScope); logicalClock.startUpdateCycle(); + if (sourcesSatisfied) { + markSourcesRefreshedForUnitTests(); + } + } + + /** + * Record that sources have been satisfied within a unit test cycle. + */ + @TestUseOnly + public void markSourcesRefreshedForUnitTests() { + Assert.assertion(unitTestMode, "unitTestMode"); + if (sourcesLastSatisfiedStep >= logicalClock.currentStep()) { + throw new IllegalStateException("Already marked sources as satisfied!"); + } sourcesLastSatisfiedStep = logicalClock.currentStep(); } @@ -872,6 +901,8 @@ private void startCycleForUnitTestsInternal() { @TestUseOnly public void completeCycleForUnitTests() { Assert.assertion(unitTestMode, "unitTestMode"); + Assert.eq(sourcesLastSatisfiedStep, "sourcesLastSatisfiedStep", logicalClock.currentStep(), + "logicalClock.currentStep()"); try { unitTestRefreshThreadPool.submit(this::completeCycleForUnitTestsInternal).get(); } catch (InterruptedException | ExecutionException e) { @@ -895,16 +926,32 @@ private void completeCycleForUnitTestsInternal() { } } + /** + * Execute the given runnable wrapped with {@link #startCycleForUnitTests()} and + * {@link #completeCycleForUnitTests()}. Note that the runnable is run on the current thread. This is equivalent to + * {@code runWithinUnitTestCycle(runnable, true)}. + * + * @param runnable The runnable to execute + */ + @TestUseOnly + public void runWithinUnitTestCycle(@NotNull final ThrowingRunnable runnable) throws T { + runWithinUnitTestCycle(runnable, true); + } + /** * Execute the given runnable wrapped with {@link #startCycleForUnitTests()} and * {@link #completeCycleForUnitTests()}. Note that the runnable is run on the current thread. * - * @param runnable the runnable to execute. + * @param runnable The runnable to execute + * @param sourcesSatisfied Whether sources should be marked as satisfied by this invocation; if {@code false}, the + * caller must control source satisfaction using {@link #markSourcesRefreshedForUnitTests()}. */ @TestUseOnly - public void runWithinUnitTestCycle(ThrowingRunnable runnable) + public void runWithinUnitTestCycle( + @NotNull final ThrowingRunnable runnable, + final boolean sourcesSatisfied) throws T { - startCycleForUnitTests(); + startCycleForUnitTests(sourcesSatisfied); try { runnable.run(); } finally { diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/TestConcurrentInstantiation.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/TestConcurrentInstantiation.java index 2124fe02840..a1471769ebd 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/TestConcurrentInstantiation.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/TestConcurrentInstantiation.java @@ -60,6 +60,7 @@ public class TestConcurrentInstantiation extends QueryTableTestBase { private ExecutorService pool; private ExecutorService dualPool; + private ControlledUpdateGraph updateGraph; @Override public void setUp() throws Exception { @@ -76,6 +77,7 @@ public void setUp() throws Exception { }; pool = Executors.newFixedThreadPool(1, threadFactory); dualPool = Executors.newFixedThreadPool(2, threadFactory); + updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); } @Override @@ -95,7 +97,8 @@ public void testTreeTableFilter() throws ExecutionException, InterruptedExceptio () -> (QueryTable) treed.getSource().apply(new TreeTableFilter.Operator((TreeTableImpl) treed, WhereFilterFactory.getExpressions("Sentinel in 4, 6, 9, 11, 12, 13, 14, 15"))); - ExecutionContext.getContext().getUpdateGraph().cast().startCycleForUnitTests(); + updateGraph.startCycleForUnitTests(false); + final Table rawSorted = pool.submit(callable).get(TIMEOUT_LENGTH, TIMEOUT_UNIT); TableTools.show(rawSorted); @@ -110,17 +113,18 @@ public void testTreeTableFilter() throws ExecutionException, InterruptedExceptio assertTableEquals(rawSorted, table2); source.notifyListeners(i(10), i(), i()); + updateGraph.markSourcesRefreshedForUnitTests(); final Future future3 = pool.submit(callable); assertTableEquals(rawSorted, table2); - ExecutionContext.getContext().getUpdateGraph().cast().completeCycleForUnitTests(); + updateGraph.completeCycleForUnitTests(); final Table table3 = future3.get(TIMEOUT_LENGTH, TIMEOUT_UNIT); assertTableEquals(rawSorted, table2); assertTableEquals(table2, table3); - ExecutionContext.getContext().getUpdateGraph().cast().startCycleForUnitTests(); + updateGraph.startCycleForUnitTests(false); TstUtils.addToTable(source, i(11), col("Sentinel", 12), @@ -132,7 +136,8 @@ public void testTreeTableFilter() throws ExecutionException, InterruptedExceptio assertTableEquals(table3, table4); source.notifyListeners(i(11), i(), i()); - ExecutionContext.getContext().getUpdateGraph().cast().completeCycleForUnitTests(); + updateGraph.markSourcesRefreshedForUnitTests(); + updateGraph.completeCycleForUnitTests(); assertArrayEquals( new int[] {1, 2, 3, 4, 6, 9, 10, 11, 12}, @@ -148,7 +153,7 @@ public void testFlatten() throws ExecutionException, InterruptedException, Timeo final Table tableStart = TstUtils.testRefreshingTable(i(2, 4, 6).toTracking(), col("x", 1, 2, 3), col("y", "a", "b", "c")); - ExecutionContext.getContext().getUpdateGraph().cast().startCycleForUnitTests(); + updateGraph.startCycleForUnitTests(false); final Table flat = pool.submit(table::flatten).get(TIMEOUT_LENGTH, TIMEOUT_UNIT); @@ -163,13 +168,14 @@ public void testFlatten() throws ExecutionException, InterruptedException, Timeo TstUtils.assertTableEquals(prevTable(flat2), tableStart); table.notifyListeners(i(3), i(), i()); + updateGraph.markSourcesRefreshedForUnitTests(); final Table flat3 = pool.submit(table::flatten).get(TIMEOUT_LENGTH, TIMEOUT_UNIT); TstUtils.assertTableEquals(prevTable(flat), tableStart); TstUtils.assertTableEquals(prevTable(flat2), tableStart); - ExecutionContext.getContext().getUpdateGraph().cast().completeCycleForUnitTests(); + updateGraph.completeCycleForUnitTests(); TstUtils.assertTableEquals(table, flat); TstUtils.assertTableEquals(table, flat2); @@ -187,7 +193,7 @@ public void testUpdateView() throws ExecutionException, InterruptedException, Ti final Callable
callable = () -> table.updateView("z=x*4"); - ExecutionContext.getContext().getUpdateGraph().cast().startCycleForUnitTests(); + updateGraph.startCycleForUnitTests(); final Table updateView1 = pool.submit(callable).get(TIMEOUT_LENGTH, TIMEOUT_UNIT); @@ -207,7 +213,7 @@ public void testUpdateView() throws ExecutionException, InterruptedException, Ti TstUtils.assertTableEquals(tableStart, prevTable(updateView1)); TstUtils.assertTableEquals(tableStart, prevTable(updateView2)); - ExecutionContext.getContext().getUpdateGraph().cast().completeCycleForUnitTests(); + updateGraph.completeCycleForUnitTests(); TstUtils.assertTableEquals(tableUpdate, updateView1); TstUtils.assertTableEquals(tableUpdate, updateView2); @@ -224,7 +230,7 @@ public void testView() throws ExecutionException, InterruptedException, TimeoutE final Callable
callable = () -> table.view("y", "z=x*4"); - ExecutionContext.getContext().getUpdateGraph().cast().startCycleForUnitTests(); + updateGraph.startCycleForUnitTests(); final Table updateView1 = pool.submit(callable).get(TIMEOUT_LENGTH, TIMEOUT_UNIT); @@ -244,7 +250,7 @@ public void testView() throws ExecutionException, InterruptedException, TimeoutE TstUtils.assertTableEquals(tableStart, prevTable(updateView1)); TstUtils.assertTableEquals(tableStart, prevTable(updateView2)); - ExecutionContext.getContext().getUpdateGraph().cast().completeCycleForUnitTests(); + updateGraph.completeCycleForUnitTests(); TstUtils.assertTableEquals(tableUpdate, updateView1); TstUtils.assertTableEquals(tableUpdate, updateView2); @@ -262,7 +268,7 @@ public void testDropColumns() throws ExecutionException, InterruptedException, T final Callable
callable = () -> table.dropColumns("z"); - ExecutionContext.getContext().getUpdateGraph().cast().startCycleForUnitTests(); + updateGraph.startCycleForUnitTests(); final Table dropColumns1 = pool.submit(callable).get(TIMEOUT_LENGTH, TIMEOUT_UNIT); @@ -282,7 +288,7 @@ public void testDropColumns() throws ExecutionException, InterruptedException, T TstUtils.assertTableEquals(tableStart, prevTable(dropColumns1)); TstUtils.assertTableEquals(tableStart, prevTable(dropColumns2)); - ExecutionContext.getContext().getUpdateGraph().cast().completeCycleForUnitTests(); + updateGraph.completeCycleForUnitTests(); TstUtils.assertTableEquals(tableUpdate, dropColumns1); TstUtils.assertTableEquals(tableUpdate, dropColumns2); @@ -298,7 +304,7 @@ public void testWhere() throws ExecutionException, InterruptedException, Timeout final Table tableUpdate = TstUtils.testRefreshingTable(i(2, 3, 6).toTracking(), col("x", 1, 4, 3), col("y", "a", "d", "c"), col("z", true, true, true)); - ExecutionContext.getContext().getUpdateGraph().cast().startCycleForUnitTests(); + updateGraph.startCycleForUnitTests(false); final Table filter1 = pool.submit(() -> table.where("z")).get(TIMEOUT_LENGTH, TIMEOUT_UNIT); @@ -312,13 +318,14 @@ public void testWhere() throws ExecutionException, InterruptedException, Timeout TstUtils.assertTableEquals(tableStart, prevTable(filter2)); table.notifyListeners(i(3), i(), i()); + updateGraph.markSourcesRefreshedForUnitTests(); final Table filter3 = pool.submit(() -> table.where("z")).get(TIMEOUT_LENGTH, TIMEOUT_UNIT); TstUtils.assertTableEquals(tableStart, prevTable(filter1)); TstUtils.assertTableEquals(tableStart, prevTable(filter2)); - ExecutionContext.getContext().getUpdateGraph().cast().completeCycleForUnitTests(); + updateGraph.completeCycleForUnitTests(); TstUtils.assertTableEquals(tableUpdate, filter1); TstUtils.assertTableEquals(tableUpdate, filter2); @@ -333,7 +340,7 @@ public void testWhere2() throws ExecutionException, InterruptedException, Timeou final Table testUpdate = TstUtils.testRefreshingTable(i(3, 6).toTracking(), col("x", 4, 3), col("y", "d", "c"), col("z", true, true)); - ExecutionContext.getContext().getUpdateGraph().cast().startCycleForUnitTests(); + updateGraph.startCycleForUnitTests(false); final Table filter1 = pool.submit(() -> table.where("z")).get(TIMEOUT_LENGTH, TIMEOUT_UNIT); @@ -347,13 +354,14 @@ public void testWhere2() throws ExecutionException, InterruptedException, Timeou TstUtils.assertTableEquals(tableStart, prevTable(filter2)); table.notifyListeners(i(3), i(), i(2)); + updateGraph.markSourcesRefreshedForUnitTests(); final Table filter3 = pool.submit(() -> table.where("z")).get(TIMEOUT_LENGTH, TIMEOUT_UNIT); TstUtils.assertTableEquals(tableStart, prevTable(filter1)); TstUtils.assertTableEquals(tableStart, prevTable(filter2)); - ExecutionContext.getContext().getUpdateGraph().cast().completeCycleForUnitTests(); + updateGraph.completeCycleForUnitTests(); showWithRowSet(table); showWithRowSet(filter1); @@ -375,10 +383,10 @@ public void testWhereDynamic() throws ExecutionException, InterruptedException, col("x", 4, 3), col("y", "d", "c"), col("z", true, true)); final QueryTable whereTable = TstUtils.testRefreshingTable(i(0).toTracking(), col("z", true)); - final DynamicWhereFilter filter = ExecutionContext.getContext().getUpdateGraph().exclusiveLock().computeLocked( + final DynamicWhereFilter filter = updateGraph.exclusiveLock().computeLocked( () -> new DynamicWhereFilter(whereTable, true, MatchPairFactory.getExpressions("z"))); - ExecutionContext.getContext().getUpdateGraph().cast().startCycleForUnitTests(); + updateGraph.startCycleForUnitTests(false); final Future
future1 = dualPool.submit(() -> table.where(filter)); try { @@ -392,8 +400,9 @@ public void testWhereDynamic() throws ExecutionException, InterruptedException, assertTableEquals(tableStart, prevTable(filter2)); table.notifyListeners(i(3), i(), i(2)); + updateGraph.markSourcesRefreshedForUnitTests(); - ExecutionContext.getContext().getUpdateGraph().cast().completeCycleForUnitTests(); + updateGraph.completeCycleForUnitTests(); final Table filter1 = future1.get(TIMEOUT_LENGTH, TIMEOUT_UNIT); TstUtils.assertTableEquals(testUpdate, filter1); @@ -408,7 +417,7 @@ public void testSort() throws ExecutionException, InterruptedException, TimeoutE final Table tableUpdate = TstUtils.testRefreshingTable(i(1, 2, 3, 4).toTracking(), col("x", 4, 3, 2, 1), col("y", "d", "c", "b", "a")); - ExecutionContext.getContext().getUpdateGraph().cast().startCycleForUnitTests(); + updateGraph.startCycleForUnitTests(false); final Table sort1 = pool.submit(() -> table.sortDescending("x")).get(TIMEOUT_LENGTH, TIMEOUT_UNIT); @@ -422,13 +431,14 @@ public void testSort() throws ExecutionException, InterruptedException, TimeoutE TstUtils.assertTableEquals(tableStart, prevTable(sort2)); table.notifyListeners(i(3), i(), i()); + updateGraph.markSourcesRefreshedForUnitTests(); final Table sort3 = pool.submit(() -> table.sortDescending("x")).get(TIMEOUT_LENGTH, TIMEOUT_UNIT); TstUtils.assertTableEquals(tableStart, prevTable(sort1)); TstUtils.assertTableEquals(tableStart, prevTable(sort2)); - ExecutionContext.getContext().getUpdateGraph().cast().completeCycleForUnitTests(); + updateGraph.completeCycleForUnitTests(); TstUtils.assertTableEquals(tableUpdate, sort1); TstUtils.assertTableEquals(tableUpdate, sort2); @@ -447,7 +457,7 @@ public void testReverse() throws ExecutionException, InterruptedException, Timeo final Table tableUpdate3 = TstUtils.testRefreshingTable(i(1, 2, 3, 4, 5, 6).toTracking(), col("x", 6, 5, 4, 3, 2, 1), col("y", "f", "e", "d", "c", "b", "a")); - ExecutionContext.getContext().getUpdateGraph().cast().startCycleForUnitTests(); + updateGraph.startCycleForUnitTests(false); final Table reverse1 = pool.submit(table::reverse).get(TIMEOUT_LENGTH, TIMEOUT_UNIT); @@ -461,19 +471,19 @@ public void testReverse() throws ExecutionException, InterruptedException, Timeo TstUtils.assertTableEquals(tableStart, prevTable(reverse2)); table.notifyListeners(i(8), i(), i()); + updateGraph.markSourcesRefreshedForUnitTests(); final Table reverse3 = pool.submit(table::reverse).get(TIMEOUT_LENGTH, TIMEOUT_UNIT); TstUtils.assertTableEquals(tableStart, prevTable(reverse1)); TstUtils.assertTableEquals(tableStart, prevTable(reverse2)); - ExecutionContext.getContext().getUpdateGraph().cast().completeCycleForUnitTests(); + updateGraph.completeCycleForUnitTests(); TstUtils.assertTableEquals(tableUpdate, reverse1); TstUtils.assertTableEquals(tableUpdate, reverse2); TstUtils.assertTableEquals(tableUpdate, reverse3); - final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); updateGraph.runWithinUnitTestCycle(() -> { TstUtils.addToTable(table, i(10000), col("x", 5), col("y", "e")); table.notifyListeners(i(10000), i(), i()); @@ -502,7 +512,6 @@ public void testSortOfPartitionBy() throws ExecutionException, InterruptedExcept col("x", 1, 2, 3), col("y", "a", "a", "a")); final PartitionedTable pt = table.partitionBy("y"); - final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); updateGraph.startCycleForUnitTests(); TstUtils.addToTable(table, i(3), col("x", 4), col("y", "d")); @@ -542,8 +551,7 @@ public void testChain() throws ExecutionException, InterruptedException, Timeout final Callable
callable = () -> table.updateView("u=x*4").where("z").sortDescending("x"); - final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); - updateGraph.startCycleForUnitTests(); + updateGraph.startCycleForUnitTests(false); final Table chain1 = pool.submit(callable).get(TIMEOUT_LENGTH, TIMEOUT_UNIT); @@ -557,6 +565,7 @@ public void testChain() throws ExecutionException, InterruptedException, Timeout TstUtils.assertTableEquals(tableStart, prevTable(chain2)); table.notifyListeners(i(3), i(), i()); + updateGraph.markSourcesRefreshedForUnitTests(); final Table chain3 = pool.submit(callable).get(TIMEOUT_LENGTH, TIMEOUT_UNIT); @@ -651,7 +660,7 @@ private void testIterative(List> transformations, int see splitCallables.add(new Pair<>(firstHalf, secondHalf)); } - final Table standard = ExecutionContext.getContext().getUpdateGraph().exclusiveLock().computeLocked(() -> { + final Table standard = updateGraph.exclusiveLock().computeLocked(() -> { try { return complete.call(); } catch (Exception e) { @@ -699,7 +708,7 @@ private void testIterative(List> transformations, int see beforeStartFirstHalf.add(pool.submit(splitCallable.first).get(TIMEOUT_LENGTH, TIMEOUT_UNIT)); } - ExecutionContext.getContext().getUpdateGraph().cast().startCycleForUnitTests(); + updateGraph.startCycleForUnitTests(false); if (beforeUpdate) { // before we update the underlying data @@ -776,6 +785,7 @@ private void testIterative(List> transformations, int see } table.notifyListeners(updates[0], updates[1], updates[2]); + updateGraph.markSourcesRefreshedForUnitTests(); if (beforeAndAfterNotify) { final List
beforeAndAfterNotifySplitResults = new ArrayList<>(splitCallables.size()); @@ -854,7 +864,7 @@ public void onFailureInternal(Throwable originalException, Entry sourceEntry) { dynamicTable.addUpdateListener(listener); } lastResultSize = results.size(); - ExecutionContext.getContext().getUpdateGraph().cast() + updateGraph .completeCycleForUnitTests(); if (beforeStartAndAfterCycle) { @@ -872,7 +882,7 @@ public void onFailureInternal(Throwable originalException, Entry sourceEntry) { for (int splitIndex = 0; splitIndex < splitCallables.size(); ++splitIndex) { final int fSplitIndex = splitIndex; - final Table splitResult = ExecutionContext.getContext().getUpdateGraph().exclusiveLock() + final Table splitResult = updateGraph.exclusiveLock() .computeLocked(() -> splitCallables.get(fSplitIndex).second .apply(beforeStartFirstHalf.get(fSplitIndex))) .withAttributes(Map.of( @@ -899,7 +909,7 @@ public void onFailureInternal(Throwable originalException, Entry sourceEntry) { for (int splitIndex = 0; splitIndex < splitCallables.size(); ++splitIndex) { final int fSplitIndex = splitIndex; - final Table splitResult = ExecutionContext.getContext().getUpdateGraph().exclusiveLock() + final Table splitResult = updateGraph.exclusiveLock() .computeLocked(() -> splitCallables.get(fSplitIndex).second .apply(beforeUpdateFirstHalf.get(fSplitIndex))) .withAttributes(Map.of( @@ -955,7 +965,7 @@ public void onFailureInternal(Throwable originalException, Entry sourceEntry) { for (int splitIndex = 0; splitIndex < splitCallables.size(); ++splitIndex) { final int fSplitIndex = splitIndex; - final Table splitResult = ExecutionContext.getContext().getUpdateGraph().exclusiveLock() + final Table splitResult = updateGraph.exclusiveLock() .computeLocked(() -> splitCallables.get(fSplitIndex).second .apply(beforeCycleFirstHalf.get(fSplitIndex))) .withAttributes(Map.of( @@ -1003,7 +1013,7 @@ public void testSelectDistinct() throws ExecutionException, InterruptedException final Table expected2 = newTable(col("y", "a", "d", "b", "c")); final Table expected2outOfOrder = newTable(col("y", "a", "b", "c", "d")); - ExecutionContext.getContext().getUpdateGraph().cast().startCycleForUnitTests(); + updateGraph.startCycleForUnitTests(false); final Callable
callable = () -> table.selectDistinct("y"); @@ -1021,6 +1031,7 @@ public void testSelectDistinct() throws ExecutionException, InterruptedException TstUtils.assertTableEquals(expected1, prevTable(distinct2)); table.notifyListeners(i(3), i(), i()); + updateGraph.markSourcesRefreshedForUnitTests(); final Table distinct3 = pool.submit(callable).get(TIMEOUT_LENGTH, TIMEOUT_UNIT); @@ -1029,7 +1040,7 @@ public void testSelectDistinct() throws ExecutionException, InterruptedException TstUtils.assertTableEquals(expected2, distinct3); TstUtils.assertTableEquals(expected2, prevTable(distinct3)); - ExecutionContext.getContext().getUpdateGraph().cast().completeCycleForUnitTests(); + updateGraph.completeCycleForUnitTests(); TstUtils.assertTableEquals(expected2outOfOrder, distinct1); TstUtils.assertTableEquals(expected2outOfOrder, distinct2); @@ -1084,7 +1095,7 @@ public void testSelectDistinctReset() throws ExecutionException, InterruptedExce final Table slowed = table.updateView("z=barrierFunction.apply(y)"); final Table expected1 = newTable(col("z", "a", "b")); - ExecutionContext.getContext().getUpdateGraph().cast().startCycleForUnitTests(); + updateGraph.startCycleForUnitTests(false); final Callable
callable = () -> slowed.selectDistinct("z"); @@ -1094,10 +1105,11 @@ public void testSelectDistinctReset() throws ExecutionException, InterruptedExce System.out.println("Removing rows"); removeRows(table, i(8)); table.notifyListeners(i(), i(8), i()); + updateGraph.markSourcesRefreshedForUnitTests(); barrierFunction.sleepDuration = 0; - ExecutionContext.getContext().getUpdateGraph().cast().completeCycleForUnitTests(); + updateGraph.completeCycleForUnitTests(); final Table distinct1 = future1.get(TIMEOUT_LENGTH, TIMEOUT_UNIT); TstUtils.assertTableEquals(expected1, distinct1); @@ -1218,12 +1230,12 @@ private void testByConcurrent(Function function, boolean hasKeys, // We only care about the silent version of this table, as it's just a vessel to tick and ensure that the // resultant table // is computed using the appropriate version. - final Table expected1 = ExecutionContext.getContext().getUpdateGraph().exclusiveLock().computeLocked( + final Table expected1 = updateGraph.exclusiveLock().computeLocked( () -> function.apply(table.silent()).select()); - final Table expected2 = ExecutionContext.getContext().getUpdateGraph().exclusiveLock() + final Table expected2 = updateGraph.exclusiveLock() .computeLocked(() -> function.apply(table2)); - ExecutionContext.getContext().getUpdateGraph().cast().startCycleForUnitTests(); + updateGraph.startCycleForUnitTests(false); final Future
future1 = pool.submit(callable); final Table result1; @@ -1272,13 +1284,12 @@ private void testByConcurrent(Function function, boolean hasKeys, TstUtils.assertTableEquals(expected1, prevResult2a, TableDiff.DiffItems.DoublesExact); table.notifyListeners(i(5, 9), i(), allowModifications ? i(8) : i()); + updateGraph.markSourcesRefreshedForUnitTests(); final Future
future3 = pool.submit(callable); if (withReset) { - while (((QueryTable) slowed).getLastNotificationStep() != ExecutionContext.getContext().getUpdateGraph() - .clock().currentStep()) { - ExecutionContext.getContext().getUpdateGraph().cast() - .flushOneNotificationForUnitTests(); + while (((QueryTable) slowed).getLastNotificationStep() != updateGraph.clock().currentStep()) { + updateGraph.flushOneNotificationForUnitTests(); } } final Table result3 = future3.get(TIMEOUT_LENGTH, TIMEOUT_UNIT); @@ -1299,7 +1310,7 @@ private void testByConcurrent(Function function, boolean hasKeys, TableTools.show(expected2); TstUtils.assertTableEquals(expected2, result3, TableDiff.DiffItems.DoublesExact); - ExecutionContext.getContext().getUpdateGraph().cast().completeCycleForUnitTests(); + updateGraph.completeCycleForUnitTests(); if (hasKeys) { TstUtils.assertTableEquals(expected2.sort("KeyColumn"), result1.sort("KeyColumn"), @@ -1344,12 +1355,12 @@ private void testPartitionByConcurrent(boolean withReset) throws Exception { // We only care about the silent version of this table, as it's just a vessel to tick and ensure that the // resultant table // is computed using the appropriate version. - final Table expected1 = ExecutionContext.getContext().getUpdateGraph().exclusiveLock().computeLocked( + final Table expected1 = updateGraph.exclusiveLock().computeLocked( () -> table.silent().partitionBy("KeyColumn").merge().select()); - final Table expected2 = ExecutionContext.getContext().getUpdateGraph().exclusiveLock().computeLocked( + final Table expected2 = updateGraph.exclusiveLock().computeLocked( () -> table2.silent().partitionBy("KeyColumn").merge().select()); - ExecutionContext.getContext().getUpdateGraph().cast().startCycleForUnitTests(); + updateGraph.startCycleForUnitTests(false); final Future future1 = pool.submit(callable); final PartitionedTable result1; @@ -1391,13 +1402,12 @@ private void testPartitionByConcurrent(boolean withReset) throws Exception { TableTools.show(expected1); table.notifyListeners(i(5, 9), i(), i(8)); + updateGraph.markSourcesRefreshedForUnitTests(); final Future future3 = pool.submit(callable); if (withReset) { - while (((QueryTable) slowed).getLastNotificationStep() != ExecutionContext.getContext().getUpdateGraph() - .clock().currentStep()) { - ExecutionContext.getContext().getUpdateGraph().cast() - .flushOneNotificationForUnitTests(); + while (((QueryTable) slowed).getLastNotificationStep() != updateGraph.clock().currentStep()) { + updateGraph.flushOneNotificationForUnitTests(); } } final PartitionedTable result3 = future3.get(TIMEOUT_LENGTH, TIMEOUT_UNIT); @@ -1416,7 +1426,7 @@ private void testPartitionByConcurrent(boolean withReset) throws Exception { assertNull(result3c); TstUtils.assertTableEquals(expected2.where("KeyColumn = `d`"), result3d); - ExecutionContext.getContext().getUpdateGraph().cast().completeCycleForUnitTests(); + updateGraph.completeCycleForUnitTests(); TstUtils.assertTableEquals(expected2, result1.merge()); TstUtils.assertTableEquals(expected2, result2.merge()); @@ -1536,10 +1546,10 @@ public void testConstructSnapshotException() throws ExecutionException, Interrup SleepUtil.sleep(100); // add a row to the table - ExecutionContext.getContext().getUpdateGraph().cast().startCycleForUnitTests(); + updateGraph.startCycleForUnitTests(); TstUtils.addToTable(table, i(10), col("y", "e")); table.notifyListeners(i(10), i(), i()); - ExecutionContext.getContext().getUpdateGraph().cast().completeCycleForUnitTests(); + updateGraph.completeCycleForUnitTests(); // now get the answer final String[] answer = future.get(5000, TimeUnit.MILLISECONDS); @@ -1556,7 +1566,7 @@ public void testStaticSnapshot() throws ExecutionException, InterruptedException TableTools.newTable(col("x", 1, 4, 2, 3), col("y", "a", "d", "b", "c"), col("z", true, true, false, true)); - ExecutionContext.getContext().getUpdateGraph().cast().startCycleForUnitTests(); + updateGraph.startCycleForUnitTests(false); final Table snap1 = pool.submit(() -> table.snapshot()).get(TIMEOUT_LENGTH, TIMEOUT_UNIT); @@ -1570,13 +1580,14 @@ public void testStaticSnapshot() throws ExecutionException, InterruptedException TstUtils.assertTableEquals(tableStart, prevTable(snap2)); table.notifyListeners(i(3), i(), i()); + updateGraph.markSourcesRefreshedForUnitTests(); final Table snap3 = pool.submit(() -> table.snapshot()).get(TIMEOUT_LENGTH, TIMEOUT_UNIT); TstUtils.assertTableEquals(tableStart, prevTable(snap1)); TstUtils.assertTableEquals(tableStart, prevTable(snap2)); - ExecutionContext.getContext().getUpdateGraph().cast().completeCycleForUnitTests(); + updateGraph.completeCycleForUnitTests(); TstUtils.assertTableEquals(tableStart, snap1); TstUtils.assertTableEquals(tableStart, snap2); @@ -1600,7 +1611,6 @@ public void testSnapshotLiveness() { TstUtils.assertTableEquals(snap, base); - final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); updateGraph.runWithinUnitTestCycle(() -> { final TableUpdate downstream1 = new TableUpdateImpl(i(1), i(), i(), RowSetShiftData.EMPTY, ModifiedColumnSet.EMPTY); @@ -1617,4 +1627,34 @@ public void testSnapshotLiveness() { }); TstUtils.assertTableEquals(snap, base); } + + public void testSourceDependencyWithoutListener() { + final QueryTable rootTable = TstUtils.testRefreshingTable(i(10).toTracking(), intCol("Sentinel", 10)); + final QueryTable tickTable = TstUtils.testRefreshingTable(i(0).toTracking(), intCol("Ticking", 1)); + + final ExecutionContext executionContext = ExecutionContext.getContext(); + + final InstrumentedTableUpdateListenerAdapter adapter = + new InstrumentedTableUpdateListenerAdapter(tickTable, true) { + @Override + public void onUpdate(@NotNull final TableUpdate upstream) { + final Table x; + try (final SafeCloseable ignored = executionContext.open()) { + x = rootTable.updateView("X=Sentinel * 2"); + } + TableTools.showWithRowSet(x); + } + + @Override + public boolean canExecute(long step) { + return rootTable.satisfied(step) && super.canExecute(step); + } + }; + tickTable.addUpdateListener(adapter); + + updateGraph.runWithinUnitTestCycle(() -> { + addToTable(tickTable, i(1), intCol("Ticking", 2)); + tickTable.notifyListeners(i(1), i(), i()); + }); + } } diff --git a/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/ClockInconsistencyException.java b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/ClockInconsistencyException.java new file mode 100644 index 00000000000..e84d43d2fcd --- /dev/null +++ b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/ClockInconsistencyException.java @@ -0,0 +1,22 @@ +package io.deephaven.engine.updategraph; + +import io.deephaven.UncheckedDeephavenException; +import org.jetbrains.annotations.NotNull; + +/** + * Runtime exception thrown by update processing code that observes evidence of clock inconsistencies. For example, + * {@link NotificationQueue.Dependency#satisfied(long) dependency satisfaction} may throw an instance of this class if + * the requested clock step is lower than the last satisfied clock step. In practice, this may identify bugs or improper + * update graph mixing in update processing, or allow concurrent snapshots to fail fast if they've already missed a + * clock change. + */ +public class ClockInconsistencyException extends UncheckedDeephavenException { + + public ClockInconsistencyException(@NotNull final String message) { + super(message); + } + + public ClockInconsistencyException(@NotNull final String message, @NotNull final Throwable cause) { + super(message, cause); + } +} diff --git a/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/LogicalClockImpl.java b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/LogicalClockImpl.java index 74c50d1892f..c396ad3cc18 100644 --- a/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/LogicalClockImpl.java +++ b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/LogicalClockImpl.java @@ -7,6 +7,7 @@ import io.deephaven.internal.log.LoggerFactory; import io.deephaven.io.logger.Logger; import io.deephaven.util.annotations.TestUseOnly; +import io.deephaven.util.process.ProcessEnvironment; import java.util.concurrent.atomic.AtomicLong; @@ -47,6 +48,9 @@ public final long currentValue() { */ public final long startUpdateCycle() { final long beforeValue = currentValue.get(); + if (beforeValue == Long.MAX_VALUE) { + ProcessEnvironment.get().getFatalErrorReporter().report("Maximum logical clock cycles exceeded"); + } Assert.eq(LogicalClock.getState(beforeValue), "getState(beforeValue)", State.Idle); final long afterValue = currentValue.incrementAndGet(); Assert.eq(afterValue, "currentValue.incrementAndGet()", beforeValue + 1, "beforeValue + 1"); @@ -60,6 +64,8 @@ public final long startUpdateCycle() { */ public final void completeUpdateCycle() { final long value = currentValue.get(); + // If we try to exceed our maximum clock value, it will be on an Idle to Updating transition, since + // Long.MAX_VALUE & STATE_MASK == 1, which means that the maximum value occurs upon reaching an Idle phase. Assert.eq(LogicalClock.getState(value), "getState(value)", State.Updating); Assert.eq(currentValue.incrementAndGet(), "currentValue.incrementAndGet()", value + 1, "value + 1"); } diff --git a/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/NotificationQueue.java b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/NotificationQueue.java index b10f5b5c170..0e308ae1bb7 100644 --- a/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/NotificationQueue.java +++ b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/NotificationQueue.java @@ -60,6 +60,9 @@ interface Dependency extends LogOutputAppendable { * * @param step The step for which we are testing satisfaction * @return Whether the dependency is satisfied on {@code step} (and will not fire subsequent notifications) + * @throws ClockInconsistencyException if step is observed to be before the highest step known to this + * Dependency; this is a best effort validation, in order to allow concurrent snapshots to fail fast or + * improper update processing to be detected * @implNote For all practical purposes, all implementations should consider whether the {@link UpdateGraph} * itself is satisfied if they have no other dependencies. */ @@ -131,7 +134,6 @@ static UpdateGraph getUpdateGraph(@Nullable Dependency first, Dependency... depe * Enqueue a collection of notifications to be flushed. * * @param notifications The notification to enqueue - * * @see #addNotification(Notification) */ void addNotifications(@NotNull final Collection notifications); diff --git a/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java b/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java index 3959bcad8d9..81b349f972e 100644 --- a/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java +++ b/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java @@ -236,7 +236,7 @@ public int hashCode() { private final WritableColumnSource[] deltaColumns; /** - * This is the last step on which the UGP-synced RowSet was updated. This is used only for consistency checking + * This is the last step on which the UG-synced RowSet was updated. This is used only for consistency checking * between our initial creation and subsequent updates. */ private long lastIndexClockStep = 0; @@ -2135,10 +2135,10 @@ public Boolean usePreviousValues(final long beforeClockValue) { capturedLastIndexClockStep = getLastIndexClockStep(); - final LogicalClock.State state = LogicalClock.getState(beforeClockValue); - final long step = LogicalClock.getStep(beforeClockValue); - if (state != LogicalClock.State.Updating) { - this.step = step; + final LogicalClock.State beforeState = LogicalClock.getState(beforeClockValue); + final long beforeStep = LogicalClock.getStep(beforeClockValue); + if (beforeState == LogicalClock.State.Idle) { + this.step = beforeStep; return false; } @@ -2199,8 +2199,7 @@ BarrageMessage getSnapshot( onGetSnapshot.run(); } - final SnapshotControl snapshotControl = - new SnapshotControl(snapshotSubscriptions); + final SnapshotControl snapshotControl = new SnapshotControl(snapshotSubscriptions); final BarrageMessage msg = ConstructSnapshot.constructBackplaneSnapshotInPositionSpace( this, parent, columnsToSnapshot, positionsToSnapshot, reversePositionsToSnapshot, snapshotControl); diff --git a/server/src/test/java/io/deephaven/server/table/ExportTableUpdateListenerTest.java b/server/src/test/java/io/deephaven/server/table/ExportTableUpdateListenerTest.java index 1efa6878b07..3d8dea54510 100644 --- a/server/src/test/java/io/deephaven/server/table/ExportTableUpdateListenerTest.java +++ b/server/src/test/java/io/deephaven/server/table/ExportTableUpdateListenerTest.java @@ -308,8 +308,9 @@ public void testTableSizeUsesPrev() { throw new UncheckedDeephavenException(ie); } + updateGraph.markSourcesRefreshedForUnitTests(); src.notifyListeners(update); - }); + }, false); // we should get both a run and the update in the same flush expectSizes(t1.getValue().getExportId(), 42, 84);