Skip to content

Commit

Permalink
[#446] feat(iceberg): Support detailed iceberg catalog property metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
yunqing-wei committed Oct 12, 2023
1 parent 4d9e7a9 commit 878f2e7
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.datastrato.graviton.rel.transforms.Transform;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -60,7 +59,9 @@ public class IcebergCatalogOperations implements CatalogOperations, SupportsSche

@VisibleForTesting IcebergTableOps icebergTableOps;

private IcebergTablePropertiesMetadata tablePropertiesMetadata;
private IcebergCatalogPropertiesMetadata icebergCatalogPropertiesMetadata;

private IcebergTablePropertiesMetadata icebergTablePropertiesMetadata;

private final CatalogEntity entity;

Expand All @@ -87,6 +88,8 @@ public void initialize(Map<String, String> conf) throws RuntimeException {
icebergConfig.loadFromMap(conf, k -> true);
this.icebergTableOps = new IcebergTableOps(icebergConfig);
this.icebergTableOpsHelper = icebergTableOps.createIcebergTableOpsHelper();
this.icebergCatalogPropertiesMetadata = new IcebergCatalogPropertiesMetadata();
this.icebergTablePropertiesMetadata = new IcebergTablePropertiesMetadata();
}

/** Closes the Iceberg catalog and releases the associated client pool. */
Expand Down Expand Up @@ -522,11 +525,11 @@ private static String currentUser() {

@Override
public PropertiesMetadata tablePropertiesMetadata() throws UnsupportedOperationException {
return tablePropertiesMetadata;
return icebergTablePropertiesMetadata;
}

@Override
public PropertiesMetadata catalogPropertiesMetadata() throws UnsupportedOperationException {
return Maps::newHashMap;
return icebergCatalogPropertiesMetadata;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.graviton.catalog.lakehouse.iceberg;

import static com.datastrato.graviton.catalog.PropertyEntry.enumImmutablePropertyEntry;
import static com.datastrato.graviton.catalog.PropertyEntry.stringReservedPropertyEntry;

import com.datastrato.graviton.catalog.BasePropertiesMetadata;
import com.datastrato.graviton.catalog.PropertyEntry;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;

public class IcebergCatalogPropertiesMetadata extends BasePropertiesMetadata {
public static final String CATALOG_TYPE = "catalogType";
public static final String WAREHOUSE = "WAREHOUSE";
public static final String URI = "URI";

private static final Map<String, PropertyEntry<?>> propertiesMetadata;

enum IcebergCatalogType {
HIVE,
JDBC,
MEMORY
}

static {
List<PropertyEntry<?>> propertyEntries =
ImmutableList.of(
enumImmutablePropertyEntry(
CATALOG_TYPE,
"Iceberg catalog type choose properties",
true,
IcebergCatalogType.class,
null,
false,
false),
stringReservedPropertyEntry(URI, "Iceberg catalog uri config", false),
stringReservedPropertyEntry(WAREHOUSE, "Iceberg catalog warehouse config", false));
propertiesMetadata = Maps.uniqueIndex(propertyEntries, PropertyEntry::getName);
}

@Override
protected Map<String, PropertyEntry<?>> specificPropertyEntries() {
return propertiesMetadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,49 @@
*/
package com.datastrato.graviton.catalog.lakehouse.iceberg;

import static com.datastrato.graviton.catalog.PropertyEntry.stringReservedPropertyEntry;

import com.datastrato.graviton.catalog.BasePropertiesMetadata;
import com.datastrato.graviton.catalog.PropertyEntry;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;

public class IcebergTablePropertiesMetadata extends BasePropertiesMetadata {
public static final String COMMENT = "comment";
public static final String CREATOR = "creator";
public static final String LOCATION = "location";
public static final String CURRENT_SNAPSHOT_ID = "current-snapshot-id";
public static final String CHERRY_PICK_SNAPSHOT_ID = "cherry-pick-snapshot-id";
public static final String SORT_ORDER = "sort-order";
public static final String IDENTIFIER_FIELDS = "identifier-fields";

private static final Map<String, PropertyEntry<?>> propertiesMetadata;

static {
List<PropertyEntry<?>> propertyEntries =
ImmutableList.of(
stringReservedPropertyEntry(COMMENT, "table comment", true),
stringReservedPropertyEntry(CREATOR, "table creator info", false),
stringReservedPropertyEntry(LOCATION, "Iceberg location for table storage", false),
stringReservedPropertyEntry(
CURRENT_SNAPSHOT_ID,
"The snapshot representing the current state of the table",
false),
stringReservedPropertyEntry(
CHERRY_PICK_SNAPSHOT_ID,
"Selecting a specific snapshots in a merge operation",
false),
stringReservedPropertyEntry(
SORT_ORDER, "Selecting a specific snapshots in a merge operation", false),
stringReservedPropertyEntry(
IDENTIFIER_FIELDS, "The identifier field(s) for defining the table", false));
propertiesMetadata = Maps.uniqueIndex(propertyEntries, PropertyEntry::getName);
}

@Override
protected Map<String, PropertyEntry<?>> specificPropertyEntries() {
// TODO: support Iceberg table property specs
return Maps.newHashMap();
return propertiesMetadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,53 @@ public void testListDatabases() {
icebergTableOps.listNamespace(org.apache.iceberg.catalog.Namespace.empty());
Assertions.assertTrue(listNamespacesResponse.namespaces().isEmpty());
}

@Test
void testCatalogProperty() {
AuditInfo auditInfo =
new AuditInfo.Builder().withCreator("creator").withCreateTime(Instant.now()).build();

CatalogEntity entity =
new CatalogEntity.Builder()
.withId(1L)
.withName("catalog")
.withNamespace(Namespace.of("metalake"))
.withType(IcebergCatalog.Type.RELATIONAL)
.withProvider("iceberg")
.withAuditInfo(auditInfo)
.build();

Map<String, String> conf = Maps.newHashMap();

try (IcebergCatalogOperations ops = new IcebergCatalogOperations(entity)) {
ops.initialize(conf);
Assertions.assertThrows(
IllegalArgumentException.class,
() -> {
Map<String, String> map = Maps.newHashMap();
map.put(IcebergCatalogPropertiesMetadata.CATALOG_TYPE, "test");
ops.catalogPropertiesMetadata().validatePropertyForCreate(map);
});

Assertions.assertDoesNotThrow(
() -> {
Map<String, String> map = Maps.newHashMap();
map.put(IcebergCatalogPropertiesMetadata.CATALOG_TYPE, "hive");
ops.catalogPropertiesMetadata().validatePropertyForCreate(map);
});

Throwable throwable =
Assertions.assertThrows(
IllegalArgumentException.class,
() -> ops.catalogPropertiesMetadata().validatePropertyForCreate(Maps.newHashMap()));

Assertions.assertTrue(
throwable
.getMessage()
.contains(
String.format(
"Properties are required and must be set: [%s]",
IcebergCatalogPropertiesMetadata.CATALOG_TYPE)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.lang3.ArrayUtils;
Expand Down Expand Up @@ -81,6 +82,13 @@ private static void initIcebergSchema() {
}

private static void initIcebergCatalog() {
CatalogEntity entity = createDefaultCatalogEntity();

Map<String, String> conf = Maps.newHashMap();
icebergCatalog = new IcebergCatalog().withCatalogConf(conf).withCatalogEntity(entity);
}

private static CatalogEntity createDefaultCatalogEntity() {
AuditInfo auditInfo =
new AuditInfo.Builder()
.withCreator("testIcebergUser")
Expand All @@ -96,9 +104,7 @@ private static void initIcebergCatalog() {
.withProvider("iceberg")
.withAuditInfo(auditInfo)
.build();

Map<String, String> conf = Maps.newHashMap();
icebergCatalog = new IcebergCatalog().withCatalogConf(conf).withCatalogEntity(entity);
return entity;
}

private Distribution createDistribution() {
Expand Down Expand Up @@ -480,6 +486,51 @@ public void testAlterIcebergTable() {
Assertions.assertArrayEquals(createdTable.partitioning(), alteredTable.partitioning());
}

@Test
public void testTableProperty() {
CatalogEntity entity = createDefaultCatalogEntity();
try (IcebergCatalogOperations ops = new IcebergCatalogOperations(entity)) {
ops.initialize(Maps.newHashMap());
Map<String, String> map = Maps.newHashMap();
map.put(IcebergTablePropertiesMetadata.COMMENT, "test");
map.put(IcebergTablePropertiesMetadata.CREATOR, "test");
map.put(IcebergTablePropertiesMetadata.LOCATION, "test");
map.put(IcebergTablePropertiesMetadata.CURRENT_SNAPSHOT_ID, "test");
map.put(IcebergTablePropertiesMetadata.CHERRY_PICK_SNAPSHOT_ID, "test");
map.put(IcebergTablePropertiesMetadata.SORT_ORDER, "test");
map.put(IcebergTablePropertiesMetadata.IDENTIFIER_FIELDS, "test");
for (Map.Entry<String, String> entry : map.entrySet()) {
Assertions.assertThrows(
IllegalArgumentException.class,
() -> {
ops.tablePropertiesMetadata()
.validatePropertyForCreate(
new HashMap<String, String>() {
{
put(entry.getKey(), entry.getValue());
}
});
});
}

map = Maps.newHashMap();
map.put("key1", "val1");
map.put("key2", "val2");
for (Map.Entry<String, String> entry : map.entrySet()) {
Assertions.assertDoesNotThrow(
() -> {
ops.tablePropertiesMetadata()
.validatePropertyForCreate(
new HashMap<String, String>() {
{
put(entry.getKey(), entry.getValue());
}
});
});
}
}
}

protected static String genRandomName() {
return UUID.randomUUID().toString().replace("-", "");
}
Expand Down

0 comments on commit 878f2e7

Please sign in to comment.