Skip to content

Commit

Permalink
[#446] feat(iceberg): Support detailed iceberg catalog property metad…
Browse files Browse the repository at this point in the history
…ata (#486)

### What changes were proposed in this pull request?
Implement catalogPropertiesMetadata、tablePropertiesMetadata for
IcebergCatalogOperations

### Why are the changes needed?
Currently catalogPropertiesMetadatatablePropertiesMetadata will return
an empty map, and we need to implement it to return a real value for the
iceberg property metadata.

Fix: #446 

### Does this PR introduce any user-facing change?
N/A

### How was this patch tested?
Add test testCatalogProperty in TestIcebergCatalog
Add test testTableProperty in TestIcebergTable
  • Loading branch information
Clearvive authored Oct 16, 2023
1 parent 2ae599d commit 805be9d
Show file tree
Hide file tree
Showing 10 changed files with 286 additions and 37 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.graviton.catalog.lakehouse.iceberg;

public enum IcebergCatalogBackend {
HIVE,
JDBC,
MEMORY
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
*/
package com.datastrato.graviton.catalog.lakehouse.iceberg;

import static com.datastrato.graviton.catalog.BaseCatalog.CATALOG_BYPASS_PREFIX;

import com.datastrato.graviton.NameIdentifier;
import com.datastrato.graviton.Namespace;
import com.datastrato.graviton.catalog.CatalogOperations;
Expand All @@ -27,6 +29,7 @@
import com.datastrato.graviton.rel.TableCatalog;
import com.datastrato.graviton.rel.TableChange;
import com.datastrato.graviton.rel.transforms.Transform;
import com.datastrato.graviton.utils.MapUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -60,7 +63,9 @@ public class IcebergCatalogOperations implements CatalogOperations, SupportsSche

@VisibleForTesting IcebergTableOps icebergTableOps;

private IcebergTablePropertiesMetadata tablePropertiesMetadata;
private IcebergCatalogPropertiesMetadata icebergCatalogPropertiesMetadata;

private IcebergTablePropertiesMetadata icebergTablePropertiesMetadata;

private final CatalogEntity entity;

Expand All @@ -83,10 +88,23 @@ public IcebergCatalogOperations(CatalogEntity entity) {
*/
@Override
public void initialize(Map<String, String> conf) throws RuntimeException {
// Key format like graviton.bypass.a.b
Map<String, String> prefixMap = MapUtils.getPrefixMap(conf, CATALOG_BYPASS_PREFIX);

this.icebergCatalogPropertiesMetadata = new IcebergCatalogPropertiesMetadata();
// Hold keys that lie in GRAVITON_CONFIG_TO_ICEBERG
Map<String, String> gravitonConfig =
this.icebergCatalogPropertiesMetadata.transformProperties(conf);

Map<String, String> resultConf = Maps.newHashMap(prefixMap);
resultConf.putAll(gravitonConfig);

IcebergConfig icebergConfig = new IcebergConfig();
icebergConfig.loadFromMap(conf, k -> true);
icebergConfig.loadFromMap(resultConf, k -> true);

this.icebergTableOps = new IcebergTableOps(icebergConfig);
this.icebergTableOpsHelper = icebergTableOps.createIcebergTableOpsHelper();
this.icebergTablePropertiesMetadata = new IcebergTablePropertiesMetadata();
}

/** Closes the Iceberg catalog and releases the associated client pool. */
Expand Down Expand Up @@ -522,11 +540,11 @@ private static String currentUser() {

@Override
public PropertiesMetadata tablePropertiesMetadata() throws UnsupportedOperationException {
return tablePropertiesMetadata;
return icebergTablePropertiesMetadata;
}

@Override
public PropertiesMetadata catalogPropertiesMetadata() throws UnsupportedOperationException {
return Maps::newHashMap;
return icebergCatalogPropertiesMetadata;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.graviton.catalog.lakehouse.iceberg;

import static com.datastrato.graviton.catalog.PropertyEntry.enumImmutablePropertyEntry;
import static com.datastrato.graviton.catalog.PropertyEntry.stringRequiredPropertyEntry;

import com.datastrato.graviton.catalog.BaseCatalogPropertiesMetadata;
import com.datastrato.graviton.catalog.PropertyEntry;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class IcebergCatalogPropertiesMetadata extends BaseCatalogPropertiesMetadata {
public static final String CATALOG_BACKEND_NAME = "catalog-backend";

public static final String GRAVITON_JDBC_USER = "jdbc-user";
public static final String ICEBERG_JDBC_USER = "jdbc.user";

public static final String GRAVITON_JDBC_PASSWORD = "jdbc-password";
public static final String ICEBERG_JDBC_PASSWORD = "jdbc.password";
public static final String ICEBERG_JDBC_INITIALIZE = "jdbc-initialize";
public static final String WAREHOUSE = "warehouse";
public static final String URI = "uri";

private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;

// Map that maintains the mapping of keys in Graviton to that in Iceberg, for example, users
// will only need to set the configuration 'catalog-backend' in Graviton and Graviton will change
// it to `catalogType` automatically and pass it to Iceberg.
public static final Map<String, String> GRAVITON_CONFIG_TO_ICEBERG =
ImmutableMap.of(
GRAVITON_JDBC_USER,
ICEBERG_JDBC_USER,
GRAVITON_JDBC_PASSWORD,
ICEBERG_JDBC_PASSWORD,
URI,
URI,
WAREHOUSE,
WAREHOUSE);

static {
List<PropertyEntry<?>> propertyEntries =
ImmutableList.of(
enumImmutablePropertyEntry(
CATALOG_BACKEND_NAME,
"Iceberg catalog type choose properties",
true,
IcebergCatalogBackend.class,
null,
false,
false),
stringRequiredPropertyEntry(URI, "Iceberg catalog uri config", false, false),
stringRequiredPropertyEntry(
WAREHOUSE, "Iceberg catalog warehouse config", false, false));
HashMap<String, PropertyEntry<?>> result = Maps.newHashMap(BASIC_CATALOG_PROPERTY_ENTRIES);
result.putAll(Maps.uniqueIndex(propertyEntries, PropertyEntry::getName));
PROPERTIES_METADATA = ImmutableMap.copyOf(result);
}

@Override
protected Map<String, PropertyEntry<?>> specificPropertyEntries() {
return PROPERTIES_METADATA;
}

public Map<String, String> transformProperties(Map<String, String> properties) {
Map<String, String> gravitonConfig = Maps.newHashMap();
properties.forEach(
(key, value) -> {
if (GRAVITON_CONFIG_TO_ICEBERG.containsKey(key)) {
gravitonConfig.put(GRAVITON_CONFIG_TO_ICEBERG.get(key), value);
}
});
return gravitonConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,36 @@

package com.datastrato.graviton.catalog.lakehouse.iceberg;

import static com.datastrato.graviton.catalog.lakehouse.iceberg.IcebergCatalogPropertiesMetadata.CATALOG_BACKEND_NAME;
import static com.datastrato.graviton.catalog.lakehouse.iceberg.IcebergCatalogPropertiesMetadata.URI;
import static com.datastrato.graviton.catalog.lakehouse.iceberg.IcebergCatalogPropertiesMetadata.WAREHOUSE;

import com.datastrato.graviton.Config;
import com.datastrato.graviton.config.ConfigBuilder;
import com.datastrato.graviton.config.ConfigEntry;

public class IcebergConfig extends Config {

public static final ConfigEntry<String> CATALOG_TYPE =
new ConfigBuilder("catalogType")
public static final ConfigEntry<String> CATALOG_BACKEND =
new ConfigBuilder(CATALOG_BACKEND_NAME)
.doc("Choose the implementation of the Iceberg catalog")
.version(DEFAULT_VERSION)
.stringConf()
.createWithDefault("memory");

public static final ConfigEntry<Boolean> INITIALIZE_JDBC_CATALOG_TABLES =
new ConfigBuilder("initializeJdbcCatalogTables")
.doc("Whether to load the configuration of the jdbc catalog table during initialization")
public static final ConfigEntry<String> CATALOG_WAREHOUSE =
new ConfigBuilder(WAREHOUSE)
.doc("The warehouse config of the Iceberg catalog")
.version(DEFAULT_VERSION)
.booleanConf()
.createWithDefault(true);
.stringConf()
.createWithDefault(null);

public static final ConfigEntry<String> CATALOG_URI =
new ConfigBuilder(URI)
.doc("The uri config of the Iceberg catalog")
.version(DEFAULT_VERSION)
.stringConf()
.createWithDefault(null);

public IcebergConfig() {
super(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,49 @@
*/
package com.datastrato.graviton.catalog.lakehouse.iceberg;

import static com.datastrato.graviton.catalog.PropertyEntry.stringReservedPropertyEntry;

import com.datastrato.graviton.catalog.BasePropertiesMetadata;
import com.datastrato.graviton.catalog.PropertyEntry;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;

public class IcebergTablePropertiesMetadata extends BasePropertiesMetadata {
public static final String COMMENT = "comment";
public static final String CREATOR = "creator";
public static final String LOCATION = "location";
public static final String CURRENT_SNAPSHOT_ID = "current-snapshot-id";
public static final String CHERRY_PICK_SNAPSHOT_ID = "cherry-pick-snapshot-id";
public static final String SORT_ORDER = "sort-order";
public static final String IDENTIFIER_FIELDS = "identifier-fields";

private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;

static {
List<PropertyEntry<?>> propertyEntries =
ImmutableList.of(
stringReservedPropertyEntry(COMMENT, "Table comment", true),
stringReservedPropertyEntry(CREATOR, "Table creator info", false),
stringReservedPropertyEntry(LOCATION, "Iceberg location for table storage", false),
stringReservedPropertyEntry(
CURRENT_SNAPSHOT_ID,
"The snapshot representing the current state of the table",
false),
stringReservedPropertyEntry(
CHERRY_PICK_SNAPSHOT_ID,
"Selecting a specific snapshots in a merge operation",
false),
stringReservedPropertyEntry(
SORT_ORDER, "Selecting a specific snapshots in a merge operation", false),
stringReservedPropertyEntry(
IDENTIFIER_FIELDS, "The identifier field(s) for defining the table", false));
PROPERTIES_METADATA = Maps.uniqueIndex(propertyEntries, PropertyEntry::getName);
}

@Override
protected Map<String, PropertyEntry<?>> specificPropertyEntries() {
// TODO: support Iceberg table property specs
return Maps.newHashMap();
return PROPERTIES_METADATA;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ public class IcebergTableOps {
private SupportsNamespaces asNamespaceCatalog;

public IcebergTableOps(IcebergConfig icebergConfig) {
String catalogType = icebergConfig.get(IcebergConfig.CATALOG_BACKEND);
catalog =
IcebergCatalogUtil.loadCatalogBackend(
icebergConfig.get(IcebergConfig.CATALOG_TYPE), icebergConfig.getConfigsWithPrefix(""));
IcebergCatalogUtil.loadCatalogBackend(catalogType, icebergConfig.getConfigsWithPrefix(""));
if (catalog instanceof SupportsNamespaces) {
asNamespaceCatalog = (SupportsNamespaces) catalog;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
*/
package com.datastrato.graviton.catalog.lakehouse.iceberg.utils;

import static com.datastrato.graviton.catalog.lakehouse.iceberg.IcebergConfig.INITIALIZE_JDBC_CATALOG_TABLES;
import static com.datastrato.graviton.catalog.lakehouse.iceberg.IcebergCatalogPropertiesMetadata.ICEBERG_JDBC_INITIALIZE;

import com.datastrato.graviton.catalog.lakehouse.iceberg.IcebergCatalogBackend;
import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.iceberg.CatalogProperties;
Expand All @@ -33,7 +33,9 @@ private static InMemoryCatalog loadMemoryCatalog(Map<String, String> properties)

private static HiveCatalog loadHiveCatalog(Map<String, String> properties) {
HiveCatalog hiveCatalog = new HiveCatalog();
hiveCatalog.setConf(new HdfsConfiguration());
HdfsConfiguration hdfsConfiguration = new HdfsConfiguration();
properties.forEach(hdfsConfiguration::set);
hiveCatalog.setConf(hdfsConfiguration);
hiveCatalog.initialize("hive", properties);
return hiveCatalog;
}
Expand All @@ -43,11 +45,10 @@ private static JdbcCatalog loadJdbcCatalog(Map<String, String> properties) {
new JdbcCatalog(
null,
null,
Boolean.parseBoolean(
properties.getOrDefault(
INITIALIZE_JDBC_CATALOG_TABLES.getKey(),
String.valueOf(INITIALIZE_JDBC_CATALOG_TABLES.getDefaultValue()))));
jdbcCatalog.setConf(new HdfsConfiguration());
Boolean.parseBoolean(properties.getOrDefault(ICEBERG_JDBC_INITIALIZE, "false")));
HdfsConfiguration hdfsConfiguration = new HdfsConfiguration();
properties.forEach(hdfsConfiguration::set);
jdbcCatalog.setConf(hdfsConfiguration);
jdbcCatalog.initialize("jdbc", properties);
return jdbcCatalog;
}
Expand All @@ -57,16 +58,13 @@ public static Catalog loadCatalogBackend(String catalogType) {
}

public static Catalog loadCatalogBackend(String catalogType, Map<String, String> properties) {
// TODO Organize the configuration properties and adapt them to the lower layer, and map some
// graviton configuration keys.
LOG.info("Load catalog backend of {}", catalogType);

switch (catalogType.toLowerCase(Locale.ENGLISH)) {
case "memory":
switch (IcebergCatalogBackend.valueOf(catalogType.toUpperCase())) {
case MEMORY:
return loadMemoryCatalog(properties);
case "hive":
case HIVE:
return loadHiveCatalog(properties);
case "jdbc":
case JDBC:
return loadJdbcCatalog(properties);
default:
throw new RuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,50 @@ public void testListDatabases() {
icebergTableOps.listNamespace(org.apache.iceberg.catalog.Namespace.empty());
Assertions.assertTrue(listNamespacesResponse.namespaces().isEmpty());
}

@Test
void testCatalogProperty() {
AuditInfo auditInfo =
new AuditInfo.Builder().withCreator("creator").withCreateTime(Instant.now()).build();

CatalogEntity entity =
new CatalogEntity.Builder()
.withId(1L)
.withName("catalog")
.withNamespace(Namespace.of("metalake"))
.withType(IcebergCatalog.Type.RELATIONAL)
.withProvider("iceberg")
.withAuditInfo(auditInfo)
.build();

Map<String, String> conf = Maps.newHashMap();

try (IcebergCatalogOperations ops = new IcebergCatalogOperations(entity)) {
ops.initialize(conf);
Assertions.assertThrows(
IllegalArgumentException.class,
() -> {
Map<String, String> map = Maps.newHashMap();
map.put(IcebergCatalogPropertiesMetadata.CATALOG_BACKEND_NAME, "test");
ops.catalogPropertiesMetadata().validatePropertyForCreate(map);
});

Assertions.assertDoesNotThrow(
() -> {
Map<String, String> map = Maps.newHashMap();
map.put(IcebergCatalogPropertiesMetadata.CATALOG_BACKEND_NAME, "hive");
map.put(IcebergCatalogPropertiesMetadata.URI, "127.0.0.1");
map.put(IcebergCatalogPropertiesMetadata.WAREHOUSE, "test");
ops.catalogPropertiesMetadata().validatePropertyForCreate(map);
});

Throwable throwable =
Assertions.assertThrows(
IllegalArgumentException.class,
() -> ops.catalogPropertiesMetadata().validatePropertyForCreate(Maps.newHashMap()));

Assertions.assertTrue(
throwable.getMessage().contains(IcebergCatalogPropertiesMetadata.CATALOG_BACKEND_NAME));
}
}
}
Loading

0 comments on commit 805be9d

Please sign in to comment.