diff --git a/.github/workflows/integration-test.yml b/.github/workflows/integration-test.yml index 8f4103d5fe..f67b70e541 100644 --- a/.github/workflows/integration-test.yml +++ b/.github/workflows/integration-test.yml @@ -10,7 +10,7 @@ on: env: HIVE_IMAGE_NAME: datastrato/gravitino-ci-hive - HIVE_IMAGE_TAG_NAME: 0.1.2 + HIVE_IMAGE_TAG_NAME: 0.1.4 concurrency: group: ${{ github.worklfow }}-${{ github.event.pull_request.number || github.ref }} diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java index 28990ad571..432ab69b0e 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java @@ -121,8 +121,8 @@ public void close() {} @Override public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogException { try { - return icebergTableOps - .listNamespace(IcebergTableOpsHelper.getIcebergNamespace(namespace.levels())).namespaces() + + return icebergTableOps.listNamespace(IcebergTableOpsHelper.getIcebergNamespace()).namespaces() .stream() .map(icebergNamespace -> NameIdentifier.of(icebergNamespace.levels())) .toArray(NameIdentifier[]::new); @@ -161,7 +161,7 @@ public IcebergSchema createSchema( .build()) .build(); icebergTableOps.createNamespace( - createdSchema.toCreateRequest(IcebergTableOpsHelper.getIcebergNamespace(ident))); + createdSchema.toCreateRequest(IcebergTableOpsHelper.getIcebergNamespace(ident.name()))); LOG.info( "Created Iceberg schema (database) {} in Iceberg\ncurrentUser:{} \ncomment: {} \nmetadata: {}", ident.name(), @@ -194,7 +194,7 @@ public IcebergSchema createSchema( public IcebergSchema loadSchema(NameIdentifier ident) throws NoSuchSchemaException { try { GetNamespaceResponse response = - icebergTableOps.loadNamespace(IcebergTableOpsHelper.getIcebergNamespace(ident)); + icebergTableOps.loadNamespace(IcebergTableOpsHelper.getIcebergNamespace(ident.name())); IcebergSchema icebergSchema = new IcebergSchema.Builder() .withName(ident.name()) @@ -229,7 +229,7 @@ public IcebergSchema alterSchema(NameIdentifier ident, SchemaChange... changes) throws NoSuchSchemaException { try { GetNamespaceResponse response = - icebergTableOps.loadNamespace(IcebergTableOpsHelper.getIcebergNamespace(ident)); + icebergTableOps.loadNamespace(IcebergTableOpsHelper.getIcebergNamespace(ident.name())); Map metadata = response.properties(); List removals = new ArrayList<>(); Map updates = new HashMap<>(); @@ -273,7 +273,8 @@ public IcebergSchema alterSchema(NameIdentifier ident, SchemaChange... changes) UpdateNamespacePropertiesRequest.builder().updateAll(updates).removeAll(removals).build(); UpdateNamespacePropertiesResponse updateNamespacePropertiesResponse = icebergTableOps.updateNamespaceProperties( - IcebergTableOpsHelper.getIcebergNamespace(ident), updateNamespacePropertiesRequest); + IcebergTableOpsHelper.getIcebergNamespace(ident.name()), + updateNamespacePropertiesRequest); LOG.info( "Altered Iceberg schema (database) {}. UpdateResponse:\n{}", ident.name(), @@ -299,7 +300,7 @@ public IcebergSchema alterSchema(NameIdentifier ident, SchemaChange... changes) public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmptySchemaException { Preconditions.checkArgument(!cascade, "Iceberg does not support cascading delete operations."); try { - icebergTableOps.dropNamespace(IcebergTableOpsHelper.getIcebergNamespace(ident)); + icebergTableOps.dropNamespace(IcebergTableOpsHelper.getIcebergNamespace(ident.name())); LOG.info("Dropped Iceberg schema (database) {}", ident.name()); return true; } catch (NamespaceNotEmptyException e) { @@ -326,12 +327,11 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty public NameIdentifier[] listTables(Namespace namespace) throws NoSuchSchemaException { try { ListTablesResponse listTablesResponse = - icebergTableOps.listTable(IcebergTableOpsHelper.getIcebergNamespace(namespace.levels())); + icebergTableOps.listTable(IcebergTableOpsHelper.getIcebergNamespace(namespace)); return listTablesResponse.identifiers().stream() .map( tableIdentifier -> - NameIdentifier.of( - ArrayUtils.add(tableIdentifier.namespace().levels(), tableIdentifier.name()))) + NameIdentifier.of(ArrayUtils.add(namespace.levels(), tableIdentifier.name()))) .toArray(NameIdentifier[]::new); } catch (NoSuchNamespaceException e) { throw new NoSuchSchemaException( @@ -399,8 +399,10 @@ public Table alterTable(NameIdentifier tableIdent, TableChange... changes) private Table internalUpdateTable(NameIdentifier tableIdent, TableChange... changes) throws NoSuchTableException, IllegalArgumentException { try { + String[] levels = tableIdent.namespace().levels(); IcebergTableOpsHelper.IcebergTableChange icebergTableChange = - icebergTableOpsHelper.buildIcebergTableChanges(tableIdent, changes); + icebergTableOpsHelper.buildIcebergTableChanges( + NameIdentifier.of(levels[levels.length - 1], tableIdent.name()), changes); LoadTableResponse loadTableResponse = icebergTableOps.updateTable(icebergTableChange); loadTableResponse.validate(); return IcebergTable.fromIcebergTable(loadTableResponse.tableMetadata(), tableIdent.name()); @@ -446,8 +448,7 @@ private Table renameTable(NameIdentifier tableIdent, TableChange.RenameTable ren @Override public boolean dropTable(NameIdentifier tableIdent) { try { - icebergTableOps.dropTable( - TableIdentifier.of(ArrayUtils.add(tableIdent.namespace().levels(), tableIdent.name()))); + icebergTableOps.dropTable(IcebergTableOpsHelper.buildIcebergTableIdentifier(tableIdent)); LOG.info("Dropped Iceberg table {}", tableIdent.name()); return true; } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { @@ -479,20 +480,36 @@ public Table createTable( SortOrder[] sortOrders) throws NoSuchSchemaException, TableAlreadyExistsException { try { + if (!Distribution.NONE.equals(distribution)) { + throw new UnsupportedOperationException("Iceberg does not support distribution"); + } + NameIdentifier schemaIdent = NameIdentifier.of(tableIdent.namespace().levels()); if (!schemaExists(schemaIdent)) { LOG.warn("Iceberg schema (database) does not exist: {}", schemaIdent); throw new NoSuchSchemaException("Iceberg Schema (database) does not exist " + schemaIdent); } + IcebergColumn[] icebergColumns = + Arrays.stream(columns) + .map( + column -> + new IcebergColumn.Builder() + .withName(column.name()) + .withType(column.dataType()) + .withComment(column.comment()) + .withOptional(true) + .build()) + .toArray(IcebergColumn[]::new); IcebergTable createdTable = new IcebergTable.Builder() .withName(tableIdent.name()) - .withColumns(columns) + .withColumns(icebergColumns) .withComment(comment) .withPartitions(partitions) .withSortOrders(sortOrders) .withProperties(properties) + .withDistribution(Distribution.NONE) .withAuditInfo( new AuditInfo.Builder() .withCreator(currentUser()) @@ -502,7 +519,7 @@ public Table createTable( LoadTableResponse loadTableResponse = icebergTableOps.createTable( - IcebergTableOpsHelper.getIcebergNamespace(tableIdent.namespace().levels()), + IcebergTableOpsHelper.getIcebergNamespace(schemaIdent.name()), createdTable.toCreateTableRequest()); loadTableResponse.validate(); diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogPropertiesMetadata.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogPropertiesMetadata.java index 9813f2f881..7ecc8ce24e 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogPropertiesMetadata.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogPropertiesMetadata.java @@ -36,6 +36,8 @@ public class IcebergCatalogPropertiesMetadata extends BaseCatalogPropertiesMetad // it to `catalogType` automatically and pass it to Iceberg. public static final Map GRAVITINO_CONFIG_TO_ICEBERG = ImmutableMap.of( + CATALOG_BACKEND_NAME, + CATALOG_BACKEND_NAME, GRAVITINO_JDBC_USER, ICEBERG_JDBC_USER, GRAVITINO_JDBC_PASSWORD, diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTable.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTable.java index b2a20d3d07..90e13df3da 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTable.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTable.java @@ -48,6 +48,7 @@ public CreateTableRequest toCreateTableRequest() { Map resultProperties = Maps.newHashMap(IcebergTableOpsHelper.removeReservedProperties(properties)); + resultProperties.putIfAbsent(ICEBERG_COMMENT_FIELD_NAME, comment); CreateTableRequest.Builder builder = CreateTableRequest.builder() .withName(name) diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/ops/IcebergTableOpsHelper.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/ops/IcebergTableOpsHelper.java index ca349934e7..339a0f2257 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/ops/IcebergTableOpsHelper.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/ops/IcebergTableOpsHelper.java @@ -34,7 +34,6 @@ import javax.ws.rs.NotSupportedException; import lombok.Getter; import lombok.Setter; -import org.apache.commons.lang3.ArrayUtils; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.Transaction; @@ -287,22 +286,45 @@ public static Map removeReservedProperties(Map c .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } - public static Namespace getIcebergNamespace(NameIdentifier ident) { - return getIcebergNamespace(ArrayUtils.add(ident.namespace().levels(), ident.name())); + /** + * Gravitino only supports single-level namespace storage management, which differs from Iceberg. + * Therefore, we need to handle this difference here. + * + * @param namespace GravitinoNamespace + * @return + */ + public static Namespace getIcebergNamespace(com.datastrato.gravitino.Namespace namespace) { + return getIcebergNamespace(namespace.level(namespace.length() - 1)); } public static Namespace getIcebergNamespace(String... level) { return Namespace.of(level); } + /** + * Gravitino only supports tables managed with a single level hierarchy, such as + * `{namespace}.{table}`, so we need to perform truncation here. + * + * @param namespace + * @param name + * @return + */ public static TableIdentifier buildIcebergTableIdentifier( com.datastrato.gravitino.Namespace namespace, String name) { - return TableIdentifier.of(ArrayUtils.add(namespace.levels(), name)); + String[] levels = namespace.levels(); + return TableIdentifier.of(levels[levels.length - 1], name); } + /** + * Gravitino only supports tables managed with a single level hierarchy, such as + * `{namespace}.{table}`, so we need to perform truncation here. + * + * @param nameIdentifier GravitinoNameIdentifier + * @return + */ public static TableIdentifier buildIcebergTableIdentifier(NameIdentifier nameIdentifier) { - return TableIdentifier.of( - ArrayUtils.add(nameIdentifier.namespace().levels(), nameIdentifier.name())); + String[] levels = nameIdentifier.namespace().levels(); + return TableIdentifier.of(levels[levels.length - 1], nameIdentifier.name()); } @VisibleForTesting diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/utils/IcebergCatalogUtil.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/utils/IcebergCatalogUtil.java index 11d7947003..1c35403f10 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/utils/IcebergCatalogUtil.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/utils/IcebergCatalogUtil.java @@ -45,7 +45,7 @@ private static JdbcCatalog loadJdbcCatalog(Map properties) { new JdbcCatalog( null, null, - Boolean.parseBoolean(properties.getOrDefault(ICEBERG_JDBC_INITIALIZE, "false"))); + Boolean.parseBoolean(properties.getOrDefault(ICEBERG_JDBC_INITIALIZE, "true"))); HdfsConfiguration hdfsConfiguration = new HdfsConfiguration(); properties.forEach(hdfsConfiguration::set); jdbcCatalog.setConf(hdfsConfiguration); diff --git a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/TestIcebergSchema.java b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/TestIcebergSchema.java index c566f39c53..c4f45d0419 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/TestIcebergSchema.java +++ b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/TestIcebergSchema.java @@ -15,6 +15,8 @@ import java.time.Instant; import java.util.Arrays; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -43,8 +45,11 @@ public void testCreateIcebergSchema() { Assertions.assertTrue(icebergCatalog.asSchemas().schemaExists(ident)); - NameIdentifier[] idents = icebergCatalog.asSchemas().listSchemas(ident.namespace()); - Assertions.assertTrue(Arrays.asList(idents).contains(ident)); + Set names = + Arrays.stream(icebergCatalog.asSchemas().listSchemas(ident.namespace())) + .map(NameIdentifier::name) + .collect(Collectors.toSet()); + Assertions.assertTrue(names.contains(ident.name())); // Test schema already exists Throwable exception = diff --git a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/TestIcebergTable.java b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/TestIcebergTable.java index 89755ef8fb..1143744348 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/TestIcebergTable.java +++ b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/TestIcebergTable.java @@ -17,7 +17,6 @@ import com.datastrato.gravitino.meta.CatalogEntity; import com.datastrato.gravitino.rel.Column; import com.datastrato.gravitino.rel.Distribution; -import com.datastrato.gravitino.rel.Distribution.Strategy; import com.datastrato.gravitino.rel.SortOrder; import com.datastrato.gravitino.rel.SortOrder.Direction; import com.datastrato.gravitino.rel.SortOrder.NullOrdering; @@ -107,14 +106,6 @@ private static CatalogEntity createDefaultCatalogEntity() { return entity; } - private Distribution createDistribution() { - return Distribution.builder() - .withNumber(10) - .withTransforms(new Transform[] {Transforms.field(new String[] {"col_1"})}) - .withStrategy(Strategy.EVEN) - .build(); - } - private SortOrder[] createSortOrder() { return new SortOrder[] { SortOrder.builder() @@ -159,7 +150,7 @@ public void testCreateIcebergTable() throws IOException { ICEBERG_COMMENT, properties, new Transform[0], - null, + Distribution.NONE, sortOrders); Assertions.assertEquals(tableIdentifier.name(), table.name()); Assertions.assertEquals(ICEBERG_COMMENT, table.comment()); @@ -198,7 +189,7 @@ public void testCreateIcebergTable() throws IOException { ICEBERG_COMMENT, properties, new Transform[0], - null, + Distribution.NONE, sortOrders)); Assertions.assertTrue(exception.getMessage().contains("Table already exists")); } @@ -380,7 +371,7 @@ public void testAlterIcebergTable() { .build(); Column[] columns = new Column[] {col1, col2}; - Distribution distribution = createDistribution(); + Distribution distribution = Distribution.NONE; SortOrder[] sortOrders = createSortOrder(); Table createdTable = diff --git a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/utils/TestIcebergCatalogUtil.java b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/utils/TestIcebergCatalogUtil.java index 430c932852..3b30a53358 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/utils/TestIcebergCatalogUtil.java +++ b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/utils/TestIcebergCatalogUtil.java @@ -5,6 +5,7 @@ package com.datastrato.gravitino.catalog.lakehouse.iceberg.utils; +import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergCatalogPropertiesMetadata; import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergConfig; import java.util.HashMap; import java.util.Map; @@ -47,6 +48,7 @@ void testLoadCatalog() { Map properties = new HashMap<>(); properties.put(CatalogProperties.URI, "jdbc://0.0.0.0:3306"); properties.put(CatalogProperties.WAREHOUSE_LOCATION, "test"); + properties.put(IcebergCatalogPropertiesMetadata.ICEBERG_JDBC_INITIALIZE, "false"); catalog = IcebergCatalogUtil.loadCatalogBackend("jdbc", properties); Assertions.assertTrue(catalog instanceof JdbcCatalog); diff --git a/common/src/main/java/com/datastrato/gravitino/dto/util/DTOConverters.java b/common/src/main/java/com/datastrato/gravitino/dto/util/DTOConverters.java index f005fa06e4..f58d4bba0c 100644 --- a/common/src/main/java/com/datastrato/gravitino/dto/util/DTOConverters.java +++ b/common/src/main/java/com/datastrato/gravitino/dto/util/DTOConverters.java @@ -97,7 +97,7 @@ public static TableDTO toDTO(Table table) { } public static DistributionDTO toDTO(Distribution distribution) { - if (Distribution.NONE.equals(distribution)) { + if (Distribution.NONE.equals(distribution) || null == distribution) { return DistributionDTO.NONE; } diff --git a/integration-test/build.gradle.kts b/integration-test/build.gradle.kts index 59917dda58..f5b0c85b39 100644 --- a/integration-test/build.gradle.kts +++ b/integration-test/build.gradle.kts @@ -73,7 +73,9 @@ dependencies { exclude("org.eclipse.jetty.aggregate", "jetty-all") exclude("org.eclipse.jetty.orbit", "javax.servlet") } - + testImplementation(libs.hadoop2.hdfs) { + exclude("*") + } testImplementation(libs.hadoop2.mapreduce.client.core) { exclude("*") } @@ -105,8 +107,8 @@ dependencies { exclude("org.apache.zookeeper") exclude("io.dropwizard.metrics") } - testImplementation(libs.scala.collection.compat) testImplementation(libs.slf4j.jdk14) + testImplementation(libs.scala.collection.compat) testImplementation(libs.sqlite.jdbc) } diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/lakehouse/iceberg/CatalogIcebergIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/lakehouse/iceberg/CatalogIcebergIT.java new file mode 100644 index 0000000000..d0a85bb41e --- /dev/null +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/lakehouse/iceberg/CatalogIcebergIT.java @@ -0,0 +1,588 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.integration.test.catalog.lakehouse.iceberg; + +import static com.datastrato.gravitino.rel.transforms.Transforms.day; +import static com.datastrato.gravitino.rel.transforms.Transforms.identity; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import com.datastrato.gravitino.Catalog; +import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.Namespace; +import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergCatalogBackend; +import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergConfig; +import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTable; +import com.datastrato.gravitino.catalog.lakehouse.iceberg.ops.IcebergTableOpsHelper; +import com.datastrato.gravitino.client.GravitinoMetaLake; +import com.datastrato.gravitino.dto.rel.ColumnDTO; +import com.datastrato.gravitino.exceptions.NoSuchSchemaException; +import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException; +import com.datastrato.gravitino.exceptions.TableAlreadyExistsException; +import com.datastrato.gravitino.integration.test.util.AbstractIT; +import com.datastrato.gravitino.integration.test.util.GravitinoITUtils; +import com.datastrato.gravitino.rel.Column; +import com.datastrato.gravitino.rel.Distribution; +import com.datastrato.gravitino.rel.Schema; +import com.datastrato.gravitino.rel.SchemaChange; +import com.datastrato.gravitino.rel.SortOrder; +import com.datastrato.gravitino.rel.SortOrder.Direction; +import com.datastrato.gravitino.rel.SortOrder.NullOrdering; +import com.datastrato.gravitino.rel.SupportsSchemas; +import com.datastrato.gravitino.rel.Table; +import com.datastrato.gravitino.rel.TableCatalog; +import com.datastrato.gravitino.rel.TableChange; +import com.datastrato.gravitino.rel.transforms.Transform; +import com.datastrato.gravitino.rel.transforms.Transforms; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import io.substrait.type.TypeCreator; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.hive.HiveCatalog; +import org.apache.thrift.TException; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; + +@Tag("gravitino-docker-it") +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +public class CatalogIcebergIT extends AbstractIT { + public static String metalakeName = GravitinoITUtils.genRandomName("iceberg_it_metalake"); + public static String catalogName = GravitinoITUtils.genRandomName("iceberg_it_catalog"); + public static String schemaName = GravitinoITUtils.genRandomName("iceberg_it_schema"); + public static String tableName = GravitinoITUtils.genRandomName("iceberg_it_table"); + public static String alertTableName = "alert_table_name"; + public static String table_comment = "table_comment"; + + public static String schema_comment = "schema_comment"; + public static String ICEBERG_COL_NAME1 = "iceberg_col_name1"; + public static String ICEBERG_COL_NAME2 = "iceberg_col_name2"; + public static String ICEBERG_COL_NAME3 = "iceberg_col_name3"; + private static final String provider = "lakehouse-iceberg"; + private static final String WAREHOUSE = "file:///tmp/iceberg"; + private static final String URI = "thrift://127.0.0.1:9083"; + + private static GravitinoMetaLake metalake; + + private static Catalog catalog; + + private static HiveCatalog hiveCatalog; + + @BeforeAll + public static void startup() { + createMetalake(); + createCatalog(); + createSchema(); + } + + @AfterAll + public static void stop() { + clearTableAndSchema(); + client.dropMetalake(NameIdentifier.of(metalakeName)); + } + + @AfterEach + private void resetSchema() { + clearTableAndSchema(); + createSchema(); + } + + private static void clearTableAndSchema() { + NameIdentifier[] nameIdentifiers = + catalog.asTableCatalog().listTables(Namespace.of(metalakeName, catalogName, schemaName)); + for (NameIdentifier nameIdentifier : nameIdentifiers) { + catalog.asTableCatalog().dropTable(nameIdentifier); + } + catalog.asSchemas().dropSchema(NameIdentifier.of(metalakeName, catalogName, schemaName), false); + } + + private static void createMetalake() { + GravitinoMetaLake[] gravitinoMetaLakes = client.listMetalakes(); + Assertions.assertEquals(0, gravitinoMetaLakes.length); + + GravitinoMetaLake createdMetalake = + client.createMetalake(NameIdentifier.of(metalakeName), "comment", Collections.emptyMap()); + GravitinoMetaLake loadMetalake = client.loadMetalake(NameIdentifier.of(metalakeName)); + Assertions.assertEquals(createdMetalake, loadMetalake); + + metalake = loadMetalake; + } + + private static void createCatalog() { + Map catalogProperties = Maps.newHashMap(); + catalogProperties.put("key1", "val1"); + catalogProperties.put("key2", "val2"); + + catalogProperties.put( + IcebergConfig.CATALOG_BACKEND.getKey(), IcebergCatalogBackend.HIVE.name()); + catalogProperties.put(IcebergConfig.CATALOG_URI.getKey(), URI); + catalogProperties.put(IcebergConfig.CATALOG_WAREHOUSE.getKey(), WAREHOUSE); + + Map hiveProperties = Maps.newHashMap(); + hiveProperties.put(IcebergConfig.CATALOG_URI.getKey(), URI); + hiveProperties.put(IcebergConfig.CATALOG_WAREHOUSE.getKey(), WAREHOUSE); + + hiveCatalog = new HiveCatalog(); + hiveCatalog.setConf(new HdfsConfiguration()); + hiveCatalog.initialize("hive", hiveProperties); + + Catalog createdCatalog = + metalake.createCatalog( + NameIdentifier.of(metalakeName, catalogName), + Catalog.Type.RELATIONAL, + provider, + "comment", + catalogProperties); + Catalog loadCatalog = metalake.loadCatalog(NameIdentifier.of(metalakeName, catalogName)); + Assertions.assertEquals(createdCatalog, loadCatalog); + + catalog = loadCatalog; + } + + private static void createSchema() { + NameIdentifier ident = NameIdentifier.of(metalakeName, catalogName, schemaName); + Map prop = Maps.newHashMap(); + prop.put("key1", "val1"); + prop.put("key2", "val2"); + + Schema createdSchema = catalog.asSchemas().createSchema(ident, schema_comment, prop); + Schema loadSchema = catalog.asSchemas().loadSchema(ident); + Assertions.assertEquals(createdSchema.name(), loadSchema.name()); + prop.forEach((key, value) -> Assertions.assertEquals(loadSchema.properties().get(key), value)); + } + + private ColumnDTO[] createColumns() { + ColumnDTO col1 = + new ColumnDTO.Builder() + .withName(ICEBERG_COL_NAME1) + .withDataType(TypeCreator.NULLABLE.I32) + .withComment("col_1_comment") + .build(); + ColumnDTO col2 = + new ColumnDTO.Builder() + .withName(ICEBERG_COL_NAME2) + .withDataType(TypeCreator.NULLABLE.DATE) + .withComment("col_2_comment") + .build(); + ColumnDTO col3 = + new ColumnDTO.Builder() + .withName(ICEBERG_COL_NAME3) + .withDataType(TypeCreator.NULLABLE.STRING) + .withComment("col_3_comment") + .build(); + return new ColumnDTO[] {col1, col2, col3}; + } + + private Map createProperties() { + Map properties = Maps.newHashMap(); + properties.put("key1", "val1"); + properties.put("key2", "val2"); + return properties; + } + + @Test + void testOperationIcebergSchema() { + SupportsSchemas schemas = catalog.asSchemas(); + Namespace namespace = Namespace.of(metalakeName, catalogName); + // list schema check. + NameIdentifier[] nameIdentifiers = schemas.listSchemas(namespace); + Set schemaNames = + Arrays.stream(nameIdentifiers).map(NameIdentifier::name).collect(Collectors.toSet()); + Assertions.assertTrue(schemaNames.contains(schemaName)); + + List icebergNamespaces = + hiveCatalog.listNamespaces(IcebergTableOpsHelper.getIcebergNamespace()); + schemaNames = + icebergNamespaces.stream().map(ns -> ns.level(ns.length() - 1)).collect(Collectors.toSet()); + Assertions.assertTrue(schemaNames.contains(schemaName)); + + // create schema check. + String testSchemaName = GravitinoITUtils.genRandomName("test_schema_1"); + NameIdentifier schemaIdent = NameIdentifier.of(metalakeName, catalogName, testSchemaName); + schemas.createSchema(schemaIdent, schema_comment, Collections.emptyMap()); + nameIdentifiers = schemas.listSchemas(Namespace.of(metalakeName, catalogName)); + Map schemaMap = + Arrays.stream(nameIdentifiers).collect(Collectors.toMap(NameIdentifier::name, v -> v)); + Assertions.assertTrue(schemaMap.containsKey(testSchemaName)); + + icebergNamespaces = hiveCatalog.listNamespaces(IcebergTableOpsHelper.getIcebergNamespace()); + schemaNames = + icebergNamespaces.stream().map(ns -> ns.level(ns.length() - 1)).collect(Collectors.toSet()); + Assertions.assertTrue(schemaNames.contains(testSchemaName)); + + // alert、load schema check. + schemas.alterSchema(schemaIdent, SchemaChange.setProperty("t1", "v1")); + Schema schema = schemas.loadSchema(schemaIdent); + String val = schema.properties().get("t1"); + Assertions.assertEquals("v1", val); + + Map hiveCatalogProps = + hiveCatalog.loadNamespaceMetadata( + IcebergTableOpsHelper.getIcebergNamespace(schemaIdent.name())); + Assertions.assertTrue(hiveCatalogProps.containsKey("t1")); + + Assertions.assertThrows( + SchemaAlreadyExistsException.class, + () -> schemas.createSchema(schemaIdent, schema_comment, Collections.emptyMap())); + + // drop schema check. + schemas.dropSchema(schemaIdent, false); + Assertions.assertThrows(NoSuchSchemaException.class, () -> schemas.loadSchema(schemaIdent)); + Assertions.assertThrows( + NoSuchNamespaceException.class, + () -> + hiveCatalog.loadNamespaceMetadata( + IcebergTableOpsHelper.getIcebergNamespace(schemaIdent.name()))); + + nameIdentifiers = schemas.listSchemas(Namespace.of(metalakeName, catalogName)); + schemaMap = + Arrays.stream(nameIdentifiers).collect(Collectors.toMap(NameIdentifier::name, v -> v)); + Assertions.assertFalse(schemaMap.containsKey(testSchemaName)); + Assertions.assertFalse( + schemas.dropSchema(NameIdentifier.of(metalakeName, catalogName, "no-exits"), false)); + TableCatalog tableCatalog = catalog.asTableCatalog(); + + // create failed check. + NameIdentifier table = + NameIdentifier.of(metalakeName, catalogName, testSchemaName, "test_table"); + Assertions.assertThrows( + NoSuchSchemaException.class, + () -> + tableCatalog.createTable( + table, + createColumns(), + table_comment, + createProperties(), + null, + Distribution.NONE, + null)); + // drop schema failed check. + Assertions.assertFalse(schemas.dropSchema(schemaIdent, true)); + Assertions.assertFalse(schemas.dropSchema(schemaIdent, false)); + Assertions.assertFalse(tableCatalog.dropTable(table)); + icebergNamespaces = hiveCatalog.listNamespaces(IcebergTableOpsHelper.getIcebergNamespace()); + schemaNames = + icebergNamespaces.stream().map(ns -> ns.level(ns.length() - 1)).collect(Collectors.toSet()); + Assertions.assertTrue(schemaNames.contains(schemaName)); + } + + @Test + void testCreateAndLoadIcebergTable() { + // Create table from Graviton API + ColumnDTO[] columns = createColumns(); + + NameIdentifier tableIdentifier = + NameIdentifier.of(metalakeName, catalogName, schemaName, tableName); + Distribution distribution = Distribution.NONE; + + final SortOrder[] sortOrders = + new SortOrder[] { + SortOrder.builder() + .withNullOrdering(NullOrdering.FIRST) + .withDirection(Direction.DESC) + .withTransform(Transforms.field(new String[] {ICEBERG_COL_NAME2})) + .build() + }; + + Transform[] partitioning = + new Transform[] { + day(new String[] {columns[1].name()}), + }; + + Map properties = createProperties(); + TableCatalog tableCatalog = catalog.asTableCatalog(); + Table createdTable = + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + distribution, + sortOrders); + Assertions.assertEquals(createdTable.name(), tableName); + Map resultProp = createdTable.properties(); + for (Map.Entry entry : properties.entrySet()) { + Assertions.assertTrue(resultProp.containsKey(entry.getKey())); + Assertions.assertEquals(entry.getValue(), resultProp.get(entry.getKey())); + } + Assertions.assertEquals(createdTable.columns().length, columns.length); + + for (int i = 0; i < columns.length; i++) { + Assertions.assertEquals(createdTable.columns()[i], columns[i]); + } + + // TODO add partitioning and sort order check + Assertions.assertEquals(partitioning.length, createdTable.partitioning().length); + Assertions.assertEquals(sortOrders.length, createdTable.sortOrder().length); + + Table loadTable = tableCatalog.loadTable(tableIdentifier); + Assertions.assertEquals(tableName, loadTable.name()); + Assertions.assertEquals(table_comment, loadTable.comment()); + resultProp = loadTable.properties(); + for (Map.Entry entry : properties.entrySet()) { + Assertions.assertTrue(resultProp.containsKey(entry.getKey())); + Assertions.assertEquals(entry.getValue(), resultProp.get(entry.getKey())); + } + Assertions.assertEquals(loadTable.columns().length, columns.length); + for (int i = 0; i < columns.length; i++) { + Assertions.assertEquals(columns[i], loadTable.columns()[i]); + } + + Assertions.assertEquals(partitioning.length, loadTable.partitioning().length); + Assertions.assertEquals(sortOrders.length, loadTable.sortOrder().length); + + // catalog load check + org.apache.iceberg.Table table = + hiveCatalog.loadTable(IcebergTableOpsHelper.buildIcebergTableIdentifier(tableIdentifier)); + Assertions.assertEquals(tableName, table.name().substring(table.name().lastIndexOf(".") + 1)); + Assertions.assertEquals( + table_comment, table.properties().get(IcebergTable.ICEBERG_COMMENT_FIELD_NAME)); + resultProp = table.properties(); + for (Map.Entry entry : properties.entrySet()) { + Assertions.assertTrue(resultProp.containsKey(entry.getKey())); + Assertions.assertEquals(entry.getValue(), resultProp.get(entry.getKey())); + } + org.apache.iceberg.Schema icebergSchema = table.schema(); + Assertions.assertEquals(icebergSchema.columns().size(), columns.length); + for (int i = 0; i < columns.length; i++) { + Assertions.assertNotNull(icebergSchema.findField(columns[i].name())); + } + Assertions.assertEquals(partitioning.length, table.spec().fields().size()); + Assertions.assertEquals(partitioning.length, table.sortOrder().fields().size()); + + Assertions.assertThrows( + TableAlreadyExistsException.class, + () -> + catalog + .asTableCatalog() + .createTable( + tableIdentifier, + columns, + table_comment, + properties, + new Transform[0], + distribution, + sortOrders)); + } + + @Test + void testListAndDropIcebergTable() { + ColumnDTO[] columns = createColumns(); + + NameIdentifier table1 = NameIdentifier.of(metalakeName, catalogName, schemaName, "table_1"); + + Map properties = createProperties(); + TableCatalog tableCatalog = catalog.asTableCatalog(); + tableCatalog.createTable( + table1, + columns, + table_comment, + properties, + new Transform[0], + Distribution.NONE, + new SortOrder[0]); + NameIdentifier[] nameIdentifiers = + tableCatalog.listTables(Namespace.of(metalakeName, catalogName, schemaName)); + Assertions.assertEquals(1, nameIdentifiers.length); + Assertions.assertEquals("table_1", nameIdentifiers[0].name()); + + List tableIdentifiers = + hiveCatalog.listTables(IcebergTableOpsHelper.getIcebergNamespace(schemaName)); + Assertions.assertEquals(1, tableIdentifiers.size()); + Assertions.assertEquals("table_1", tableIdentifiers.get(0).name()); + + NameIdentifier table2 = NameIdentifier.of(metalakeName, catalogName, schemaName, "table_2"); + tableCatalog.createTable( + table2, + columns, + table_comment, + properties, + new Transform[0], + Distribution.NONE, + new SortOrder[0]); + nameIdentifiers = tableCatalog.listTables(Namespace.of(metalakeName, catalogName, schemaName)); + Assertions.assertEquals(2, nameIdentifiers.length); + Assertions.assertEquals("table_1", nameIdentifiers[0].name()); + Assertions.assertEquals("table_2", nameIdentifiers[1].name()); + + tableIdentifiers = + hiveCatalog.listTables(IcebergTableOpsHelper.getIcebergNamespace(schemaName)); + Assertions.assertEquals(2, tableIdentifiers.size()); + Assertions.assertEquals("table_1", tableIdentifiers.get(0).name()); + Assertions.assertEquals("table_2", tableIdentifiers.get(1).name()); + + Assertions.assertDoesNotThrow(() -> tableCatalog.dropTable(table1)); + + nameIdentifiers = tableCatalog.listTables(Namespace.of(metalakeName, catalogName, schemaName)); + Assertions.assertEquals(1, nameIdentifiers.length); + Assertions.assertEquals("table_2", nameIdentifiers[0].name()); + + Assertions.assertDoesNotThrow(() -> tableCatalog.dropTable(table2)); + Namespace schemaNamespace = Namespace.of(metalakeName, catalogName, schemaName); + nameIdentifiers = tableCatalog.listTables(schemaNamespace); + Assertions.assertEquals(0, nameIdentifiers.length); + + tableIdentifiers = + hiveCatalog.listTables(IcebergTableOpsHelper.getIcebergNamespace(schemaName)); + Assertions.assertEquals(0, tableIdentifiers.size()); + } + + @Test + public void testAlterIcebergTable() throws TException, InterruptedException { + ColumnDTO[] columns = createColumns(); + catalog + .asTableCatalog() + .createTable( + NameIdentifier.of(metalakeName, catalogName, schemaName, tableName), + columns, + table_comment, + createProperties(), + new Transform[] {identity(columns[0])}); + Assertions.assertThrows( + IllegalArgumentException.class, + () -> { + catalog + .asTableCatalog() + .alterTable( + NameIdentifier.of(metalakeName, catalogName, schemaName, tableName), + TableChange.rename(alertTableName), + TableChange.updateComment(table_comment + "_new")); + }); + + catalog + .asTableCatalog() + .alterTable( + NameIdentifier.of(metalakeName, catalogName, schemaName, tableName), + TableChange.rename(alertTableName)); + + catalog + .asTableCatalog() + .alterTable( + NameIdentifier.of(metalakeName, catalogName, schemaName, alertTableName), + TableChange.updateComment(table_comment + "_new"), + TableChange.removeProperty("key1"), + TableChange.setProperty("key2", "val2_new"), + TableChange.addColumn(new String[] {"col_4"}, TypeCreator.NULLABLE.STRING), + TableChange.renameColumn(new String[] {ICEBERG_COL_NAME2}, "col_2_new"), + TableChange.updateColumnComment(new String[] {ICEBERG_COL_NAME1}, "comment_new"), + TableChange.updateColumnType( + new String[] {ICEBERG_COL_NAME1}, TypeCreator.NULLABLE.I32)); + + Table table = + catalog + .asTableCatalog() + .loadTable(NameIdentifier.of(metalakeName, catalogName, schemaName, alertTableName)); + Assertions.assertEquals(alertTableName, table.name()); + Assertions.assertEquals("val2_new", table.properties().get("key2")); + + Assertions.assertEquals(ICEBERG_COL_NAME1, table.columns()[0].name()); + Assertions.assertEquals(TypeCreator.NULLABLE.I32, table.columns()[0].dataType()); + Assertions.assertEquals("comment_new", table.columns()[0].comment()); + + Assertions.assertEquals("col_2_new", table.columns()[1].name()); + Assertions.assertEquals(TypeCreator.NULLABLE.DATE, table.columns()[1].dataType()); + Assertions.assertEquals("col_2_comment", table.columns()[1].comment()); + + Assertions.assertEquals(ICEBERG_COL_NAME3, table.columns()[2].name()); + Assertions.assertEquals(TypeCreator.NULLABLE.STRING, table.columns()[2].dataType()); + Assertions.assertEquals("col_3_comment", table.columns()[2].comment()); + + Assertions.assertEquals("col_4", table.columns()[3].name()); + Assertions.assertEquals(TypeCreator.NULLABLE.STRING, table.columns()[3].dataType()); + Assertions.assertNull(table.columns()[3].comment()); + + Assertions.assertEquals(1, table.partitioning().length); + Assertions.assertEquals( + columns[0].name(), ((Transforms.NamedReference) table.partitioning()[0]).value()[0]); + + ColumnDTO col1 = + new ColumnDTO.Builder() + .withName("name") + .withDataType(TypeCreator.NULLABLE.STRING) + .withComment("comment") + .build(); + ColumnDTO col2 = + new ColumnDTO.Builder() + .withName("address") + .withDataType(TypeCreator.NULLABLE.STRING) + .withComment("comment") + .build(); + ColumnDTO col3 = + new ColumnDTO.Builder() + .withName("date_of_birth") + .withDataType(TypeCreator.NULLABLE.DATE) + .withComment("comment") + .build(); + ColumnDTO[] newColumns = new ColumnDTO[] {col1, col2, col3}; + NameIdentifier tableIdentifier = + NameIdentifier.of( + metalakeName, + catalogName, + schemaName, + GravitinoITUtils.genRandomName("CatalogHiveIT_table")); + catalog + .asTableCatalog() + .createTable( + tableIdentifier, + newColumns, + table_comment, + ImmutableMap.of(), + new Transform[0], + Distribution.NONE, + new SortOrder[0]); + + IllegalArgumentException illegalArgumentException = + assertThrows( + IllegalArgumentException.class, + () -> + catalog + .asTableCatalog() + .alterTable( + tableIdentifier, + TableChange.updateColumnPosition( + new String[] {"no_column"}, TableChange.ColumnPosition.first()))); + Assertions.assertTrue(illegalArgumentException.getMessage().contains("no_column")); + + catalog + .asTableCatalog() + .alterTable( + tableIdentifier, + TableChange.updateColumnPosition( + new String[] {col1.name()}, TableChange.ColumnPosition.after(col2.name()))); + + Table updateColumnPositionTable = catalog.asTableCatalog().loadTable(tableIdentifier); + + Column[] updateCols = updateColumnPositionTable.columns(); + Assertions.assertEquals(3, updateCols.length); + Assertions.assertEquals(col2.name(), updateCols[0].name()); + Assertions.assertEquals(col1.name(), updateCols[1].name()); + Assertions.assertEquals(col3.name(), updateCols[2].name()); + + Assertions.assertDoesNotThrow( + () -> + catalog + .asTableCatalog() + .alterTable( + tableIdentifier, + TableChange.deleteColumn(new String[] {col3.name()}, true), + TableChange.deleteColumn(new String[] {col2.name()}, true))); + Table delColTable = catalog.asTableCatalog().loadTable(tableIdentifier); + Assertions.assertEquals(1, delColTable.columns().length); + Assertions.assertEquals(col1.name(), delColTable.columns()[0].name()); + } +}