Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#446] feat(iceberg): Support detailed iceberg catalog property metadata #486

Merged
merged 3 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.graviton.catalog.lakehouse.iceberg;

public enum IcebergCatalogBackend {
HIVE,
JDBC,
MEMORY
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
*/
package com.datastrato.graviton.catalog.lakehouse.iceberg;

import static com.datastrato.graviton.catalog.BaseCatalog.CATALOG_BYPASS_PREFIX;

import com.datastrato.graviton.NameIdentifier;
import com.datastrato.graviton.Namespace;
import com.datastrato.graviton.catalog.CatalogOperations;
Expand All @@ -27,6 +29,7 @@
import com.datastrato.graviton.rel.TableCatalog;
import com.datastrato.graviton.rel.TableChange;
import com.datastrato.graviton.rel.transforms.Transform;
import com.datastrato.graviton.utils.MapUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -60,7 +63,9 @@ public class IcebergCatalogOperations implements CatalogOperations, SupportsSche

@VisibleForTesting IcebergTableOps icebergTableOps;

private IcebergTablePropertiesMetadata tablePropertiesMetadata;
private IcebergCatalogPropertiesMetadata icebergCatalogPropertiesMetadata;

private IcebergTablePropertiesMetadata icebergTablePropertiesMetadata;

private final CatalogEntity entity;

Expand All @@ -83,10 +88,23 @@ public IcebergCatalogOperations(CatalogEntity entity) {
*/
@Override
public void initialize(Map<String, String> conf) throws RuntimeException {
FANNG1 marked this conversation as resolved.
Show resolved Hide resolved
// Key format like graviton.bypass.a.b
Map<String, String> prefixMap = MapUtils.getPrefixMap(conf, CATALOG_BYPASS_PREFIX);
Clearvive marked this conversation as resolved.
Show resolved Hide resolved

this.icebergCatalogPropertiesMetadata = new IcebergCatalogPropertiesMetadata();
// Hold keys that lie in GRAVITON_CONFIG_TO_ICEBERG
Map<String, String> gravitonConfig =
this.icebergCatalogPropertiesMetadata.transformProperties(conf);
Clearvive marked this conversation as resolved.
Show resolved Hide resolved

Map<String, String> resultConf = Maps.newHashMap(prefixMap);
resultConf.putAll(gravitonConfig);

IcebergConfig icebergConfig = new IcebergConfig();
icebergConfig.loadFromMap(conf, k -> true);
icebergConfig.loadFromMap(resultConf, k -> true);

this.icebergTableOps = new IcebergTableOps(icebergConfig);
this.icebergTableOpsHelper = icebergTableOps.createIcebergTableOpsHelper();
this.icebergTablePropertiesMetadata = new IcebergTablePropertiesMetadata();
}

/** Closes the Iceberg catalog and releases the associated client pool. */
Expand Down Expand Up @@ -522,11 +540,11 @@ private static String currentUser() {

@Override
public PropertiesMetadata tablePropertiesMetadata() throws UnsupportedOperationException {
return tablePropertiesMetadata;
return icebergTablePropertiesMetadata;
}

@Override
public PropertiesMetadata catalogPropertiesMetadata() throws UnsupportedOperationException {
return Maps::newHashMap;
return icebergCatalogPropertiesMetadata;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.graviton.catalog.lakehouse.iceberg;

import static com.datastrato.graviton.catalog.PropertyEntry.enumImmutablePropertyEntry;
import static com.datastrato.graviton.catalog.PropertyEntry.stringRequiredPropertyEntry;

import com.datastrato.graviton.catalog.BaseCatalogPropertiesMetadata;
import com.datastrato.graviton.catalog.PropertyEntry;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class IcebergCatalogPropertiesMetadata extends BaseCatalogPropertiesMetadata {
public static final String CATALOG_BACKEND_NAME = "catalog-backend";

public static final String GRAVITON_JDBC_USER = "jdbc-user";
public static final String ICEBERG_JDBC_USER = "jdbc.user";

public static final String GRAVITON_JDBC_PASSWORD = "jdbc-password";
public static final String ICEBERG_JDBC_PASSWORD = "jdbc.password";
public static final String ICEBERG_JDBC_INITIALIZE = "jdbc-initialize";
public static final String WAREHOUSE = "warehouse";
public static final String URI = "uri";
Clearvive marked this conversation as resolved.
Show resolved Hide resolved

private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;

// Map that maintains the mapping of keys in Graviton to that in Iceberg, for example, users
// will only need to set the configuration 'catalog-backend' in Graviton and Graviton will change
// it to `catalogType` automatically and pass it to Iceberg.
public static final Map<String, String> GRAVITON_CONFIG_TO_ICEBERG =
ImmutableMap.of(
GRAVITON_JDBC_USER,
ICEBERG_JDBC_USER,
GRAVITON_JDBC_PASSWORD,
ICEBERG_JDBC_PASSWORD,
URI,
URI,
WAREHOUSE,
WAREHOUSE);

static {
List<PropertyEntry<?>> propertyEntries =
ImmutableList.of(
Clearvive marked this conversation as resolved.
Show resolved Hide resolved
enumImmutablePropertyEntry(
CATALOG_BACKEND_NAME,
"Iceberg catalog type choose properties",
true,
IcebergCatalogBackend.class,
null,
false,
false),
stringRequiredPropertyEntry(URI, "Iceberg catalog uri config", false, false),
stringRequiredPropertyEntry(
WAREHOUSE, "Iceberg catalog warehouse config", false, false));
HashMap<String, PropertyEntry<?>> result = Maps.newHashMap(BASIC_CATALOG_PROPERTY_ENTRIES);
result.putAll(Maps.uniqueIndex(propertyEntries, PropertyEntry::getName));
PROPERTIES_METADATA = ImmutableMap.copyOf(result);
}

@Override
protected Map<String, PropertyEntry<?>> specificPropertyEntries() {
return PROPERTIES_METADATA;
}

public Map<String, String> transformProperties(Map<String, String> properties) {
Map<String, String> gravitonConfig = Maps.newHashMap();
properties.forEach(
(key, value) -> {
if (GRAVITON_CONFIG_TO_ICEBERG.containsKey(key)) {
gravitonConfig.put(GRAVITON_CONFIG_TO_ICEBERG.get(key), value);
}
});
return gravitonConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,36 @@

package com.datastrato.graviton.catalog.lakehouse.iceberg;

import static com.datastrato.graviton.catalog.lakehouse.iceberg.IcebergCatalogPropertiesMetadata.CATALOG_BACKEND_NAME;
import static com.datastrato.graviton.catalog.lakehouse.iceberg.IcebergCatalogPropertiesMetadata.URI;
import static com.datastrato.graviton.catalog.lakehouse.iceberg.IcebergCatalogPropertiesMetadata.WAREHOUSE;

import com.datastrato.graviton.Config;
import com.datastrato.graviton.config.ConfigBuilder;
import com.datastrato.graviton.config.ConfigEntry;

public class IcebergConfig extends Config {

public static final ConfigEntry<String> CATALOG_TYPE =
new ConfigBuilder("catalogType")
public static final ConfigEntry<String> CATALOG_BACKEND =
new ConfigBuilder(CATALOG_BACKEND_NAME)
.doc("Choose the implementation of the Iceberg catalog")
.version(DEFAULT_VERSION)
.stringConf()
.createWithDefault("memory");

public static final ConfigEntry<Boolean> INITIALIZE_JDBC_CATALOG_TABLES =
new ConfigBuilder("initializeJdbcCatalogTables")
.doc("Whether to load the configuration of the jdbc catalog table during initialization")
public static final ConfigEntry<String> CATALOG_WAREHOUSE =
new ConfigBuilder(WAREHOUSE)
.doc("The warehouse config of the Iceberg catalog")
.version(DEFAULT_VERSION)
.booleanConf()
.createWithDefault(true);
.stringConf()
.createWithDefault(null);

public static final ConfigEntry<String> CATALOG_URI =
new ConfigBuilder(URI)
.doc("The uri config of the Iceberg catalog")
.version(DEFAULT_VERSION)
.stringConf()
.createWithDefault(null);

public IcebergConfig() {
super(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,49 @@
*/
package com.datastrato.graviton.catalog.lakehouse.iceberg;

import static com.datastrato.graviton.catalog.PropertyEntry.stringReservedPropertyEntry;

import com.datastrato.graviton.catalog.BasePropertiesMetadata;
import com.datastrato.graviton.catalog.PropertyEntry;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;

public class IcebergTablePropertiesMetadata extends BasePropertiesMetadata {
FANNG1 marked this conversation as resolved.
Show resolved Hide resolved
public static final String COMMENT = "comment";
public static final String CREATOR = "creator";
public static final String LOCATION = "location";
public static final String CURRENT_SNAPSHOT_ID = "current-snapshot-id";
public static final String CHERRY_PICK_SNAPSHOT_ID = "cherry-pick-snapshot-id";
public static final String SORT_ORDER = "sort-order";
public static final String IDENTIFIER_FIELDS = "identifier-fields";

private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;

static {
List<PropertyEntry<?>> propertyEntries =
ImmutableList.of(
stringReservedPropertyEntry(COMMENT, "table comment", true),
Clearvive marked this conversation as resolved.
Show resolved Hide resolved
Clearvive marked this conversation as resolved.
Show resolved Hide resolved
stringReservedPropertyEntry(CREATOR, "table creator info", false),
stringReservedPropertyEntry(LOCATION, "Iceberg location for table storage", false),
stringReservedPropertyEntry(
CURRENT_SNAPSHOT_ID,
"The snapshot representing the current state of the table",
false),
stringReservedPropertyEntry(
CHERRY_PICK_SNAPSHOT_ID,
"Selecting a specific snapshots in a merge operation",
false),
stringReservedPropertyEntry(
SORT_ORDER, "Selecting a specific snapshots in a merge operation", false),
stringReservedPropertyEntry(
IDENTIFIER_FIELDS, "The identifier field(s) for defining the table", false));
PROPERTIES_METADATA = Maps.uniqueIndex(propertyEntries, PropertyEntry::getName);
}

@Override
protected Map<String, PropertyEntry<?>> specificPropertyEntries() {
// TODO: support Iceberg table property specs
return Maps.newHashMap();
return PROPERTIES_METADATA;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ public class IcebergTableOps {
private SupportsNamespaces asNamespaceCatalog;

public IcebergTableOps(IcebergConfig icebergConfig) {
String catalogType = icebergConfig.get(IcebergConfig.CATALOG_BACKEND);
catalog =
IcebergCatalogUtil.loadCatalogBackend(
icebergConfig.get(IcebergConfig.CATALOG_TYPE), icebergConfig.getConfigsWithPrefix(""));
IcebergCatalogUtil.loadCatalogBackend(catalogType, icebergConfig.getConfigsWithPrefix(""));
if (catalog instanceof SupportsNamespaces) {
asNamespaceCatalog = (SupportsNamespaces) catalog;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
*/
package com.datastrato.graviton.catalog.lakehouse.iceberg.utils;

import static com.datastrato.graviton.catalog.lakehouse.iceberg.IcebergConfig.INITIALIZE_JDBC_CATALOG_TABLES;
import static com.datastrato.graviton.catalog.lakehouse.iceberg.IcebergCatalogPropertiesMetadata.ICEBERG_JDBC_INITIALIZE;

import com.datastrato.graviton.catalog.lakehouse.iceberg.IcebergCatalogBackend;
import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.iceberg.CatalogProperties;
Expand All @@ -33,7 +33,9 @@ private static InMemoryCatalog loadMemoryCatalog(Map<String, String> properties)

private static HiveCatalog loadHiveCatalog(Map<String, String> properties) {
HiveCatalog hiveCatalog = new HiveCatalog();
hiveCatalog.setConf(new HdfsConfiguration());
HdfsConfiguration hdfsConfiguration = new HdfsConfiguration();
properties.forEach(hdfsConfiguration::set);
hiveCatalog.setConf(hdfsConfiguration);
hiveCatalog.initialize("hive", properties);
return hiveCatalog;
}
Expand All @@ -43,11 +45,10 @@ private static JdbcCatalog loadJdbcCatalog(Map<String, String> properties) {
new JdbcCatalog(
null,
null,
Boolean.parseBoolean(
properties.getOrDefault(
INITIALIZE_JDBC_CATALOG_TABLES.getKey(),
String.valueOf(INITIALIZE_JDBC_CATALOG_TABLES.getDefaultValue()))));
jdbcCatalog.setConf(new HdfsConfiguration());
Boolean.parseBoolean(properties.getOrDefault(ICEBERG_JDBC_INITIALIZE, "false")));
HdfsConfiguration hdfsConfiguration = new HdfsConfiguration();
properties.forEach(hdfsConfiguration::set);
jdbcCatalog.setConf(hdfsConfiguration);
jdbcCatalog.initialize("jdbc", properties);
return jdbcCatalog;
}
Expand All @@ -57,16 +58,13 @@ public static Catalog loadCatalogBackend(String catalogType) {
}

public static Catalog loadCatalogBackend(String catalogType, Map<String, String> properties) {
// TODO Organize the configuration properties and adapt them to the lower layer, and map some
// graviton configuration keys.
LOG.info("Load catalog backend of {}", catalogType);
Clearvive marked this conversation as resolved.
Show resolved Hide resolved

switch (catalogType.toLowerCase(Locale.ENGLISH)) {
case "memory":
switch (IcebergCatalogBackend.valueOf(catalogType.toUpperCase())) {
case MEMORY:
return loadMemoryCatalog(properties);
case "hive":
case HIVE:
return loadHiveCatalog(properties);
case "jdbc":
case JDBC:
return loadJdbcCatalog(properties);
default:
throw new RuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,50 @@ public void testListDatabases() {
icebergTableOps.listNamespace(org.apache.iceberg.catalog.Namespace.empty());
Assertions.assertTrue(listNamespacesResponse.namespaces().isEmpty());
}

@Test
void testCatalogProperty() {
AuditInfo auditInfo =
new AuditInfo.Builder().withCreator("creator").withCreateTime(Instant.now()).build();

CatalogEntity entity =
new CatalogEntity.Builder()
.withId(1L)
.withName("catalog")
.withNamespace(Namespace.of("metalake"))
.withType(IcebergCatalog.Type.RELATIONAL)
.withProvider("iceberg")
.withAuditInfo(auditInfo)
.build();

Map<String, String> conf = Maps.newHashMap();

try (IcebergCatalogOperations ops = new IcebergCatalogOperations(entity)) {
ops.initialize(conf);
Assertions.assertThrows(
IllegalArgumentException.class,
() -> {
Map<String, String> map = Maps.newHashMap();
map.put(IcebergCatalogPropertiesMetadata.CATALOG_BACKEND_NAME, "test");
ops.catalogPropertiesMetadata().validatePropertyForCreate(map);
});

Assertions.assertDoesNotThrow(
() -> {
Map<String, String> map = Maps.newHashMap();
map.put(IcebergCatalogPropertiesMetadata.CATALOG_BACKEND_NAME, "hive");
map.put(IcebergCatalogPropertiesMetadata.URI, "127.0.0.1");
map.put(IcebergCatalogPropertiesMetadata.WAREHOUSE, "test");
ops.catalogPropertiesMetadata().validatePropertyForCreate(map);
});

Throwable throwable =
Assertions.assertThrows(
IllegalArgumentException.class,
() -> ops.catalogPropertiesMetadata().validatePropertyForCreate(Maps.newHashMap()));

Assertions.assertTrue(
throwable.getMessage().contains(IcebergCatalogPropertiesMetadata.CATALOG_BACKEND_NAME));
}
}
}
Loading