diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index ff4a96e5d7..f4c7fd8749 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -88,6 +88,7 @@ iceberg-api = { group = "org.apache.iceberg", name = "iceberg-api", version.ref iceberg-hive-metastore = { group = "org.apache.iceberg", name = "iceberg-hive-metastore", version.ref = "iceberg" } trino-spi= { group = "io.trino", name = "trino-spi", version.ref = "trino" } trino-toolkit= { group = "io.trino", name = "trino-plugin-toolkit", version.ref = "trino" } +trino-testing= { group = "io.trino", name = "trino-testing", version.ref = "trino" } iceberg-spark-runtime = { group = "org.apache.iceberg", name = "iceberg-spark-runtime-3.4_2.13", version.ref = "iceberg" } spark-sql = { group = "org.apache.spark", name = "spark-sql_2.13", version.ref = "spark" } diff --git a/trino-connector/build.gradle.kts b/trino-connector/build.gradle.kts index b70c47bd7e..acb7929b69 100644 --- a/trino-connector/build.gradle.kts +++ b/trino-connector/build.gradle.kts @@ -33,6 +33,7 @@ dependencies { testImplementation(libs.junit.jupiter.api) testImplementation(libs.junit.jupiter.params) testRuntimeOnly(libs.junit.jupiter.engine) + testImplementation(libs.trino.testing) testImplementation(libs.mockito.core) testImplementation(libs.mockserver.netty) testImplementation(libs.mockserver.client.java) diff --git a/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/GravitonColumnHandle.java b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/GravitonColumnHandle.java new file mode 100644 index 0000000000..817879ff07 --- /dev/null +++ b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/GravitonColumnHandle.java @@ -0,0 +1,64 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.graviton.trino.connector; + +import static com.google.common.base.MoreObjects.toStringHelper; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import io.trino.spi.connector.ColumnHandle; + +/** + * The GravitonColumnHandle is used to transform column information between Trino and Graviton, as + * well as to wrap the inner connector column handle for data access. + */ +public final class GravitonColumnHandle implements ColumnHandle { + private final String columnName; + private final ColumnHandle internalColumnHandler; + + @JsonCreator + public GravitonColumnHandle( + @JsonProperty("columnName") String columnName, + @JsonProperty("internalColumnHandler") ColumnHandle columnHandle) { + Preconditions.checkArgument(columnName != null, "columnName is not null"); + Preconditions.checkArgument(columnHandle != null, "columnHandle is not null"); + this.columnName = columnName; + this.internalColumnHandler = columnHandle; + } + + @JsonProperty + public String getColumnName() { + return columnName; + } + + @JsonProperty + public ColumnHandle getInternalColumnHandler() { + return internalColumnHandler; + } + + @Override + public int hashCode() { + return columnName.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if ((obj == null) || (getClass() != obj.getClass())) { + return false; + } + + GravitonColumnHandle other = (GravitonColumnHandle) obj; + return columnName.equals(other.columnName); + } + + @Override + public String toString() { + return toStringHelper(this).add("columnName", columnName).toString(); + } +} diff --git a/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/GravitonConnector.java b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/GravitonConnector.java index 4f2b3080e9..81b2f75fa8 100644 --- a/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/GravitonConnector.java +++ b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/GravitonConnector.java @@ -5,7 +5,10 @@ package com.datastrato.graviton.trino.connector; import com.datastrato.graviton.NameIdentifier; +import com.datastrato.graviton.client.GravitonMetaLake; import com.datastrato.graviton.trino.connector.catalog.CatalogConnectorContext; +import com.datastrato.graviton.trino.connector.catalog.CatalogConnectorMetadata; +import com.google.common.base.Preconditions; import io.trino.spi.connector.Connector; import io.trino.spi.connector.ConnectorMetadata; import io.trino.spi.connector.ConnectorPageSinkProvider; @@ -37,13 +40,34 @@ public GravitonConnector( @Override public ConnectorTransactionHandle beginTransaction( IsolationLevel isolationLevel, boolean readOnly, boolean autoCommit) { - throw new NotImplementedException(); + Connector internalConnector = catalogConnectorContext.getInternalConnector(); + + ConnectorTransactionHandle internalTransactionHandler = + internalConnector.beginTransaction(isolationLevel, readOnly, autoCommit); + Preconditions.checkNotNull(internalConnector); + + return new GravitonTransactionHandle(internalTransactionHandler); } @Override public ConnectorMetadata getMetadata( ConnectorSession session, ConnectorTransactionHandle transactionHandle) { - throw new NotImplementedException(); + GravitonTransactionHandle gravitonTransactionHandle = + (GravitonTransactionHandle) transactionHandle; + + Connector internalConnector = catalogConnectorContext.getInternalConnector(); + ConnectorMetadata internalMetadata = + internalConnector.getMetadata( + session, gravitonTransactionHandle.getInternalTransactionHandle()); + Preconditions.checkNotNull(internalMetadata); + + GravitonMetaLake metalake = catalogConnectorContext.getMetalake(); + + CatalogConnectorMetadata catalogConnectorMetadata = + new CatalogConnectorMetadata(metalake, catalogIdentifier); + + return new GravitonMetadata( + catalogConnectorMetadata, catalogConnectorContext.getMetadataAdapter(), internalMetadata); } @Override diff --git a/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/GravitonConnectorFactory.java b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/GravitonConnectorFactory.java index 69dae946f3..20646e62c7 100644 --- a/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/GravitonConnectorFactory.java +++ b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/GravitonConnectorFactory.java @@ -9,15 +9,16 @@ import com.datastrato.graviton.trino.connector.catalog.CatalogConnectorManager; import com.datastrato.graviton.trino.connector.catalog.CatalogInjector; import com.google.common.base.Preconditions; -import io.airlift.log.Logger; import io.trino.spi.connector.Connector; import io.trino.spi.connector.ConnectorContext; import io.trino.spi.connector.ConnectorFactory; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class GravitonConnectorFactory implements ConnectorFactory { - private static final Logger LOG = Logger.get(GravitonConnectorFactory.class); + private static final Logger LOG = LoggerFactory.getLogger(GravitonConnectorFactory.class); private static final String DEFAULT_CONNECTOR_NAME = "graviton"; private CatalogConnectorManager catalogConnectorManager; @@ -45,7 +46,7 @@ public Connector create( if (catalogConnectorManager == null) { try { CatalogInjector catalogInjector = new CatalogInjector(); - catalogInjector.bindCatalogManager(context); + catalogInjector.init(context); CatalogConnectorFactory catalogConnectorFactory = new CatalogConnectorFactory(catalogInjector); diff --git a/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/GravitonErrorCode.java b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/GravitonErrorCode.java index 9e1e22ad01..6c6e06e877 100644 --- a/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/GravitonErrorCode.java +++ b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/GravitonErrorCode.java @@ -15,6 +15,15 @@ public enum GravitonErrorCode implements ErrorCodeSupplier { GRAVITON_METALAKE_NOT_EXISTS(1, EXTERNAL), GRAVITON_MISSING_CONFIG(2, EXTERNAL), GRAVITON_CREATE_INNER_CONNECTOR_FAILED(3, EXTERNAL), + GRAVITON_UNSUPPORTED_CATALOG_PROVIDER(4, EXTERNAL), + GRAVITON_CREATE_INTERNAL_CONNECTOR_ERROR(5, EXTERNAL), + GRAVITON_SCHEMA_NOT_EXISTS(6, EXTERNAL), + GRAVITON_CATALOG_NOT_EXISTS(7, EXTERNAL), + GRAVITON_TABLE_NOT_EXISTS(8, EXTERNAL), + GRAVITON_UNSUPPORTED_TRINO_DATATYPE(9, EXTERNAL), + GRAVITON_UNSUPPORTED_GRAVITON_DATATYPE(10, EXTERNAL), + GRAVITON_UNSUPPORTED_OPERATION(11, EXTERNAL), + GRAVITON_COLUMN_NOT_EXISTS(12, EXTERNAL), ; private final ErrorCode errorCode; diff --git a/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/GravitonMetadata.java b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/GravitonMetadata.java new file mode 100644 index 0000000000..8eb30607ef --- /dev/null +++ b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/GravitonMetadata.java @@ -0,0 +1,152 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.graviton.trino.connector; + +import com.datastrato.graviton.trino.connector.catalog.CatalogConnectorMetadata; +import com.datastrato.graviton.trino.connector.catalog.CatalogConnectorMetadataAdapter; +import com.datastrato.graviton.trino.connector.metadata.GravitonColumn; +import com.datastrato.graviton.trino.connector.metadata.GravitonSchema; +import com.datastrato.graviton.trino.connector.metadata.GravitonTable; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import io.trino.spi.connector.ColumnHandle; +import io.trino.spi.connector.ColumnMetadata; +import io.trino.spi.connector.ConnectorMetadata; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableHandle; +import io.trino.spi.connector.ConnectorTableMetadata; +import io.trino.spi.connector.ConnectorTableProperties; +import io.trino.spi.connector.ConnectorTableVersion; +import io.trino.spi.connector.SchemaTableName; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * The GravitonMetadata class provides operations for Graviton metadata on the Graviton server. It + * also transforms the different metadata formats between Trino and Graviton. Additionally, it wraps + * the internal connector metadata for accessing data. + */ +public class GravitonMetadata implements ConnectorMetadata { + // Handling metadata operations on graviton server + private final CatalogConnectorMetadata catalogConnectorMetadata; + + // Transform different metadata format + private final CatalogConnectorMetadataAdapter metadataAdapter; + + private final ConnectorMetadata internalMetadata; + + public GravitonMetadata( + CatalogConnectorMetadata catalogConnectorMetadata, + CatalogConnectorMetadataAdapter metadataAdapter, + ConnectorMetadata internalMetadata) { + this.catalogConnectorMetadata = catalogConnectorMetadata; + this.metadataAdapter = metadataAdapter; + this.internalMetadata = internalMetadata; + } + + @Override + public List listSchemaNames(ConnectorSession session) { + return catalogConnectorMetadata.listSchemaNames(); + } + + @Override + public Map getSchemaProperties(ConnectorSession session, String schemaName) { + GravitonSchema schema = catalogConnectorMetadata.getSchema(schemaName); + return metadataAdapter.getSchemaProperties(schema); + } + + @Override + public ConnectorTableProperties getTableProperties( + ConnectorSession session, ConnectorTableHandle tableHandle) { + GravitonTableHandle gravitonTableHandle = (GravitonTableHandle) tableHandle; + GravitonTable table = + catalogConnectorMetadata.getTable( + gravitonTableHandle.getSchemaName(), gravitonTableHandle.getTableName()); + return metadataAdapter.getTableProperties(table); + } + + @Override + public GravitonTableHandle getTableHandle( + ConnectorSession session, + SchemaTableName tableName, + Optional startVersion, + Optional endVersion) { + boolean tableExists = + catalogConnectorMetadata.tableExists(tableName.getSchemaName(), tableName.getTableName()); + if (!tableExists) return null; + + ConnectorTableHandle internalTableHandle = + internalMetadata.getTableHandle(session, tableName, startVersion, endVersion); + return new GravitonTableHandle( + tableName.getSchemaName(), tableName.getTableName(), internalTableHandle); + } + + @Override + public ConnectorTableMetadata getTableMetadata( + ConnectorSession session, ConnectorTableHandle tableHandle) { + GravitonTableHandle gravitonTableHandle = (GravitonTableHandle) tableHandle; + GravitonTable table = + catalogConnectorMetadata.getTable( + gravitonTableHandle.getSchemaName(), gravitonTableHandle.getTableName()); + return metadataAdapter.getTableMetadata(table); + } + + @Override + public List listTables( + ConnectorSession session, Optional optionalSchemaName) { + Set schemaNames = + optionalSchemaName + .map(ImmutableSet::of) + .orElseGet(() -> ImmutableSet.copyOf(listSchemaNames(session))); + + ImmutableList.Builder builder = ImmutableList.builder(); + for (String schemaName : schemaNames) { + List tableNames = catalogConnectorMetadata.listTables(schemaName); + for (String tableName : tableNames) { + builder.add(new SchemaTableName(schemaName, tableName)); + } + } + return builder.build(); + } + + @Override + public Map getColumnHandles( + ConnectorSession session, ConnectorTableHandle tableHandle) { + GravitonTableHandle gravitonTableHandle = (GravitonTableHandle) tableHandle; + + GravitonTable table = + catalogConnectorMetadata.getTable( + gravitonTableHandle.getSchemaName(), gravitonTableHandle.getTableName()); + + ImmutableMap.Builder columnHandles = ImmutableMap.builder(); + + Map internalColumnHandles = + internalMetadata.getColumnHandles(session, gravitonTableHandle.getInternalTableHandle()); + for (GravitonColumn column : table.getColumns()) { + GravitonColumnHandle columnHandle = + new GravitonColumnHandle(column.getName(), internalColumnHandles.get(column.getName())); + columnHandles.put(column.getName(), columnHandle); + } + return columnHandles.buildOrThrow(); + } + + @Override + public ColumnMetadata getColumnMetadata( + ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle) { + GravitonTableHandle gravitonTableHandle = (GravitonTableHandle) tableHandle; + GravitonTable table = + catalogConnectorMetadata.getTable( + gravitonTableHandle.getSchemaName(), gravitonTableHandle.getTableName()); + + GravitonColumnHandle gravitonColumnHandle = (GravitonColumnHandle) columnHandle; + String columName = gravitonColumnHandle.getColumnName(); + + GravitonColumn column = table.getColumn(columName); + return metadataAdapter.getColumnMetadata(column); + } +} diff --git a/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/GravitonTableHandle.java b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/GravitonTableHandle.java new file mode 100644 index 0000000000..3578ee6094 --- /dev/null +++ b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/GravitonTableHandle.java @@ -0,0 +1,81 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.graviton.trino.connector; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import io.trino.spi.connector.ConnectorTableHandle; +import io.trino.spi.connector.SchemaTableName; +import java.util.Objects; + +/** + * The GravitonTableHandle is used to transform table information between Trino and Graviton, as + * well as to wrap the inner connector table handle for data access. + */ +public final class GravitonTableHandle implements ConnectorTableHandle { + + private final String schemaName; + private final String tableName; + + private final ConnectorTableHandle internalTableHandle; + + @JsonCreator + public GravitonTableHandle( + @JsonProperty("schemaName") String schemaName, + @JsonProperty("tableName") String tableName, + @JsonProperty("internalTableHandle") ConnectorTableHandle internalTableHandle) { + Preconditions.checkArgument(schemaName != null, "schemaName is not null"); + Preconditions.checkArgument(tableName != null, "tableName is not null"); + Preconditions.checkArgument(internalTableHandle != null, "internalTableHandle is not null"); + + this.schemaName = schemaName; + this.tableName = tableName; + this.internalTableHandle = internalTableHandle; + } + + @JsonProperty + public String getSchemaName() { + return schemaName; + } + + @JsonProperty + public String getTableName() { + return tableName; + } + + @JsonProperty + public ConnectorTableHandle getInternalTableHandle() { + return internalTableHandle; + } + + public SchemaTableName toSchemaTableName() { + return new SchemaTableName(schemaName, tableName); + } + + @Override + public int hashCode() { + return Objects.hash(schemaName, tableName); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if ((obj == null) || (getClass() != obj.getClass())) { + return false; + } + + GravitonTableHandle other = (GravitonTableHandle) obj; + return Objects.equals(this.schemaName, other.schemaName) + && Objects.equals(this.tableName, other.tableName); + } + + @Override + public String toString() { + return schemaName + ":" + tableName; + } +} diff --git a/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/GravitonTransactionHandle.java b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/GravitonTransactionHandle.java new file mode 100644 index 0000000000..47f0854819 --- /dev/null +++ b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/GravitonTransactionHandle.java @@ -0,0 +1,29 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.graviton.trino.connector; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.trino.spi.connector.ConnectorTransactionHandle; + +/** + * The GravitonTransactionHandle is used to make Graviton metadata operations transactional and wrap + * the inner connector transaction for data access. + */ +public class GravitonTransactionHandle implements ConnectorTransactionHandle { + ConnectorTransactionHandle internalTransactionHandle; + + @JsonCreator + public GravitonTransactionHandle( + @JsonProperty("internalTransactionHandle") + ConnectorTransactionHandle internalTransactionHandler) { + this.internalTransactionHandle = internalTransactionHandler; + } + + @JsonProperty + public ConnectorTransactionHandle getInternalTransactionHandle() { + return internalTransactionHandle; + } +} diff --git a/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/catalog/CatalogConnectorAdapter.java b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/catalog/CatalogConnectorAdapter.java index 2a355ba415..5082865510 100644 --- a/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/catalog/CatalogConnectorAdapter.java +++ b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/catalog/CatalogConnectorAdapter.java @@ -6,8 +6,10 @@ import static java.util.Collections.emptyList; +import com.datastrato.graviton.trino.connector.metadata.GravitonCatalog; import io.trino.spi.session.PropertyMetadata; import java.util.List; +import java.util.Map; /** * This interface is used to handle different parts of connectors from different catalog connectors. @@ -18,4 +20,10 @@ public interface CatalogConnectorAdapter { default List> getTableProperties() { return emptyList(); } + + /** @return Return internal connector config with trino. */ + Map buildInternalConnectorConfig(GravitonCatalog catalog); + + /** @return Return MetadataAdapter for special catalog connector. */ + CatalogConnectorMetadataAdapter getMetadataAdapter(); } diff --git a/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/catalog/CatalogConnectorContext.java b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/catalog/CatalogConnectorContext.java index db0a1cb490..57db2e3cdc 100644 --- a/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/catalog/CatalogConnectorContext.java +++ b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/catalog/CatalogConnectorContext.java @@ -7,9 +7,12 @@ import com.datastrato.graviton.NameIdentifier; import com.datastrato.graviton.client.GravitonMetaLake; import com.datastrato.graviton.trino.connector.GravitonConnector; +import com.datastrato.graviton.trino.connector.metadata.GravitonCatalog; +import com.google.common.base.Preconditions; import io.trino.spi.connector.Connector; import io.trino.spi.session.PropertyMetadata; import java.util.List; +import java.util.Map; /** * The CatalogConnector serves as a communication bridge between the Graviton connector and its @@ -21,10 +24,10 @@ public class CatalogConnectorContext { private final NameIdentifier catalogName; private final GravitonMetaLake metalake; - // connector communicates with trino + // Connector communicates with trino private final GravitonConnector connector; - // internal connector communicates with data storage + // Internal connector communicates with data storage private final Connector internalConnector; private final CatalogConnectorAdapter adapter; @@ -61,4 +64,50 @@ public List> getTableProperties() { public void close() { this.internalConnector.shutdown(); } + + public CatalogConnectorMetadataAdapter getMetadataAdapter() { + return adapter.getMetadataAdapter(); + } + + static class Builder { + private final CatalogConnectorAdapter connectorAdapter; + private NameIdentifier catalogName; + private GravitonMetaLake metalake; + private Connector internalConnector; + + Builder(CatalogConnectorAdapter connectorAdapter) { + this.connectorAdapter = connectorAdapter; + } + + public Builder clone() { + return new Builder(connectorAdapter); + } + + public Map buildConfig(GravitonCatalog catalog) { + return connectorAdapter.buildInternalConnectorConfig(catalog); + } + + Builder withMetalake(GravitonMetaLake metalake) { + this.metalake = metalake; + return this; + } + + Builder withCatalogName(NameIdentifier catalogName) { + this.catalogName = catalogName; + return this; + } + + Builder withInternalConnector(Connector internalConnector) { + this.internalConnector = internalConnector; + return this; + } + + CatalogConnectorContext build() { + Preconditions.checkArgument(catalogName != null, "catalogName is not null"); + Preconditions.checkArgument(metalake != null, "metalake is not null"); + Preconditions.checkArgument(internalConnector != null, "internalConnector is not null"); + return new CatalogConnectorContext( + catalogName, metalake, internalConnector, connectorAdapter); + } + } } diff --git a/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/catalog/CatalogConnectorFactory.java b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/catalog/CatalogConnectorFactory.java index 9389c71a60..0bcc994e13 100644 --- a/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/catalog/CatalogConnectorFactory.java +++ b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/catalog/CatalogConnectorFactory.java @@ -4,21 +4,60 @@ */ package com.datastrato.graviton.trino.connector.catalog; +import static com.datastrato.graviton.trino.connector.GravitonErrorCode.GRAVITON_CREATE_INTERNAL_CONNECTOR_ERROR; +import static com.datastrato.graviton.trino.connector.GravitonErrorCode.GRAVITON_UNSUPPORTED_CATALOG_PROVIDER; + import com.datastrato.graviton.NameIdentifier; import com.datastrato.graviton.client.GravitonMetaLake; +import com.datastrato.graviton.trino.connector.catalog.hive.HiveConnectorAdapter; import com.datastrato.graviton.trino.connector.metadata.GravitonCatalog; -import org.apache.commons.lang3.NotImplementedException; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.Connector; +import java.util.HashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +/** This class use to create CatalogConnectorContext instance by given catalog. */ public class CatalogConnectorFactory { + private static final Logger LOG = LoggerFactory.getLogger(CatalogConnectorFactory.class); private final CatalogInjector catalogInjector; + private final HashMap catalogBuilders = new HashMap<>(); public CatalogConnectorFactory(CatalogInjector catalogInjector) { this.catalogInjector = catalogInjector; + + catalogBuilders.put("hive", new CatalogConnectorContext.Builder(new HiveConnectorAdapter())); } public CatalogConnectorContext loadCatalogConnector( NameIdentifier nameIdentifier, GravitonMetaLake metalake, GravitonCatalog catalog) { - throw new NotImplementedException(); + String catalogProvider = catalog.getProvider(); + CatalogConnectorContext.Builder builder = catalogBuilders.get(catalogProvider); + if (builder == null) { + String message = String.format("Unsupported catalog provider %s.", catalogProvider); + LOG.error(message); + throw new TrinoException(GRAVITON_UNSUPPORTED_CATALOG_PROVIDER, message); + } + + // Avoid using the same builder object to prevent catalog creation errors. + builder = builder.clone(); + + try { + Connector internalConnector = + catalogInjector.createConnector(nameIdentifier.toString(), builder.buildConfig(catalog)); + + return builder + .withMetalake(metalake) + .withCatalogName(nameIdentifier) + .withInternalConnector(internalConnector) + .build(); + + } catch (Exception e) { + String message = + String.format("Failed to create internal catalog connector. The catalog is: %s", catalog); + LOG.error(message, e); + throw new TrinoException(GRAVITON_CREATE_INTERNAL_CONNECTOR_ERROR, message, e); + } } } diff --git a/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/catalog/CatalogConnectorManager.java b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/catalog/CatalogConnectorManager.java index ac5b38374f..69800417d6 100644 --- a/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/catalog/CatalogConnectorManager.java +++ b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/catalog/CatalogConnectorManager.java @@ -17,7 +17,6 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import io.airlift.log.Logger; import io.trino.spi.TrinoException; import java.util.Arrays; import java.util.concurrent.ConcurrentHashMap; @@ -25,6 +24,8 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.NotImplementedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class has the following main functions: @@ -37,7 +38,7 @@ * */ public class CatalogConnectorManager { - private static final Logger LOG = Logger.get(CatalogConnectorManager.class); + private static final Logger LOG = LoggerFactory.getLogger(CatalogConnectorManager.class); private static final int CATALOG_LOAD_FREQUENCY_SECOND = 30; private static final int NUMBER_EXECUTOR_THREAD = 1; @@ -68,7 +69,7 @@ private static ScheduledThreadPoolExecutor createScheduledThreadPoolExecutor() { .setNameFormat("graviton-connector-schedule-%d") .setUncaughtExceptionHandler( (thread, throwable) -> - LOG.warn("%s uncaught exception:", thread.getName(), throwable)) + LOG.warn("{} uncaught exception:", thread.getName(), throwable)) .build()); } @@ -78,14 +79,13 @@ public void config(GravitonConfig config) { public void start() { gravitonClient = GravitonClient.builder(config.getURI()).build(); - String metalake = config.getMetalake(); if (Strings.isNullOrEmpty(metalake)) { throw new TrinoException(GRAVITON_METALAKE_NOT_EXISTS, "No graviton metalake selected"); } this.usedMetalake = metalake; - // schedule a task to load catalog from graviton server. + // Schedule a task to load catalog from graviton server. executorService.execute(this::loadMetalake); LOG.info("Graviton CatalogConnectorManager started."); } @@ -99,7 +99,7 @@ void loadMetalake() { LOG.warn("Metalake {} does not exist.", usedMetalake); return; } catch (Exception e) { - LOG.error("Load Metalake {} failed.", e); + LOG.error("Load Metalake {} failed.", usedMetalake, e); return; } @@ -107,7 +107,7 @@ void loadMetalake() { loadCatalogs(metalake); // TODO (yuhui) need to handle metalake dropped. } finally { - // load metalake for handling catalog in the metalake updates. + // Load metalake for handling catalog in the metalake updates. executorService.schedule(this::loadMetalake, CATALOG_LOAD_FREQUENCY_SECOND, TimeUnit.SECONDS); } } diff --git a/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/catalog/CatalogConnectorMetadata.java b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/catalog/CatalogConnectorMetadata.java new file mode 100644 index 0000000000..f29c86019e --- /dev/null +++ b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/catalog/CatalogConnectorMetadata.java @@ -0,0 +1,105 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.graviton.trino.connector.catalog; + +import static com.datastrato.graviton.trino.connector.GravitonErrorCode.GRAVITON_CATALOG_NOT_EXISTS; +import static com.datastrato.graviton.trino.connector.GravitonErrorCode.GRAVITON_SCHEMA_NOT_EXISTS; +import static com.datastrato.graviton.trino.connector.GravitonErrorCode.GRAVITON_TABLE_NOT_EXISTS; +import static com.datastrato.graviton.trino.connector.GravitonErrorCode.GRAVITON_UNSUPPORTED_OPERATION; + +import com.datastrato.graviton.Catalog; +import com.datastrato.graviton.NameIdentifier; +import com.datastrato.graviton.Namespace; +import com.datastrato.graviton.client.GravitonMetaLake; +import com.datastrato.graviton.exceptions.NoSuchCatalogException; +import com.datastrato.graviton.exceptions.NoSuchSchemaException; +import com.datastrato.graviton.exceptions.NoSuchTableException; +import com.datastrato.graviton.rel.Schema; +import com.datastrato.graviton.rel.SupportsSchemas; +import com.datastrato.graviton.rel.Table; +import com.datastrato.graviton.rel.TableCatalog; +import com.datastrato.graviton.trino.connector.metadata.GravitonSchema; +import com.datastrato.graviton.trino.connector.metadata.GravitonTable; +import io.trino.spi.TrinoException; +import java.util.Arrays; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** This class implements graviton metadata operators. */ +public class CatalogConnectorMetadata { + + private static final Logger LOG = LoggerFactory.getLogger(CatalogConnectorMetadata.class); + + private final GravitonMetaLake metalake; + private final String catalogName; + private final SupportsSchemas schemaCatalog; + private final TableCatalog tableCatalog; + + public CatalogConnectorMetadata(GravitonMetaLake metalake, NameIdentifier catalogIdentifier) { + try { + this.catalogName = catalogIdentifier.name(); + this.metalake = metalake; + Catalog catalog = metalake.loadCatalog(catalogIdentifier); + + // Make sure the catalog support schema operations. + this.schemaCatalog = catalog.asSchemas(); + this.tableCatalog = catalog.asTableCatalog(); + } catch (NoSuchCatalogException e) { + throw new TrinoException(GRAVITON_CATALOG_NOT_EXISTS, "Catalog does not exist", e); + } catch (UnsupportedOperationException e) { + throw new TrinoException( + GRAVITON_UNSUPPORTED_OPERATION, "Catalog does not support schema or table operations", e); + } + } + + public List listSchemaNames() { + try { + return Arrays.stream( + schemaCatalog.listSchemas(Namespace.ofSchema(metalake.name(), catalogName))) + .map(NameIdentifier::name) + .toList(); + } catch (NoSuchCatalogException e) { + throw new TrinoException(GRAVITON_CATALOG_NOT_EXISTS, "Catalog does not exist", e); + } + } + + public GravitonSchema getSchema(String schemaName) { + try { + Schema schema = + schemaCatalog.loadSchema( + NameIdentifier.ofSchema(metalake.name(), catalogName, schemaName)); + return new GravitonSchema(schema); + } catch (NoSuchSchemaException e) { + throw new TrinoException(GRAVITON_SCHEMA_NOT_EXISTS, "Schema does not exist", e); + } + } + + public GravitonTable getTable(String schemaName, String tableName) { + try { + Table table = + tableCatalog.loadTable( + NameIdentifier.ofTable(metalake.name(), catalogName, schemaName, tableName)); + return new GravitonTable(schemaName, table); + } catch (NoSuchTableException e) { + throw new TrinoException(GRAVITON_TABLE_NOT_EXISTS, "Table does not exist", e); + } + } + + public List listTables(String schemaName) { + try { + NameIdentifier[] tables = + tableCatalog.listTables(Namespace.ofTable(metalake.name(), catalogName, schemaName)); + return Arrays.stream(tables).map(NameIdentifier::name).toList(); + } catch (NoSuchSchemaException e) { + throw new TrinoException(GRAVITON_SCHEMA_NOT_EXISTS, "Schema does not exist", e); + } + } + + public boolean tableExists(String schemaName, String tableName) { + return tableCatalog.tableExists( + NameIdentifier.ofTable(metalake.name(), catalogName, schemaName, tableName)); + } +} diff --git a/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/catalog/CatalogConnectorMetadataAdapter.java b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/catalog/CatalogConnectorMetadataAdapter.java new file mode 100644 index 0000000000..515c6c1824 --- /dev/null +++ b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/catalog/CatalogConnectorMetadataAdapter.java @@ -0,0 +1,36 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.graviton.trino.connector.catalog; + +import com.datastrato.graviton.trino.connector.metadata.GravitonColumn; +import com.datastrato.graviton.trino.connector.metadata.GravitonSchema; +import com.datastrato.graviton.trino.connector.metadata.GravitonTable; +import io.trino.spi.connector.ColumnMetadata; +import io.trino.spi.connector.ConnectorTableMetadata; +import io.trino.spi.connector.ConnectorTableProperties; +import java.util.Map; + +/** + * This interface is used to handle different parts of catalog metadata from different catalog + * connectors. + */ +public interface CatalogConnectorMetadataAdapter { + Map getSchemaProperties(GravitonSchema schema); + + /** Transform graviton table metadata to trino ConnectorTableMetadata */ + ConnectorTableMetadata getTableMetadata(GravitonTable gravitonTable); + + /** Transform trino ConnectorTableMetadata to graviton table metadata */ + GravitonTable createTable(ConnectorTableMetadata tableMetadata); + + /** Transform trino schema metadata to graviton schema metadata */ + GravitonSchema createSchema(String schemaName, Map properties); + + /** Transform graviton column metadata to trino ColumnMetadata */ + ColumnMetadata getColumnMetadata(GravitonColumn column); + + /** Transform graviton table properties to trino ConnectorTableProperties */ + ConnectorTableProperties getTableProperties(GravitonTable table); +} diff --git a/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/catalog/CatalogInjector.java b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/catalog/CatalogInjector.java index 09d477e536..1049512f49 100644 --- a/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/catalog/CatalogInjector.java +++ b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/catalog/CatalogInjector.java @@ -10,7 +10,6 @@ import com.datastrato.graviton.trino.connector.GravitonErrorCode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; -import io.airlift.log.Logger; import io.trino.spi.TrinoException; import io.trino.spi.connector.Connector; import io.trino.spi.connector.ConnectorContext; @@ -20,6 +19,8 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class dynamically injects the Catalog managed by Graviton into Trino using reflection @@ -29,12 +30,15 @@ */ public class CatalogInjector { - private static final Logger LOG = Logger.get(CatalogInjector.class); + private static final Logger LOG = LoggerFactory.getLogger(CatalogInjector.class); private static final int MIN_TRINO_SPI_VERSION = 360; - private ConcurrentHashMap catalogs; - private Object catalogFactoryObject; + // It is used to inject catalogs to trino + private InjectCatalogHandle injectHandle; + + // It is used to create internal catalogs. + private CreateCatalogHandle createHandle; private String trinoVersion; private void checkTrinoSpiVersion(ConnectorContext context) { @@ -50,60 +54,122 @@ private void checkTrinoSpiVersion(ConnectorContext context) { } } - public void bindCatalogManager(ConnectorContext context) { - // injector trino catalog need NodeManager support allCatalogsOnAllNodes; - checkTrinoSpiVersion(context); - - // Try to get trino CatalogFactory instance, normally we can get the catalog from - // CatalogFactory, then add catalog to it that loaded from graviton. - - try { - // set NodeManager allCatalogsOnAllNodes = true; - Object nodeManager = context.getNodeManager(); - Field field = nodeManager.getClass().getDeclaredField("nodeManager"); - field.setAccessible(true); - nodeManager = field.get(nodeManager); + private static Field getField(Object targetObject, String fieldName) throws NoSuchFieldException { + Field field = targetObject.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + return field; + } - field = nodeManager.getClass().getDeclaredField("allCatalogsOnAllNodes"); - field.setAccessible(true); - field.setBoolean(nodeManager, true); - Preconditions.checkState( - field.getBoolean(nodeManager), "allCatalogsOnAllNodes should be true"); + private static Object getFiledObject(Object targetObject, String fieldName) + throws NoSuchFieldException, IllegalAccessException { + return getField(targetObject, fieldName).get(targetObject); + } - // find CatalogManager - field = nodeManager.getClass().getDeclaredField("activeNodesByCatalogHandle"); - field.setAccessible(true); - field.set(nodeManager, Optional.empty()); + private static boolean isClassObject(Object targetObject, String className) { + return targetObject.getClass().getName().endsWith(className); + } - MetadataProvider metadataProvider = context.getMetadataProvider(); + private static Class getClass(ClassLoader classLoader, String className) + throws ClassNotFoundException { + return classLoader.loadClass(className); + } - field = metadataProvider.getClass().getDeclaredField("metadata"); - field.setAccessible(true); - Object metadata = field.get(metadataProvider); + /** + * @param context + *
+   *  This function does the following tasks by ConnectorContext:
+   *  1. Retrieve the DiscoveryNodeManager object.
+   *  2. To enable Trino to handle tables on every node,
+   *  set 'allCatalogsOnAllNodes' to 'true' and 'activeNodesByCatalogHandle' to empty.
+   *  3. Retrieve the catalogManager object.
+   *  4. Get createCatalog function in catalogFactory
+   *  5. Create a CreateCatalogHandle for the Graviton connector's internal connector.
+   *  6. Create InjectCatalogHandle for injection catalogs to trino.
+   *
+   *  A runtime ConnectorContext hierarchy:
+   *  context (ConnectorContext)
+   *  --nodeManager (ConnectorAwareNodeManager)
+   *  ----nodeManager (DiscoveryNodeManager)
+   *  ------nodeManager (DiscoveryNodeManager)
+   *  ------allCatalogsOnAllNodes (boolean)
+   *  ------activeNodesByCatalogHandle (Optional)
+   *  --metadataProvider(InternalMetadataProvider)
+   *  ----metadata (TracingMetadata)
+   *  ------delegate (MetadataManager)
+   *  --------transactionManager (InMemoryTransactionManager)
+   *  ----------catalogManager (StaticCatalogManager)
+   *  ------------catalogFactory (LazyCatalogFactory)
+   *  --------------createCatalog() (Function)
+   *  ------------catalogs (ConcurrentHashMap)
+   * 
+ */ + public void init(ConnectorContext context) { + // Injector trino catalog need NodeManager support allCatalogsOnAllNodes; + checkTrinoSpiVersion(context); - field = metadata.getClass().getDeclaredField("delegate"); - field.setAccessible(true); - Object metadataManager = field.get(metadata); + try { + // 1. Retrieve the DiscoveryNodeManager object. + Object nodeManager = context.getNodeManager(); + nodeManager = getFiledObject(nodeManager, "nodeManager"); - field = metadataManager.getClass().getDeclaredField("transactionManager"); - field.setAccessible(true); - Object transactionManager = field.get(metadataManager); + if (isClassObject(nodeManager, "DiscoveryNodeManager")) { + // 2. To enable Trino to handle tables on every node + Field allCatalogsOnAllNodes = getField(nodeManager, "allCatalogsOnAllNodes"); + allCatalogsOnAllNodes.setBoolean(nodeManager, true); - field = transactionManager.getClass().getDeclaredField("catalogManager"); - field.setAccessible(true); - Object catalogManager = field.get(transactionManager); + Field activeNodesByCatalogHandle = getField(nodeManager, "activeNodesByCatalogHandle"); + activeNodesByCatalogHandle.set(nodeManager, Optional.empty()); + } - // find CatalogManager.catalogs - field = catalogManager.getClass().getDeclaredField("catalogs"); - field.setAccessible(true); - catalogs = (ConcurrentHashMap) field.get(catalogManager); - Preconditions.checkNotNull(catalogs, "catalogs should not be null"); + // 3. Retrieve the catalogManager object. + MetadataProvider metadataProvider = context.getMetadataProvider(); - // find catalog factory - field = catalogManager.getClass().getDeclaredField("catalogFactory"); - field.setAccessible(true); - catalogFactoryObject = field.get(catalogManager); - Preconditions.checkNotNull(catalogFactoryObject, "catalogFactoryObject should not be null"); + Object metadata = getFiledObject(metadataProvider, "metadata"); + Object metadataManager = metadata; + if (isClassObject(metadata, "TracingMetadata")) { + metadataManager = getFiledObject(metadata, "delegate"); + } + Preconditions.checkNotNull(metadataManager, "metadataManager should not be null"); + + Object transactionManager = getFiledObject(metadataManager, "transactionManager"); + Object catalogManager = getFiledObject(transactionManager, "catalogManager"); + Preconditions.checkNotNull(catalogManager, "catalogManager should not be null"); + + // 4. Get createCatalog function in catalogFactory + Object catalogFactory = getFiledObject(catalogManager, "catalogFactory"); + Preconditions.checkNotNull(catalogFactory, "catalogFactory should not be null"); + + Class catalogPropertiesClass = + getClass( + catalogManager.getClass().getClassLoader(), "io.trino.connector.CatalogProperties"); + Method createCatalogMethod = + catalogFactory.getClass().getDeclaredMethod("createCatalog", catalogPropertiesClass); + Preconditions.checkNotNull(createCatalogMethod, "createCatalogMethod should not be null"); + + // 5. Create a CreateCatalogHandle + createHandle = + (catalogName, catalogProperties) -> { + ObjectMapper objectMapper = new ObjectMapper(); + Object catalogPropertiesObject = + objectMapper.readValue(catalogProperties, catalogPropertiesClass); + + // Call catalogFactory.createCatalog() return CatalogConnector + Object catalogConnector = + createCatalogMethod.invoke(catalogFactory, catalogPropertiesObject); + + // The catalogConnector hierarchy: + // --catalogConnector (CatalogConnector) + // ----catalogConnector (ConnectorServices) + // ------connector (Connector) + + // Get a connector object from trino CatalogConnector. + Object catalogConnectorObject = getFiledObject(catalogConnector, "catalogConnector"); + return getFiledObject(catalogConnectorObject, "connector"); + }; + + // 6. Create InjectCatalogHandle + createInjectHandler( + catalogManager, catalogFactory, createCatalogMethod, catalogPropertiesClass); LOG.info("Bind Trino catalog manager successfully."); } catch (Exception e) { @@ -115,27 +181,66 @@ public void bindCatalogManager(ConnectorContext context) { } } + private void createInjectHandler( + Object catalogManager, + Object catalogFactory, + Method createCatalogMethod, + Class catalogPropertiesClass) + throws NoSuchFieldException, IllegalAccessException { + // The catalogManager is an instance of CoordinatorDynamicCatalogManager + if (isClassObject(catalogManager, "CoordinatorDynamicCatalogManager")) { + ConcurrentHashMap activeCatalogs = + (ConcurrentHashMap) getFiledObject(catalogManager, "activeCatalogs"); + Preconditions.checkNotNull(activeCatalogs, "activeCatalogs should not be null"); + + ConcurrentHashMap allCatalogs = + (ConcurrentHashMap) getFiledObject(catalogManager, "allCatalogs"); + Preconditions.checkNotNull(allCatalogs, "allCatalogs should not be null"); + + injectHandle = + (catalogName, catalogProperties) -> { + // Call CatalogFactory:createCatalog and add the catalog to + // CoordinatorDynamicCatalogManager + ObjectMapper objectMapper = new ObjectMapper(); + Object catalogPropertiesObject = + objectMapper.readValue(catalogProperties, catalogPropertiesClass); + Object catalogConnector = + createCatalogMethod.invoke(catalogFactory, catalogPropertiesObject); + + Field catelogField = catalogConnector.getClass().getDeclaredField("catalog"); + catelogField.setAccessible(true); + Object catalog = catelogField.get(catalogConnector); + activeCatalogs.put(catalogName, catalog); + + Field catelogHandleField = + catalogConnector.getClass().getDeclaredField("catalogHandle"); + catelogHandleField.setAccessible(true); + Object catalogHandle = catelogHandleField.get(catalogConnector); + allCatalogs.put(catalogHandle, catalogConnector); + }; + } else { + // The catalogManager is an instance of StaticCatalogManager + ConcurrentHashMap catalogs = (ConcurrentHashMap) getFiledObject(catalogManager, "catalogs"); + Preconditions.checkNotNull(catalogs, "catalogs should not be null"); + + injectHandle = + (catalogName, catalogProperties) -> { + // call CatalogFactory:createCatalog and add the catalog to StaticCatalogManager + ObjectMapper objectMapper = new ObjectMapper(); + Object catalogPropertiesObject = + objectMapper.readValue(catalogProperties, catalogPropertiesClass); + + Object catalogConnector = + createCatalogMethod.invoke(catalogFactory, catalogPropertiesObject); + catalogs.put(catalogName, catalogConnector); + }; + } + } + void injectCatalogConnector(String catalogName) { try { - Class catalogConnectorClass = - catalogFactoryObject - .getClass() - .getClassLoader() - .loadClass("io.trino.connector.CatalogProperties"); - String catalogProperties = createCatalogProperties(catalogName); - - ObjectMapper objectMapper = new ObjectMapper(); - Object catalogPropertiesObject = - objectMapper.readValue(catalogProperties, catalogConnectorClass); - - // call CatalogFactory:createCatalog - Method method = - catalogFactoryObject.getClass().getDeclaredMethod("createCatalog", catalogConnectorClass); - Object catalogConnector = method.invoke(catalogFactoryObject, catalogPropertiesObject); - - // put catalog to CatalogManager.catalogs - catalogs.put(catalogName, catalogConnector); + injectHandle.invoke(catalogName, catalogProperties); LOG.info("Inject trino catalog {} successfully.", catalogName); } catch (Exception e) { @@ -157,31 +262,15 @@ Connector createConnector(String connectorName, Map properties) try { ObjectMapper objectMapper = new ObjectMapper(); connectorProperties = objectMapper.writeValueAsString(properties); + LOG.debug( + "Create internal catalog connector {}. The config:{} .", + connectorName, + connectorProperties); - // call CatalogFactory:createCatalog - Class catalogConnectorClass = - catalogFactoryObject - .getClass() - .getClassLoader() - .loadClass("io.trino.connector.CatalogProperties"); - Method method = - catalogFactoryObject.getClass().getDeclaredMethod("createCatalog", catalogConnectorClass); - - Object catalogPropertyObject = - objectMapper.readValue(connectorProperties, catalogConnectorClass); - Object catalogConnector = method.invoke(catalogFactoryObject, catalogPropertyObject); - - // get a connector object from trino CatalogConnector. - Field field = catalogConnector.getClass().getDeclaredField("catalogConnector"); - field.setAccessible(true); - Object connectorService = field.get(catalogConnector); - - field = connectorService.getClass().getDeclaredField("connector"); - field.setAccessible(true); - Object connector = field.get(connectorService); + Object catalogConnector = createHandle.invoke(connectorName, connectorProperties); LOG.info("Create internal catalog connector {} successfully.", connectorName); - return (Connector) connector; + return (Connector) catalogConnector; } catch (Exception e) { LOG.error( "Create internal catalog connector {} failed. Connector properties: {} ", @@ -191,4 +280,12 @@ Connector createConnector(String connectorName, Map properties) throw new TrinoException(GRAVITON_CREATE_INNER_CONNECTOR_FAILED, e); } } + + interface InjectCatalogHandle { + void invoke(String name, String properties) throws Exception; + } + + interface CreateCatalogHandle { + Object invoke(String name, String properties) throws Exception; + } } diff --git a/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/catalog/hive/HiveConnectorAdapter.java b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/catalog/hive/HiveConnectorAdapter.java new file mode 100644 index 0000000000..c5d61dbd16 --- /dev/null +++ b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/catalog/hive/HiveConnectorAdapter.java @@ -0,0 +1,42 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.graviton.trino.connector.catalog.hive; + +import com.datastrato.graviton.trino.connector.catalog.CatalogConnectorAdapter; +import com.datastrato.graviton.trino.connector.catalog.CatalogConnectorMetadataAdapter; +import com.datastrato.graviton.trino.connector.metadata.GravitonCatalog; +import io.trino.spi.session.PropertyMetadata; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** Transforming Hive connector configuration and components into Graviton connector. */ +public class HiveConnectorAdapter implements CatalogConnectorAdapter { + + private final HiveTableProperties tableProperties = new HiveTableProperties(); + + public HiveConnectorAdapter() {} + + public Map buildInternalConnectorConfig(GravitonCatalog catalog) { + Map config = new HashMap<>(); + config.put("catalogHandle", catalog.getName() + ":normal:default"); + config.put("connectorName", "hive"); + + Map properties = new HashMap<>(); + properties.put("hive.metastore.uri", catalog.getProperties("hive.metastore.uris", "")); + config.put("properties", properties); + return config; + } + + @Override + public List> getTableProperties() { + return tableProperties.getTableProperties(); + } + + public CatalogConnectorMetadataAdapter getMetadataAdapter() { + // TODO yuhui Need to improve schema table and column properties + return new HiveMetadataAdapter(null, getTableProperties(), null); + } +} diff --git a/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/catalog/hive/HiveMetadataAdapter.java b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/catalog/hive/HiveMetadataAdapter.java new file mode 100644 index 0000000000..52a43e360d --- /dev/null +++ b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/catalog/hive/HiveMetadataAdapter.java @@ -0,0 +1,96 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.graviton.trino.connector.catalog.hive; + +import com.datastrato.graviton.trino.connector.catalog.CatalogConnectorMetadataAdapter; +import com.datastrato.graviton.trino.connector.metadata.GravitonColumn; +import com.datastrato.graviton.trino.connector.metadata.GravitonSchema; +import com.datastrato.graviton.trino.connector.metadata.GravitonTable; +import com.datastrato.graviton.trino.connector.util.DataTypeTransformer; +import io.trino.spi.connector.ColumnMetadata; +import io.trino.spi.connector.ConnectorTableMetadata; +import io.trino.spi.connector.ConnectorTableProperties; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.session.PropertyMetadata; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.commons.lang3.NotImplementedException; + +/** Transforming graviton hive metadata to trino. */ +public class HiveMetadataAdapter implements CatalogConnectorMetadataAdapter { + private final List> schemaProperties; + private final List> tableProperties; + private final List> columnProperties; + + public HiveMetadataAdapter( + List> schemaProperties, + List> tableProperties, + List> columnProperties) { + this.schemaProperties = schemaProperties; + this.tableProperties = tableProperties; + this.columnProperties = columnProperties; + } + + public ConnectorTableMetadata getTableMetadata(GravitonTable gravitonTable) { + SchemaTableName schemaTableName = + new SchemaTableName(gravitonTable.getSchemaName(), gravitonTable.getName()); + ArrayList columnMetadataList = new ArrayList<>(); + for (GravitonColumn column : gravitonTable.getColumns()) { + columnMetadataList.add(getColumnMetadata(column)); + } + + Map properties = + gravitonTable.getProperties().entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + return new ConnectorTableMetadata( + schemaTableName, columnMetadataList, properties, Optional.of(gravitonTable.getComment())); + } + + @Override + public ColumnMetadata getColumnMetadata(GravitonColumn column) { + return new ColumnMetadata(column.getName(), DataTypeTransformer.getTrinoType(column.getType())); + } + + @Override + public ConnectorTableProperties getTableProperties(GravitonTable table) { + throw new NotImplementedException(); + } + + @Override + public Map getSchemaProperties(GravitonSchema schema) { + return normalizeProperties(schema.properties(), schemaProperties); + } + + @Override + public GravitonTable createTable(ConnectorTableMetadata tableMetadata) { + throw new NotImplementedException(); + } + + @Override + public GravitonSchema createSchema(String schemaName, Map properties) { + throw new NotImplementedException(); + } + + private Map normalizeProperties( + Map properties, List> propertyTemplate) { + // TODO yuhui redo this function on graviton table properties supported. + Map validProperties = new HashMap<>(); + for (PropertyMetadata propertyMetadata : propertyTemplate) { + String name = propertyMetadata.getName(); + if (properties.containsKey(name)) { + if (propertyMetadata.getJavaType() == String.class) + validProperties.put(name, properties.get(name)); + + if (name.equals("format")) validProperties.put(name, properties.get(name)); + } + } + return validProperties; + } +} diff --git a/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/catalog/hive/HiveTableProperties.java b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/catalog/hive/HiveTableProperties.java new file mode 100644 index 0000000000..ac018e57b1 --- /dev/null +++ b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/catalog/hive/HiveTableProperties.java @@ -0,0 +1,27 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.graviton.trino.connector.catalog.hive; + +import static io.trino.spi.session.PropertyMetadata.stringProperty; + +import com.google.common.collect.ImmutableList; +import io.trino.spi.session.PropertyMetadata; +import java.util.List; + +public class HiveTableProperties { + + private final List> tableProperties; + + // TODO yuhui Need to add table properties + HiveTableProperties() { + tableProperties = + ImmutableList.of( + stringProperty("format", "Hive storage format for the table", "TEXTFILE", false)); + } + + public List> getTableProperties() { + return tableProperties; + } +} diff --git a/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/metadata/GravitonCatalog.java b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/metadata/GravitonCatalog.java index 09a216e1f4..6f52a21b38 100644 --- a/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/metadata/GravitonCatalog.java +++ b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/metadata/GravitonCatalog.java @@ -6,6 +6,7 @@ import com.datastrato.graviton.Catalog; +/** Help Graviton connector access CatalogMetadata from graviton client. */ public class GravitonCatalog { private final Catalog catalog; diff --git a/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/metadata/GravitonColumn.java b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/metadata/GravitonColumn.java new file mode 100644 index 0000000000..2cd8658acc --- /dev/null +++ b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/metadata/GravitonColumn.java @@ -0,0 +1,42 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.graviton.trino.connector.metadata; + +import com.datastrato.graviton.rel.Column; +import com.google.common.base.Preconditions; +import io.substrait.type.Type; +import java.util.Map; + +/** Help Graviton connector access ColumnMetadata from graviton client. */ +public final class GravitonColumn { + private final Column column; + private final int index; + + public GravitonColumn(Column column, int columnIndex) { + this.column = column; + this.index = columnIndex; + Preconditions.checkArgument(column != null, "column is not null"); + } + + public int getIndex() { + return index; + } + + public Map getProperties() { + return Map.of(); + } + + public String getName() { + return column.name(); + } + + public Type getType() { + return column.dataType(); + } + + public String getComment() { + return column.comment(); + } +} diff --git a/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/metadata/GravitonSchema.java b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/metadata/GravitonSchema.java new file mode 100644 index 0000000000..86e2ffb0b6 --- /dev/null +++ b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/metadata/GravitonSchema.java @@ -0,0 +1,30 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.graviton.trino.connector.metadata; + +import com.datastrato.graviton.rel.Schema; +import java.util.Map; + +/** Help Graviton connector access SchemaMetadata from graviton client. */ +public class GravitonSchema { + + private final Schema schema; + + public GravitonSchema(Schema schema) { + this.schema = schema; + } + + public Map properties() { + return schema.properties(); + } + + public String name() { + return schema.name(); + } + + public String comment() { + return schema.comment(); + } +} diff --git a/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/metadata/GravitonTable.java b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/metadata/GravitonTable.java new file mode 100644 index 0000000000..4073d838be --- /dev/null +++ b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/metadata/GravitonTable.java @@ -0,0 +1,66 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.graviton.trino.connector.metadata; + +import static com.datastrato.graviton.trino.connector.GravitonErrorCode.GRAVITON_COLUMN_NOT_EXISTS; + +import com.datastrato.graviton.rel.Table; +import com.google.common.collect.ImmutableList; +import io.trino.spi.TrinoException; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** Help Graviton connector access TableMetadata from graviton client. */ +public class GravitonTable { + private final String schemaName; + private final Table tableMetadata; + private final ImmutableList columns; + private final Map properties; + + public GravitonTable(String schemaName, Table tableMetadata) { + this.schemaName = schemaName; + this.tableMetadata = tableMetadata; + + ImmutableList.Builder tableColumns = ImmutableList.builder(); + for (int i = 0; i < tableMetadata.columns().length; i++) { + tableColumns.add(new GravitonColumn(tableMetadata.columns()[i], i)); + } + this.columns = tableColumns.build(); + + properties = tableMetadata.properties(); + } + + public String getName() { + return tableMetadata.name(); + } + + public List getColumns() { + return columns; + } + + public Map getProperties() { + return properties; + } + + public GravitonColumn getColumn(String columName) { + Optional entry = + columns.stream().filter((column -> column.getName().equals(columName))).findFirst(); + if (entry.isEmpty()) { + throw new TrinoException( + GRAVITON_COLUMN_NOT_EXISTS, String.format("Column %s does not exist", columName)); + } + + return entry.get(); + } + + public String getSchemaName() { + return schemaName; + } + + public String getComment() { + return tableMetadata.comment(); + } +} diff --git a/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/util/DataTypeTransformer.java b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/util/DataTypeTransformer.java new file mode 100644 index 0000000000..30eafe2a61 --- /dev/null +++ b/trino-connector/src/main/java/com/datastrato/graviton/trino/connector/util/DataTypeTransformer.java @@ -0,0 +1,73 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.graviton.trino.connector.util; + +import static com.datastrato.graviton.trino.connector.GravitonErrorCode.GRAVITON_UNSUPPORTED_GRAVITON_DATATYPE; +import static com.datastrato.graviton.trino.connector.GravitonErrorCode.GRAVITON_UNSUPPORTED_TRINO_DATATYPE; +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.spi.type.BooleanType.BOOLEAN; +import static io.trino.spi.type.DateType.DATE; +import static io.trino.spi.type.DoubleType.DOUBLE; +import static io.trino.spi.type.IntegerType.INTEGER; +import static io.trino.spi.type.TimestampType.TIMESTAMP_SECONDS; +import static io.trino.spi.type.TimestampType.createTimestampType; +import static io.trino.spi.type.VarbinaryType.VARBINARY; +import static io.trino.spi.type.VarcharType.VARCHAR; +import static io.trino.spi.type.VarcharType.createUnboundedVarcharType; + +import io.substrait.type.TypeCreator; +import io.trino.spi.TrinoException; +import io.trino.spi.type.Type; + +/** This class is used to transform datatype between graviton and trino */ +public class DataTypeTransformer { + + public static Type getTrinoType(io.substrait.type.Type type) { + if (type.equals(TypeCreator.REQUIRED.STRING) || type.equals(TypeCreator.NULLABLE.STRING)) { + return createUnboundedVarcharType(); + } else if (type.equals(TypeCreator.REQUIRED.BOOLEAN) + || type.equals(TypeCreator.NULLABLE.BOOLEAN)) { + return BOOLEAN; + } else if (type.equals(TypeCreator.REQUIRED.I32) || type.equals(TypeCreator.NULLABLE.I32)) { + return INTEGER; + } else if (type.equals(TypeCreator.REQUIRED.I64) || type.equals(TypeCreator.NULLABLE.I64)) { + return BIGINT; + } else if (type.equals(TypeCreator.REQUIRED.FP64) || type.equals(TypeCreator.NULLABLE.FP64)) { + return DOUBLE; + } else if (type.equals(TypeCreator.REQUIRED.BINARY) + || type.equals(TypeCreator.NULLABLE.BINARY)) { + return VARBINARY; + } else if (type.equals(TypeCreator.REQUIRED.DATE) || type.equals(TypeCreator.NULLABLE.DATE)) { + return DATE; + } else if (type.equals(TypeCreator.REQUIRED.TIMESTAMP) + || type.equals(TypeCreator.NULLABLE.TIMESTAMP)) { + return createTimestampType(TIMESTAMP_SECONDS.getPrecision()); + } + throw new TrinoException( + GRAVITON_UNSUPPORTED_GRAVITON_DATATYPE, "Unsupported graviton datatype: " + type); + } + + public static io.substrait.type.Type getGravitonType(Type type, boolean nullable) { + if (type.equals(VARCHAR)) { + return nullable ? TypeCreator.NULLABLE.STRING : TypeCreator.REQUIRED.STRING; + } else if (type.equals(BOOLEAN)) { + return nullable ? TypeCreator.NULLABLE.BOOLEAN : TypeCreator.REQUIRED.BOOLEAN; + } else if (type.equals(INTEGER)) { + return nullable ? TypeCreator.NULLABLE.I32 : TypeCreator.REQUIRED.I32; + } else if (type.equals(BIGINT)) { + return nullable ? TypeCreator.NULLABLE.I64 : TypeCreator.REQUIRED.I64; + } else if (type.equals(DOUBLE)) { + return nullable ? TypeCreator.NULLABLE.FP64 : TypeCreator.REQUIRED.FP64; + } else if (type.equals(VARBINARY)) { + return nullable ? TypeCreator.NULLABLE.BINARY : TypeCreator.REQUIRED.BINARY; + } else if (type.equals(DATE)) { + return nullable ? TypeCreator.NULLABLE.DATE : TypeCreator.REQUIRED.DATE; + } else if (type.equals(TIMESTAMP_SECONDS)) { + return nullable ? TypeCreator.NULLABLE.TIMESTAMP : TypeCreator.REQUIRED.TIMESTAMP; + } + throw new TrinoException( + GRAVITON_UNSUPPORTED_TRINO_DATATYPE, "Unsupported trino datatype: " + type); + } +} diff --git a/trino-connector/src/test/java/com/datastrato/graviton/trino/connector/TestGravitonConnector.java b/trino-connector/src/test/java/com/datastrato/graviton/trino/connector/TestGravitonConnector.java new file mode 100644 index 0000000000..fa8f4ba1d5 --- /dev/null +++ b/trino-connector/src/test/java/com/datastrato/graviton/trino/connector/TestGravitonConnector.java @@ -0,0 +1,46 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.graviton.trino.connector; + +import static io.trino.testing.TestingSession.testSessionBuilder; +import static org.assertj.core.api.Assertions.assertThat; + +import io.trino.Session; +import io.trino.testing.BaseConnectorTest; +import io.trino.testing.DistributedQueryRunner; +import io.trino.testing.QueryRunner; +import java.util.HashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestGravitonConnector extends BaseConnectorTest { + + private static final Logger LOG = LoggerFactory.getLogger(TestGravitonConnector.class); + + @Override + protected QueryRunner createQueryRunner() throws Exception { + Session session = testSessionBuilder().setCatalog("graviton").build(); + + QueryRunner queryRunner = null; + try { + // queryRunner = LocalQueryRunner.builder(session).build(); + queryRunner = DistributedQueryRunner.builder(session).setNodeCount(1).build(); + + queryRunner.installPlugin(new GravitonPlugin()); + + HashMap properties = new HashMap<>(); + properties.put("graviton.metalake", "test"); + queryRunner.createCatalog("graviton", "graviton", properties); + } catch (Exception e) { + throw new RuntimeException(e); + } + return queryRunner; + } + + @Override + public void testCreateSchema() { + assertThat(computeActual("SHOW catalogs").getOnlyColumnAsSet()).contains("graviton"); + } +} diff --git a/trino-connector/src/test/java/com/datastrato/graviton/trino/connector/TestGravitonTableHandle.java b/trino-connector/src/test/java/com/datastrato/graviton/trino/connector/TestGravitonTableHandle.java new file mode 100644 index 0000000000..b025a2daab --- /dev/null +++ b/trino-connector/src/test/java/com/datastrato/graviton/trino/connector/TestGravitonTableHandle.java @@ -0,0 +1,39 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.graviton.trino.connector; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.airlift.json.JsonCodec; +import io.trino.spi.connector.ConnectorTableHandle; +import org.junit.jupiter.api.Test; + +public class TestGravitonTableHandle { + private final JsonCodec codec = + JsonCodec.jsonCodec(GravitonTableHandle.class); + + @Test + public void testCreateFromJson() { + GravitonTableHandle expected = + new GravitonTableHandle("db1", "t1", new MockConnectorTableHandle("mock")); + + codec.toJson(expected); + } + + public static class MockConnectorTableHandle implements ConnectorTableHandle { + + private final String name; + + @JsonCreator + public MockConnectorTableHandle(@JsonProperty("name") String name) { + this.name = name; + } + + @JsonProperty + public String getName() { + return name; + } + } +} diff --git a/trino-connector/src/test/java/com/datastrato/graviton/trino/connector/metadata/TestGravitonColumn.java b/trino-connector/src/test/java/com/datastrato/graviton/trino/connector/metadata/TestGravitonColumn.java new file mode 100644 index 0000000000..2b5a65308b --- /dev/null +++ b/trino-connector/src/test/java/com/datastrato/graviton/trino/connector/metadata/TestGravitonColumn.java @@ -0,0 +1,30 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.graviton.trino.connector.metadata; + +import com.datastrato.graviton.dto.rel.ColumnDTO; +import io.substrait.type.TypeCreator; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestGravitonColumn { + + @Test + void testGravitonColumn() { + ColumnDTO columnDTO = + new ColumnDTO.Builder() + .withName("f1") + .withComment("test column") + .withDataType(TypeCreator.NULLABLE.STRING) + .build(); + + GravitonColumn column = new GravitonColumn(columnDTO, 0); + + Assertions.assertEquals(column.getName(), columnDTO.name()); + Assertions.assertEquals(column.getIndex(), 0); + Assertions.assertEquals(column.getComment(), columnDTO.comment()); + Assertions.assertEquals(column.getType(), columnDTO.dataType()); + } +} diff --git a/trino-connector/src/test/java/com/datastrato/graviton/trino/connector/metadata/TestGravitonSchema.java b/trino-connector/src/test/java/com/datastrato/graviton/trino/connector/metadata/TestGravitonSchema.java new file mode 100644 index 0000000000..bd970e1558 --- /dev/null +++ b/trino-connector/src/test/java/com/datastrato/graviton/trino/connector/metadata/TestGravitonSchema.java @@ -0,0 +1,37 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.graviton.trino.connector.metadata; + +import com.datastrato.graviton.dto.AuditDTO; +import com.datastrato.graviton.dto.rel.SchemaDTO; +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestGravitonSchema { + + @Test + void testGravitonSchema() { + Map properties = new HashMap<>(); + properties.put("prop1", "test prop1"); + + SchemaDTO schemaDTO = + new SchemaDTO.Builder() + .withName("db1") + .withComment("test schema") + .withProperties(properties) + .withAudit( + new AuditDTO.Builder().withCreator("creator").withCreateTime(Instant.now()).build()) + .build(); + + GravitonSchema schema = new GravitonSchema(schemaDTO); + + Assertions.assertEquals(schema.name(), schemaDTO.name()); + Assertions.assertEquals(schema.comment(), schemaDTO.comment()); + Assertions.assertEquals(schema.properties(), schemaDTO.properties()); + } +} diff --git a/trino-connector/src/test/java/com/datastrato/graviton/trino/connector/metadata/TestGravitonTable.java b/trino-connector/src/test/java/com/datastrato/graviton/trino/connector/metadata/TestGravitonTable.java new file mode 100644 index 0000000000..c61c43b290 --- /dev/null +++ b/trino-connector/src/test/java/com/datastrato/graviton/trino/connector/metadata/TestGravitonTable.java @@ -0,0 +1,57 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.graviton.trino.connector.metadata; + +import com.datastrato.graviton.dto.AuditDTO; +import com.datastrato.graviton.dto.rel.ColumnDTO; +import com.datastrato.graviton.dto.rel.TableDTO; +import io.substrait.type.TypeCreator; +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestGravitonTable { + + @Test + void testGravitonTable() { + ColumnDTO[] columns = + new ColumnDTO[] { + new ColumnDTO.Builder() + .withName("f1") + .withDataType(TypeCreator.NULLABLE.STRING) + .withComment("f1 column") + .build(), + new ColumnDTO.Builder() + .withName("f2") + .withDataType(TypeCreator.NULLABLE.I32) + .withComment("f2 column") + .build() + }; + Map properties = new HashMap<>(); + properties.put("format", "TEXTFILE"); + TableDTO tableDTO = + new TableDTO.Builder() + .withName("table1") + .withColumns(columns) + .withComment("test table") + .withProperties(properties) + .withAudit( + new AuditDTO.Builder().withCreator("creator").withCreateTime(Instant.now()).build()) + .build(); + + GravitonTable table = new GravitonTable("db1", tableDTO); + + Assertions.assertEquals(table.getName(), tableDTO.name()); + Assertions.assertEquals(table.getSchemaName(), "db1"); + Assertions.assertEquals(table.getColumns().size(), tableDTO.columns().length); + for (int i = 0; i < table.getColumns().size(); i++) { + Assertions.assertEquals(table.getColumns().get(i).getName(), tableDTO.columns()[i].name()); + } + Assertions.assertEquals(table.getComment(), tableDTO.comment()); + Assertions.assertEquals(table.getProperties(), tableDTO.properties()); + } +} diff --git a/trino-connector/src/test/java/com/datastrato/graviton/trino/connector/util/TestDataTypeTransformer.java b/trino-connector/src/test/java/com/datastrato/graviton/trino/connector/util/TestDataTypeTransformer.java new file mode 100644 index 0000000000..29e7ec2e11 --- /dev/null +++ b/trino-connector/src/test/java/com/datastrato/graviton/trino/connector/util/TestDataTypeTransformer.java @@ -0,0 +1,117 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.graviton.trino.connector.util; + +import static com.datastrato.graviton.trino.connector.GravitonErrorCode.GRAVITON_UNSUPPORTED_GRAVITON_DATATYPE; +import static com.datastrato.graviton.trino.connector.GravitonErrorCode.GRAVITON_UNSUPPORTED_TRINO_DATATYPE; +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.spi.type.BooleanType.BOOLEAN; +import static io.trino.spi.type.DateType.DATE; +import static io.trino.spi.type.DoubleType.DOUBLE; +import static io.trino.spi.type.HyperLogLogType.HYPER_LOG_LOG; +import static io.trino.spi.type.IntegerType.INTEGER; +import static io.trino.spi.type.TimestampType.TIMESTAMP_SECONDS; +import static io.trino.spi.type.VarbinaryType.VARBINARY; +import static io.trino.spi.type.VarcharType.VARCHAR; + +import io.substrait.type.TypeCreator; +import io.trino.spi.TrinoException; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestDataTypeTransformer { + + @Test + public void testGetGravitonType() { + Assertions.assertEquals( + DataTypeTransformer.getGravitonType(VARCHAR, true), TypeCreator.NULLABLE.STRING); + Assertions.assertEquals( + DataTypeTransformer.getGravitonType(VARCHAR, false), TypeCreator.REQUIRED.STRING); + + Assertions.assertEquals( + DataTypeTransformer.getGravitonType(BOOLEAN, true), TypeCreator.NULLABLE.BOOLEAN); + Assertions.assertEquals( + DataTypeTransformer.getGravitonType(BOOLEAN, false), TypeCreator.REQUIRED.BOOLEAN); + + Assertions.assertEquals( + DataTypeTransformer.getGravitonType(INTEGER, true), TypeCreator.NULLABLE.I32); + Assertions.assertEquals( + DataTypeTransformer.getGravitonType(INTEGER, false), TypeCreator.REQUIRED.I32); + + Assertions.assertEquals( + DataTypeTransformer.getGravitonType(BIGINT, true), TypeCreator.NULLABLE.I64); + Assertions.assertEquals( + DataTypeTransformer.getGravitonType(BIGINT, false), TypeCreator.REQUIRED.I64); + + Assertions.assertEquals( + DataTypeTransformer.getGravitonType(DOUBLE, true), TypeCreator.NULLABLE.FP64); + Assertions.assertEquals( + DataTypeTransformer.getGravitonType(DOUBLE, false), TypeCreator.REQUIRED.FP64); + + Assertions.assertEquals( + DataTypeTransformer.getGravitonType(VARBINARY, true), TypeCreator.NULLABLE.BINARY); + Assertions.assertEquals( + DataTypeTransformer.getGravitonType(VARBINARY, false), TypeCreator.REQUIRED.BINARY); + + Assertions.assertEquals( + DataTypeTransformer.getGravitonType(DATE, true), TypeCreator.NULLABLE.DATE); + Assertions.assertEquals( + DataTypeTransformer.getGravitonType(DATE, false), TypeCreator.REQUIRED.DATE); + + Assertions.assertEquals( + DataTypeTransformer.getGravitonType(TIMESTAMP_SECONDS, true), + TypeCreator.NULLABLE.TIMESTAMP); + Assertions.assertEquals( + DataTypeTransformer.getGravitonType(TIMESTAMP_SECONDS, false), + TypeCreator.REQUIRED.TIMESTAMP); + + try { + DataTypeTransformer.getGravitonType(HYPER_LOG_LOG, true); + } catch (TrinoException e) { + if (e.getErrorCode() != GRAVITON_UNSUPPORTED_TRINO_DATATYPE.toErrorCode()) { + throw e; + } + } + } + + @Test + public void testGetTrinoType() { + Assertions.assertEquals(DataTypeTransformer.getTrinoType(TypeCreator.NULLABLE.STRING), VARCHAR); + Assertions.assertEquals(DataTypeTransformer.getTrinoType(TypeCreator.REQUIRED.STRING), VARCHAR); + + Assertions.assertEquals( + DataTypeTransformer.getTrinoType(TypeCreator.NULLABLE.BOOLEAN), BOOLEAN); + Assertions.assertEquals( + DataTypeTransformer.getTrinoType(TypeCreator.REQUIRED.BOOLEAN), BOOLEAN); + + Assertions.assertEquals(DataTypeTransformer.getTrinoType(TypeCreator.NULLABLE.I32), INTEGER); + Assertions.assertEquals(DataTypeTransformer.getTrinoType(TypeCreator.REQUIRED.I32), INTEGER); + + Assertions.assertEquals(DataTypeTransformer.getTrinoType(TypeCreator.NULLABLE.I64), BIGINT); + Assertions.assertEquals(DataTypeTransformer.getTrinoType(TypeCreator.REQUIRED.I64), BIGINT); + + Assertions.assertEquals(DataTypeTransformer.getTrinoType(TypeCreator.NULLABLE.I64), BIGINT); + Assertions.assertEquals(DataTypeTransformer.getTrinoType(TypeCreator.REQUIRED.I64), BIGINT); + + Assertions.assertEquals(DataTypeTransformer.getTrinoType(TypeCreator.NULLABLE.FP64), DOUBLE); + Assertions.assertEquals(DataTypeTransformer.getTrinoType(TypeCreator.REQUIRED.FP64), DOUBLE); + + Assertions.assertEquals(DataTypeTransformer.getTrinoType(TypeCreator.NULLABLE.DATE), DATE); + Assertions.assertEquals(DataTypeTransformer.getTrinoType(TypeCreator.REQUIRED.DATE), DATE); + + Assertions.assertEquals( + DataTypeTransformer.getTrinoType(TypeCreator.NULLABLE.TIMESTAMP), TIMESTAMP_SECONDS); + Assertions.assertEquals( + DataTypeTransformer.getTrinoType(TypeCreator.REQUIRED.TIMESTAMP), TIMESTAMP_SECONDS); + + try { + DataTypeTransformer.getTrinoType(TypeCreator.NULLABLE.BINARY); + } catch (TrinoException e) { + if (e.getErrorCode() != GRAVITON_UNSUPPORTED_GRAVITON_DATATYPE.toErrorCode()) { + throw e; + } + } + } +}