Skip to content

Commit

Permalink
WIP, for review
Browse files Browse the repository at this point in the history
  • Loading branch information
lbooker42 committed Jul 3, 2024
1 parent 264fdb1 commit 58d0a73
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.TrackingWritableRowSet;
import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.DataIndex;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableListener;
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.locations.ImmutableTableLocationKey;
import io.deephaven.engine.table.impl.locations.TableLocation;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -53,7 +50,7 @@ public interface ColumnSourceManager extends LivenessReferent {
*
* @return The set of added row keys, to be owned by the caller
*/
WritableRowSet refresh();
TableUpdate refresh();

/**
* Advise this ColumnSourceManager that an error has occurred, and that it will no longer be {@link #refresh()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.deephaven.engine.rowset.TrackingWritableRowSet;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.TableUpdate;
import io.deephaven.engine.table.TableUpdateListener;
import io.deephaven.engine.table.impl.locations.ImmutableTableLocationKey;
import io.deephaven.engine.table.impl.locations.TableDataException;
Expand Down Expand Up @@ -106,7 +107,6 @@ public abstract class SourceTable<IMPL_TYPE extends SourceTable<IMPL_TYPE>> exte
}

setRefreshing(isRefreshing);
setAttribute(Table.ADD_ONLY_TABLE_ATTRIBUTE, Boolean.TRUE);
}

/**
Expand Down Expand Up @@ -219,10 +219,6 @@ protected void instrumentedRefresh() {
final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate = locationBuffer.processPending();
final ImmutableTableLocationKey[] removedKeys =
maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys());
if (removedKeys.length > 0) {
throw new TableLocationRemovedException("Source table does not support removed locations",
removedKeys);
}
maybeAddLocations(locationUpdate.getPendingAddedLocationKeys());

// This class previously had functionality to notify "location listeners", but it was never used.
Expand All @@ -232,13 +228,13 @@ protected void instrumentedRefresh() {
return;
}

final RowSet added = columnSourceManager.refresh();
if (added.isEmpty()) {
final TableUpdate update = columnSourceManager.refresh();
if (update.empty()) {
return;
}

rowSet.insert(added);
notifyListeners(added, RowSetFactory.empty(), RowSetFactory.empty());
rowSet.insert(update.added());
rowSet.remove(update.removed());
notifyListeners(update);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public class RegionedColumnSourceManager extends LivenessArtifact implements Col
private final KeyedObjectHashMap<ImmutableTableLocationKey, IncludedTableLocationEntry> includedTableLocations =
new KeyedObjectHashMap<>(INCLUDED_TABLE_LOCATION_ENTRY_KEY);

private final List<IncludedTableLocationEntry> removedTableLocations = new ArrayList<>();

/**
* Table locations that provide the regions backing our column sources, in insertion order.
*/
Expand Down Expand Up @@ -206,7 +208,8 @@ public boolean removeLocationKey(@NotNull final ImmutableTableLocationKey locati
log.debug().append("EMPTY_LOCATION_REMOVED:").append(locationKey.toString()).endl();
}
} else if (includedLocation != null) {
includedLocation.invalidate();
removedTableLocations.add(includedLocation);
// includedLocation.invalidate();
return true;
}

Expand All @@ -219,7 +222,8 @@ public synchronized TrackingWritableRowSet initialize() {

// Do our first pass over the locations to include as many as possible and build the initial row set
// noinspection resource
final TrackingWritableRowSet initialRowSet = update(true).toTracking();
final TableUpdate update = update(true);
final TrackingWritableRowSet initialRowSet = update.added().writableCast().toTracking();

// Add single-column data indexes for all partitioning columns, whether refreshing or not
columnDefinitions.stream().filter(ColumnDefinition::isPartitioning).forEach(cd -> {
Expand Down Expand Up @@ -261,7 +265,7 @@ public synchronized TrackingWritableRowSet initialize() {
}

@Override
public synchronized WritableRowSet refresh() {
public synchronized TableUpdate refresh() {
if (!isRefreshing) {
throw new UnsupportedOperationException("Cannot refresh a static table");
}
Expand All @@ -280,8 +284,19 @@ public void deliverError(@NotNull final Throwable error, @Nullable final TableLi
}
}

private WritableRowSet update(final boolean initializing) {
private TableUpdate update(final boolean initializing) {
final RowSetBuilderSequential addedRowSetBuilder = RowSetFactory.builderSequential();
final RowSetBuilderSequential removedRowSetBuilder = RowSetFactory.builderSequential();

// Sort the removed locations by region index, so that we can process them in order.
removedTableLocations.sort(Comparator.comparingInt(e -> e.regionIndex));
for (final IncludedTableLocationEntry removedLocation : removedTableLocations) {
final long regionFirstKey = RegionedColumnSource.getFirstRowKey(removedLocation.regionIndex);
removedLocation.location.getRowSet()
.forAllRowKeyRanges((subRegionFirstKey, subRegionLastKey) -> removedRowSetBuilder
.appendRange(regionFirstKey + subRegionFirstKey, regionFirstKey + subRegionLastKey));
}
removedTableLocations.clear();

final RowSetBuilderSequential modifiedRegionBuilder = initializing ? null : RowSetFactory.builderSequential();

Expand Down Expand Up @@ -373,7 +388,8 @@ private WritableRowSet update(final boolean initializing) {
includedLocationsTable.notifyListeners(update);
}
}
return addedRowSetBuilder.build();
return new TableUpdateImpl(addedRowSetBuilder.build(), removedRowSetBuilder.build(), RowSetFactory.empty(),
RowSetShiftData.EMPTY, ModifiedColumnSet.EMPTY);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ public abstract class IcebergInstructions {
public static final IcebergInstructions DEFAULT = builder().build();

public enum IcebergRefreshing {
STATIC,
AUTO_REFRESHING,
MANUAL_REFRESHING
STATIC, AUTO_REFRESHING, MANUAL_REFRESHING
}

public static Builder builder() {
Expand Down

0 comments on commit 58d0a73

Please sign in to comment.