Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: support refreshing Iceberg tables #5707

Merged
merged 63 commits into from
Oct 22, 2024

Conversation

lbooker42
Copy link
Contributor

@lbooker42 lbooker42 commented Jul 2, 2024

Add two methods of refreshing tables:

  • Manual refreshing - user specifies which snapshot to load and the engine will parse the snapshot to add/remove Iceberg data files as needed and notify downstream tables of the changes
  • Auto refreshing - at regular intervals (user configurable) the engine will query Iceberg for the latest snapshot and then parse and load

Example code:

Java automatic and manually refreshing tables

import io.deephaven.iceberg.util.*;
import org.apache.iceberg.catalog.*;

// Create a map to hold the Iceberg Catalog properties
def properties = [
    "type": "rest",
    "uri": "http://rest:8181",
    "client.region": "us-east-1",
    "s3.access-key-id": "admin",
    "s3.secret-access-key": "password",
    "s3.endpoint": "http://minio:9000",
]

adapter = IcebergTools.createAdapter("generic-adapter", properties);

//////////////////////////////////////////////////////////////////////

tableAdapter = adapter.loadTable("sales.sales_multi")

snapshots = tableAdapter.snapshots()
definition = tableAdapter.definitionTable()

//////////////////////////////////////////////////////////////////////

// Load the latest snapshot as a static table
sales_multi_static = tableAdapter.table()

// Load a specific snapshot as a static table
sales_multi_static = tableAdapter.table(6119130922262455673L)

//////////////////////////////////////////////////////////////////////

// Manual refreshing
refreshing_instructions = IcebergInstructions.builder()
    .updateMode(IcebergUpdateMode.manualRefreshingMode())
    .build()

// Load a table with a specific snapshot
sales_multi = adapter.readTable(
        "sales.sales_multi",
        5120804857276751995,
        iceberg_instructions)

// Update the table to a specific snapshot
sales_multi.update(848129305390678414L)

// Update to the latest snapshot
sales_multi.update()

//////////////////////////////////////////////////////////////////////

import io.deephaven.iceberg.util.IcebergUpdateMode;

// Automatic refreshing every 1 second 
iceberg_instructions = IcebergInstructions.builder()
    .updateMode(IcebergUpdateMode.autoRefreshingMode(1_000L))
    .build()
sales_multi = tableAdapter.table(iceberg_instructions)

// Load the table using the default refresh of 60 seconds
iceberg_instructions = IcebergInstructions.builder()
    .updateMode(IcebergUpdateMode.autoRefreshingMode())
    .build()
sales_multi = tableAdapter.table(iceberg_instructions)

Python automatic and manually refreshing tables

from deephaven.experimental import s3, iceberg

local_adapter = iceberg.adapter(name="generic-adapter", properties={
    "type" : "rest",
    "uri" : "http://rest:8181",
    "client.region" : "us-east-1",
    "s3.access-key-id" : "admin",
    "s3.secret-access-key" : "password",
    "s3.endpoint" : "http://minio:9000"
});

t_namespaces = local_adapter.namespaces()
t_tables = local_adapter.tables("sales")

## Create a table adapter for 
table_adapter = local_adapter.load_table("sales.sales_multi")

t_snapshots = table_adapter.snapshots()
t_definition = table_adapter.definition()

#################################################

# Get the latest snapshot as a static table
sales_multi_static_latest = table_adapter.table()

# Get a specific snapshot as a static table
sales_multi_static_snap = table_adapter.table(snapshot_id=6119130922262455673)

#################################################

iceberg_instructions = iceberg.IcebergInstructions(
        update_mode=iceberg.IcebergUpdateMode.manual_refresh())

# Get the latest snapshot as a manual refreshing table
sales_multi_refreshing = table_adapter.table(instructions=iceberg_instructions)

# Get a specific snapshot as a manual refreshing table
sales_multi_refreshing = table_adapter.table(
    snapshot_id=6119130922262455673,
    instructions=iceberg_instructions)

# Update to a specific snapshot
sales_multi_refreshing.update(861950607215619880)

# Update to a specific snapshot
sales_multi_refreshing.update(4720492918960789101)

# Update to the latest snapshot
sales_multi_refreshing.update()

#################################################

# Get an auto refreshing table that updates each second (1000 ms)
iceberg_instructions = iceberg.IcebergInstructions(
        update_mode=iceberg.IcebergUpdateMode.auto_refresh(1000))
sales_multi = table_adapter.table(instructions=iceberg_instructions)

# Get an auto refreshing table that updates at the default interval of 60 seconds
iceberg_instructions = iceberg.IcebergInstructions(
        update_mode=iceberg.IcebergUpdateMode.auto_refresh())
sales_multi = table_adapter.table(instructions=iceberg_instructions)

@lbooker42 lbooker42 added this to the 0.36.0 milestone Jul 2, 2024
@lbooker42 lbooker42 self-assigned this Jul 2, 2024
@lbooker42 lbooker42 requested a review from rcaudy July 2, 2024 15:47
/**
* Notify the listener of a {@link TableLocationKey} encountered while initiating or maintaining the location
* subscription. This should occur at most once per location, but the order of delivery is <i>not</i>
* guaranteed.
*
* @param tableLocationKey The new table location key
*/
void handleTableLocationKey(@NotNull ImmutableTableLocationKey tableLocationKey);
void handleTableLocationKeyAdded(@NotNull ImmutableTableLocationKey tableLocationKey);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good change, may be breaking for DHE, please consult Andy pre-merge.

void beginTransaction();

void endTransaction();

/**
* Notify the listener of a {@link TableLocationKey} encountered while initiating or maintaining the location
* subscription. This should occur at most once per location, but the order of delivery is <i>not</i>
Copy link
Member

@rcaudy rcaudy Jul 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider whether we can have add + remove + add. What about remove + add in the same pull?
Should document that this may change the "at most once per location" guarantee, and define semantics.
I think it should be something like:
We allow re-add of a removed TLK. Downstream consumers should process these in an order that respects delivery and transactionality.

Within one transaction, expect at most one of "remove" or "add" for a given TLK.
Within one transaction, we can allow remove followed by add, but not add followed by remove. This dictates that we deliver pending removes before pending adds in processPending.
That is, one transaction allows:

  1. Replace a TLK (remove followed by add)
  2. Remove a TLK (remove)
  3. Add a TLK (add)
    Double add, double remove, or add followed by remove is right out.

Processing an addition to a transaction.

  1. Remove: If there's an existing accumulated remove, error. Else, if there's an existing accumulated add, error. Else, accumulate the remove.
  2. Add: If there's an existing accumulated add, error. Else, accumulate the add.

Across multiple transactions delivered as a batch, ensure that the right end-state is achieved.

  1. Add + remove collapses pairwise to no-op
  2. Remove + add (assuming prior add) should be processed in order. We might very well choose to not allow re-add at this time, I don't expect Iceberg to do this. If we do allow it, we need to be conscious that the removed location's region(s) need(s) to be used for previous data, while the added one needs to be used for current data.
  3. Multiple adds or removes within without their opposite intervening is an error.

null token should be handled exactly the same as a single-element transaction.

Processing a transaction:

  1. Process removes first. If there's an add pending, then delete, swallow the remove. Else, if there's a remove pending, error. Else, store the remove as pending.
  2. Process adds. If there's an add pending, error. Else, store the add as pending.

Note: removal support means that RegionedColumnSources may no longer be immutable! We need to be sure that we are aware of whether a particular TLP might remove data, and ensure that in those cases the RCS is not marked immutable. REVISED: ONLY REPLACE IS AN ISSUE FOR IMMUTABILITY, AS LONG AS WE DON'T RESUSE SLOTS.

We discussed that TLPs should probably specify whether they are guaranteeing that they will never remove TLKs, and whether their TLs will never remove or modify rows. I think if and when we encounter data sources that require modify support, we should probably just use SourcePartitionedTable instead of PartitionAwareSourceTable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if I need to handle the RCS immutability question in this PR since Iceberg will not modify rows.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing a region makes the values in the corresponding row key range disappear. That's OK for immutability.
If you allow a new region to use the same slot, or allow the old region to reincarnate in the same slot potentially with different data, you are violating immutability.

Not reusing slots means that a long-lived iceberg table may eventually exhaust its row key space.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace (remove + add of a TLK) requires some kind of versioning of the TL, in a way that the TLK is aware of in order to ensure that we provide the table with the right TL for the version. AbstractTableLocationProvider's location caching layer is not currently sufficient for atomically replacing TLs.

* must also be newer (higher in sequence number) than the current snapshot or an {@link IllegalArgumentException}
* is thrown.
* Update a manually refreshing table location provider with a specific snapshot from the catalog. If the
* {@code snapshotId} is not found in the list of snapshots for the table, an {@link IllegalArgumentException} is
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Iceberg have a SnapshotNotFoundException? If not, should we add one of our own? IAE is OK, though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iceberg does not produce an error when a snapshot is not matched. Intending to keep these as IAE rather than create an exception used exactly once.

Copy link
Member

@rcaudy rcaudy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.

py/server/deephaven/experimental/iceberg.py Outdated Show resolved Hide resolved
py/server/deephaven/experimental/iceberg.py Outdated Show resolved Hide resolved
py/server/deephaven/experimental/iceberg.py Outdated Show resolved Hide resolved
Comment on lines 135 to 137
def update(self, snapshot_id:Optional[int] = None):
"""
Updates the table with a specific snapshot. If no snapshot is provided, the most recent snapshot is used.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updates the table to match the contents of the specified snapshot. This may result in row removes and additions that will be propagated asynchronously via this IcebergTable's UpdateGraph.

py/server/deephaven/experimental/iceberg.py Outdated Show resolved Hide resolved
py/server/deephaven/experimental/iceberg.py Outdated Show resolved Hide resolved
py/server/deephaven/experimental/iceberg.py Outdated Show resolved Hide resolved
py/server/deephaven/experimental/iceberg.py Outdated Show resolved Hide resolved
py/server/deephaven/experimental/iceberg.py Outdated Show resolved Hide resolved
py/server/deephaven/experimental/iceberg.py Outdated Show resolved Hide resolved
py/server/deephaven/experimental/iceberg.py Outdated Show resolved Hide resolved
py/server/deephaven/experimental/iceberg.py Outdated Show resolved Hide resolved
py/server/deephaven/experimental/iceberg.py Outdated Show resolved Hide resolved
py/server/deephaven/experimental/iceberg.py Outdated Show resolved Hide resolved
Copy link
Member

@rcaudy rcaudy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.

Copy link
Member

@rcaudy rcaudy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Provided a few bits of feedback in Slack.

rcaudy
rcaudy previously approved these changes Oct 21, 2024
py/server/deephaven/experimental/iceberg.py Outdated Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I correct to assume that we still don't unit test python iceberg?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, still correct. We created #5656 and this is a priority for us. With JDBC + sqlite catalogs, we are pretty confident we can get full python testing on a local catalog.

@lbooker42 lbooker42 changed the title feat: support refreshing Iceberg tables feat! support refreshing Iceberg tables Oct 22, 2024
@lbooker42 lbooker42 changed the title feat! support refreshing Iceberg tables feat!: support refreshing Iceberg tables Oct 22, 2024
@lbooker42 lbooker42 enabled auto-merge (squash) October 22, 2024 22:18
@lbooker42 lbooker42 merged commit 1ae0f77 into deephaven:main Oct 22, 2024
17 of 18 checks passed
@github-actions github-actions bot locked and limited conversation to collaborators Oct 22, 2024
@deephaven-internal
Copy link
Contributor

Labels indicate documentation is required. Issues for documentation have been opened:

Community: deephaven/deephaven-docs-community#328

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants