Skip to content

Commit

Permalink
Added in an experimental Windowed table.
Browse files Browse the repository at this point in the history
This implements a windowing partitioned table, that can take in timestamped data and window
across any period of time for any amount of backlook desired.

Generally the implementation leverages an input Blink table that has a timestamp column, it
then creates a new __WINDOWED__ column that it partitions the input by. The implementation
leverages a PriorityQueue to then clean up "expired" windows, maintaining the specified number
of windows and correctly partitioning the incoming data across those windows.
  • Loading branch information
supernomad committed Nov 7, 2023
1 parent 24f8182 commit 158e70b
Show file tree
Hide file tree
Showing 5 changed files with 362 additions and 0 deletions.
21 changes: 21 additions & 0 deletions extensions/window/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
plugins {
id 'java-library'
id 'io.deephaven.project.register'
}

description 'Window: Integrating a time bucketing partitioned table'

dependencies {
implementation project(':engine-table')

Classpaths.inheritImmutables(project)

testImplementation TestTools.projectDependency(project, 'engine-table')
Classpaths.inheritJUnitClassic(project, 'testImplementation')
Classpaths.inheritAssertJ(project)

testRuntimeOnly project(':log-to-slf4j'),
project(path: ':configs'),
project(path: ':test-configs')
Classpaths.inheritSlf4j(project, 'slf4j-simple', 'testRuntimeOnly')
}
2 changes: 2 additions & 0 deletions extensions/window/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
io.deephaven.project.ProjectType=JAVA_PUBLIC

141 changes: 141 additions & 0 deletions extensions/window/src/main/java/io/deephaven/window/WindowedTable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package io.deephaven.window;

import io.deephaven.base.verify.Assert;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.TrackingWritableRowSet;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.WritableColumnSource;
import io.deephaven.engine.table.impl.ConstituentDependency;
import io.deephaven.engine.table.impl.QueryTable;
import io.deephaven.engine.table.impl.partitioned.PartitionedTableImpl;
import io.deephaven.engine.table.impl.sources.ArrayBackedColumnSource;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.jetbrains.annotations.NotNull;

public class WindowedTable extends PartitionedTableImpl {
private static final String CONSTITUENT_COL = "__CONSTITUENT__";
private static final String WINDOW_COL = "__WINDOW__";
private static final String WINDOW_LOWER_BIN =
WINDOW_COL + " = lowerBin(%s, %d)";
private static final String WINDOW_UPPER_BIN =
WINDOW_COL + " = upperBin(%s, %d)";

private WindowingListener listener;

/**
* Create a WindowedTable instance.
*
* @param input The input table to be windowed.
* @param timestampColumnName The name of the timestamp column.
* @param upperBin Whether to use upper bins for windowing.
* @param windowPeriod The duration of each window.
* @param windowCount The number of windows to maintain.
* @return A WindowedTable instance.
*/
public static WindowedTable of(@NotNull final Table input,
@NotNull final String timestampColumnName,
@NotNull final boolean upperBin,
@NotNull final Duration windowPeriod,
@NotNull final long windowCount) {
// This is useless on a non-refreshing table, so lets enforce that here.
Assert.eqTrue(input.isRefreshing(), "need refreshing table");

// This is also useless on a non-blink table, since the upstreams will just
// hold onto the data forever. It is also useful for us to assume Blink
// semantics so just enforce this here.
Assert.eqTrue(
input.getAttribute(Table.BLINK_TABLE_ATTRIBUTE).equals(Boolean.TRUE),
"need blink table");

// Start off by creating our window column that will allow us to split up
// incoming data into the various output tables we need to manage.
String windowExp = String.format(WINDOW_LOWER_BIN, timestampColumnName,
windowPeriod.toNanos());
if (upperBin) {
windowExp = String.format(WINDOW_UPPER_BIN, timestampColumnName,
windowPeriod.toNanos());
}

// Execute the either lower/upper bin call on the table, and then partition
// it by the new column. There by creating our "input" table used by the
// listener to copy the newly created tables into a managable form.
final Table windowed = input.updateView(windowExp);
final Table inputParts =
windowed.removeBlink()
.partitionedAggBy(List.of(), true, null, WINDOW_COL)
.table();

// Verify we are working with what we think we are working with table wise.
final TableDefinition partsDef = TableDefinition.of(
ColumnDefinition.ofTime(WINDOW_COL),
ColumnDefinition.fromGenericType(CONSTITUENT_COL, QueryTable.class));
Assert.eqTrue(inputParts.getDefinition().equalsIgnoreOrder(partsDef),
"definitions mismatch");

// Grab our input and create our output column sources.
final ColumnSource<Instant> inputWindows =
inputParts.getColumnSource(WINDOW_COL);
final ColumnSource<QueryTable> inputTables =
inputParts.getColumnSource(CONSTITUENT_COL);
final WritableColumnSource<Instant> outputWindows =
ArrayBackedColumnSource.getMemoryColumnSource(Instant.class, null);
final WritableColumnSource<QueryTable> outputTables =
ArrayBackedColumnSource.getMemoryColumnSource(QueryTable.class, null);

// Ensure our new column sources are fully setup and ready to go.
outputWindows.startTrackingPrevValues();
outputTables.startTrackingPrevValues();

// Create a new empty rowset, and create our output table.
final TrackingWritableRowSet rowSet = RowSetFactory.empty().toTracking();
final QueryTable outputParts = new QueryTable(
partsDef, rowSet,
Map.of(WINDOW_COL, outputWindows, CONSTITUENT_COL, outputTables));

// Create the listener to handle copying tables into our output.
final WindowingListener listener = new WindowingListener(
inputParts, outputParts, rowSet, inputWindows, inputTables,
outputWindows, outputTables, upperBin, windowPeriod, windowCount);

// Install the constituent dependency between the listener
// maintaining the table and the table itself.
ConstituentDependency.install(outputParts, listener);

// Finally create the actual WindowTable which is really just a
// PartitionedTable in disguise.
final TableDefinition constituentDef = windowed.getDefinition();
return new WindowedTable(listener, outputParts, List.of(WINDOW_COL), true,
CONSTITUENT_COL, constituentDef, true, true);
}

protected WindowedTable(@NotNull final WindowingListener listener,
@NotNull final Table table,
@NotNull final Collection<String> keyColumnNames,
final boolean uniqueKeys,
@NotNull final String constituentColumnName,
@NotNull final TableDefinition constituentDefinition,
final boolean constituentChangesPermitted,
final boolean validateConstituents) {
super(table, keyColumnNames, uniqueKeys, constituentColumnName,
constituentDefinition, constituentChangesPermitted,
validateConstituents);
this.listener = listener;
}

/**
* Close the WindowedTable and its associated listener.
*/
public synchronized void close() {
if (this.listener != null) {
this.listener.close();
this.listener = null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package io.deephaven.window;

import io.deephaven.base.verify.Assert;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetBuilderRandom;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.TrackingWritableRowSet;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableUpdate;
import io.deephaven.engine.table.WritableColumnSource;
import io.deephaven.engine.table.impl.InstrumentedTableUpdateListenerAdapter;
import io.deephaven.engine.table.impl.QueryTable;
import io.deephaven.time.DateTimeUtils;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.PriorityQueue;
import org.jetbrains.annotations.NotNull;

public class WindowingListener extends InstrumentedTableUpdateListenerAdapter {
private final QueryTable child;
private final TrackingWritableRowSet childRowSet;
private final WritableColumnSource<Instant> childWindowSource;
private final WritableColumnSource<QueryTable> childTableSource;

private final Table parent;
private final ColumnSource<Instant> parentWindowSource;
private final ColumnSource<QueryTable> parentTableSource;

private final PriorityQueue<Tracker> tracking;
private final boolean upperBin;
private final Duration windowPeriod;
private final long windowCount;

protected WindowingListener(
final Table parent, final QueryTable child,
final TrackingWritableRowSet childRowSet,
final ColumnSource<Instant> parentWindowSource,
final ColumnSource<QueryTable> parentTableSource,
final WritableColumnSource<Instant> childWindowSource,
final WritableColumnSource<QueryTable> childTableSource,
final boolean upperBin, final Duration windowPeriod,
final long windowCount) {
super("io-deephaven-window-windowinglistener", parent, false);
this.parent = parent;
this.parentWindowSource = parentWindowSource;
this.parentTableSource = parentTableSource;
this.child = child;
this.childRowSet = childRowSet;
this.childTableSource = childTableSource;
this.childWindowSource = childWindowSource;
this.upperBin = upperBin;
this.windowPeriod = windowPeriod;
this.windowCount = windowCount;
this.tracking = new PriorityQueue<>();

// Add ourselves as a listener of our parent.
this.parent.addUpdateListener(this);

// Add ourselves as a parent to our child.
//
// Note: Not 100% sure this is necessary as we are installing a constituent
// dependency one level above but this doesn't seem to hurt anything so
// leaving it in.
this.child.addParentReference(this);
}

public synchronized void close() {
this.parent.removeUpdateListener(this);
}

@Override
public void onUpdate(TableUpdate upstream) {
// We explicitly ignore any removes, and we panic on any shifted/modifed
// data from upstream.
Assert.eqTrue(upstream.shifted().empty(),
"blink upstream has shifted data");
Assert.eqTrue(upstream.modified().isEmpty(),
"blink upstream has modified data");

// Grab our current and then earliest possible bucket we care about.
final long period = this.windowPeriod.toNanos();
final Instant currentBucket =
(this.upperBin) ? DateTimeUtils.upperBin(DateTimeUtils.now(), period)
: DateTimeUtils.lowerBin(DateTimeUtils.now(), period);
final Instant earliestBucket =
(this.windowCount <= 1)
? currentBucket
: DateTimeUtils.minus(currentBucket,
period * (this.windowCount - 1));

// Start by cleaning up our local state using a priority queue as a min-heap
// implementation, allowing for efficient cleanup of existing windows that
// have expired since the last execution.
final RowSet removed = this.cleanup(earliestBucket);
final RowSet added = this.insert(upstream.added(), earliestBucket);

// Update our current rowset with adds/removes, and then notify our child
// tables listeners.
this.childRowSet.update(added, removed);
this.child.notifyListeners(added, removed, RowSetFactory.empty());
}

private RowSet insert(@NotNull final RowSet added,
@NotNull final Instant earliestBucket) {
// Create a new rowset builder so we can inform our downstream of anything
// that eventually gets inserted.
final RowSetBuilderRandom builder = RowSetFactory.builderRandom();

// This is unfortunately a O(n) operation, and it sort of has to be as we
// need to check each and every one of the new window rows to see if we care
// about them. At any rate it should be that `n` is sufficiently small that
// this is not an issue; One would have to want to window at microsecond
// intervals or smaller for this to really be an issue.
added.forAllRowKeys((key) -> {
// For each key start by grabbing the window we are working with and then
// verify we care about it.
final Instant window = this.parentWindowSource.get(key);
if (DateTimeUtils.isBefore(window, earliestBucket)) {
return;
}

// Since we care about this table, grab it and place it in _our_ output
// sources, this will allow us to manage it properly. Also we need to make
// sure we add back the Blink table attribute that we had to remove to
// support the partitioned_agg_by call.
final QueryTable table =
(QueryTable) this.parentTableSource.get(key).withAttributes(
Map.of(Table.BLINK_TABLE_ATTRIBUTE, Boolean.TRUE));

// Quickly ensure that we have enough capacity for this new table to
// track.
this.childTableSource.ensureCapacity(key + 1);
this.childWindowSource.ensureCapacity(key + 1);

// Finally inserting the values we need in the various column sources plus
// our priority queue for pending cleanup.
this.childWindowSource.set(key, window);
this.childTableSource.set(key, table);
this.tracking.add(new Tracker(key, window));

// Lastly ensure we notify our downstreams that there is a new key to work
// with.
builder.addKey(key);
});

return builder.build();
}

private RowSet cleanup(@NotNull final Instant earliestBucket) {
// Create a new rowset builder so we can inform our downstream of anything
// that eventually gets removed.
final RowSetBuilderRandom builder = RowSetFactory.builderRandom();

// Check our current list of tables and nuke anything that should be
// removed.
while (!this.tracking.isEmpty()) {
// Start by taking a peak at the oldest tracker we have.
if (DateTimeUtils.isAfterOrEqual(this.tracking.peek().window,
earliestBucket)) {
// If we get here nothing else could possibly need pruning thanks to the
// priority queue, just break and be done with it.
break;
}

// Actually pop off the oldest and null out any data at the corresponding
// key, while making sure we notify our downstreams that the remove
// happened.
final Tracker oldest = this.tracking.poll();
this.childWindowSource.setNull(oldest.key);
this.childTableSource.setNull(oldest.key);
builder.addKey(oldest.key);
}

return builder.build();
}

protected class Tracker implements Comparable<Tracker> {
public long key;
public Instant window;

protected Tracker(long key, Instant window) {
this.key = key;
this.window = window;
}

@Override
public int compareTo(Tracker other) {
// Compare based on the priority field
return Long.compare(DateTimeUtils.epochNanos(this.window),
DateTimeUtils.epochNanos(other.window));
}
}
}
3 changes: 3 additions & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ project(':extensions-csv').projectDir = file('extensions/csv')
include(':extensions-kafka')
project(':extensions-kafka').projectDir = file('extensions/kafka')

include(':extensions-window')
project(':extensions-window').projectDir = file('extensions/window')

include(':extensions-parquet-base')
project(':extensions-parquet-base').projectDir = file('extensions/parquet/base')

Expand Down

0 comments on commit 158e70b

Please sign in to comment.