Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce memory footprint in UpdateBy rolling time operatations #5215

Merged
merged 1 commit into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,6 @@ void prepareWindowBucket(UpdateByWindowBucketContext context) {
context.workingChunkSize = Math.toIntExact(Math.min(context.workingChunkSize, context.affectedRows.size()));
}

@Override
void finalizeWindowBucket(UpdateByWindowBucketContext context) {
super.finalizeWindowBucket(context);
}

@Override
UpdateByWindowBucketContext makeWindowContext(final TrackingRowSet sourceRowSet,
final ColumnSource<?> timestampColumnSource,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import io.deephaven.engine.table.impl.ssa.LongSegmentedSortedArray;
import io.deephaven.engine.table.iterators.ChunkedLongColumnIterator;
import io.deephaven.engine.table.iterators.LongColumnIterator;
import io.deephaven.util.SafeCloseable;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand All @@ -31,8 +30,6 @@ class UpdateByWindowRollingTime extends UpdateByWindowRollingBase {
private static final int RING_BUFFER_INITIAL_SIZE = 128;

public static class UpdateByWindowTimeBucketContext extends UpdateByWindowRollingBucketContext {
ChunkSource.GetContext timestampColumnGetContext;

public UpdateByWindowTimeBucketContext(final TrackingRowSet sourceRowSet,
@NotNull final ColumnSource<?> timestampColumnSource,
@Nullable final LongSegmentedSortedArray timestampSsa,
Expand All @@ -42,14 +39,6 @@ public UpdateByWindowTimeBucketContext(final TrackingRowSet sourceRowSet,
final boolean initialStep) {
super(sourceRowSet, timestampColumnSource, timestampSsa, timestampValidRowSet, timestampsModified,
chunkSize, initialStep);
// This is needed by computeAffectedRowsAndOperators() before allocateResources() is called
timestampColumnGetContext = timestampColumnSource.makeGetContext(workingChunkSize);
}

@Override
public void close() {
super.close();
Assert.eqNull(timestampColumnGetContext, "timestampColumnGetContext");
}
}

Expand All @@ -73,15 +62,6 @@ UpdateByWindow copy() {
fwdUnits);
}

@Override
void finalizeWindowBucket(UpdateByWindowBucketContext context) {
UpdateByWindowTimeBucketContext ctx = (UpdateByWindowTimeBucketContext) context;
try (SafeCloseable ignored = ctx.timestampColumnGetContext) {
ctx.timestampColumnGetContext = null;
}
super.finalizeWindowBucket(context);
}

@Override
public UpdateByWindowBucketContext makeWindowContext(final TrackingRowSet sourceRowSet,
final ColumnSource<?> timestampColumnSource,
Expand All @@ -100,13 +80,17 @@ public UpdateByWindowBucketContext makeWindowContext(final TrackingRowSet source
* window parameters. After these rows have been identified, must determine which rows will be needed to recompute
* these values (i.e. that fall within the window and will `influence` this computation).
*/
private static WritableRowSet computeAffectedRowsTime(final UpdateByWindowTimeBucketContext ctx,
private static WritableRowSet computeAffectedRowsTime(
final UpdateByWindowTimeBucketContext ctx,
final ChunkSource.GetContext tsContext,
final RowSet subset, long revNanos, long fwdNanos, boolean usePrev) {
// swap fwd/rev to get the affected windows
return computeInfluencerRowsTime(ctx, subset, fwdNanos, revNanos, usePrev);
return computeInfluencerRowsTime(ctx, tsContext, subset, fwdNanos, revNanos, usePrev);
}

private static WritableRowSet computeInfluencerRowsTime(final UpdateByWindowTimeBucketContext ctx,
private static WritableRowSet computeInfluencerRowsTime(
final UpdateByWindowTimeBucketContext ctx,
final ChunkSource.GetContext tsContext,
final RowSet subset,
long revNanos, long fwdNanos, boolean usePrev) {
try (final RowSequence.Iterator it = subset.getRowSequenceIterator()) {
Expand All @@ -118,8 +102,8 @@ private static WritableRowSet computeInfluencerRowsTime(final UpdateByWindowTime
final int rsSize = rs.intSize();

LongChunk<? extends Values> timestamps = usePrev
? ctx.timestampColumnSource.getPrevChunk(ctx.timestampColumnGetContext, rs).asLongChunk()
: ctx.timestampColumnSource.getChunk(ctx.timestampColumnGetContext, rs).asLongChunk();
? ctx.timestampColumnSource.getPrevChunk(tsContext, rs).asLongChunk()
: ctx.timestampColumnSource.getChunk(tsContext, rs).asLongChunk();

for (int ii = 0; ii < rsSize; ii++) {
final long ts = timestamps.get(ii);
Expand Down Expand Up @@ -168,100 +152,105 @@ private static WritableRowSet computeInfluencerRowsTime(final UpdateByWindowTime
public void computeAffectedRowsAndOperators(UpdateByWindowBucketContext context, @NotNull TableUpdate upstream) {
UpdateByWindowTimeBucketContext ctx = (UpdateByWindowTimeBucketContext) context;

if (upstream.empty() || ctx.sourceRowSet.isEmpty()) {
// No further work will be done on this context
finalizeWindowBucket(context);
return;
}
try (final ChunkSource.GetContext tsContext = ctx.timestampColumnSource.makeGetContext(ctx.workingChunkSize)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the important line. Try-with-resources here and pass the context to the calling functions instead of keeping as a member.

Copy link
Contributor Author

@lbooker42 lbooker42 Mar 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Github is having a problem with matching the lines, even though they are unchanged. Sorry for the confusion. Assuming it is the indentation.

if (upstream.empty() || ctx.sourceRowSet.isEmpty()) {
// No further work will be done on this context
finalizeWindowBucket(context);
return;
}

// all rows are affected on the initial step
if (ctx.initialStep) {
ctx.affectedRows = ctx.sourceRowSet;
// all rows are affected on the initial step
if (ctx.initialStep) {
ctx.affectedRows = ctx.sourceRowSet;

// Get the exact set of rows needed to compute the initial row set
ctx.influencerRows = computeInfluencerRowsTime(ctx, ctx.affectedRows, prevUnits, fwdUnits, false);
// Get the exact set of rows needed to compute the initial row set
ctx.influencerRows =
computeInfluencerRowsTime(ctx, tsContext, ctx.affectedRows, prevUnits, fwdUnits, false);

// mark all operators as affected by this update
context.dirtyOperatorIndices = IntStream.range(0, operators.length).toArray();
context.dirtyOperators = new BitSet(operators.length);
context.dirtyOperators.set(0, operators.length);
// mark all operators as affected by this update
context.dirtyOperatorIndices = IntStream.range(0, operators.length).toArray();
context.dirtyOperators = new BitSet(operators.length);
context.dirtyOperators.set(0, operators.length);

ctx.isDirty = true;
return;
}
ctx.isDirty = true;
return;
}

// determine which operators are affected by this update
processUpdateForContext(context, upstream);
// determine which operators are affected by this update
processUpdateForContext(context, upstream);

if (!ctx.isDirty) {
// No further work will be done on this context
finalizeWindowBucket(context);
return;
}
if (!ctx.isDirty) {
// No further work will be done on this context
finalizeWindowBucket(context);
return;
}

final WritableRowSet tmpAffected = RowSetFactory.empty();

final WritableRowSet tmpAffected = RowSetFactory.empty();
// consider the modifications only when input or timestamp columns were modified
if (upstream.modified().isNonempty() && (ctx.timestampsModified || ctx.inputModified)) {
// recompute all windows that have the modified rows in their window
try (final RowSet modifiedAffected =
computeAffectedRowsTime(ctx, tsContext, upstream.modified(), prevUnits, fwdUnits, false)) {
tmpAffected.insert(modifiedAffected);
}

if (ctx.timestampsModified) {
// recompute all windows previously containing the modified rows
// after the timestamp modifications
try (final WritableRowSet modifiedAffectedPrev =
computeAffectedRowsTime(ctx, tsContext, upstream.getModifiedPreShift(), prevUnits, fwdUnits,
true)) {
// we used the SSA (post-shift) to get these keys, no need to shift
// retain only the rows that still exist in the sourceRowSet
modifiedAffectedPrev.retain(ctx.timestampValidRowSet);
tmpAffected.insert(modifiedAffectedPrev);
}

// re-compute all modified rows, they have new windows after the timestamp modifications
tmpAffected.insert(upstream.modified());
}
}

// consider the modifications only when input or timestamp columns were modified
if (upstream.modified().isNonempty() && (ctx.timestampsModified || ctx.inputModified)) {
// recompute all windows that have the modified rows in their window
try (final RowSet modifiedAffected =
computeAffectedRowsTime(ctx, upstream.modified(), prevUnits, fwdUnits, false)) {
tmpAffected.insert(modifiedAffected);
if (upstream.added().isNonempty()) {
// add the new rows and any cascading changes from inserting rows
final long prev = Math.max(0, prevUnits);
final long fwd = Math.max(0, fwdUnits);
try (final RowSet addedAffected =
computeAffectedRowsTime(ctx, tsContext, upstream.added(), prev, fwd, false)) {
tmpAffected.insert(addedAffected);
}
// compute all new rows
tmpAffected.insert(upstream.added());
}

if (ctx.timestampsModified) {
// recompute all windows that previously contained the modified rows, they may not contain this value
// after the timestamp modifications
try (final WritableRowSet modifiedAffectedPrev =
computeAffectedRowsTime(ctx, upstream.getModifiedPreShift(), prevUnits, fwdUnits, true)) {
// other rows can be affected by removes
if (upstream.removed().isNonempty()) {
final long prev = Math.max(0, prevUnits);
final long fwd = Math.max(0, fwdUnits);
try (final WritableRowSet removedAffected =
computeAffectedRowsTime(ctx, tsContext, upstream.removed(), prev, fwd, true)) {
// we used the SSA (post-shift) to get these keys, no need to shift
// retain only the rows that still exist in the sourceRowSet
modifiedAffectedPrev.retain(ctx.timestampValidRowSet);
tmpAffected.insert(modifiedAffectedPrev);
}
removedAffected.retain(ctx.timestampValidRowSet);

// re-compute all modified rows, they have new windows after the timestamp modifications
tmpAffected.insert(upstream.modified());
tmpAffected.insert(removedAffected);
}
}
}

if (upstream.added().isNonempty()) {
// add the new rows and any cascading changes from inserting rows
final long prev = Math.max(0, prevUnits);
final long fwd = Math.max(0, fwdUnits);
try (final RowSet addedAffected =
computeAffectedRowsTime(ctx, upstream.added(), prev, fwd, false)) {
tmpAffected.insert(addedAffected);
}
// compute all new rows
tmpAffected.insert(upstream.added());
}
ctx.affectedRows = tmpAffected;

// other rows can be affected by removes
if (upstream.removed().isNonempty()) {
final long prev = Math.max(0, prevUnits);
final long fwd = Math.max(0, fwdUnits);
try (final WritableRowSet removedAffected =
computeAffectedRowsTime(ctx, upstream.removed(), prev, fwd, true)) {
// we used the SSA (post-shift) to get these keys, no need to shift
// retain only the rows that still exist in the sourceRowSet
removedAffected.retain(ctx.timestampValidRowSet);

tmpAffected.insert(removedAffected);
if (ctx.affectedRows.isEmpty()) {
// No further work will be done on this context
finalizeWindowBucket(context);
ctx.isDirty = false;
return;
}
}

ctx.affectedRows = tmpAffected;

if (ctx.affectedRows.isEmpty()) {
// No further work will be done on this context
finalizeWindowBucket(context);
ctx.isDirty = false;
return;
// now get influencer rows for the affected rows
ctx.influencerRows =
computeInfluencerRowsTime(ctx, tsContext, ctx.affectedRows, prevUnits, fwdUnits, false);
}

// now get influencer rows for the affected rows
ctx.influencerRows = computeInfluencerRowsTime(ctx, ctx.affectedRows, prevUnits, fwdUnits, false);
}

private long nextLongOrMax(LongColumnIterator it) {
Expand Down
Loading