diff --git a/api/src/main/java/com/datastrato/graviton/rel/TableChange.java b/api/src/main/java/com/datastrato/graviton/rel/TableChange.java index d009465e94..88eb97e985 100644 --- a/api/src/main/java/com/datastrato/graviton/rel/TableChange.java +++ b/api/src/main/java/com/datastrato/graviton/rel/TableChange.java @@ -227,7 +227,7 @@ final class SetProperty implements TableChange { private final String property; private final String value; - private SetProperty(String property, String value) { + public SetProperty(String property, String value) { this.property = property; this.value = value; } diff --git a/catalog-lakehouse/build.gradle.kts b/catalog-lakehouse/build.gradle.kts index ebe5c87e28..34601b179d 100644 --- a/catalog-lakehouse/build.gradle.kts +++ b/catalog-lakehouse/build.gradle.kts @@ -14,6 +14,7 @@ plugins { dependencies { implementation(project(":common")) implementation(project(":core")) + implementation(project(":api")) implementation(libs.jackson.databind) implementation(libs.jackson.annotations) implementation(libs.jackson.datatype.jdk8) @@ -24,6 +25,14 @@ dependencies { implementation(libs.bundles.jetty) implementation(libs.bundles.jersey) implementation(libs.bundles.iceberg) + implementation(libs.substrait.java.core) { + exclude("com.fasterxml.jackson.core") + exclude("com.fasterxml.jackson.datatype") + exclude("com.fasterxml.jackson.dataformat") + exclude("com.google.protobuf") + exclude("com.google.code.findbugs") + exclude("org.slf4j") + } compileOnly(libs.lombok) annotationProcessor(libs.lombok) diff --git a/catalog-lakehouse/src/main/java/com/datastrato/graviton/catalog/lakehouse/iceberg/ops/IcebergTableOps.java b/catalog-lakehouse/src/main/java/com/datastrato/graviton/catalog/lakehouse/iceberg/ops/IcebergTableOps.java index f632caf857..ea68409791 100644 --- a/catalog-lakehouse/src/main/java/com/datastrato/graviton/catalog/lakehouse/iceberg/ops/IcebergTableOps.java +++ b/catalog-lakehouse/src/main/java/com/datastrato/graviton/catalog/lakehouse/iceberg/ops/IcebergTableOps.java @@ -4,10 +4,12 @@ */ package com.datastrato.graviton.catalog.lakehouse.iceberg.ops; +import com.datastrato.graviton.catalog.lakehouse.iceberg.ops.IcebergTableOpsHelper.IcebergTableChange; import com.datastrato.graviton.catalog.lakehouse.iceberg.utils.IcebergCatalogUtil; import com.google.common.base.Preconditions; import java.util.Optional; import javax.ws.rs.NotSupportedException; +import org.apache.iceberg.Transaction; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; @@ -31,7 +33,7 @@ public class IcebergTableOps { private static final Logger LOG = LoggerFactory.getLogger(IcebergTableOps.class); - private Catalog catalog; + protected Catalog catalog; private SupportsNamespaces asNamespaceCatalog; private final String DEFAULT_ICEBERG_CATALOG_TYPE = "memory"; @@ -42,6 +44,10 @@ public IcebergTableOps() { } } + public IcebergTableOpsHelper createIcebergTableOpsHelper() { + return new IcebergTableOpsHelper(catalog); + } + private void validateNamespace(Optional namespace) { namespace.ifPresent( n -> @@ -115,4 +121,10 @@ public LoadTableResponse updateTable( TableIdentifier tableIdentifier, UpdateTableRequest updateTableRequest) { return CatalogHandlers.updateTable(catalog, tableIdentifier, updateTableRequest); } + + public LoadTableResponse updateTable(IcebergTableChange icebergTableChange) { + Transaction transaction = icebergTableChange.getTransaction(); + transaction.commitTransaction(); + return loadTable(icebergTableChange.getTableIdentifier()); + } } diff --git a/catalog-lakehouse/src/main/java/com/datastrato/graviton/catalog/lakehouse/iceberg/ops/IcebergTableOpsHelper.java b/catalog-lakehouse/src/main/java/com/datastrato/graviton/catalog/lakehouse/iceberg/ops/IcebergTableOpsHelper.java new file mode 100644 index 0000000000..5aea284c4a --- /dev/null +++ b/catalog-lakehouse/src/main/java/com/datastrato/graviton/catalog/lakehouse/iceberg/ops/IcebergTableOpsHelper.java @@ -0,0 +1,327 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.graviton.catalog.lakehouse.iceberg.ops; + +import com.datastrato.graviton.NameIdentifier; +import com.datastrato.graviton.rel.TableChange; +import com.datastrato.graviton.rel.TableChange.AddColumn; +import com.datastrato.graviton.rel.TableChange.After; +import com.datastrato.graviton.rel.TableChange.ColumnChange; +import com.datastrato.graviton.rel.TableChange.ColumnPosition; +import com.datastrato.graviton.rel.TableChange.DeleteColumn; +import com.datastrato.graviton.rel.TableChange.RemoveProperty; +import com.datastrato.graviton.rel.TableChange.RenameColumn; +import com.datastrato.graviton.rel.TableChange.RenameTable; +import com.datastrato.graviton.rel.TableChange.SetProperty; +import com.datastrato.graviton.rel.TableChange.UpdateColumnComment; +import com.datastrato.graviton.rel.TableChange.UpdateColumnPosition; +import com.datastrato.graviton.rel.TableChange.UpdateColumnType; +import com.datastrato.graviton.rel.TableChange.UpdateComment; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import io.substrait.type.Type; +import io.substrait.type.Type.Binary; +import io.substrait.type.Type.I32; +import io.substrait.type.Type.I64; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import javax.ws.rs.NotSupportedException; +import lombok.Getter; +import lombok.Setter; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.UpdateProperties; +import org.apache.iceberg.UpdateSchema; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.types.Type.PrimitiveType; +import org.apache.iceberg.types.Types.IntegerType; +import org.apache.iceberg.types.Types.LongType; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StringType; +import org.apache.iceberg.types.Types.StructType; + +public class IcebergTableOpsHelper { + + @VisibleForTesting public static final Joiner DOT = Joiner.on("."); + private static final Set IcebergReservedProperties = + ImmutableSet.of( + "location", + "comment", + "current-snapshot-id", + "cherry-pick-snapshot-id", + "sort-order", + "identifier-fields"); + + private Catalog icebergCatalog; + + public IcebergTableOpsHelper(Catalog icebergCatalog) { + this.icebergCatalog = icebergCatalog; + } + + @Getter + @Setter + public static final class IcebergTableChange { + private TableIdentifier tableIdentifier; + private Transaction transaction; + + IcebergTableChange(TableIdentifier tableIdentifier, Transaction transaction) { + this.tableIdentifier = tableIdentifier; + this.transaction = transaction; + } + } + + // todo, just for pass the updateTable test, @yunqing will provide a new implement + private static org.apache.iceberg.types.Type convertType(Type gravitonType) { + if (gravitonType instanceof I32) { + return IntegerType.get(); + } else if (gravitonType instanceof I64) { + return LongType.get(); + } else if (gravitonType instanceof Binary) { + return StringType.get(); + } + return StringType.get(); + } + + private void doDeleteColumn( + UpdateSchema icebergUpdateSchema, DeleteColumn deleteColumn, Schema icebergTableSchema) { + NestedField deleteField = icebergTableSchema.findField(DOT.join(deleteColumn.fieldNames())); + if (deleteField == null) { + if (deleteColumn.getIfExists()) { + return; + } else { + throw new IllegalArgumentException( + "delete column not exists: " + DOT.join(deleteColumn.fieldNames())); + } + } + icebergUpdateSchema.deleteColumn(DOT.join(deleteColumn.fieldNames())); + } + + private void doUpdateColumnComment( + UpdateSchema icebergUpdateSchema, UpdateColumnComment updateColumnComment) { + icebergUpdateSchema.updateColumnDoc( + DOT.join(updateColumnComment.fieldNames()), updateColumnComment.getNewComment()); + } + + private void doSetProperty(UpdateProperties icebergUpdateProperties, SetProperty setProperty) { + icebergUpdateProperties.set(setProperty.getProperty(), setProperty.getValue()); + } + + private void doRemoveProperty( + UpdateProperties icebergUpdateProperties, RemoveProperty removeProperty) { + icebergUpdateProperties.remove(removeProperty.getProperty()); + } + + private void doRenameColumn(UpdateSchema icebergUpdateSchema, RenameColumn renameColumn) { + icebergUpdateSchema.renameColumn( + DOT.join(renameColumn.fieldNames()), renameColumn.getNewName()); + } + + private void doMoveColumn( + UpdateSchema icebergUpdateSchema, String[] fieldNames, ColumnPosition columnPosition) { + if (columnPosition instanceof TableChange.After) { + After after = (After) columnPosition; + String peerName = getSiblingName(fieldNames, after.getColumn()); + icebergUpdateSchema.moveAfter(DOT.join(fieldNames), peerName); + } else if (columnPosition instanceof TableChange.First) { + icebergUpdateSchema.moveFirst(DOT.join(fieldNames)); + } else { + throw new NotSupportedException( + "Iceberg doesn't support column position: " + columnPosition.getClass().getSimpleName()); + } + } + + private void doUpdateColumnPosition( + UpdateSchema icebergUpdateSchema, UpdateColumnPosition updateColumnPosition) { + doMoveColumn( + icebergUpdateSchema, updateColumnPosition.fieldNames(), updateColumnPosition.getPosition()); + } + + private void doUpdateColumnType( + UpdateSchema icebergUpdateSchema, UpdateColumnType updateColumnType) { + org.apache.iceberg.types.Type type = convertType(updateColumnType.getNewDataType()); + Preconditions.checkArgument( + type.isPrimitiveType(), + "Cannot update %s, not a primitive type: %s", + DOT.join(updateColumnType.fieldNames()), + type); + icebergUpdateSchema.updateColumn(DOT.join(updateColumnType.fieldNames()), (PrimitiveType) type); + } + + private ColumnPosition getAddColumnPosition(StructType parent, ColumnPosition columnPosition) { + if (columnPosition != null) { + return columnPosition; + } + + List fields = parent.fields(); + // no column, add to first + if (fields.isEmpty()) { + return ColumnPosition.first(); + } + + NestedField last = fields.get(fields.size() - 1); + return ColumnPosition.after(last.name()); + } + + private void doAddColumn( + UpdateSchema icebergUpdateSchema, AddColumn addColumn, Schema icebergTableSchema) { + // todo(xiaojing) check new column is nullable + String parentName = getParentName(addColumn.fieldNames()); + StructType parentStruct; + if (parentName != null) { + org.apache.iceberg.types.Type parent = icebergTableSchema.findType(parentName); + Preconditions.checkArgument( + parent != null, "Couldn't find parent field: " + parentName + " in iceberg table"); + Preconditions.checkArgument( + parent instanceof StructType, + "Couldn't add column to non-struct field, name:" + + parentName + + ", type:" + + parent.getClass().getSimpleName()); + parentStruct = (StructType) parent; + } else { + parentStruct = icebergTableSchema.asStruct(); + } + + icebergUpdateSchema.addColumn( + getParentName(addColumn.fieldNames()), + getLeafName(addColumn.fieldNames()), + convertType(addColumn.getDataType()), + addColumn.getComment()); + + ColumnPosition position = getAddColumnPosition(parentStruct, addColumn.getPosition()); + doMoveColumn(icebergUpdateSchema, addColumn.fieldNames(), position); + } + + private void alterTableProperty( + UpdateProperties icebergUpdateProperties, List propertyChanges) { + for (TableChange change : propertyChanges) { + if (change instanceof RemoveProperty) { + doRemoveProperty(icebergUpdateProperties, (RemoveProperty) change); + } else if (change instanceof SetProperty) { + doSetProperty(icebergUpdateProperties, (SetProperty) change); + } else { + throw new NotSupportedException( + "Iceberg doesn't support table change: " + + change.getClass().getSimpleName() + + " for now"); + } + } + icebergUpdateProperties.commit(); + } + + private void alterTableColumn( + UpdateSchema icebergUpdateSchema, + List columnChanges, + Schema icebergTableSchema) { + for (ColumnChange change : columnChanges) { + if (change instanceof AddColumn) { + doAddColumn(icebergUpdateSchema, (AddColumn) change, icebergTableSchema); + } else if (change instanceof DeleteColumn) { + doDeleteColumn(icebergUpdateSchema, (DeleteColumn) change, icebergTableSchema); + } else if (change instanceof UpdateColumnPosition) { + doUpdateColumnPosition(icebergUpdateSchema, (UpdateColumnPosition) change); + } else if (change instanceof RenameColumn) { + doRenameColumn(icebergUpdateSchema, (RenameColumn) change); + } else if (change instanceof UpdateColumnType) { + doUpdateColumnType(icebergUpdateSchema, (UpdateColumnType) change); + } else if (change instanceof UpdateColumnComment) { + doUpdateColumnComment(icebergUpdateSchema, (UpdateColumnComment) change); + } else { + throw new NotSupportedException( + "Iceberg doesn't support " + change.getClass().getSimpleName() + " for now"); + } + } + icebergUpdateSchema.commit(); + } + + public IcebergTableChange buildIcebergTableChanges( + NameIdentifier gravitonNameIdentifier, TableChange... tableChanges) { + + TableIdentifier icebergTableIdentifier = + TableIdentifier.of( + Namespace.of(gravitonNameIdentifier.namespace().levels()), + gravitonNameIdentifier.name()); + + List gravitonColumnChanges = Lists.newArrayList(); + List gravitonPropertyChanges = Lists.newArrayList(); + for (TableChange change : tableChanges) { + if (change instanceof ColumnChange) { + gravitonColumnChanges.add((ColumnChange) change); + } else if (change instanceof UpdateComment) { + UpdateComment updateComment = (UpdateComment) change; + gravitonPropertyChanges.add(new SetProperty("comment", updateComment.getNewComment())); + } else if (change instanceof RemoveProperty) { + RemoveProperty removeProperty = (RemoveProperty) change; + Preconditions.checkArgument( + !IcebergReservedProperties.contains(removeProperty.getProperty()), + removeProperty.getProperty() + " is not allowed to remove properties"); + gravitonPropertyChanges.add(removeProperty); + } else if (change instanceof SetProperty) { + SetProperty setProperty = (SetProperty) change; + Preconditions.checkArgument( + !IcebergReservedProperties.contains(setProperty.getProperty()), + setProperty.getProperty() + " is not allowed to Set properties"); + gravitonPropertyChanges.add(setProperty); + } else if (change instanceof RenameTable) { + throw new RuntimeException("RenameTable shouldn't use tableUpdate interface"); + } else { + throw new NotSupportedException("Iceberg doesn't support " + change.getClass() + "for now"); + } + } + + Table icebergBaseTable = icebergCatalog.loadTable(icebergTableIdentifier); + Transaction transaction = icebergBaseTable.newTransaction(); + IcebergTableChange icebergTableChange = + new IcebergTableChange(icebergTableIdentifier, transaction); + if (!gravitonColumnChanges.isEmpty()) { + alterTableColumn( + transaction.updateSchema(), gravitonColumnChanges, icebergBaseTable.schema()); + } + + if (!gravitonPropertyChanges.isEmpty()) { + alterTableProperty(transaction.updateProperties(), gravitonPropertyChanges); + } + + return icebergTableChange; + } + + @VisibleForTesting + static String getParentName(String[] fields) { + if (fields.length > 1) { + return DOT.join(Arrays.copyOfRange(fields, 0, fields.length - 1)); + } + return null; + } + + @VisibleForTesting + static String getLeafName(String[] fields) { + Preconditions.checkArgument( + fields.length > 0, "Invalid field name: at least one name is required"); + return fields[fields.length - 1]; + } + + @VisibleForTesting + static String getSiblingName(String[] fieldNames, String fieldName) { + if (fieldNames.length > 1) { + String[] peerNames = Arrays.copyOf(fieldNames, fieldNames.length); + peerNames[fieldNames.length - 1] = fieldName; + return DOT.join(peerNames); + } + return fieldName; + } + + @VisibleForTesting + static Set getIcebergReservedProperties() { + return IcebergReservedProperties; + } +} diff --git a/catalog-lakehouse/src/test/java/com/datastrato/graviton/catalog/lakehouse/iceberg/ops/TestIcebergTableUpdate.java b/catalog-lakehouse/src/test/java/com/datastrato/graviton/catalog/lakehouse/iceberg/ops/TestIcebergTableUpdate.java new file mode 100644 index 0000000000..c2f0f952e8 --- /dev/null +++ b/catalog-lakehouse/src/test/java/com/datastrato/graviton/catalog/lakehouse/iceberg/ops/TestIcebergTableUpdate.java @@ -0,0 +1,454 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.graviton.catalog.lakehouse.iceberg.ops; + +import com.datastrato.graviton.NameIdentifier; +import com.datastrato.graviton.catalog.lakehouse.iceberg.ops.IcebergTableOpsHelper.IcebergTableChange; +import com.datastrato.graviton.rel.TableChange; +import com.datastrato.graviton.rel.TableChange.ColumnPosition; +import io.substrait.type.Type.I32; +import io.substrait.type.Type.I64; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.rest.requests.CreateNamespaceRequest; +import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.responses.LoadTableResponse; +import org.apache.iceberg.types.Types.IntegerType; +import org.apache.iceberg.types.Types.ListType; +import org.apache.iceberg.types.Types.LongType; +import org.apache.iceberg.types.Types.MapType; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StringType; +import org.apache.iceberg.types.Types.StructType; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class TestIcebergTableUpdate { + private IcebergTableOps icebergTableOps = null; + private IcebergTableOpsHelper icebergTableOpsHelper = null; + private static final String TEST_NAMESPACE_NAME = "graviton_test_namespace"; + private static final String TEST_TABLE_NAME = "graviton_test_table"; + + private static final TableIdentifier icebergIdentifier = + TableIdentifier.of(TEST_NAMESPACE_NAME, TEST_TABLE_NAME); + private static final NameIdentifier identifier = + NameIdentifier.of(TEST_NAMESPACE_NAME, TEST_TABLE_NAME); + + private static final String[] firstField = {"foo_string"}; + private static final String[] secondField = {"foo2_string"}; + private static final String[] thirdField = {"foo_int"}; + private static final String[] fourthField = {"foo_struct"}; + private static final String[] notExistField = {"foo_not_exist"}; + private static final Schema tableSchema = + new Schema( + NestedField.of(1, false, firstField[0], StringType.get()), + NestedField.of(2, false, secondField[0], StringType.get()), + NestedField.of(3, true, thirdField[0], IntegerType.get()), + NestedField.of( + 4, + false, + fourthField[0], + ListType.ofOptional( + 5, + StructType.of( + NestedField.required(6, "struct_int", IntegerType.get()), + NestedField.optional( + 7, + "struct_map", + MapType.ofOptional(8, 9, IntegerType.get(), StringType.get())))))); + + @BeforeEach + public void init() { + icebergTableOps = new IcebergTableOps(); + icebergTableOpsHelper = icebergTableOps.createIcebergTableOpsHelper(); + createNamespace(TEST_NAMESPACE_NAME); + createTable(TEST_NAMESPACE_NAME, TEST_TABLE_NAME); + } + + public LoadTableResponse updateTable( + NameIdentifier gravitonNameIdentifier, TableChange... gravitonTableChanges) { + IcebergTableChange icebergTableChange = + icebergTableOpsHelper.buildIcebergTableChanges( + gravitonNameIdentifier, gravitonTableChanges); + return icebergTableOps.updateTable(icebergTableChange); + } + + private void createNamespace(String namespace) { + icebergTableOps.createNamespace( + CreateNamespaceRequest.builder().withNamespace(Namespace.of(namespace)).build()); + } + + private void createTable(String namespace, String tableName) { + CreateTableRequest createTableRequest = + CreateTableRequest.builder().withName(tableName).withSchema(tableSchema).build(); + icebergTableOps.createTable(Namespace.of(namespace), createTableRequest); + Assertions.assertTrue(icebergTableOps.tableExists(TableIdentifier.of(namespace, tableName))); + } + + @Test + public void testUpdateComment() { + String comments = "new comment"; + TableChange updateComment = TableChange.updateComment(comments); + LoadTableResponse loadTableResponse = updateTable(identifier, updateComment); + String comment = loadTableResponse.tableMetadata().property("comment", ""); + Assertions.assertTrue(comment.equals(comments)); + } + + @Test + public void testSetAndRemoveProperty() { + String testPropertyKey = "test_property_key"; + String testPropertyValue = "test_property_value"; + String testPropertyNewValue = "test_property_new_value"; + LoadTableResponse loadTableResponse = icebergTableOps.loadTable(icebergIdentifier); + Assertions.assertFalse( + loadTableResponse.tableMetadata().properties().containsKey(testPropertyKey)); + + // set a not-existing property + TableChange setProperty = TableChange.setProperty(testPropertyKey, testPropertyValue); + loadTableResponse = updateTable(identifier, setProperty); + Assertions.assertEquals( + loadTableResponse.tableMetadata().property(testPropertyKey, ""), testPropertyValue); + + // overwrite existing property + setProperty = TableChange.setProperty(testPropertyKey, testPropertyNewValue); + loadTableResponse = updateTable(identifier, setProperty); + Assertions.assertEquals( + loadTableResponse.tableMetadata().property(testPropertyKey, ""), testPropertyNewValue); + + // remove existing property + TableChange removeProperty = TableChange.removeProperty(testPropertyKey); + loadTableResponse = updateTable(identifier, removeProperty); + Assertions.assertFalse( + loadTableResponse.tableMetadata().properties().containsKey(testPropertyKey)); + + icebergTableOpsHelper.getIcebergReservedProperties().stream() + .forEach( + property -> { + TableChange setProperty1 = TableChange.setProperty(property, "test_v"); + Assertions.assertThrowsExactly( + IllegalArgumentException.class, () -> updateTable(identifier, setProperty1)); + }); + + icebergTableOpsHelper.getIcebergReservedProperties().stream() + .forEach( + property -> { + TableChange removeProperty1 = TableChange.removeProperty(property); + Assertions.assertThrowsExactly( + IllegalArgumentException.class, () -> updateTable(identifier, removeProperty1)); + }); + } + + @Test + public void testRenameTable() { + TableChange renameTable = TableChange.rename("new_table_name"); + Assertions.assertThrowsExactly( + RuntimeException.class, () -> updateTable(identifier, renameTable)); + } + + @Test + public void testAddColumn() { + // add to after first column + String addColumnNameAfter = "add_column_after"; + TableChange addColumn = + TableChange.addColumn( + new String[] {addColumnNameAfter}, + I32.builder().nullable(true).build(), + "", + ColumnPosition.after(firstField[0])); + LoadTableResponse loadTableResponse = updateTable(identifier, addColumn); + List columns = getColumnNames(loadTableResponse); + Assertions.assertEquals(columns.get(1), addColumnNameAfter); + + // add to first + String addColumnNameFirst = "add_column_first"; + addColumn = + TableChange.addColumn( + new String[] {addColumnNameFirst}, + I32.builder().nullable(true).build(), + "", + ColumnPosition.first()); + loadTableResponse = updateTable(identifier, addColumn); + columns = getColumnNames(loadTableResponse); + Assertions.assertEquals(columns.get(0), addColumnNameFirst); + + // add to last + String addColumnNameLast = "add_column_last"; + addColumn = + TableChange.addColumn( + new String[] {addColumnNameLast}, I32.builder().nullable(true).build()); + loadTableResponse = updateTable(identifier, addColumn); + columns = getColumnNames(loadTableResponse); + Assertions.assertEquals(columns.get(columns.size() - 1), addColumnNameLast); + + // add to struct after + addColumn = + TableChange.addColumn( + new String[] {fourthField[0], "element", "struct_after"}, + I32.builder().nullable(true).build(), + "", + ColumnPosition.after("struct_int")); + loadTableResponse = updateTable(identifier, addColumn); + StructType t = + (StructType) + loadTableResponse + .tableMetadata() + .schema() + .findType(IcebergTableOpsHelper.DOT.join(fourthField[0], "element")); + Assertions.assertEquals("struct_after", t.fields().get(1).name()); + + // add to struct first + addColumn = + TableChange.addColumn( + new String[] {fourthField[0], "element", "struct_first"}, + I32.builder().nullable(true).build(), + "", + ColumnPosition.first()); + loadTableResponse = updateTable(identifier, addColumn); + t = + (StructType) + loadTableResponse + .tableMetadata() + .schema() + .findType(IcebergTableOpsHelper.DOT.join(fourthField[0], "element")); + Assertions.assertEquals("struct_first", t.fields().get(0).name()); + + // add to struct last + addColumn = + TableChange.addColumn( + new String[] {fourthField[0], "element", "struct_last"}, + I32.builder().nullable(true).build()); + loadTableResponse = updateTable(identifier, addColumn); + t = + (StructType) + loadTableResponse + .tableMetadata() + .schema() + .findType(IcebergTableOpsHelper.DOT.join(fourthField[0], "element")); + Assertions.assertEquals("struct_last", t.fields().get(t.fields().size() - 1).name()); + + // add column exists + Assertions.assertThrowsExactly( + IllegalArgumentException.class, + () -> { + TableChange addColumn1 = + TableChange.addColumn(firstField, I32.builder().nullable(true).build(), ""); + updateTable(identifier, addColumn1); + }); + + // after column not exists + Assertions.assertThrowsExactly( + IllegalArgumentException.class, + () -> { + TableChange addColumn1 = + TableChange.addColumn( + firstField, + I32.builder().nullable(true).build(), + "", + ColumnPosition.after("not_exits")); + updateTable(identifier, addColumn1); + }); + } + + @Test + public void testDeleteColumn() { + // delete normal column + TableChange deleteColumn = TableChange.deleteColumn(firstField, true); + LoadTableResponse loadTableResponse = updateTable(identifier, deleteColumn); + List columns = getColumnNames(loadTableResponse); + Assertions.assertFalse(columns.stream().anyMatch(column -> column.equals(firstField[0]))); + + // delete column from list-struct + String[] deleteColumnArray = new String[] {fourthField[0], "element", "struct_int"}; + deleteColumn = TableChange.deleteColumn(deleteColumnArray, false); + loadTableResponse = updateTable(identifier, deleteColumn); + Schema schema = loadTableResponse.tableMetadata().schema(); + Assertions.assertTrue( + schema.findType(IcebergTableOpsHelper.DOT.join(deleteColumnArray)) == null); + + deleteColumn = TableChange.deleteColumn(notExistField, true); + // no exception + updateTable(identifier, deleteColumn); + + TableChange deleteColumn3 = TableChange.deleteColumn(notExistField, false); + Assertions.assertThrowsExactly( + IllegalArgumentException.class, () -> updateTable(identifier, deleteColumn3)); + } + + @Test + public void testUpdateColumnComment() { + String newComment = "new comment"; + TableChange updateColumnComment = TableChange.updateColumnComment(firstField, newComment); + LoadTableResponse loadTableResponse = updateTable(identifier, updateColumnComment); + Assertions.assertEquals( + loadTableResponse.tableMetadata().schema().columns().get(0).doc(), newComment); + } + + @Test + public void testUpdateColumnType() { + TableChange updateColumnType = + TableChange.updateColumnType(thirdField, I64.builder().nullable(true).build()); + LoadTableResponse loadTableResponse = updateTable(identifier, updateColumnType); + Assertions.assertEquals( + LongType.get(), loadTableResponse.tableMetadata().schema().columns().get(2).type()); + + // update struct_int from int to long + updateColumnType = + TableChange.updateColumnType( + new String[] {fourthField[0], "element", "struct_int"}, + I64.builder().nullable(true).build()); + loadTableResponse = updateTable(identifier, updateColumnType); + StructType t = + (StructType) + loadTableResponse + .tableMetadata() + .schema() + .findType(IcebergTableOpsHelper.DOT.join(fourthField[0], "element")); + Assertions.assertEquals(LongType.get(), t.fields().get(0).type()); + + TableChange updateColumnType2 = + TableChange.updateColumnType(notExistField, I32.builder().nullable(true).build()); + Assertions.assertThrowsExactly( + IllegalArgumentException.class, () -> updateTable(identifier, updateColumnType2)); + } + + @Test + public void testRenameColumn() { + String newColumnName = "new_name"; + TableChange renameColumn = TableChange.renameColumn(firstField, newColumnName); + LoadTableResponse loadTableResponse = updateTable(identifier, renameColumn); + List fields = getColumnNames(loadTableResponse); + Assertions.assertEquals(newColumnName, fields.get(0)); + + // rename struct_int to new_name + renameColumn = + TableChange.renameColumn( + new String[] {fourthField[0], "element", "struct_int"}, newColumnName); + loadTableResponse = updateTable(identifier, renameColumn); + StructType t = + (StructType) + loadTableResponse + .tableMetadata() + .schema() + .findType(IcebergTableOpsHelper.DOT.join(fourthField[0], "element")); + Assertions.assertEquals(newColumnName, t.fields().get(0).name()); + + TableChange renameColumn2 = TableChange.renameColumn(notExistField, newColumnName); + Assertions.assertThrowsExactly( + IllegalArgumentException.class, () -> updateTable(identifier, renameColumn2)); + } + + private List getColumnNames(LoadTableResponse loadTableResponse) { + return loadTableResponse.tableMetadata().schema().columns().stream() + .map(NestedField::name) + .collect(Collectors.toList()); + } + + @Test + public void testUpdateColumnPosition() { + // test ColumnPosition.after + TableChange updateColumnPosition = + TableChange.updateColumnPosition(firstField, ColumnPosition.after(secondField[0])); + LoadTableResponse loadTableResponse = updateTable(identifier, updateColumnPosition); + List fieldNames = getColumnNames(loadTableResponse); + Assertions.assertEquals(fieldNames.get(0), secondField[0]); + Assertions.assertEquals(fieldNames.get(1), firstField[0]); + + // test ColumnPosition.first + updateColumnPosition = TableChange.updateColumnPosition(thirdField, ColumnPosition.first()); + loadTableResponse = updateTable(identifier, updateColumnPosition); + fieldNames = + loadTableResponse.tableMetadata().schema().columns().stream() + .map(NestedField::name) + .collect(Collectors.toList()); + Assertions.assertEquals(fieldNames.get(0), thirdField[0]); + Assertions.assertEquals(fieldNames.get(1), secondField[0]); + Assertions.assertEquals(fieldNames.get(2), firstField[0]); + + // test struct columnPosition after + updateColumnPosition = + TableChange.updateColumnPosition( + new String[] {fourthField[0], "element", "struct_int"}, + ColumnPosition.after("struct_map")); + loadTableResponse = updateTable(identifier, updateColumnPosition); + StructType structType = + (StructType) + loadTableResponse + .tableMetadata() + .schema() + .findType(IcebergTableOpsHelper.DOT.join(fourthField[0], "element")); + Assertions.assertEquals("struct_map", structType.fields().get(0).name()); + Assertions.assertEquals("struct_int", structType.fields().get(1).name()); + + // test struct columnPosition first + updateColumnPosition = + TableChange.updateColumnPosition( + new String[] {fourthField[0], "element", "struct_int"}, ColumnPosition.first()); + loadTableResponse = updateTable(identifier, updateColumnPosition); + structType = + (StructType) + loadTableResponse + .tableMetadata() + .schema() + .findType(IcebergTableOpsHelper.DOT.join(fourthField[0], "element")); + Assertions.assertEquals("struct_int", structType.fields().get(0).name()); + Assertions.assertEquals("struct_map", structType.fields().get(1).name()); + + // test update first on not existing column + TableChange updateColumnPosition2 = + TableChange.updateColumnPosition(notExistField, ColumnPosition.first()); + Assertions.assertThrowsExactly( + IllegalArgumentException.class, () -> updateTable(identifier, updateColumnPosition2)); + + // test update after on not existing column + TableChange updateColumnPosition3 = + TableChange.updateColumnPosition(thirdField, ColumnPosition.after(notExistField[0])); + Assertions.assertThrowsExactly( + IllegalArgumentException.class, () -> updateTable(identifier, updateColumnPosition3)); + } + + @Test + void testMultiUpdate() { + // rename column + String newColumnName = "new_name"; + TableChange renameColumn = TableChange.renameColumn(firstField, newColumnName); + + // delete column + TableChange deleteColumn = TableChange.deleteColumn(secondField, true); + + // update properties + String testPropertyKey = "test_property_key"; + String testPropertyValue = "test_property_value"; + TableChange setProperty = TableChange.setProperty(testPropertyKey, testPropertyValue); + + LoadTableResponse loadTableResponse = + updateTable(identifier, renameColumn, deleteColumn, setProperty); + List columns = getColumnNames(loadTableResponse); + + Assertions.assertEquals(newColumnName, columns.get(0)); + + Assertions.assertFalse(columns.stream().anyMatch(column -> column.equals(secondField[0]))); + + Assertions.assertEquals( + loadTableResponse.tableMetadata().property(testPropertyKey, ""), testPropertyValue); + } + + @Test + void testGetFieldName() { + Assertions.assertEquals(null, IcebergTableOpsHelper.getParentName(new String[] {"a"})); + Assertions.assertEquals( + "a.b", IcebergTableOpsHelper.getParentName(new String[] {"a", "b", "c"})); + + Assertions.assertEquals("a", IcebergTableOpsHelper.getLeafName(new String[] {"a"})); + Assertions.assertEquals("c", IcebergTableOpsHelper.getLeafName(new String[] {"a", "b", "c"})); + + Assertions.assertEquals("p", IcebergTableOpsHelper.getSiblingName(new String[] {"a"}, "p")); + Assertions.assertEquals( + "a.b.p", IcebergTableOpsHelper.getSiblingName(new String[] {"a", "b", "c"}, "p")); + } +}