Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added support to write iceberg tables #5989

Open
wants to merge 48 commits into
base: main
Choose a base branch
from

Conversation

malhotrashivam
Copy link
Contributor

@malhotrashivam malhotrashivam commented Aug 26, 2024

Closes: #6125
Should be merged after #6156

Also moves existing Iceberg tests from Junit4 to Junit5.

@malhotrashivam malhotrashivam added parquet Related to the Parquet integration DocumentationNeeded ReleaseNotesNeeded Release notes are needed s3 iceberg labels Aug 26, 2024
@malhotrashivam malhotrashivam added this to the 0.37.0 milestone Aug 26, 2024
@malhotrashivam malhotrashivam self-assigned this Aug 26, 2024
@malhotrashivam malhotrashivam marked this pull request as draft September 6, 2024 18:27
@malhotrashivam malhotrashivam changed the title feat: [DO NOT MERGE] Added support to write iceberg tables feat: Added support to write iceberg tables Sep 6, 2024
@malhotrashivam malhotrashivam marked this pull request as ready for review October 1, 2024 17:25
@@ -33,95 +36,18 @@
*/
public abstract class ParquetInstructions implements ColumnToCodecMappings {

private static volatile String defaultCompressionCodecName = CompressionCodecName.SNAPPY.toString();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing unnecessary configuration parameters.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general this seems like an improvement, the old code was adding little value for the complexity. What about enterprise usages?

@@ -433,6 +382,14 @@ public boolean useDictionary() {
public void useDictionary(final boolean useDictionary) {
this.useDictionary = useDictionary;
}

public OptionalInt getFieldId() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The field Id related logic may change when #6156 gets merged.

maximum_dictionary_size: Optional[int] = None,
target_page_size: Optional[int] = None,
verify_schema: Optional[bool] = None,
dh_to_iceberg_column_renames: Optional[Dict[str, str]] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

name is very long, especially if a user is specifying it. Any reason it can't just be column_renames?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should also look through the rest of the API to see if column_renames or col_renames would be most consistent. I would guess col_renames.

Comment on lines 197 to 220
if compression_codec_name is not None:
builder.compressionCodecName(compression_codec_name)

if maximum_dictionary_keys is not None:
builder.maximumDictionaryKeys(maximum_dictionary_keys)

if maximum_dictionary_size is not None:
builder.maximumDictionarySize(maximum_dictionary_size)

if target_page_size is not None:
builder.targetPageSize(target_page_size)

if verify_schema is not None:
builder.verifySchema(verify_schema)

if dh_to_iceberg_column_renames is not None:
for dh_name, iceberg_name in dh_to_iceberg_column_renames.items():
builder.putDhToIcebergColumnRenames(dh_name, iceberg_name)

if table_definition is not None:
builder.tableDefinition(TableDefinition(table_definition).j_table_definition)

if data_instructions is not None:
builder.dataInstructions(data_instructions.j_object)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect all of these cases can have is not None removed. Confirm with @jmao-denver on what he wants to see.

tables: List[Table],
partition_paths: Optional[List[str]] = None,
instructions: Optional[IcebergParquetWriteInstructions] = None):
# TODO Review javadoc in this file once again
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo

table_identifier: str,
tables: List[Table],
partition_paths: Optional[List[str]] = None,
instructions: Optional[IcebergParquetWriteInstructions] = None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing a return type hint

instructions: Optional[IcebergParquetWriteInstructions] = None):
# TODO Review javadoc in this file once again
"""
Append the provided Deephaven table as a new partition to the existing Iceberg table in a single snapshot. This
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this says "table" and "partition", but the input is a list of tables. Does that mean multiple tables go to one partition or multiple partitions? etc.

Comment on lines 405 to 406
tables: List[Table],
partition_paths: Optional[List[str]] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see other comments

table_identifier: str,
tables: List[Table],
partition_paths: Optional[List[str]] = None,
instructions: Optional[IcebergParquetWriteInstructions] = None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing a return type hint

of data files that were written. Users can use this list to create a transaction/snapshot if needed.

Args:
table_identifier (str): the identifier string for iceberg table to write to.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

grammar

Comment on lines 414 to 416
tables (List[Table]): the tables to write.
partition_paths (Optional[List[str]]): the partitioning path at which data would be written, for example,
"year=2021/month=01". If omitted, we will try to write data to the table without partitioning.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see other comments

partition_paths (Optional[List[str]]): the partitioning path at which data would be written, for example,
"year=2021/month=01". If omitted, we will try to write data to the table without partitioning.
instructions (Optional[IcebergParquetWriteInstructions]): the instructions for customizations while writing.
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All above cases that are missing the return type hint are also missing docs on the return value

Copy link
Member

@rcaudy rcaudy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was a little less than thorough in the parquet writing and table adapter code, but I think we got the salient bits reviewed.
We should gather a consensus around our schema evolution support, as it influences this PR quite a lot.

* The Deephaven tables to be written. All tables should have the same definition, else a {@link #tableDefinition()
* table definition} should be provided.
*/
public abstract List<Table> dhTables();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a fan generally of prefixing based on namespaces like "dh"; it's okay sometimes as a variable, but I would just let the return type speak for itself io.deephaven.engine.table.Table and call this tables().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do all of these tables need to have exactly #tableDefinition if that is present? If so, we should add a check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All tables do not need to have exactly the same definition if #tableDefinition is provided, else they need to have same definition. I have these check in IcebergTableAdapter::ensureDefinition, I can move some of that logic to the Instructions class.

I have also updated the docs for #tableDefinition to make it more clear.

* <p>
* If not provided, we use the latest schema from the table.
*/
public abstract Optional<Schema> schema();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was debating arguing for the point

We may want to enforce that schema is present if the map is present, because it's Iceberg's responsibility to set the field ids when the schema is created or updated, and thus a user should only be getting the field IDs from an existing schema

but upon further reflection, it's possible that the users are either hard-coding just field IDs after the fact (ie, after the table has been created), or getting it from some other system that interacts with Iceberg.

Of course, in that scenario, you might argue that they should be hard-coding the Schema as opposed to just the field ids... this also does up a bit of a bootstrapping chicken and an egg problem - how can a piece of writing logic be written that is both responsible for creating an initial table if it doesn't exist, but also appending it to it successfully if the table does exist? Are we allowed to assume that Iceberg will create the Schema with the field IDs incrementing starting from 1? We should discuss this bootstrapping assumption problem, and how we may best need to solve it...

I think this method should move above the map; it feels more structurally important.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we might be able to make assumptions about the ordering. Digging into the code I see org.apache.iceberg.types.TypeUtil#assignFreshIds(int, org.apache.iceberg.Schema, org.apache.iceberg.types.TypeUtil.NextID); of course, this is only the de-facto implementation, not sure if the field ids are guaranteed to be in this order based on the spec...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another argument, in the case where DH is the only writer, is the user should be able to completely ignore schema and the map...

Comment on lines 722 to 727
// Sleep for 0.5 second
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, we should have some sort of testing hooks so we don't need to do any sleeps; or maybe we already have a way to wait for new data against the table?

Regardless, if a thread is interrupted in this way in a test, probably better to just add it as exception to the test method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked with Larry, he said he couldn't find a better way to test this part, so I left it like this for now.
Will check with Ryan.

Comment on lines +197 to +201
/**
* @return A callback to be executed when on completing each parquet data file write (excluding the index and
* metadata files).
*/
public abstract Optional<OnWriteCompleted> onWriteCompleted();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the current implementation, I see this always gets invoked on-thread in a linear fashion; we may want to document that the consumer is responsible for thread-safety, or that the writing code will invoke this in a thread safe way. Both ways have there merit, not sure which I prefer. @rcaudy ?

if (numTables == 0) {
return writeInstructions.withTableDefinition(TableDefinition.of());
}
final List<Table> dhTables = writeInstructions.tables();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I get the spirit of this method, but I'm not a fan of how we now have 2 classes of IcebergParquetWriteInstructions. It's the same sort of situation I'm sad about wrt #6149.

I wonder instead of we should add a method

public abstract class IcebergWriteInstructions implements IcebergBaseInstructions {
    ...
    public final TableDefinition tableDefinitionOrFirst() {
        return tableDefinition().orElse(tables().get(0).getDefinition());
    }
    ...

and then, where applicable, have callers use tableDefinitionOrFirst instead of tableDefinition. This saves us from having to create a new object and allows us to preserve the original instructions further down through the call stack.

writeInstructions.onWriteCompleted()
.ifPresent(callback -> callback.onWriteCompleted(CompletedParquetWrite.builder()
.destination(tableDestination)
.numRows(source.size())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it's more appropriate to plumb the callback down through ParquetTableWriter.write? In that way, write doesn't need to return the number of bytes anymore, and it's going to be responsible internally for calling the number of rows (instead of making the caller do source.size(); I do see the safety check is at the inner layer calling checkInitiateSerialTableOperation).

public IcebergTableAdapter createTable(
@NotNull final TableIdentifier tableIdentifier,
@NotNull final TableDefinition definition) {
// TODO Add these APIs to python code once finalized
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO


INSTRUCTIONS_BUILDER addAllDhTables(Iterable<? extends Table> elements);

// TODO Discuss about the API for partition paths, and add tests
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO Check with Devin if this is okay.

* The Deephaven tables to be written. All tables should have the same definition, else a {@link #tableDefinition()
* table definition} should be provided.
*/
public abstract List<Table> dhTables();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All tables do not need to have exactly the same definition if #tableDefinition is provided, else they need to have same definition. I have these check in IcebergTableAdapter::ensureDefinition, I can move some of that logic to the Instructions class.

I have also updated the docs for #tableDefinition to make it more clear.

Comment on lines 722 to 727
// Sleep for 0.5 second
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked with Larry, he said he couldn't find a better way to test this part, so I left it like this for now.
Will check with Ryan.

Comment on lines 220 to 226
// Overwrite with an empty table
final Table emptyTable = TableTools.emptyTable(0)
.update("intCol = (int) 4 * i + 30",
"doubleCol = (double) 4.5 * i + 30");
tableAdapter.overwrite(instructionsBuilder()
.addDhTables(emptyTable)
.build());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, it now seems like an unnecessary complication.
If user wants to delete the content, they can do it with just the iceberg API and don't need Deephaven to help.
So I can delete it for now and we can add it later if needed.

* The {@link TableDefinition} to use when writing Iceberg data files. All tables written by this writer should have
* the same definition.
*/
public abstract TableDefinition tableDefinition();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this definition can be a subset or a superset as well.
This is similar to how we treat table definition on parquet reading/writing side too, where adding an additional column here will lead to null values in the table.
I have updated the javadocs to make it more clear.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
DocumentationNeeded iceberg parquet Related to the Parquet integration ReleaseNotesNeeded Release notes are needed s3
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add support to write deephaven tables to iceberg
5 participants