Skip to content

Commit

Permalink
[#420] feat(trino-connector): Implement list and show metadata in tri…
Browse files Browse the repository at this point in the history
…no. (#421)


### What changes were proposed in this pull request?

Support trino using graviton to manager catalogs and query data.

1. Implemented graviton hive catalog connector creation.
2. Implemented show graviton catalogs.
3. Implemented list/show schemas from graviton catalogs.
4. Implemented list/show tables from graviton schema.


### Why are the changes needed?

Fix: #420 Implement list/show catalogs schemas and tables on
graviton-connector

### Does this PR introduce _any_ user-facing change?


### How was this patch tested?
1. UT
  • Loading branch information
diqiu50 authored Oct 10, 2023
1 parent 78959a3 commit f78a102
Show file tree
Hide file tree
Showing 30 changed files with 1,504 additions and 105 deletions.
1 change: 1 addition & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ iceberg-api = { group = "org.apache.iceberg", name = "iceberg-api", version.ref
iceberg-hive-metastore = { group = "org.apache.iceberg", name = "iceberg-hive-metastore", version.ref = "iceberg" }
trino-spi= { group = "io.trino", name = "trino-spi", version.ref = "trino" }
trino-toolkit= { group = "io.trino", name = "trino-plugin-toolkit", version.ref = "trino" }
trino-testing= { group = "io.trino", name = "trino-testing", version.ref = "trino" }
iceberg-spark-runtime = { group = "org.apache.iceberg", name = "iceberg-spark-runtime-3.4_2.13", version.ref = "iceberg" }
spark-sql = { group = "org.apache.spark", name = "spark-sql_2.13", version.ref = "spark" }

Expand Down
1 change: 1 addition & 0 deletions trino-connector/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ dependencies {
testImplementation(libs.junit.jupiter.api)
testImplementation(libs.junit.jupiter.params)
testRuntimeOnly(libs.junit.jupiter.engine)
testImplementation(libs.trino.testing)
testImplementation(libs.mockito.core)
testImplementation(libs.mockserver.netty)
testImplementation(libs.mockserver.client.java)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.graviton.trino.connector;

import static com.google.common.base.MoreObjects.toStringHelper;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import io.trino.spi.connector.ColumnHandle;

/**
* The GravitonColumnHandle is used to transform column information between Trino and Graviton, as
* well as to wrap the inner connector column handle for data access.
*/
public final class GravitonColumnHandle implements ColumnHandle {
private final String columnName;
private final ColumnHandle internalColumnHandler;

@JsonCreator
public GravitonColumnHandle(
@JsonProperty("columnName") String columnName,
@JsonProperty("internalColumnHandler") ColumnHandle columnHandle) {
Preconditions.checkArgument(columnName != null, "columnName is not null");
Preconditions.checkArgument(columnHandle != null, "columnHandle is not null");
this.columnName = columnName;
this.internalColumnHandler = columnHandle;
}

@JsonProperty
public String getColumnName() {
return columnName;
}

@JsonProperty
public ColumnHandle getInternalColumnHandler() {
return internalColumnHandler;
}

@Override
public int hashCode() {
return columnName.hashCode();
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if ((obj == null) || (getClass() != obj.getClass())) {
return false;
}

GravitonColumnHandle other = (GravitonColumnHandle) obj;
return columnName.equals(other.columnName);
}

@Override
public String toString() {
return toStringHelper(this).add("columnName", columnName).toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
package com.datastrato.graviton.trino.connector;

import com.datastrato.graviton.NameIdentifier;
import com.datastrato.graviton.client.GravitonMetaLake;
import com.datastrato.graviton.trino.connector.catalog.CatalogConnectorContext;
import com.datastrato.graviton.trino.connector.catalog.CatalogConnectorMetadata;
import com.google.common.base.Preconditions;
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorMetadata;
import io.trino.spi.connector.ConnectorPageSinkProvider;
Expand Down Expand Up @@ -37,13 +40,34 @@ public GravitonConnector(
@Override
public ConnectorTransactionHandle beginTransaction(
IsolationLevel isolationLevel, boolean readOnly, boolean autoCommit) {
throw new NotImplementedException();
Connector internalConnector = catalogConnectorContext.getInternalConnector();

ConnectorTransactionHandle internalTransactionHandler =
internalConnector.beginTransaction(isolationLevel, readOnly, autoCommit);
Preconditions.checkNotNull(internalConnector);

return new GravitonTransactionHandle(internalTransactionHandler);
}

@Override
public ConnectorMetadata getMetadata(
ConnectorSession session, ConnectorTransactionHandle transactionHandle) {
throw new NotImplementedException();
GravitonTransactionHandle gravitonTransactionHandle =
(GravitonTransactionHandle) transactionHandle;

Connector internalConnector = catalogConnectorContext.getInternalConnector();
ConnectorMetadata internalMetadata =
internalConnector.getMetadata(
session, gravitonTransactionHandle.getInternalTransactionHandle());
Preconditions.checkNotNull(internalMetadata);

GravitonMetaLake metalake = catalogConnectorContext.getMetalake();

CatalogConnectorMetadata catalogConnectorMetadata =
new CatalogConnectorMetadata(metalake, catalogIdentifier);

return new GravitonMetadata(
catalogConnectorMetadata, catalogConnectorContext.getMetadataAdapter(), internalMetadata);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@
import com.datastrato.graviton.trino.connector.catalog.CatalogConnectorManager;
import com.datastrato.graviton.trino.connector.catalog.CatalogInjector;
import com.google.common.base.Preconditions;
import io.airlift.log.Logger;
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorContext;
import io.trino.spi.connector.ConnectorFactory;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GravitonConnectorFactory implements ConnectorFactory {

private static final Logger LOG = Logger.get(GravitonConnectorFactory.class);
private static final Logger LOG = LoggerFactory.getLogger(GravitonConnectorFactory.class);
private static final String DEFAULT_CONNECTOR_NAME = "graviton";

private CatalogConnectorManager catalogConnectorManager;
Expand Down Expand Up @@ -45,7 +46,7 @@ public Connector create(
if (catalogConnectorManager == null) {
try {
CatalogInjector catalogInjector = new CatalogInjector();
catalogInjector.bindCatalogManager(context);
catalogInjector.init(context);
CatalogConnectorFactory catalogConnectorFactory =
new CatalogConnectorFactory(catalogInjector);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@ public enum GravitonErrorCode implements ErrorCodeSupplier {
GRAVITON_METALAKE_NOT_EXISTS(1, EXTERNAL),
GRAVITON_MISSING_CONFIG(2, EXTERNAL),
GRAVITON_CREATE_INNER_CONNECTOR_FAILED(3, EXTERNAL),
GRAVITON_UNSUPPORTED_CATALOG_PROVIDER(4, EXTERNAL),
GRAVITON_CREATE_INTERNAL_CONNECTOR_ERROR(5, EXTERNAL),
GRAVITON_SCHEMA_NOT_EXISTS(6, EXTERNAL),
GRAVITON_CATALOG_NOT_EXISTS(7, EXTERNAL),
GRAVITON_TABLE_NOT_EXISTS(8, EXTERNAL),
GRAVITON_UNSUPPORTED_TRINO_DATATYPE(9, EXTERNAL),
GRAVITON_UNSUPPORTED_GRAVITON_DATATYPE(10, EXTERNAL),
GRAVITON_UNSUPPORTED_OPERATION(11, EXTERNAL),
GRAVITON_COLUMN_NOT_EXISTS(12, EXTERNAL),
;

private final ErrorCode errorCode;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.graviton.trino.connector;

import com.datastrato.graviton.trino.connector.catalog.CatalogConnectorMetadata;
import com.datastrato.graviton.trino.connector.catalog.CatalogConnectorMetadataAdapter;
import com.datastrato.graviton.trino.connector.metadata.GravitonColumn;
import com.datastrato.graviton.trino.connector.metadata.GravitonSchema;
import com.datastrato.graviton.trino.connector.metadata.GravitonTable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorMetadata;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTableProperties;
import io.trino.spi.connector.ConnectorTableVersion;
import io.trino.spi.connector.SchemaTableName;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

/**
* The GravitonMetadata class provides operations for Graviton metadata on the Graviton server. It
* also transforms the different metadata formats between Trino and Graviton. Additionally, it wraps
* the internal connector metadata for accessing data.
*/
public class GravitonMetadata implements ConnectorMetadata {
// Handling metadata operations on graviton server
private final CatalogConnectorMetadata catalogConnectorMetadata;

// Transform different metadata format
private final CatalogConnectorMetadataAdapter metadataAdapter;

private final ConnectorMetadata internalMetadata;

public GravitonMetadata(
CatalogConnectorMetadata catalogConnectorMetadata,
CatalogConnectorMetadataAdapter metadataAdapter,
ConnectorMetadata internalMetadata) {
this.catalogConnectorMetadata = catalogConnectorMetadata;
this.metadataAdapter = metadataAdapter;
this.internalMetadata = internalMetadata;
}

@Override
public List<String> listSchemaNames(ConnectorSession session) {
return catalogConnectorMetadata.listSchemaNames();
}

@Override
public Map<String, Object> getSchemaProperties(ConnectorSession session, String schemaName) {
GravitonSchema schema = catalogConnectorMetadata.getSchema(schemaName);
return metadataAdapter.getSchemaProperties(schema);
}

@Override
public ConnectorTableProperties getTableProperties(
ConnectorSession session, ConnectorTableHandle tableHandle) {
GravitonTableHandle gravitonTableHandle = (GravitonTableHandle) tableHandle;
GravitonTable table =
catalogConnectorMetadata.getTable(
gravitonTableHandle.getSchemaName(), gravitonTableHandle.getTableName());
return metadataAdapter.getTableProperties(table);
}

@Override
public GravitonTableHandle getTableHandle(
ConnectorSession session,
SchemaTableName tableName,
Optional<ConnectorTableVersion> startVersion,
Optional<ConnectorTableVersion> endVersion) {
boolean tableExists =
catalogConnectorMetadata.tableExists(tableName.getSchemaName(), tableName.getTableName());
if (!tableExists) return null;

ConnectorTableHandle internalTableHandle =
internalMetadata.getTableHandle(session, tableName, startVersion, endVersion);
return new GravitonTableHandle(
tableName.getSchemaName(), tableName.getTableName(), internalTableHandle);
}

@Override
public ConnectorTableMetadata getTableMetadata(
ConnectorSession session, ConnectorTableHandle tableHandle) {
GravitonTableHandle gravitonTableHandle = (GravitonTableHandle) tableHandle;
GravitonTable table =
catalogConnectorMetadata.getTable(
gravitonTableHandle.getSchemaName(), gravitonTableHandle.getTableName());
return metadataAdapter.getTableMetadata(table);
}

@Override
public List<SchemaTableName> listTables(
ConnectorSession session, Optional<String> optionalSchemaName) {
Set<String> schemaNames =
optionalSchemaName
.map(ImmutableSet::of)
.orElseGet(() -> ImmutableSet.copyOf(listSchemaNames(session)));

ImmutableList.Builder<SchemaTableName> builder = ImmutableList.builder();
for (String schemaName : schemaNames) {
List<String> tableNames = catalogConnectorMetadata.listTables(schemaName);
for (String tableName : tableNames) {
builder.add(new SchemaTableName(schemaName, tableName));
}
}
return builder.build();
}

@Override
public Map<String, ColumnHandle> getColumnHandles(
ConnectorSession session, ConnectorTableHandle tableHandle) {
GravitonTableHandle gravitonTableHandle = (GravitonTableHandle) tableHandle;

GravitonTable table =
catalogConnectorMetadata.getTable(
gravitonTableHandle.getSchemaName(), gravitonTableHandle.getTableName());

ImmutableMap.Builder<String, ColumnHandle> columnHandles = ImmutableMap.builder();

Map<String, ColumnHandle> internalColumnHandles =
internalMetadata.getColumnHandles(session, gravitonTableHandle.getInternalTableHandle());
for (GravitonColumn column : table.getColumns()) {
GravitonColumnHandle columnHandle =
new GravitonColumnHandle(column.getName(), internalColumnHandles.get(column.getName()));
columnHandles.put(column.getName(), columnHandle);
}
return columnHandles.buildOrThrow();
}

@Override
public ColumnMetadata getColumnMetadata(
ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle) {
GravitonTableHandle gravitonTableHandle = (GravitonTableHandle) tableHandle;
GravitonTable table =
catalogConnectorMetadata.getTable(
gravitonTableHandle.getSchemaName(), gravitonTableHandle.getTableName());

GravitonColumnHandle gravitonColumnHandle = (GravitonColumnHandle) columnHandle;
String columName = gravitonColumnHandle.getColumnName();

GravitonColumn column = table.getColumn(columName);
return metadataAdapter.getColumnMetadata(column);
}
}
Loading

0 comments on commit f78a102

Please sign in to comment.