Skip to content

Commit

Permalink
[#523] test (trino-connector): Add SQL interface tests for trino-conn…
Browse files Browse the repository at this point in the history
…ector. (#524)

### What changes were proposed in this pull request?

Add SQL interface end to end UT for trino-connector. It uses the
Graviton connector to load the memory catalog and execute basic SQL
tests.

### Why are the changes needed?

Fix: #523 Add SQL interface end to end UT for trino-connector
  • Loading branch information
diqiu50 authored Oct 19, 2023
1 parent eaa950c commit c956e43
Show file tree
Hide file tree
Showing 26 changed files with 797 additions and 236 deletions.
10 changes: 9 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ java {
}
}

dependencies {
testImplementation(libs.testng)
}

subprojects {
apply(plugin = "jacoco")

Expand All @@ -52,7 +56,11 @@ subprojects {
tasks.configureEach<Test> {
val skipTests = project.hasProperty("skipTests")
if (!skipTests) {
useJUnitPlatform()
if (project.name == "trino-connector") {
useTestNG()
} else {
useJUnitPlatform()
}
finalizedBy(tasks.getByName("jacocoTestReport"))
}
}
Expand Down
3 changes: 3 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ trino = '426'
spark = "3.4.1"
scala-collection-compat = "2.7.0"
sqlite-jdbc = "3.42.0.0"
testng = "7.7.1"


protobuf-plugin = "0.9.2"
Expand Down Expand Up @@ -91,10 +92,12 @@ iceberg-hive-metastore = { group = "org.apache.iceberg", name = "iceberg-hive-me
trino-spi= { group = "io.trino", name = "trino-spi", version.ref = "trino" }
trino-toolkit= { group = "io.trino", name = "trino-plugin-toolkit", version.ref = "trino" }
trino-testing= { group = "io.trino", name = "trino-testing", version.ref = "trino" }
trino-memory= { group = "io.trino", name = "trino-memory", version.ref = "trino" }
iceberg-spark-runtime = { group = "org.apache.iceberg", name = "iceberg-spark-runtime-3.4_2.13", version.ref = "iceberg" }
spark-sql = { group = "org.apache.spark", name = "spark-sql_2.13", version.ref = "spark" }
scala-collection-compat = { group = "org.scala-lang.modules", name = "scala-collection-compat_2.13", version.ref = "scala-collection-compat" }
sqlite-jdbc = { group = "org.xerial", name = "sqlite-jdbc", version.ref = "sqlite-jdbc" }
testng = { group = "org.testng", name = "testng", version.ref = "testng" }
spark-hive = { group = "org.apache.spark", name = "spark-hive_2.13", version.ref = "spark" }


Expand Down
31 changes: 14 additions & 17 deletions trino-connector/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,26 @@ repositories {
}

dependencies {
implementation(project(":api"))
implementation(project(":common")) {
exclude("org.apache.logging.log4j")
}
implementation(project(":clients:client-java")) {
exclude("org.apache.logging.log4j")
}
implementation(project(":clients:client-java-runtime", configuration="shadow"))
implementation(libs.jackson.databind)
implementation(libs.jackson.annotations)
implementation(libs.guava)
implementation(libs.httpclient5)
implementation(libs.commons.lang3)
implementation(libs.trino.spi)
implementation(libs.trino.toolkit)
implementation(libs.substrait.java.core)
implementation(libs.trino.spi) {
exclude("org.apache.logging.log4j")
}
implementation(libs.trino.toolkit) {
exclude("org.apache.logging.log4j")
}

testImplementation(libs.junit.jupiter.api)
testImplementation(libs.junit.jupiter.params)
testRuntimeOnly(libs.junit.jupiter.engine)
testImplementation(libs.trino.testing)
testImplementation(libs.mockito.core)
testImplementation(libs.mockserver.netty)
testImplementation(libs.mockserver.client.java)
testImplementation(libs.trino.testing) {
exclude("org.apache.logging.log4j")
}
testImplementation(libs.trino.memory) {
exclude("org.antlr")
exclude("org.apache.logging.log4j")
}
}

java {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ public Connector create(
catalogConnectorManager.getCatalogConnector(catalogName);
Preconditions.checkNotNull(catalogConnectorContext, "catalogConnector is not null");

// For testing
GravitinoPlugin.internalTestingConnector = catalogConnectorContext.getInternalConnector();

return catalogConnectorContext.getConnector();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTableProperties;
import io.trino.spi.connector.ConnectorTableVersion;
import io.trino.spi.connector.RetryMode;
import io.trino.spi.connector.SchemaTableName;
Expand Down Expand Up @@ -68,16 +67,6 @@ public Map<String, Object> getSchemaProperties(ConnectorSession session, String
return metadataAdapter.getSchemaProperties(schema);
}

@Override
public ConnectorTableProperties getTableProperties(
ConnectorSession session, ConnectorTableHandle tableHandle) {
GravitinoTableHandle gravitinoTableHandle = (GravitinoTableHandle) tableHandle;
GravitinoTable table =
catalogConnectorMetadata.getTable(
gravitinoTableHandle.getSchemaName(), gravitinoTableHandle.getTableName());
return metadataAdapter.getTableProperties(table);
}

@Override
public GravitinoTableHandle getTableHandle(
ConnectorSession session,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@

import com.google.common.collect.ImmutableList;
import io.trino.spi.Plugin;
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorFactory;

/** Trino plugin endpoint, using java spi mechanism */
public class GravitinoPlugin implements Plugin {

// For testing.
public static Connector internalTestingConnector;

@Override
public Iterable<ConnectorFactory> getConnectorFactories() {
return ImmutableList.of(new GravitinoConnectorFactory());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@ default List<PropertyMetadata<?>> getTableProperties() {
Map<String, Object> buildInternalConnectorConfig(GravitinoCatalog catalog);

/** @return SchemaProperties list that used to validate schema properties. */
List<PropertyMetadata<?>> getSchemaProperties();
default List<PropertyMetadata<?>> getSchemaProperties() {
return emptyList();
};

/** @return Return MetadataAdapter for special catalog connector. */
CatalogConnectorMetadataAdapter getMetadataAdapter();

/** @return ColumnProperties list that used to validate column properties. */
List<PropertyMetadata<?>> getColumnProperties();
default List<PropertyMetadata<?>> getColumnProperties() {
return emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.client.GravitinoMetaLake;
import com.datastrato.gravitino.trino.connector.catalog.hive.HiveConnectorAdapter;
import com.datastrato.gravitino.trino.connector.catalog.memory.MemoryConnectorAdapter;
import com.datastrato.gravitino.trino.connector.metadata.GravitinoCatalog;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.Connector;
Expand All @@ -28,6 +29,8 @@ public CatalogConnectorFactory(CatalogInjector catalogInjector) {
this.catalogInjector = catalogInjector;

catalogBuilders.put("hive", new CatalogConnectorContext.Builder(new HiveConnectorAdapter()));
catalogBuilders.put(
"memory", new CatalogConnectorContext.Builder(new MemoryConnectorAdapter()));
}

public CatalogConnectorContext loadCatalogConnector(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,112 @@
import com.datastrato.gravitino.trino.connector.metadata.GravitinoColumn;
import com.datastrato.gravitino.trino.connector.metadata.GravitinoSchema;
import com.datastrato.gravitino.trino.connector.metadata.GravitinoTable;
import com.datastrato.gravitino.trino.connector.util.DataTypeTransformer;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTableProperties;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.session.PropertyMetadata;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.lang3.NotImplementedException;

/**
* This interface is used to handle different parts of catalog metadata from different catalog
* connectors.
*/
public interface CatalogConnectorMetadataAdapter {
Map<String, Object> getSchemaProperties(GravitinoSchema schema);
public class CatalogConnectorMetadataAdapter {

protected final List<PropertyMetadata<?>> schemaProperties;
protected final List<PropertyMetadata<?>> tableProperties;
protected final List<PropertyMetadata<?>> columnProperties;

protected CatalogConnectorMetadataAdapter(
List<PropertyMetadata<?>> schemaProperties,
List<PropertyMetadata<?>> tableProperties,
List<PropertyMetadata<?>> columnProperties) {
this.schemaProperties = schemaProperties;
this.tableProperties = tableProperties;
this.columnProperties = columnProperties;
}

public Map<String, Object> getSchemaProperties(GravitinoSchema schema) {
return normalizeProperties(schema.getProperties(), schemaProperties);
}

/** Transform gravitino table metadata to trino ConnectorTableMetadata */
ConnectorTableMetadata getTableMetadata(GravitinoTable gravitinoTable);
public ConnectorTableMetadata getTableMetadata(GravitinoTable gravitinoTable) {
SchemaTableName schemaTableName =
new SchemaTableName(gravitinoTable.getSchemaName(), gravitinoTable.getName());
ArrayList<ColumnMetadata> columnMetadataList = new ArrayList<>();
for (GravitinoColumn column : gravitinoTable.getColumns()) {
columnMetadataList.add(getColumnMetadata(column));
}

Map<String, Object> properties =
normalizeProperties(gravitinoTable.getProperties(), tableProperties);
return new ConnectorTableMetadata(
schemaTableName, columnMetadataList, properties, Optional.of(gravitinoTable.getComment()));
}

/** Transform trino ConnectorTableMetadata to gravitino table metadata */
GravitinoTable createTable(ConnectorTableMetadata tableMetadata);
public GravitinoTable createTable(ConnectorTableMetadata tableMetadata) {
String tableName = tableMetadata.getTableSchema().getTable().getTableName();
String schemaName = tableMetadata.getTableSchema().getTable().getSchemaName();
String comment = tableMetadata.getComment().orElse("");
Map<String, String> properties = removeUnsetProperties(tableMetadata.getProperties());

List<GravitinoColumn> columns = new ArrayList<>();
for (int i = 0; i < tableMetadata.getColumns().size(); i++) {
ColumnMetadata column = tableMetadata.getColumns().get(i);
columns.add(
new GravitinoColumn(
column.getName(),
DataTypeTransformer.getGravitinoType(column.getType(), column.isNullable()),
i,
column.getComment()));
}
return new GravitinoTable(schemaName, tableName, columns, comment, properties);
}

/** Transform trino schema metadata to gravitino schema metadata */
GravitinoSchema createSchema(String schemaName, Map<String, Object> properties);
public GravitinoSchema createSchema(String schemaName, Map<String, Object> properties) {
return new GravitinoSchema(schemaName, removeUnsetProperties(properties), "");
}

/** Transform gravitino column metadata to trino ColumnMetadata */
ColumnMetadata getColumnMetadata(GravitinoColumn column);
public ColumnMetadata getColumnMetadata(GravitinoColumn column) {
return new ColumnMetadata(column.getName(), DataTypeTransformer.getTrinoType(column.getType()));
}

/** Transform gravitino table properties to trino ConnectorTableProperties */
ConnectorTableProperties getTableProperties(GravitinoTable table);
public ConnectorTableProperties getTableProperties(GravitinoTable table) {
throw new NotImplementedException();
}

/** Normalize gravitino attributes for trino */
protected Map<String, Object> normalizeProperties(
Map<String, String> properties, List<PropertyMetadata<?>> propertyTemplate) {
// TODO yuhui redo this function once gravitino table properties are supported..
// Trino only supports properties defined in the propertyTemplate.
Map<String, Object> validProperties = new HashMap<>();
for (PropertyMetadata<?> propertyMetadata : propertyTemplate) {
String name = propertyMetadata.getName();
if (properties.containsKey(name)) {
validProperties.put(name, properties.get(name));
}
}
return validProperties;
}

/** Remove trino unset attributes fro gravitino */
protected Map<String, String> removeUnsetProperties(Map<String, Object> properties) {
return properties.entrySet().stream()
.filter(e -> e.getValue() != null)
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public Map<String, Object> buildInternalConnectorConfig(GravitinoCatalog catalog
config.put("connectorName", "hive");

Map<String, Object> properties = new HashMap<>();
properties.put("hive.metastore.uri", catalog.getProperties("hive.metastore.uris", ""));
properties.put("hive.metastore.uri", catalog.getProperties("metastore.uris", ""));
config.put("properties", properties);
return config;
}
Expand Down
Loading

0 comments on commit c956e43

Please sign in to comment.