Skip to content

Commit

Permalink
[#480] test(lakehouse-iceberg): add hive catalog to IcebergRESTServic…
Browse files Browse the repository at this point in the history
…eIT (#493)

### What changes were proposed in this pull request?

add hive catalog to IcebergRESTServiceIT
1. custom graviton config file with different Iceberg catalog types
2. The hive catalog warehouse location is using localfs to bypass HDFS
3. unify test namespace to `iceberg_rest_` prefix, to drop all test
namespace and tables before each test.

### Why are the changes needed?


Part of: #480

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
1. existing UTs
4. HiveCatalog UTs
  • Loading branch information
FANNG1 authored Oct 17, 2023
1 parent fa7889f commit fc49d30
Show file tree
Hide file tree
Showing 9 changed files with 480 additions and 159 deletions.
2 changes: 2 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ rocksdbjni = "7.7.3"
iceberg = '1.3.1'
trino = '426'
spark = "3.4.1"
scala-collection-compat = "2.7.0"


protobuf-plugin = "0.9.2"
Expand Down Expand Up @@ -91,6 +92,7 @@ trino-toolkit= { group = "io.trino", name = "trino-plugin-toolkit", version.ref
trino-testing= { group = "io.trino", name = "trino-testing", version.ref = "trino" }
iceberg-spark-runtime = { group = "org.apache.iceberg", name = "iceberg-spark-runtime-3.4_2.13", version.ref = "iceberg" }
spark-sql = { group = "org.apache.spark", name = "spark-sql_2.13", version.ref = "spark" }
scala-collection-compat = { group = "org.scala-lang.modules", name = "scala-collection-compat_2.13", version.ref = "scala-collection-compat" }


[bundles]
Expand Down
1 change: 1 addition & 0 deletions integration-test/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ dependencies {
exclude("org.apache.zookeeper")
exclude("io.dropwizard.metrics")
}
testImplementation(libs.scala.collection.compat)
testImplementation(libs.slf4j.jdk14)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
Expand All @@ -41,6 +39,7 @@

public class MiniGraviton {
private static final Logger LOG = LoggerFactory.getLogger(MiniGraviton.class);
private MiniGravitonContext context;
private RESTClient restClient;
private final File mockConfDir;
private final ServerConfig serverConfig = new ServerConfig();
Expand All @@ -50,7 +49,8 @@ public class MiniGraviton {

private int port;

public MiniGraviton() throws IOException {
public MiniGraviton(MiniGravitonContext context) throws IOException {
this.context = context;
this.mockConfDir = Files.createTempDirectory("MiniGraviton").toFile();
mockConfDir.mkdirs();
}
Expand Down Expand Up @@ -154,52 +154,43 @@ public Config getServerConfig() {
return serverConfig;
}

// Customize the config file
private void customizeConfigFile(String configTempFileName, String configFileName)
throws IOException {
Map<String, String> configMap = new HashMap<>();
configMap.put(
GravitonServer.WEBSERVER_CONF_PREFIX + JettyServerConfig.WEBSERVER_HTTP_PORT.getKey(),
String.valueOf(RESTUtils.findAvailablePort(2000, 3000)));
configMap.put(
Configs.ENTRY_KV_ROCKSDB_BACKEND_PATH.getKey(), "/tmp/graviton-" + UUID.randomUUID());

configMap.put(
AuxiliaryServiceManager.GRAVITON_AUX_SERVICE_PREFIX
+ AuxiliaryServiceManager.AUX_SERVICE_NAMES,
IcebergRESTService.SERVICE_NAME);
Map<String, String> getIcebergRestServiceConfigs() throws IOException {
Map<String, String> customConfigs = new HashMap<>();

String icebergJarPath =
Paths.get("catalogs", "catalog-lakehouse-iceberg", "build", "libs").toString();
String icebergConfigPath =
Paths.get("catalogs", "catalog-lakehouse-iceberg", "src", "main", "resources").toString();
configMap.put(
customConfigs.put(
AuxiliaryServiceManager.GRAVITON_AUX_SERVICE_PREFIX
+ IcebergRESTService.SERVICE_NAME
+ "."
+ AuxiliaryServiceManager.AUX_SERVICE_CLASSPATH,
String.join(",", icebergJarPath, icebergConfigPath));
configMap.put(

customConfigs.put(
AuxiliaryServiceManager.GRAVITON_AUX_SERVICE_PREFIX
+ IcebergRESTService.SERVICE_NAME
+ "."
+ JettyServerConfig.WEBSERVER_HTTP_PORT.getKey(),
String.valueOf(RESTUtils.findAvailablePort(3000, 4000)));
return customConfigs;
}

Properties props = new Properties();

try (InputStream inputStream = Files.newInputStream(Paths.get(configTempFileName));
OutputStream outputStream = Files.newOutputStream(Paths.get(configFileName))) {
props.load(inputStream);
// Customize the config file
private void customizeConfigFile(String configTempFileName, String configFileName)
throws IOException {
Map<String, String> configMap = new HashMap<>();
configMap.put(
GravitonServer.WEBSERVER_CONF_PREFIX + JettyServerConfig.WEBSERVER_HTTP_PORT.getKey(),
String.valueOf(RESTUtils.findAvailablePort(2000, 3000)));
configMap.put(
Configs.ENTRY_KV_ROCKSDB_BACKEND_PATH.getKey(), "/tmp/graviton-" + UUID.randomUUID());

for (Map.Entry<String, String> entry : configMap.entrySet()) {
props.setProperty(entry.getKey(), entry.getValue());
}
configMap.putAll(getIcebergRestServiceConfigs());
configMap.putAll(context.customConfig);

props.store(outputStream, null);
} catch (IOException e) {
LOG.error("Exception in customizeConfigFile ", e);
}
ITUtils.rewriteConfigFile(configTempFileName, configFileName, configMap);
}

private boolean checkIfServerIsRunning() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.graviton.integration.test;

import java.util.Map;

public class MiniGravitonContext {
Map<String, String> customConfig;

public MiniGravitonContext(Map<String, String> customConfig) {
this.customConfig = customConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,31 @@
package com.datastrato.graviton.integration.test.util;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Map;
import java.util.Properties;

public class ITUtils {
public static final String TEST_MODE = "testMode";
public static final String EMBEDDED_TEST_MODE = "embedded";

public static String joinDirPath(String... dirs) {
return String.join(File.separator, dirs);
}

public static void rewriteConfigFile(
String configTempFileName, String configFileName, Map<String, String> configMap)
throws IOException {
Properties props = new Properties();
try (InputStream inputStream = Files.newInputStream(Paths.get(configTempFileName));
OutputStream outputStream = Files.newOutputStream(Paths.get(configFileName))) {
props.load(inputStream);
props.putAll(configMap);
props.store(outputStream, null);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.graviton.integration.test.catalog.lakehouse.iceberg;

import com.datastrato.graviton.catalog.lakehouse.iceberg.IcebergCatalogBackend;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;

// Hive&Jdbc catalog must be tested with graviton-docker-it env,
// so we should create a separate class instead using junit `parameterized test`
// to auto-generate catalog type
@Tag("graviton-docker-it")
@TestInstance(Lifecycle.PER_CLASS)
public class IcebergRESTHiveCatalogIT extends IcebergRESTServiceIT {
public IcebergRESTHiveCatalogIT() {
catalogType = IcebergCatalogBackend.HIVE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.datastrato.graviton.Config;
import com.datastrato.graviton.aux.AuxiliaryServiceManager;
import com.datastrato.graviton.catalog.lakehouse.iceberg.IcebergCatalogBackend;
import com.datastrato.graviton.catalog.lakehouse.iceberg.IcebergConfig;
import com.datastrato.graviton.catalog.lakehouse.iceberg.IcebergRESTService;
import com.datastrato.graviton.integration.test.util.AbstractIT;
Expand All @@ -15,6 +16,7 @@
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -23,16 +25,119 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*
* <p>Referred from spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java
*/

@TestInstance(Lifecycle.PER_CLASS)
public class IcebergRESTServiceBaseIT extends AbstractIT {
public static final Logger LOG = LoggerFactory.getLogger(IcebergRESTServiceBaseIT.class);
private SparkSession sparkSession;
protected IcebergCatalogBackend catalogType = IcebergCatalogBackend.MEMORY;

public IcebergRESTServiceBaseIT() {
@BeforeAll
void initIcebergTestEnv() throws Exception {
registerIcebergCatalogConfig();
AbstractIT.startIntegrationTest();
initSparkEnv();
LOG.info("graviton and spark env started,{}", catalogType);
}

@AfterAll
void stopIcebergTestEnv() throws Exception {
stopSparkEnv();
AbstractIT.stopIntegrationTest();
LOG.info("graviton and spark env stopped,{}", catalogType);
}

// AbstractIT#startIntegrationTest() is static, so we couldn't inject catalog info
// if startIntegrationTest() is auto invoked by Junit. so here we override
// startIntegrationTest() to disable the auto invoke by junit.
@BeforeAll
public static void startIntegrationTest() {}

@AfterAll
public static void stopIntegrationTest() {}

boolean catalogTypeNotMemory() {
return !catalogType.equals(IcebergCatalogBackend.MEMORY);
}

private void registerIcebergCatalogConfig() {
Map<String, String> icebergConfigs;

switch (catalogType) {
case HIVE:
icebergConfigs = getIcebergHiveCatalogConfigs();
break;
case JDBC:
icebergConfigs = getIcebergJdbcCatalogConfigs();
break;
case MEMORY:
icebergConfigs = getIcebergMemoryCatalogConfigs();
break;
default:
throw new RuntimeException("Not support Iceberg catalog type:" + catalogType);
}

AbstractIT.registerCustomConfigs(icebergConfigs);
LOG.info("Iceberg REST service config registered," + StringUtils.join(icebergConfigs));
}

private static Map<String, String> getIcebergMemoryCatalogConfigs() {
Map<String, String> configMap = new HashMap<>();
configMap.put(
AuxiliaryServiceManager.GRAVITON_AUX_SERVICE_PREFIX
+ IcebergRESTService.SERVICE_NAME
+ "."
+ IcebergConfig.CATALOG_BACKEND.getKey(),
IcebergCatalogBackend.MEMORY.toString().toLowerCase());

configMap.put(
AuxiliaryServiceManager.GRAVITON_AUX_SERVICE_PREFIX
+ IcebergRESTService.SERVICE_NAME
+ "."
+ IcebergConfig.CATALOG_WAREHOUSE.getKey(),
"/tmp/");
return configMap;
}

private static Map<String, String> getIcebergJdbcCatalogConfigs() {
Map<String, String> configMap = new HashMap<>();
return configMap;
}

private static Map<String, String> getIcebergHiveCatalogConfigs() {
Map<String, String> customConfigs = new HashMap<>();
customConfigs.put(
AuxiliaryServiceManager.GRAVITON_AUX_SERVICE_PREFIX
+ IcebergRESTService.SERVICE_NAME
+ "."
+ IcebergConfig.CATALOG_BACKEND.getKey(),
IcebergCatalogBackend.HIVE.toString().toLowerCase());

customConfigs.put(
AuxiliaryServiceManager.GRAVITON_AUX_SERVICE_PREFIX
+ IcebergRESTService.SERVICE_NAME
+ "."
+ IcebergConfig.CATALOG_URI.getKey(),
"thrift://127.0.0.1:9083");

customConfigs.put(
AuxiliaryServiceManager.GRAVITON_AUX_SERVICE_PREFIX
+ IcebergRESTService.SERVICE_NAME
+ "."
+ IcebergConfig.CATALOG_WAREHOUSE.getKey(),
"file:///tmp/user/hive/warehouse-hive/");
return customConfigs;
}

private static IcebergConfig buildIcebergConfig(Config config) {
Expand All @@ -59,8 +164,13 @@ private void initSparkEnv() {
.config("spark.sql.catalog.rest.type", "rest")
.config("spark.sql.catalog.rest.uri", IcebergRESTUri)
.getOrCreate();
}

sparkSession.sparkContext().setLogLevel("WARN");
private void stopSparkEnv() {
if (sparkSession != null) {
sparkSession.close();
sparkSession = null;
}
}

protected List<Object[]> sql(String query, Object... args) {
Expand Down Expand Up @@ -139,6 +249,10 @@ protected Set<String> convertToStringSet(List<Object[]> objects, int index) {
return objects.stream().map(row -> String.valueOf(row[index])).collect(Collectors.toSet());
}

protected List<String> convertToStringList(List<Object[]> objects, int index) {
return objects.stream().map(row -> String.valueOf(row[index])).collect(Collectors.toList());
}

protected Map<String, String> convertToStringMap(List<Object[]> objects) {
return objects.stream()
.collect(
Expand Down
Loading

0 comments on commit fc49d30

Please sign in to comment.