diff --git a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java index a2db0131879..5e1f9f98639 100644 --- a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java +++ b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java @@ -130,14 +130,15 @@ public void testListTables() { final Namespace ns = Namespace.of("sales"); Collection tables = adapter.listTables(ns); - Assert.eq(tables.size(), "tables.size()", 3, "3 tables in the namespace"); + Assert.eq(tables.size(), "tables.size()", 4, "4 tables in the namespace"); Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_multi")), "tables.contains(sales_multi)"); Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_partitioned")), "tables.contains(sales_partitioned)"); Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_single")), "tables.contains(sales_single)"); + Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_renamed")), "tables.contains(sales_renamed)"); Table table = adapter.listTablesAsTable(ns); - Assert.eq(table.size(), "table.size()", 3, "3 tables in the namespace"); + Assert.eq(table.size(), "table.size()", 4, "4 tables in the namespace"); Assert.eqTrue(table.getColumnSource("Namespace").getType().equals(String.class), "namespace column type"); Assert.eqTrue(table.getColumnSource("TableName").getType().equals(String.class), "table_name column type"); Assert.eqTrue(table.getColumnSource("TableIdentifierObject").getType().equals(TableIdentifier.class), @@ -145,7 +146,7 @@ public void testListTables() { // Test the string versions of the methods table = adapter.listTablesAsTable("sales"); - Assert.eq(table.size(), "table.size()", 3, "3 tables in the namespace"); + Assert.eq(table.size(), "table.size()", 4, "4 tables in the namespace"); } @Test @@ -509,8 +510,8 @@ public void testOpenTableColumnRename() throws ExecutionException, InterruptedEx final IcebergInstructions localInstructions = IcebergInstructions.builder() .dataInstructions(instructions.dataInstructions().get()) - .putColumnRenames("RegionName", "Region") - .putColumnRenames("ItemType", "Item_Type") + .putColumnRenames("Region", "RegionName") + .putColumnRenames("Item_Type", "ItemType") .build(); final IcebergCatalogAdapter adapter = @@ -524,6 +525,88 @@ public void testOpenTableColumnRename() throws ExecutionException, InterruptedEx Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); } + @Test + public void testOpenTableColumnLegalization() throws ExecutionException, InterruptedException, TimeoutException { + uploadParquetFiles(new File(IcebergToolsTest.class.getResource("/warehouse/sales/sales_renamed").getPath()), + warehousePath); + + final IcebergInstructions localInstructions = IcebergInstructions.builder() + .dataInstructions(instructions.dataInstructions().get()) + .build(); + + final IcebergCatalogAdapter adapter = + IcebergTools.createAdapter(resourceCatalog, resourceFileIO); + + final Namespace ns = Namespace.of("sales"); + final TableIdentifier tableId = TableIdentifier.of(ns, "sales_renamed"); + final io.deephaven.engine.table.Table table = adapter.readTable(tableId, localInstructions); + + // Verify we retrieved all the rows. + Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); + + Assert.eqTrue(table.getDefinition().getColumn("Region_Name") != null, "'Region Name' renamed"); + Assert.eqTrue(table.getDefinition().getColumn("ItemType") != null, "'Item&Type' renamed"); + Assert.eqTrue(table.getDefinition().getColumn("UnitsSold") != null, "'Units/Sold' renamed"); + Assert.eqTrue(table.getDefinition().getColumn("Unit_Price") != null, "'Unit Pricee' renamed"); + Assert.eqTrue(table.getDefinition().getColumn("Order_Date") != null, "'Order Date' renamed"); + } + + @Test + public void testOpenTableColumnLegalizationRename() + throws ExecutionException, InterruptedException, TimeoutException { + uploadParquetFiles(new File(IcebergToolsTest.class.getResource("/warehouse/sales/sales_renamed").getPath()), + warehousePath); + + final IcebergInstructions localInstructions = IcebergInstructions.builder() + .dataInstructions(instructions.dataInstructions().get()) + .putColumnRenames("Item&Type", "Item_Type") + .putColumnRenames("Units/Sold", "Units_Sold") + .build(); + + final IcebergCatalogAdapter adapter = + IcebergTools.createAdapter(resourceCatalog, resourceFileIO); + + final Namespace ns = Namespace.of("sales"); + final TableIdentifier tableId = TableIdentifier.of(ns, "sales_renamed"); + final io.deephaven.engine.table.Table table = adapter.readTable(tableId, localInstructions); + + // Verify we retrieved all the rows. + Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); + + Assert.eqTrue(table.getDefinition().getColumn("Region_Name") != null, "'Region Name' renamed"); + Assert.eqTrue(table.getDefinition().getColumn("Item_Type") != null, "'Item&Type' renamed"); + Assert.eqTrue(table.getDefinition().getColumn("Units_Sold") != null, "'Units/Sold' renamed"); + Assert.eqTrue(table.getDefinition().getColumn("Unit_Price") != null, "'Unit Pricee' renamed"); + Assert.eqTrue(table.getDefinition().getColumn("Order_Date") != null, "'Order Date' renamed"); + } + + @Test + public void testOpenTableColumnLegalizationPartitionException() + throws ExecutionException, InterruptedException, TimeoutException { + final TableDefinition tableDef = TableDefinition.of( + ColumnDefinition.ofInt("Year").withPartitioning(), + ColumnDefinition.ofInt("Month").withPartitioning()); + + final IcebergInstructions localInstructions = IcebergInstructions.builder() + .tableDefinition(tableDef) + .putColumnRenames("Year", "Current Year") + .putColumnRenames("Month", "Current Month") + .dataInstructions(instructions.dataInstructions().get()) + .build(); + + final IcebergCatalogAdapter adapter = + IcebergTools.createAdapter(resourceCatalog, resourceFileIO); + + final Namespace ns = Namespace.of("sales"); + final TableIdentifier tableId = TableIdentifier.of(ns, "sales_partitioned"); + try { + final io.deephaven.engine.table.Table table = adapter.readTable(tableId, localInstructions); + Assert.statementNeverExecuted("Expected an exception for missing columns"); + } catch (final TableDataException e) { + Assert.eqTrue(e.getMessage().contains("invalid column name provided"), "Exception message"); + } + } + @Test public void testOpenTableColumnRenamePartitioningColumns() throws ExecutionException, InterruptedException, TimeoutException { diff --git a/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_renamed/data/00000-4-62bcda08-a24a-4b17-a20a-e18edfea5b5a-0-00001.parquet b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_renamed/data/00000-4-62bcda08-a24a-4b17-a20a-e18edfea5b5a-0-00001.parquet new file mode 100644 index 00000000000..36034385967 --- /dev/null +++ b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_renamed/data/00000-4-62bcda08-a24a-4b17-a20a-e18edfea5b5a-0-00001.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:a9f083874b7d88483cbcc81e5104c9fa1cdc74c43dc1a1e14302459e68d99ab6 +size 729387 diff --git a/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_renamed/metadata/00000-556b3c6d-3a0e-4912-916a-151f5d772a7b.metadata.json b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_renamed/metadata/00000-556b3c6d-3a0e-4912-916a-151f5d772a7b.metadata.json new file mode 100644 index 00000000000..dd0687b087a --- /dev/null +++ b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_renamed/metadata/00000-556b3c6d-3a0e-4912-916a-151f5d772a7b.metadata.json @@ -0,0 +1,91 @@ +{ + "format-version" : 2, + "table-uuid" : "49f61dcb-0c7a-414d-8e67-9bf190a24032", + "location" : "s3://warehouse/sales/sales_renamed", + "last-sequence-number" : 1, + "last-updated-ms" : 1720562597002, + "last-column-id" : 5, + "current-schema-id" : 0, + "schemas" : [ { + "type" : "struct", + "schema-id" : 0, + "fields" : [ { + "id" : 1, + "name" : "Region Name", + "required" : false, + "type" : "string" + }, { + "id" : 2, + "name" : "Item&Type", + "required" : false, + "type" : "string" + }, { + "id" : 3, + "name" : "Units/Sold", + "required" : false, + "type" : "int" + }, { + "id" : 4, + "name" : "Unit Price", + "required" : false, + "type" : "double" + }, { + "id" : 5, + "name" : "Order Date", + "required" : false, + "type" : "timestamptz" + } ] + } ], + "default-spec-id" : 0, + "partition-specs" : [ { + "spec-id" : 0, + "fields" : [ ] + } ], + "last-partition-id" : 999, + "default-sort-order-id" : 0, + "sort-orders" : [ { + "order-id" : 0, + "fields" : [ ] + } ], + "properties" : { + "owner" : "root", + "created-at" : "2024-07-09T22:03:15.386555127Z", + "write.format.default" : "parquet", + "write.parquet.compression-codec" : "zstd" + }, + "current-snapshot-id" : 1014870679100567484, + "refs" : { + "main" : { + "snapshot-id" : 1014870679100567484, + "type" : "branch" + } + }, + "snapshots" : [ { + "sequence-number" : 1, + "snapshot-id" : 1014870679100567484, + "timestamp-ms" : 1720562597002, + "summary" : { + "operation" : "append", + "spark.app.id" : "local-1720562294285", + "added-data-files" : "1", + "added-records" : "100000", + "added-files-size" : "729387", + "changed-partition-count" : "1", + "total-records" : "100000", + "total-files-size" : "729387", + "total-data-files" : "1", + "total-delete-files" : "0", + "total-position-deletes" : "0", + "total-equality-deletes" : "0" + }, + "manifest-list" : "s3://warehouse/sales/sales_renamed/metadata/snap-1014870679100567484-1-6154af6b-49cf-41eb-a340-ff60964e750d.avro", + "schema-id" : 0 + } ], + "statistics" : [ ], + "partition-statistics" : [ ], + "snapshot-log" : [ { + "timestamp-ms" : 1720562597002, + "snapshot-id" : 1014870679100567484 + } ], + "metadata-log" : [ ] +} \ No newline at end of file diff --git a/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_renamed/metadata/6154af6b-49cf-41eb-a340-ff60964e750d-m0.avro b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_renamed/metadata/6154af6b-49cf-41eb-a340-ff60964e750d-m0.avro new file mode 100644 index 00000000000..296ea64c6b5 Binary files /dev/null and b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_renamed/metadata/6154af6b-49cf-41eb-a340-ff60964e750d-m0.avro differ diff --git a/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_renamed/metadata/snap-1014870679100567484-1-6154af6b-49cf-41eb-a340-ff60964e750d.avro b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_renamed/metadata/snap-1014870679100567484-1-6154af6b-49cf-41eb-a340-ff60964e750d.avro new file mode 100644 index 00000000000..7c8b46c5495 Binary files /dev/null and b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_renamed/metadata/snap-1014870679100567484-1-6154af6b-49cf-41eb-a340-ff60964e750d.avro differ diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java index c54535599c3..8513d0344fd 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java @@ -3,6 +3,7 @@ // package io.deephaven.iceberg.util; +import io.deephaven.api.util.NameValidator; import io.deephaven.engine.rowset.RowSetFactory; import io.deephaven.engine.table.*; import io.deephaven.engine.table.impl.PartitionAwareSourceTable; @@ -453,10 +454,40 @@ private Table readTableInternal( // Get the user supplied table definition. final TableDefinition userTableDef = userInstructions.tableDefinition().orElse(null); + final Set takenNames = new HashSet<>(); + + // Map all the column names in the schema to their legalized names. + final Map legalizedColumnRenames = new HashMap<>(); + + // Validate user-supplied names meet legalization requirements + for (final Map.Entry entry : userInstructions.columnRenames().entrySet()) { + final String destinationName = entry.getValue(); + if (!NameValidator.isValidColumnName(destinationName)) { + throw new TableDataException( + String.format("%s:%d - invalid column name provided (%s)", table, snapshot.snapshotId(), + destinationName)); + } + // Add these renames to the legalized list. + legalizedColumnRenames.put(entry.getKey(), destinationName); + takenNames.add(destinationName); + } + + for (final Types.NestedField field : schema.columns()) { + final String name = field.name(); + // Do we already have a valid rename for this column from the user or a partitioned column? + if (!legalizedColumnRenames.containsKey(name)) { + final String legalizedName = + NameValidator.legalizeColumnName(name, s -> s.replace(" ", "_"), takenNames); + if (!legalizedName.equals(name)) { + legalizedColumnRenames.put(name, legalizedName); + takenNames.add(legalizedName); + } + } + } + // Get the table definition from the schema (potentially limited by the user supplied table definition and // applying column renames). - final TableDefinition icebergTableDef = - fromSchema(schema, partitionSpec, userTableDef, userInstructions.columnRenames()); + final TableDefinition icebergTableDef = fromSchema(schema, partitionSpec, userTableDef, legalizedColumnRenames); // If the user supplied a table definition, make sure it's fully compatible. final TableDefinition tableDef;