Skip to content

Commit

Permalink
[#483] test(lakehouse-iceberg): add graviton IT test for iceberg (#529)
Browse files Browse the repository at this point in the history
What changes were proposed in this pull request?
Integrate testing using Graviton Server, used hiveCatalog for end-to-end
validation.

Why are the changes needed?

Issues: #483

Fix: #483 

Does this PR introduce any user-facing change?
No

How was this patch tested?
CatalogIcebergIT

---------

Co-authored-by: Clearvive <[email protected]>
  • Loading branch information
Clearvive and Clearvive authored Oct 18, 2023
1 parent c6c07b9 commit d066d6b
Show file tree
Hide file tree
Showing 12 changed files with 670 additions and 40 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/integration-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ on:

env:
HIVE_IMAGE_NAME: datastrato/gravitino-ci-hive
HIVE_IMAGE_TAG_NAME: 0.1.2
HIVE_IMAGE_TAG_NAME: 0.1.4

concurrency:
group: ${{ github.worklfow }}-${{ github.event.pull_request.number || github.ref }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ public void close() {}
@Override
public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogException {
try {
return icebergTableOps
.listNamespace(IcebergTableOpsHelper.getIcebergNamespace(namespace.levels())).namespaces()

return icebergTableOps.listNamespace(IcebergTableOpsHelper.getIcebergNamespace()).namespaces()
.stream()
.map(icebergNamespace -> NameIdentifier.of(icebergNamespace.levels()))
.toArray(NameIdentifier[]::new);
Expand Down Expand Up @@ -161,7 +161,7 @@ public IcebergSchema createSchema(
.build())
.build();
icebergTableOps.createNamespace(
createdSchema.toCreateRequest(IcebergTableOpsHelper.getIcebergNamespace(ident)));
createdSchema.toCreateRequest(IcebergTableOpsHelper.getIcebergNamespace(ident.name())));
LOG.info(
"Created Iceberg schema (database) {} in Iceberg\ncurrentUser:{} \ncomment: {} \nmetadata: {}",
ident.name(),
Expand Down Expand Up @@ -194,7 +194,7 @@ public IcebergSchema createSchema(
public IcebergSchema loadSchema(NameIdentifier ident) throws NoSuchSchemaException {
try {
GetNamespaceResponse response =
icebergTableOps.loadNamespace(IcebergTableOpsHelper.getIcebergNamespace(ident));
icebergTableOps.loadNamespace(IcebergTableOpsHelper.getIcebergNamespace(ident.name()));
IcebergSchema icebergSchema =
new IcebergSchema.Builder()
.withName(ident.name())
Expand Down Expand Up @@ -229,7 +229,7 @@ public IcebergSchema alterSchema(NameIdentifier ident, SchemaChange... changes)
throws NoSuchSchemaException {
try {
GetNamespaceResponse response =
icebergTableOps.loadNamespace(IcebergTableOpsHelper.getIcebergNamespace(ident));
icebergTableOps.loadNamespace(IcebergTableOpsHelper.getIcebergNamespace(ident.name()));
Map<String, String> metadata = response.properties();
List<String> removals = new ArrayList<>();
Map<String, String> updates = new HashMap<>();
Expand Down Expand Up @@ -273,7 +273,8 @@ public IcebergSchema alterSchema(NameIdentifier ident, SchemaChange... changes)
UpdateNamespacePropertiesRequest.builder().updateAll(updates).removeAll(removals).build();
UpdateNamespacePropertiesResponse updateNamespacePropertiesResponse =
icebergTableOps.updateNamespaceProperties(
IcebergTableOpsHelper.getIcebergNamespace(ident), updateNamespacePropertiesRequest);
IcebergTableOpsHelper.getIcebergNamespace(ident.name()),
updateNamespacePropertiesRequest);
LOG.info(
"Altered Iceberg schema (database) {}. UpdateResponse:\n{}",
ident.name(),
Expand All @@ -299,7 +300,7 @@ public IcebergSchema alterSchema(NameIdentifier ident, SchemaChange... changes)
public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmptySchemaException {
Preconditions.checkArgument(!cascade, "Iceberg does not support cascading delete operations.");
try {
icebergTableOps.dropNamespace(IcebergTableOpsHelper.getIcebergNamespace(ident));
icebergTableOps.dropNamespace(IcebergTableOpsHelper.getIcebergNamespace(ident.name()));
LOG.info("Dropped Iceberg schema (database) {}", ident.name());
return true;
} catch (NamespaceNotEmptyException e) {
Expand All @@ -326,12 +327,11 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty
public NameIdentifier[] listTables(Namespace namespace) throws NoSuchSchemaException {
try {
ListTablesResponse listTablesResponse =
icebergTableOps.listTable(IcebergTableOpsHelper.getIcebergNamespace(namespace.levels()));
icebergTableOps.listTable(IcebergTableOpsHelper.getIcebergNamespace(namespace));
return listTablesResponse.identifiers().stream()
.map(
tableIdentifier ->
NameIdentifier.of(
ArrayUtils.add(tableIdentifier.namespace().levels(), tableIdentifier.name())))
NameIdentifier.of(ArrayUtils.add(namespace.levels(), tableIdentifier.name())))
.toArray(NameIdentifier[]::new);
} catch (NoSuchNamespaceException e) {
throw new NoSuchSchemaException(
Expand Down Expand Up @@ -399,8 +399,10 @@ public Table alterTable(NameIdentifier tableIdent, TableChange... changes)
private Table internalUpdateTable(NameIdentifier tableIdent, TableChange... changes)
throws NoSuchTableException, IllegalArgumentException {
try {
String[] levels = tableIdent.namespace().levels();
IcebergTableOpsHelper.IcebergTableChange icebergTableChange =
icebergTableOpsHelper.buildIcebergTableChanges(tableIdent, changes);
icebergTableOpsHelper.buildIcebergTableChanges(
NameIdentifier.of(levels[levels.length - 1], tableIdent.name()), changes);
LoadTableResponse loadTableResponse = icebergTableOps.updateTable(icebergTableChange);
loadTableResponse.validate();
return IcebergTable.fromIcebergTable(loadTableResponse.tableMetadata(), tableIdent.name());
Expand Down Expand Up @@ -446,8 +448,7 @@ private Table renameTable(NameIdentifier tableIdent, TableChange.RenameTable ren
@Override
public boolean dropTable(NameIdentifier tableIdent) {
try {
icebergTableOps.dropTable(
TableIdentifier.of(ArrayUtils.add(tableIdent.namespace().levels(), tableIdent.name())));
icebergTableOps.dropTable(IcebergTableOpsHelper.buildIcebergTableIdentifier(tableIdent));
LOG.info("Dropped Iceberg table {}", tableIdent.name());
return true;
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
Expand Down Expand Up @@ -479,20 +480,36 @@ public Table createTable(
SortOrder[] sortOrders)
throws NoSuchSchemaException, TableAlreadyExistsException {
try {
if (!Distribution.NONE.equals(distribution)) {
throw new UnsupportedOperationException("Iceberg does not support distribution");
}

NameIdentifier schemaIdent = NameIdentifier.of(tableIdent.namespace().levels());
if (!schemaExists(schemaIdent)) {
LOG.warn("Iceberg schema (database) does not exist: {}", schemaIdent);
throw new NoSuchSchemaException("Iceberg Schema (database) does not exist " + schemaIdent);
}
IcebergColumn[] icebergColumns =
Arrays.stream(columns)
.map(
column ->
new IcebergColumn.Builder()
.withName(column.name())
.withType(column.dataType())
.withComment(column.comment())
.withOptional(true)
.build())
.toArray(IcebergColumn[]::new);

IcebergTable createdTable =
new IcebergTable.Builder()
.withName(tableIdent.name())
.withColumns(columns)
.withColumns(icebergColumns)
.withComment(comment)
.withPartitions(partitions)
.withSortOrders(sortOrders)
.withProperties(properties)
.withDistribution(Distribution.NONE)
.withAuditInfo(
new AuditInfo.Builder()
.withCreator(currentUser())
Expand All @@ -502,7 +519,7 @@ public Table createTable(

LoadTableResponse loadTableResponse =
icebergTableOps.createTable(
IcebergTableOpsHelper.getIcebergNamespace(tableIdent.namespace().levels()),
IcebergTableOpsHelper.getIcebergNamespace(schemaIdent.name()),
createdTable.toCreateTableRequest());
loadTableResponse.validate();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class IcebergCatalogPropertiesMetadata extends BaseCatalogPropertiesMetad
// it to `catalogType` automatically and pass it to Iceberg.
public static final Map<String, String> GRAVITINO_CONFIG_TO_ICEBERG =
ImmutableMap.of(
CATALOG_BACKEND_NAME,
CATALOG_BACKEND_NAME,
GRAVITINO_JDBC_USER,
ICEBERG_JDBC_USER,
GRAVITINO_JDBC_PASSWORD,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public CreateTableRequest toCreateTableRequest() {

Map<String, String> resultProperties =
Maps.newHashMap(IcebergTableOpsHelper.removeReservedProperties(properties));
resultProperties.putIfAbsent(ICEBERG_COMMENT_FIELD_NAME, comment);
CreateTableRequest.Builder builder =
CreateTableRequest.builder()
.withName(name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import javax.ws.rs.NotSupportedException;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.Transaction;
Expand Down Expand Up @@ -287,22 +286,45 @@ public static Map<String, String> removeReservedProperties(Map<String, String> c
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

public static Namespace getIcebergNamespace(NameIdentifier ident) {
return getIcebergNamespace(ArrayUtils.add(ident.namespace().levels(), ident.name()));
/**
* Gravitino only supports single-level namespace storage management, which differs from Iceberg.
* Therefore, we need to handle this difference here.
*
* @param namespace GravitinoNamespace
* @return
*/
public static Namespace getIcebergNamespace(com.datastrato.gravitino.Namespace namespace) {
return getIcebergNamespace(namespace.level(namespace.length() - 1));
}

public static Namespace getIcebergNamespace(String... level) {
return Namespace.of(level);
}

/**
* Gravitino only supports tables managed with a single level hierarchy, such as
* `{namespace}.{table}`, so we need to perform truncation here.
*
* @param namespace
* @param name
* @return
*/
public static TableIdentifier buildIcebergTableIdentifier(
com.datastrato.gravitino.Namespace namespace, String name) {
return TableIdentifier.of(ArrayUtils.add(namespace.levels(), name));
String[] levels = namespace.levels();
return TableIdentifier.of(levels[levels.length - 1], name);
}

/**
* Gravitino only supports tables managed with a single level hierarchy, such as
* `{namespace}.{table}`, so we need to perform truncation here.
*
* @param nameIdentifier GravitinoNameIdentifier
* @return
*/
public static TableIdentifier buildIcebergTableIdentifier(NameIdentifier nameIdentifier) {
return TableIdentifier.of(
ArrayUtils.add(nameIdentifier.namespace().levels(), nameIdentifier.name()));
String[] levels = nameIdentifier.namespace().levels();
return TableIdentifier.of(levels[levels.length - 1], nameIdentifier.name());
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private static JdbcCatalog loadJdbcCatalog(Map<String, String> properties) {
new JdbcCatalog(
null,
null,
Boolean.parseBoolean(properties.getOrDefault(ICEBERG_JDBC_INITIALIZE, "false")));
Boolean.parseBoolean(properties.getOrDefault(ICEBERG_JDBC_INITIALIZE, "true")));
HdfsConfiguration hdfsConfiguration = new HdfsConfiguration();
properties.forEach(hdfsConfiguration::set);
jdbcCatalog.setConf(hdfsConfiguration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import java.time.Instant;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -43,8 +45,11 @@ public void testCreateIcebergSchema() {

Assertions.assertTrue(icebergCatalog.asSchemas().schemaExists(ident));

NameIdentifier[] idents = icebergCatalog.asSchemas().listSchemas(ident.namespace());
Assertions.assertTrue(Arrays.asList(idents).contains(ident));
Set<String> names =
Arrays.stream(icebergCatalog.asSchemas().listSchemas(ident.namespace()))
.map(NameIdentifier::name)
.collect(Collectors.toSet());
Assertions.assertTrue(names.contains(ident.name()));

// Test schema already exists
Throwable exception =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.datastrato.gravitino.meta.CatalogEntity;
import com.datastrato.gravitino.rel.Column;
import com.datastrato.gravitino.rel.Distribution;
import com.datastrato.gravitino.rel.Distribution.Strategy;
import com.datastrato.gravitino.rel.SortOrder;
import com.datastrato.gravitino.rel.SortOrder.Direction;
import com.datastrato.gravitino.rel.SortOrder.NullOrdering;
Expand Down Expand Up @@ -107,14 +106,6 @@ private static CatalogEntity createDefaultCatalogEntity() {
return entity;
}

private Distribution createDistribution() {
return Distribution.builder()
.withNumber(10)
.withTransforms(new Transform[] {Transforms.field(new String[] {"col_1"})})
.withStrategy(Strategy.EVEN)
.build();
}

private SortOrder[] createSortOrder() {
return new SortOrder[] {
SortOrder.builder()
Expand Down Expand Up @@ -159,7 +150,7 @@ public void testCreateIcebergTable() throws IOException {
ICEBERG_COMMENT,
properties,
new Transform[0],
null,
Distribution.NONE,
sortOrders);
Assertions.assertEquals(tableIdentifier.name(), table.name());
Assertions.assertEquals(ICEBERG_COMMENT, table.comment());
Expand Down Expand Up @@ -198,7 +189,7 @@ public void testCreateIcebergTable() throws IOException {
ICEBERG_COMMENT,
properties,
new Transform[0],
null,
Distribution.NONE,
sortOrders));
Assertions.assertTrue(exception.getMessage().contains("Table already exists"));
}
Expand Down Expand Up @@ -380,7 +371,7 @@ public void testAlterIcebergTable() {
.build();
Column[] columns = new Column[] {col1, col2};

Distribution distribution = createDistribution();
Distribution distribution = Distribution.NONE;
SortOrder[] sortOrders = createSortOrder();

Table createdTable =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package com.datastrato.gravitino.catalog.lakehouse.iceberg.utils;

import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergCatalogPropertiesMetadata;
import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergConfig;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -47,6 +48,7 @@ void testLoadCatalog() {
Map<String, String> properties = new HashMap<>();
properties.put(CatalogProperties.URI, "jdbc://0.0.0.0:3306");
properties.put(CatalogProperties.WAREHOUSE_LOCATION, "test");
properties.put(IcebergCatalogPropertiesMetadata.ICEBERG_JDBC_INITIALIZE, "false");
catalog = IcebergCatalogUtil.loadCatalogBackend("jdbc", properties);
Assertions.assertTrue(catalog instanceof JdbcCatalog);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public static TableDTO toDTO(Table table) {
}

public static DistributionDTO toDTO(Distribution distribution) {
if (Distribution.NONE.equals(distribution)) {
if (Distribution.NONE.equals(distribution) || null == distribution) {
return DistributionDTO.NONE;
}

Expand Down
6 changes: 4 additions & 2 deletions integration-test/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ dependencies {
exclude("org.eclipse.jetty.aggregate", "jetty-all")
exclude("org.eclipse.jetty.orbit", "javax.servlet")
}

testImplementation(libs.hadoop2.hdfs) {
exclude("*")
}
testImplementation(libs.hadoop2.mapreduce.client.core) {
exclude("*")
}
Expand Down Expand Up @@ -105,8 +107,8 @@ dependencies {
exclude("org.apache.zookeeper")
exclude("io.dropwizard.metrics")
}
testImplementation(libs.scala.collection.compat)
testImplementation(libs.slf4j.jdk14)
testImplementation(libs.scala.collection.compat)
testImplementation(libs.sqlite.jdbc)
}

Expand Down
Loading

0 comments on commit d066d6b

Please sign in to comment.