diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateByWindowCumulative.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateByWindowCumulative.java index 623fc5b6c1f..d161c4163c2 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateByWindowCumulative.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateByWindowCumulative.java @@ -45,11 +45,6 @@ void prepareWindowBucket(UpdateByWindowBucketContext context) { context.workingChunkSize = Math.toIntExact(Math.min(context.workingChunkSize, context.affectedRows.size())); } - @Override - void finalizeWindowBucket(UpdateByWindowBucketContext context) { - super.finalizeWindowBucket(context); - } - @Override UpdateByWindowBucketContext makeWindowContext(final TrackingRowSet sourceRowSet, final ColumnSource timestampColumnSource, diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateByWindowRollingTime.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateByWindowRollingTime.java index eb9203cb5e2..39067ab9053 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateByWindowRollingTime.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateByWindowRollingTime.java @@ -12,7 +12,6 @@ import io.deephaven.engine.table.impl.ssa.LongSegmentedSortedArray; import io.deephaven.engine.table.iterators.ChunkedLongColumnIterator; import io.deephaven.engine.table.iterators.LongColumnIterator; -import io.deephaven.util.SafeCloseable; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -31,8 +30,6 @@ class UpdateByWindowRollingTime extends UpdateByWindowRollingBase { private static final int RING_BUFFER_INITIAL_SIZE = 128; public static class UpdateByWindowTimeBucketContext extends UpdateByWindowRollingBucketContext { - ChunkSource.GetContext timestampColumnGetContext; - public UpdateByWindowTimeBucketContext(final TrackingRowSet sourceRowSet, @NotNull final ColumnSource timestampColumnSource, @Nullable final LongSegmentedSortedArray timestampSsa, @@ -42,14 +39,6 @@ public UpdateByWindowTimeBucketContext(final TrackingRowSet sourceRowSet, final boolean initialStep) { super(sourceRowSet, timestampColumnSource, timestampSsa, timestampValidRowSet, timestampsModified, chunkSize, initialStep); - // This is needed by computeAffectedRowsAndOperators() before allocateResources() is called - timestampColumnGetContext = timestampColumnSource.makeGetContext(workingChunkSize); - } - - @Override - public void close() { - super.close(); - Assert.eqNull(timestampColumnGetContext, "timestampColumnGetContext"); } } @@ -73,15 +62,6 @@ UpdateByWindow copy() { fwdUnits); } - @Override - void finalizeWindowBucket(UpdateByWindowBucketContext context) { - UpdateByWindowTimeBucketContext ctx = (UpdateByWindowTimeBucketContext) context; - try (SafeCloseable ignored = ctx.timestampColumnGetContext) { - ctx.timestampColumnGetContext = null; - } - super.finalizeWindowBucket(context); - } - @Override public UpdateByWindowBucketContext makeWindowContext(final TrackingRowSet sourceRowSet, final ColumnSource timestampColumnSource, @@ -100,13 +80,17 @@ public UpdateByWindowBucketContext makeWindowContext(final TrackingRowSet source * window parameters. After these rows have been identified, must determine which rows will be needed to recompute * these values (i.e. that fall within the window and will `influence` this computation). */ - private static WritableRowSet computeAffectedRowsTime(final UpdateByWindowTimeBucketContext ctx, + private static WritableRowSet computeAffectedRowsTime( + final UpdateByWindowTimeBucketContext ctx, + final ChunkSource.GetContext tsContext, final RowSet subset, long revNanos, long fwdNanos, boolean usePrev) { // swap fwd/rev to get the affected windows - return computeInfluencerRowsTime(ctx, subset, fwdNanos, revNanos, usePrev); + return computeInfluencerRowsTime(ctx, tsContext, subset, fwdNanos, revNanos, usePrev); } - private static WritableRowSet computeInfluencerRowsTime(final UpdateByWindowTimeBucketContext ctx, + private static WritableRowSet computeInfluencerRowsTime( + final UpdateByWindowTimeBucketContext ctx, + final ChunkSource.GetContext tsContext, final RowSet subset, long revNanos, long fwdNanos, boolean usePrev) { try (final RowSequence.Iterator it = subset.getRowSequenceIterator()) { @@ -118,8 +102,8 @@ private static WritableRowSet computeInfluencerRowsTime(final UpdateByWindowTime final int rsSize = rs.intSize(); LongChunk timestamps = usePrev - ? ctx.timestampColumnSource.getPrevChunk(ctx.timestampColumnGetContext, rs).asLongChunk() - : ctx.timestampColumnSource.getChunk(ctx.timestampColumnGetContext, rs).asLongChunk(); + ? ctx.timestampColumnSource.getPrevChunk(tsContext, rs).asLongChunk() + : ctx.timestampColumnSource.getChunk(tsContext, rs).asLongChunk(); for (int ii = 0; ii < rsSize; ii++) { final long ts = timestamps.get(ii); @@ -168,100 +152,105 @@ private static WritableRowSet computeInfluencerRowsTime(final UpdateByWindowTime public void computeAffectedRowsAndOperators(UpdateByWindowBucketContext context, @NotNull TableUpdate upstream) { UpdateByWindowTimeBucketContext ctx = (UpdateByWindowTimeBucketContext) context; - if (upstream.empty() || ctx.sourceRowSet.isEmpty()) { - // No further work will be done on this context - finalizeWindowBucket(context); - return; - } + try (final ChunkSource.GetContext tsContext = ctx.timestampColumnSource.makeGetContext(ctx.workingChunkSize)) { + if (upstream.empty() || ctx.sourceRowSet.isEmpty()) { + // No further work will be done on this context + finalizeWindowBucket(context); + return; + } - // all rows are affected on the initial step - if (ctx.initialStep) { - ctx.affectedRows = ctx.sourceRowSet; + // all rows are affected on the initial step + if (ctx.initialStep) { + ctx.affectedRows = ctx.sourceRowSet; - // Get the exact set of rows needed to compute the initial row set - ctx.influencerRows = computeInfluencerRowsTime(ctx, ctx.affectedRows, prevUnits, fwdUnits, false); + // Get the exact set of rows needed to compute the initial row set + ctx.influencerRows = + computeInfluencerRowsTime(ctx, tsContext, ctx.affectedRows, prevUnits, fwdUnits, false); - // mark all operators as affected by this update - context.dirtyOperatorIndices = IntStream.range(0, operators.length).toArray(); - context.dirtyOperators = new BitSet(operators.length); - context.dirtyOperators.set(0, operators.length); + // mark all operators as affected by this update + context.dirtyOperatorIndices = IntStream.range(0, operators.length).toArray(); + context.dirtyOperators = new BitSet(operators.length); + context.dirtyOperators.set(0, operators.length); - ctx.isDirty = true; - return; - } + ctx.isDirty = true; + return; + } - // determine which operators are affected by this update - processUpdateForContext(context, upstream); + // determine which operators are affected by this update + processUpdateForContext(context, upstream); - if (!ctx.isDirty) { - // No further work will be done on this context - finalizeWindowBucket(context); - return; - } + if (!ctx.isDirty) { + // No further work will be done on this context + finalizeWindowBucket(context); + return; + } + + final WritableRowSet tmpAffected = RowSetFactory.empty(); - final WritableRowSet tmpAffected = RowSetFactory.empty(); + // consider the modifications only when input or timestamp columns were modified + if (upstream.modified().isNonempty() && (ctx.timestampsModified || ctx.inputModified)) { + // recompute all windows that have the modified rows in their window + try (final RowSet modifiedAffected = + computeAffectedRowsTime(ctx, tsContext, upstream.modified(), prevUnits, fwdUnits, false)) { + tmpAffected.insert(modifiedAffected); + } + + if (ctx.timestampsModified) { + // recompute all windows previously containing the modified rows + // after the timestamp modifications + try (final WritableRowSet modifiedAffectedPrev = + computeAffectedRowsTime(ctx, tsContext, upstream.getModifiedPreShift(), prevUnits, fwdUnits, + true)) { + // we used the SSA (post-shift) to get these keys, no need to shift + // retain only the rows that still exist in the sourceRowSet + modifiedAffectedPrev.retain(ctx.timestampValidRowSet); + tmpAffected.insert(modifiedAffectedPrev); + } + + // re-compute all modified rows, they have new windows after the timestamp modifications + tmpAffected.insert(upstream.modified()); + } + } - // consider the modifications only when input or timestamp columns were modified - if (upstream.modified().isNonempty() && (ctx.timestampsModified || ctx.inputModified)) { - // recompute all windows that have the modified rows in their window - try (final RowSet modifiedAffected = - computeAffectedRowsTime(ctx, upstream.modified(), prevUnits, fwdUnits, false)) { - tmpAffected.insert(modifiedAffected); + if (upstream.added().isNonempty()) { + // add the new rows and any cascading changes from inserting rows + final long prev = Math.max(0, prevUnits); + final long fwd = Math.max(0, fwdUnits); + try (final RowSet addedAffected = + computeAffectedRowsTime(ctx, tsContext, upstream.added(), prev, fwd, false)) { + tmpAffected.insert(addedAffected); + } + // compute all new rows + tmpAffected.insert(upstream.added()); } - if (ctx.timestampsModified) { - // recompute all windows that previously contained the modified rows, they may not contain this value - // after the timestamp modifications - try (final WritableRowSet modifiedAffectedPrev = - computeAffectedRowsTime(ctx, upstream.getModifiedPreShift(), prevUnits, fwdUnits, true)) { + // other rows can be affected by removes + if (upstream.removed().isNonempty()) { + final long prev = Math.max(0, prevUnits); + final long fwd = Math.max(0, fwdUnits); + try (final WritableRowSet removedAffected = + computeAffectedRowsTime(ctx, tsContext, upstream.removed(), prev, fwd, true)) { // we used the SSA (post-shift) to get these keys, no need to shift // retain only the rows that still exist in the sourceRowSet - modifiedAffectedPrev.retain(ctx.timestampValidRowSet); - tmpAffected.insert(modifiedAffectedPrev); - } + removedAffected.retain(ctx.timestampValidRowSet); - // re-compute all modified rows, they have new windows after the timestamp modifications - tmpAffected.insert(upstream.modified()); + tmpAffected.insert(removedAffected); + } } - } - if (upstream.added().isNonempty()) { - // add the new rows and any cascading changes from inserting rows - final long prev = Math.max(0, prevUnits); - final long fwd = Math.max(0, fwdUnits); - try (final RowSet addedAffected = - computeAffectedRowsTime(ctx, upstream.added(), prev, fwd, false)) { - tmpAffected.insert(addedAffected); - } - // compute all new rows - tmpAffected.insert(upstream.added()); - } + ctx.affectedRows = tmpAffected; - // other rows can be affected by removes - if (upstream.removed().isNonempty()) { - final long prev = Math.max(0, prevUnits); - final long fwd = Math.max(0, fwdUnits); - try (final WritableRowSet removedAffected = - computeAffectedRowsTime(ctx, upstream.removed(), prev, fwd, true)) { - // we used the SSA (post-shift) to get these keys, no need to shift - // retain only the rows that still exist in the sourceRowSet - removedAffected.retain(ctx.timestampValidRowSet); - - tmpAffected.insert(removedAffected); + if (ctx.affectedRows.isEmpty()) { + // No further work will be done on this context + finalizeWindowBucket(context); + ctx.isDirty = false; + return; } - } - - ctx.affectedRows = tmpAffected; - if (ctx.affectedRows.isEmpty()) { - // No further work will be done on this context - finalizeWindowBucket(context); - ctx.isDirty = false; - return; + // now get influencer rows for the affected rows + ctx.influencerRows = + computeInfluencerRowsTime(ctx, tsContext, ctx.affectedRows, prevUnits, fwdUnits, false); } - - // now get influencer rows for the affected rows - ctx.influencerRows = computeInfluencerRowsTime(ctx, ctx.affectedRows, prevUnits, fwdUnits, false); } private long nextLongOrMax(LongColumnIterator it) {