Skip to content

Commit

Permalink
Reduce memory usage by using a temporary context. (#5215)
Browse files Browse the repository at this point in the history
  • Loading branch information
lbooker42 committed Mar 4, 2024
1 parent f3e620a commit 66b4be0
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,6 @@ void prepareWindowBucket(UpdateByWindowBucketContext context) {
context.workingChunkSize = Math.toIntExact(Math.min(context.workingChunkSize, context.affectedRows.size()));
}

@Override
void finalizeWindowBucket(UpdateByWindowBucketContext context) {
super.finalizeWindowBucket(context);
}

@Override
UpdateByWindowBucketContext makeWindowContext(final TrackingRowSet sourceRowSet,
final ColumnSource<?> timestampColumnSource,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import io.deephaven.engine.table.impl.ssa.LongSegmentedSortedArray;
import io.deephaven.engine.table.iterators.ChunkedLongColumnIterator;
import io.deephaven.engine.table.iterators.LongColumnIterator;
import io.deephaven.util.SafeCloseable;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand All @@ -31,8 +30,6 @@ class UpdateByWindowRollingTime extends UpdateByWindowRollingBase {
private static final int RING_BUFFER_INITIAL_SIZE = 128;

public static class UpdateByWindowTimeBucketContext extends UpdateByWindowRollingBucketContext {
ChunkSource.GetContext timestampColumnGetContext;

public UpdateByWindowTimeBucketContext(final TrackingRowSet sourceRowSet,
@NotNull final ColumnSource<?> timestampColumnSource,
@Nullable final LongSegmentedSortedArray timestampSsa,
Expand All @@ -42,14 +39,6 @@ public UpdateByWindowTimeBucketContext(final TrackingRowSet sourceRowSet,
final boolean initialStep) {
super(sourceRowSet, timestampColumnSource, timestampSsa, timestampValidRowSet, timestampsModified,
chunkSize, initialStep);
// This is needed by computeAffectedRowsAndOperators() before allocateResources() is called
timestampColumnGetContext = timestampColumnSource.makeGetContext(workingChunkSize);
}

@Override
public void close() {
super.close();
Assert.eqNull(timestampColumnGetContext, "timestampColumnGetContext");
}
}

Expand All @@ -73,15 +62,6 @@ UpdateByWindow copy() {
fwdUnits);
}

@Override
void finalizeWindowBucket(UpdateByWindowBucketContext context) {
UpdateByWindowTimeBucketContext ctx = (UpdateByWindowTimeBucketContext) context;
try (SafeCloseable ignored = ctx.timestampColumnGetContext) {
ctx.timestampColumnGetContext = null;
}
super.finalizeWindowBucket(context);
}

@Override
public UpdateByWindowBucketContext makeWindowContext(final TrackingRowSet sourceRowSet,
final ColumnSource<?> timestampColumnSource,
Expand All @@ -100,13 +80,17 @@ public UpdateByWindowBucketContext makeWindowContext(final TrackingRowSet source
* window parameters. After these rows have been identified, must determine which rows will be needed to recompute
* these values (i.e. that fall within the window and will `influence` this computation).
*/
private static WritableRowSet computeAffectedRowsTime(final UpdateByWindowTimeBucketContext ctx,
private static WritableRowSet computeAffectedRowsTime(
final UpdateByWindowTimeBucketContext ctx,
final ChunkSource.GetContext tsContext,
final RowSet subset, long revNanos, long fwdNanos, boolean usePrev) {
// swap fwd/rev to get the affected windows
return computeInfluencerRowsTime(ctx, subset, fwdNanos, revNanos, usePrev);
return computeInfluencerRowsTime(ctx, tsContext, subset, fwdNanos, revNanos, usePrev);
}

private static WritableRowSet computeInfluencerRowsTime(final UpdateByWindowTimeBucketContext ctx,
private static WritableRowSet computeInfluencerRowsTime(
final UpdateByWindowTimeBucketContext ctx,
final ChunkSource.GetContext tsContext,
final RowSet subset,
long revNanos, long fwdNanos, boolean usePrev) {
try (final RowSequence.Iterator it = subset.getRowSequenceIterator()) {
Expand All @@ -118,8 +102,8 @@ private static WritableRowSet computeInfluencerRowsTime(final UpdateByWindowTime
final int rsSize = rs.intSize();

LongChunk<? extends Values> timestamps = usePrev
? ctx.timestampColumnSource.getPrevChunk(ctx.timestampColumnGetContext, rs).asLongChunk()
: ctx.timestampColumnSource.getChunk(ctx.timestampColumnGetContext, rs).asLongChunk();
? ctx.timestampColumnSource.getPrevChunk(tsContext, rs).asLongChunk()
: ctx.timestampColumnSource.getChunk(tsContext, rs).asLongChunk();

for (int ii = 0; ii < rsSize; ii++) {
final long ts = timestamps.get(ii);
Expand Down Expand Up @@ -168,100 +152,105 @@ private static WritableRowSet computeInfluencerRowsTime(final UpdateByWindowTime
public void computeAffectedRowsAndOperators(UpdateByWindowBucketContext context, @NotNull TableUpdate upstream) {
UpdateByWindowTimeBucketContext ctx = (UpdateByWindowTimeBucketContext) context;

if (upstream.empty() || ctx.sourceRowSet.isEmpty()) {
// No further work will be done on this context
finalizeWindowBucket(context);
return;
}
try (final ChunkSource.GetContext tsContext = ctx.timestampColumnSource.makeGetContext(ctx.workingChunkSize)) {
if (upstream.empty() || ctx.sourceRowSet.isEmpty()) {
// No further work will be done on this context
finalizeWindowBucket(context);
return;
}

// all rows are affected on the initial step
if (ctx.initialStep) {
ctx.affectedRows = ctx.sourceRowSet;
// all rows are affected on the initial step
if (ctx.initialStep) {
ctx.affectedRows = ctx.sourceRowSet;

// Get the exact set of rows needed to compute the initial row set
ctx.influencerRows = computeInfluencerRowsTime(ctx, ctx.affectedRows, prevUnits, fwdUnits, false);
// Get the exact set of rows needed to compute the initial row set
ctx.influencerRows =
computeInfluencerRowsTime(ctx, tsContext, ctx.affectedRows, prevUnits, fwdUnits, false);

// mark all operators as affected by this update
context.dirtyOperatorIndices = IntStream.range(0, operators.length).toArray();
context.dirtyOperators = new BitSet(operators.length);
context.dirtyOperators.set(0, operators.length);
// mark all operators as affected by this update
context.dirtyOperatorIndices = IntStream.range(0, operators.length).toArray();
context.dirtyOperators = new BitSet(operators.length);
context.dirtyOperators.set(0, operators.length);

ctx.isDirty = true;
return;
}
ctx.isDirty = true;
return;
}

// determine which operators are affected by this update
processUpdateForContext(context, upstream);
// determine which operators are affected by this update
processUpdateForContext(context, upstream);

if (!ctx.isDirty) {
// No further work will be done on this context
finalizeWindowBucket(context);
return;
}
if (!ctx.isDirty) {
// No further work will be done on this context
finalizeWindowBucket(context);
return;
}

final WritableRowSet tmpAffected = RowSetFactory.empty();

final WritableRowSet tmpAffected = RowSetFactory.empty();
// consider the modifications only when input or timestamp columns were modified
if (upstream.modified().isNonempty() && (ctx.timestampsModified || ctx.inputModified)) {
// recompute all windows that have the modified rows in their window
try (final RowSet modifiedAffected =
computeAffectedRowsTime(ctx, tsContext, upstream.modified(), prevUnits, fwdUnits, false)) {
tmpAffected.insert(modifiedAffected);
}

if (ctx.timestampsModified) {
// recompute all windows previously containing the modified rows
// after the timestamp modifications
try (final WritableRowSet modifiedAffectedPrev =
computeAffectedRowsTime(ctx, tsContext, upstream.getModifiedPreShift(), prevUnits, fwdUnits,
true)) {
// we used the SSA (post-shift) to get these keys, no need to shift
// retain only the rows that still exist in the sourceRowSet
modifiedAffectedPrev.retain(ctx.timestampValidRowSet);
tmpAffected.insert(modifiedAffectedPrev);
}

// re-compute all modified rows, they have new windows after the timestamp modifications
tmpAffected.insert(upstream.modified());
}
}

// consider the modifications only when input or timestamp columns were modified
if (upstream.modified().isNonempty() && (ctx.timestampsModified || ctx.inputModified)) {
// recompute all windows that have the modified rows in their window
try (final RowSet modifiedAffected =
computeAffectedRowsTime(ctx, upstream.modified(), prevUnits, fwdUnits, false)) {
tmpAffected.insert(modifiedAffected);
if (upstream.added().isNonempty()) {
// add the new rows and any cascading changes from inserting rows
final long prev = Math.max(0, prevUnits);
final long fwd = Math.max(0, fwdUnits);
try (final RowSet addedAffected =
computeAffectedRowsTime(ctx, tsContext, upstream.added(), prev, fwd, false)) {
tmpAffected.insert(addedAffected);
}
// compute all new rows
tmpAffected.insert(upstream.added());
}

if (ctx.timestampsModified) {
// recompute all windows that previously contained the modified rows, they may not contain this value
// after the timestamp modifications
try (final WritableRowSet modifiedAffectedPrev =
computeAffectedRowsTime(ctx, upstream.getModifiedPreShift(), prevUnits, fwdUnits, true)) {
// other rows can be affected by removes
if (upstream.removed().isNonempty()) {
final long prev = Math.max(0, prevUnits);
final long fwd = Math.max(0, fwdUnits);
try (final WritableRowSet removedAffected =
computeAffectedRowsTime(ctx, tsContext, upstream.removed(), prev, fwd, true)) {
// we used the SSA (post-shift) to get these keys, no need to shift
// retain only the rows that still exist in the sourceRowSet
modifiedAffectedPrev.retain(ctx.timestampValidRowSet);
tmpAffected.insert(modifiedAffectedPrev);
}
removedAffected.retain(ctx.timestampValidRowSet);

// re-compute all modified rows, they have new windows after the timestamp modifications
tmpAffected.insert(upstream.modified());
tmpAffected.insert(removedAffected);
}
}
}

if (upstream.added().isNonempty()) {
// add the new rows and any cascading changes from inserting rows
final long prev = Math.max(0, prevUnits);
final long fwd = Math.max(0, fwdUnits);
try (final RowSet addedAffected =
computeAffectedRowsTime(ctx, upstream.added(), prev, fwd, false)) {
tmpAffected.insert(addedAffected);
}
// compute all new rows
tmpAffected.insert(upstream.added());
}
ctx.affectedRows = tmpAffected;

// other rows can be affected by removes
if (upstream.removed().isNonempty()) {
final long prev = Math.max(0, prevUnits);
final long fwd = Math.max(0, fwdUnits);
try (final WritableRowSet removedAffected =
computeAffectedRowsTime(ctx, upstream.removed(), prev, fwd, true)) {
// we used the SSA (post-shift) to get these keys, no need to shift
// retain only the rows that still exist in the sourceRowSet
removedAffected.retain(ctx.timestampValidRowSet);

tmpAffected.insert(removedAffected);
if (ctx.affectedRows.isEmpty()) {
// No further work will be done on this context
finalizeWindowBucket(context);
ctx.isDirty = false;
return;
}
}

ctx.affectedRows = tmpAffected;

if (ctx.affectedRows.isEmpty()) {
// No further work will be done on this context
finalizeWindowBucket(context);
ctx.isDirty = false;
return;
// now get influencer rows for the affected rows
ctx.influencerRows =
computeInfluencerRowsTime(ctx, tsContext, ctx.affectedRows, prevUnits, fwdUnits, false);
}

// now get influencer rows for the affected rows
ctx.influencerRows = computeInfluencerRowsTime(ctx, ctx.affectedRows, prevUnits, fwdUnits, false);
}

private long nextLongOrMax(LongColumnIterator it) {
Expand Down

0 comments on commit 66b4be0

Please sign in to comment.