From ade86c0600768e8022fedf5a1c93bb073c15ed5d Mon Sep 17 00:00:00 2001 From: zy-kkk Date: Wed, 9 Oct 2024 20:51:58 +0800 Subject: [PATCH] Revert "[branch-2.1][improvement](jdbc catalog) Optimize JdbcCatalog case mapping stability" (#41588) Reverts apache/doris#41330 --- .../doris/datasource/ExternalCatalog.java | 18 - .../doris/datasource/ExternalDatabase.java | 7 - .../datasource/jdbc/JdbcExternalCatalog.java | 56 +--- .../datasource/jdbc/JdbcExternalTable.java | 25 +- .../jdbc/JdbcIdentifierMapping.java | 45 +++ .../datasource/jdbc/client/JdbcClient.java | 46 ++- .../jdbc/client/JdbcMySQLClient.java | 4 +- .../jdbc/client/JdbcOracleClient.java | 4 +- .../mapping/DefaultIdentifierMapping.java | 268 --------------- .../datasource/mapping/IdentifierMapping.java | 307 +++++++++++++++++- 10 files changed, 409 insertions(+), 371 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcIdentifierMapping.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/DefaultIdentifierMapping.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 1b3cd9c33a1bc9..f6e5a570cc9cc7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -45,7 +45,6 @@ import org.apache.doris.datasource.infoschema.ExternalInfoSchemaDatabase; import org.apache.doris.datasource.infoschema.ExternalMysqlDatabase; import org.apache.doris.datasource.jdbc.JdbcExternalDatabase; -import org.apache.doris.datasource.mapping.IdentifierMapping; import org.apache.doris.datasource.maxcompute.MaxComputeExternalDatabase; import org.apache.doris.datasource.metacache.MetaCache; import org.apache.doris.datasource.operations.ExternalMetadataOps; @@ -143,9 +142,6 @@ public abstract class ExternalCatalog protected Optional useMetaCache = Optional.empty(); protected MetaCache> metaCache; - protected IdentifierMapping identifierMapping; - private boolean mappingsInitialized = false; - public ExternalCatalog() { } @@ -178,10 +174,6 @@ protected List listDatabaseNames() { } } - // only for forward to master - protected void buildDatabaseMapping() { - } - // Will be called when creating catalog(so when as replaying) // to add some default properties if missing. public void setDefaultPropsIfMissing(boolean isReplay) { @@ -210,10 +202,6 @@ public void checkWhenCreating() throws DdlException { */ public abstract List listTableNames(SessionContext ctx, String dbName); - // only for forward to master - protected void buildTableMapping(SessionContext ctx, String dbName) { - } - /** * check if the specified table exist. * @@ -278,10 +266,6 @@ public final synchronized void makeSureInitialized() { } initialized = true; } - if (!mappingsInitialized) { - buildDatabaseMapping(); - mappingsInitialized = true; - } } protected final void initLocalObjects() { @@ -407,7 +391,6 @@ private List getFilteredDatabaseNames() { public void onRefresh(boolean invalidCache) { this.objectCreated = false; this.initialized = false; - this.mappingsInitialized = false; synchronized (this.propLock) { this.convertedProperties = null; } @@ -733,7 +716,6 @@ public void gsonPostProcess() throws IOException { } this.propLock = new byte[0]; this.initialized = false; - this.mappingsInitialized = false; setDefaultPropsIfMissing(true); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java index cf65a5f0a48de6..d653a5a178e484 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java @@ -91,8 +91,6 @@ public abstract class ExternalDatabase private MetaCache metaCache; - private boolean mappingsInitialized = false; - /** * Create external database. * @@ -119,7 +117,6 @@ public void setTableExtCatalog(ExternalCatalog extCatalog) { public void setUnInitialized(boolean invalidCache) { this.initialized = false; - this.mappingsInitialized = false; this.invalidCacheInInit = invalidCache; if (extCatalog.getUseMetaCache().isPresent()) { if (extCatalog.getUseMetaCache().get() && metaCache != null) { @@ -173,10 +170,6 @@ public final synchronized void makeSureInitialized() { } initialized = true; } - if (!mappingsInitialized) { - extCatalog.buildTableMapping(null, name); - mappingsInitialized = true; - } } public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java index d25f97e70bd568..80cc0f554f637e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java @@ -31,7 +31,6 @@ import org.apache.doris.datasource.jdbc.client.JdbcClient; import org.apache.doris.datasource.jdbc.client.JdbcClientConfig; import org.apache.doris.datasource.jdbc.client.JdbcClientException; -import org.apache.doris.datasource.mapping.DefaultIdentifierMapping; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.InternalService.PJdbcTestConnectionRequest; import org.apache.doris.proto.InternalService.PJdbcTestConnectionResult; @@ -119,16 +118,19 @@ public void onRefresh(boolean invalidCache) { super.onRefresh(invalidCache); if (jdbcClient != null) { jdbcClient.closeClient(); - jdbcClient = null; } } + @Override + public void onRefreshCache(boolean invalidCache) { + onRefresh(invalidCache); + } + @Override public void onClose() { super.onClose(); if (jdbcClient != null) { jdbcClient.closeClient(); - jdbcClient = null; } } @@ -229,6 +231,8 @@ protected void initLocalObjectsImpl() { .setDriverUrl(getDriverUrl()) .setDriverClass(getDriverClass()) .setOnlySpecifiedDatabase(getOnlySpecifiedDatabase()) + .setIsLowerCaseMetaNames(getLowerCaseMetaNames()) + .setMetaNamesMapping(getMetaNamesMapping()) .setIncludeDatabaseMap(getIncludeDatabaseMap()) .setExcludeDatabaseMap(getExcludeDatabaseMap()) .setConnectionPoolMinSize(getConnectionPoolMinSize()) @@ -238,62 +242,22 @@ protected void initLocalObjectsImpl() { .setConnectionPoolKeepAlive(isConnectionPoolKeepAlive()); jdbcClient = JdbcClient.createJdbcClient(jdbcClientConfig); - identifierMapping = new DefaultIdentifierMapping(Boolean.parseBoolean(getLowerCaseMetaNames()), - getMetaNamesMapping()); } - @Override protected List listDatabaseNames() { - return identifierMapping.fromRemoteDatabaseName(jdbcClient.getDatabaseNameList()); - } - - @Override - protected void buildDatabaseMapping() { - identifierMapping.fromRemoteDatabaseName(jdbcClient.getDatabaseNameList()); - } - - protected String getRemoteDatabaseName(String dbName) { - return identifierMapping.toRemoteDatabaseName(dbName); + return jdbcClient.getDatabaseNameList(); } @Override public List listTableNames(SessionContext ctx, String dbName) { makeSureInitialized(); - String remoteDbName = getRemoteDatabaseName(dbName); - return identifierMapping.fromRemoteTableName(remoteDbName, jdbcClient.getTablesNameList(remoteDbName)); - } - - @Override - protected void buildTableMapping(SessionContext ctx, String dbName) { - String remoteDbName = getRemoteDatabaseName(dbName); - identifierMapping.fromRemoteTableName(getRemoteDatabaseName(dbName), - jdbcClient.getTablesNameList(remoteDbName)); - } - - protected String getRemoteTableName(String dbName, String tblName) { - return identifierMapping.toRemoteTableName(getRemoteDatabaseName(dbName), tblName); + return jdbcClient.getTablesNameList(dbName); } @Override public boolean tableExist(SessionContext ctx, String dbName, String tblName) { makeSureInitialized(); - String remoteDbName = getRemoteDatabaseName(dbName); - String remoteTblName = getRemoteTableName(dbName, tblName); - return jdbcClient.isTableExist(remoteDbName, remoteTblName); - } - - public List listColumns(String dbName, String tblName) { - makeSureInitialized(); - String remoteDbName = getRemoteDatabaseName(dbName); - String remoteTblName = getRemoteTableName(dbName, tblName); - return identifierMapping.fromRemoteColumnName(remoteDbName, remoteTblName, - jdbcClient.getColumnsFromJdbc(remoteDbName, - remoteTblName)); - } - - protected Map getRemoteColumnNames(String dbName, String tblName) { - return identifierMapping.toRemoteColumnNames(getRemoteDatabaseName(dbName), - getRemoteTableName(dbName, tblName)); + return jdbcClient.isTableExist(dbName, tblName); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java index a3af7f5b820e05..07ce183a589671 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java @@ -32,7 +32,6 @@ import org.apache.doris.statistics.util.StatisticsUtil; import org.apache.doris.thrift.TTableDescriptor; -import com.google.common.collect.Maps; import org.apache.commons.text.StringSubstitutor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -87,29 +86,21 @@ public TTableDescriptor toThrift() { @Override public Optional initSchema() { - return Optional.of(new SchemaCacheValue(((JdbcExternalCatalog) catalog).listColumns(dbName, name))); + return Optional.of(new SchemaCacheValue(((JdbcExternalCatalog) catalog).getJdbcClient() + .getColumnsFromJdbc(dbName, name))); } private JdbcTable toJdbcTable() { List schema = getFullSchema(); JdbcExternalCatalog jdbcCatalog = (JdbcExternalCatalog) catalog; - String fullTableName = this.dbName + "." + this.name; - JdbcTable jdbcTable = new JdbcTable(this.id, fullTableName, schema, TableType.JDBC_EXTERNAL_TABLE); - jdbcCatalog.configureJdbcTable(jdbcTable, fullTableName); + String fullDbName = this.dbName + "." + this.name; + JdbcTable jdbcTable = new JdbcTable(this.id, fullDbName, schema, TableType.JDBC_EXTERNAL_TABLE); + jdbcCatalog.configureJdbcTable(jdbcTable, fullDbName); // Set remote properties - jdbcTable.setRemoteDatabaseName(jdbcCatalog.getRemoteDatabaseName(this.dbName)); - jdbcTable.setRemoteTableName(jdbcCatalog.getRemoteTableName(this.dbName, this.name)); - Map remoteColumnNames = jdbcCatalog.getRemoteColumnNames(this.dbName, this.name); - if (!remoteColumnNames.isEmpty()) { - jdbcTable.setRemoteColumnNames(remoteColumnNames); - } else { - remoteColumnNames = Maps.newHashMap(); - for (Column column : schema) { - remoteColumnNames.put(column.getName(), column.getName()); - } - jdbcTable.setRemoteColumnNames(remoteColumnNames); - } + jdbcTable.setRemoteDatabaseName(jdbcCatalog.getJdbcClient().getRemoteDatabaseName(this.dbName)); + jdbcTable.setRemoteTableName(jdbcCatalog.getJdbcClient().getRemoteTableName(this.dbName, this.name)); + jdbcTable.setRemoteColumnNames(jdbcCatalog.getJdbcClient().getRemoteColumnNames(this.dbName, this.name)); return jdbcTable; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcIdentifierMapping.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcIdentifierMapping.java new file mode 100644 index 00000000000000..20a74724b3e496 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcIdentifierMapping.java @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.jdbc; + +import org.apache.doris.datasource.jdbc.client.JdbcClient; +import org.apache.doris.datasource.mapping.IdentifierMapping; + +public class JdbcIdentifierMapping extends IdentifierMapping { + private final JdbcClient jdbcClient; + + public JdbcIdentifierMapping(boolean isLowerCaseMetaNames, String metaNamesMapping, JdbcClient jdbcClient) { + super(isLowerCaseMetaNames, metaNamesMapping); + this.jdbcClient = jdbcClient; + } + + @Override + protected void loadDatabaseNames() { + jdbcClient.getDatabaseNameList(); + } + + @Override + protected void loadTableNames(String localDbName) { + jdbcClient.getTablesNameList(localDbName); + } + + @Override + protected void loadColumnNames(String localDbName, String localTableName) { + jdbcClient.getColumnsFromJdbc(localDbName, localTableName); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java index 2b82c074809e8c..0e57f989df34cb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java @@ -23,6 +23,7 @@ import org.apache.doris.catalog.Type; import org.apache.doris.common.DdlException; import org.apache.doris.common.util.Util; +import org.apache.doris.datasource.jdbc.JdbcIdentifierMapping; import org.apache.doris.datasource.jdbc.util.JdbcFieldSchema; import com.google.common.collect.ImmutableSet; @@ -61,8 +62,11 @@ public abstract class JdbcClient { protected ClassLoader classLoader = null; protected HikariDataSource dataSource = null; protected boolean isOnlySpecifiedDatabase; + protected boolean isLowerCaseMetaNames; + protected String metaNamesMapping; protected Map includeDatabaseMap; protected Map excludeDatabaseMap; + protected JdbcIdentifierMapping jdbcLowerCaseMetaMatching; public static JdbcClient createJdbcClient(JdbcClientConfig jdbcClientConfig) { String dbType = parseDbType(jdbcClientConfig.getJdbcUrl()); @@ -97,6 +101,8 @@ protected JdbcClient(JdbcClientConfig jdbcClientConfig) { this.catalogName = jdbcClientConfig.getCatalog(); this.jdbcUser = jdbcClientConfig.getUser(); this.isOnlySpecifiedDatabase = Boolean.parseBoolean(jdbcClientConfig.getOnlySpecifiedDatabase()); + this.isLowerCaseMetaNames = Boolean.parseBoolean(jdbcClientConfig.getIsLowerCaseMetaNames()); + this.metaNamesMapping = jdbcClientConfig.getMetaNamesMapping(); this.includeDatabaseMap = Optional.ofNullable(jdbcClientConfig.getIncludeDatabaseMap()).orElse(Collections.emptyMap()); this.excludeDatabaseMap = @@ -105,6 +111,7 @@ protected JdbcClient(JdbcClientConfig jdbcClientConfig) { this.dbType = parseDbType(jdbcUrl); initializeClassLoader(jdbcClientConfig); initializeDataSource(jdbcClientConfig); + this.jdbcLowerCaseMetaMatching = new JdbcIdentifierMapping(isLowerCaseMetaNames, metaNamesMapping, this); } // Initialize DataSource @@ -287,9 +294,10 @@ public List getDatabaseNameList() { /** * get all tables of one database */ - public List getTablesNameList(String remoteDbName) { + public List getTablesNameList(String localDbName) { List remoteTablesNames = Lists.newArrayList(); String[] tableTypes = getTableTypes(); + String remoteDbName = getRemoteDatabaseName(localDbName); processTable(remoteDbName, null, tableTypes, (rs) -> { try { while (rs.next()) { @@ -299,12 +307,14 @@ public List getTablesNameList(String remoteDbName) { throw new JdbcClientException("failed to get all tables for remote database: `%s`", e, remoteDbName); } }); - return remoteTablesNames; + return filterTableNames(remoteDbName, remoteTablesNames); } - public boolean isTableExist(String remoteDbName, String remoteTableName) { + public boolean isTableExist(String localDbName, String localTableName) { final boolean[] isExist = {false}; String[] tableTypes = getTableTypes(); + String remoteDbName = getRemoteDatabaseName(localDbName); + String remoteTableName = getRemoteTableName(localDbName, localTableName); processTable(remoteDbName, remoteTableName, tableTypes, (rs) -> { try { if (rs.next()) { @@ -321,10 +331,12 @@ public boolean isTableExist(String remoteDbName, String remoteTableName) { /** * get all columns of one table */ - public List getJdbcColumnsInfo(String remoteDbName, String remoteTableName) { + public List getJdbcColumnsInfo(String localDbName, String localTableName) { Connection conn = getConnection(); ResultSet rs = null; List tableSchema = Lists.newArrayList(); + String remoteDbName = getRemoteDatabaseName(localDbName); + String remoteTableName = getRemoteTableName(localDbName, localTableName); try { DatabaseMetaData databaseMetaData = conn.getMetaData(); String catalogName = getCatalogName(conn); @@ -350,7 +362,21 @@ public List getColumnsFromJdbc(String localDbName, String localTableName field.isAllowNull(), field.getRemarks(), true, -1)); } - return dorisTableSchema; + String remoteDbName = getRemoteDatabaseName(localDbName); + String remoteTableName = getRemoteTableName(localDbName, localTableName); + return filterColumnName(remoteDbName, remoteTableName, dorisTableSchema); + } + + public String getRemoteDatabaseName(String localDbname) { + return jdbcLowerCaseMetaMatching.getRemoteDatabaseName(localDbname); + } + + public String getRemoteTableName(String localDbName, String localTableName) { + return jdbcLowerCaseMetaMatching.getRemoteTableName(localDbName, localTableName); + } + + public Map getRemoteColumnNames(String localDbName, String localTableName) { + return jdbcLowerCaseMetaMatching.getRemoteColumnNames(localDbName, localTableName); } // protected methods,for subclass to override @@ -408,7 +434,7 @@ protected List filterDatabaseNames(List remoteDbNames) { } filteredDatabaseNames.add(databaseName); } - return filteredDatabaseNames; + return jdbcLowerCaseMetaMatching.setDatabaseNameMapping(filteredDatabaseNames); } protected Set getFilterInternalDatabases() { @@ -419,6 +445,14 @@ protected Set getFilterInternalDatabases() { .build(); } + protected List filterTableNames(String remoteDbName, List remoteTableNames) { + return jdbcLowerCaseMetaMatching.setTableNameMapping(remoteDbName, remoteTableNames); + } + + protected List filterColumnName(String remoteDbName, String remoteTableName, List remoteColumns) { + return jdbcLowerCaseMetaMatching.setColumnNameMapping(remoteDbName, remoteTableName, remoteColumns); + } + protected abstract Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema); protected Type createDecimalOrStringType(int precision, int scale) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcMySQLClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcMySQLClient.java index 3baa2ce9d911df..5624392de14c39 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcMySQLClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcMySQLClient.java @@ -129,10 +129,12 @@ protected ResultSet getRemoteColumns(DatabaseMetaData databaseMetaData, String c * get all columns of one table */ @Override - public List getJdbcColumnsInfo(String remoteDbName, String remoteTableName) { + public List getJdbcColumnsInfo(String localDbName, String localTableName) { Connection conn = getConnection(); ResultSet rs = null; List tableSchema = Lists.newArrayList(); + String remoteDbName = getRemoteDatabaseName(localDbName); + String remoteTableName = getRemoteTableName(localDbName, localTableName); try { DatabaseMetaData databaseMetaData = conn.getMetaData(); String catalogName = getCatalogName(conn); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java index dc367e8ea6e6b6..d37b36cbf3de15 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java @@ -49,10 +49,12 @@ public String getTestQuery() { } @Override - public List getJdbcColumnsInfo(String remoteDbName, String remoteTableName) { + public List getJdbcColumnsInfo(String localDbName, String localTableName) { Connection conn = getConnection(); ResultSet rs = null; List tableSchema = Lists.newArrayList(); + String remoteDbName = getRemoteDatabaseName(localDbName); + String remoteTableName = getRemoteTableName(localDbName, localTableName); try { DatabaseMetaData databaseMetaData = conn.getMetaData(); String catalogName = getCatalogName(conn); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/DefaultIdentifierMapping.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/DefaultIdentifierMapping.java deleted file mode 100644 index 4847cd86e6d79c..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/DefaultIdentifierMapping.java +++ /dev/null @@ -1,268 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.datasource.mapping; - -import org.apache.doris.catalog.Column; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -public class DefaultIdentifierMapping implements IdentifierMapping { - private static final Logger LOG = LogManager.getLogger(DefaultIdentifierMapping.class); - - private final ObjectMapper mapper = new ObjectMapper(); - private final ConcurrentHashMap localDBToRemoteDB = new ConcurrentHashMap<>(); - private final ConcurrentHashMap> localTableToRemoteTable - = new ConcurrentHashMap<>(); - private final ConcurrentHashMap>> - localColumnToRemoteColumn = new ConcurrentHashMap<>(); - - private final boolean isLowerCaseMetaNames; - private final String metaNamesMapping; - - public DefaultIdentifierMapping(boolean isLowerCaseMetaNames, String metaNamesMapping) { - this.isLowerCaseMetaNames = isLowerCaseMetaNames; - this.metaNamesMapping = metaNamesMapping; - } - - private boolean isMappingInvalid() { - return metaNamesMapping == null || metaNamesMapping.isEmpty(); - } - - @Override - public List fromRemoteDatabaseName(List remoteDatabaseNames) { - // If mapping is not required, return the original input - if (!isLowerCaseMetaNames && isMappingInvalid()) { - return remoteDatabaseNames; - } - JsonNode databasesNode = readAndParseJson(metaNamesMapping, "databases"); - - Map databaseNameMapping = Maps.newTreeMap(); - if (databasesNode.isArray()) { - for (JsonNode node : databasesNode) { - String remoteDatabase = node.path("remoteDatabase").asText(); - String mapping = node.path("mapping").asText(); - databaseNameMapping.put(remoteDatabase, mapping); - } - } - - Map> result = nameListToMapping(remoteDatabaseNames, localDBToRemoteDB, - databaseNameMapping, isLowerCaseMetaNames); - List localDatabaseNames = result.get("localNames"); - List conflictNames = result.get("conflictNames"); - if (!conflictNames.isEmpty()) { - throw new RuntimeException( - "Conflict database/schema names found when lower_case_meta_names is true: " + conflictNames - + ". Please set lower_case_meta_names to false or" - + " use meta_name_mapping to specify the names."); - } - return localDatabaseNames; - } - - @Override - public List fromRemoteTableName(String remoteDbName, List remoteTableNames) { - // If mapping is not required, return the original input - if (!isLowerCaseMetaNames && isMappingInvalid()) { - return remoteTableNames; - } - JsonNode tablesNode = readAndParseJson(metaNamesMapping, "tables"); - - Map tableNameMapping = Maps.newTreeMap(); - if (tablesNode.isArray()) { - for (JsonNode node : tablesNode) { - String remoteDatabase = node.path("remoteDatabase").asText(); - if (remoteDbName.equals(remoteDatabase)) { - String remoteTable = node.path("remoteTable").asText(); - String mapping = node.path("mapping").asText(); - tableNameMapping.put(remoteTable, mapping); - } - } - } - - localTableToRemoteTable.putIfAbsent(remoteDbName, new ConcurrentHashMap<>()); - - Map> result = nameListToMapping(remoteTableNames, - localTableToRemoteTable.get(remoteDbName), - tableNameMapping, isLowerCaseMetaNames); - List localTableNames = result.get("localNames"); - List conflictNames = result.get("conflictNames"); - - if (!conflictNames.isEmpty()) { - throw new RuntimeException( - "Conflict table names found in remote database/schema: " + remoteDbName - + " when lower_case_meta_names is true: " + conflictNames - + ". Please set lower_case_meta_names to false or" - + " use meta_name_mapping to specify the table names."); - } - return localTableNames; - } - - @Override - public List fromRemoteColumnName(String remoteDatabaseName, String remoteTableName, - List remoteColumns) { - // If mapping is not required, return the original input - if (!isLowerCaseMetaNames && isMappingInvalid()) { - return remoteColumns; - } - JsonNode tablesNode = readAndParseJson(metaNamesMapping, "columns"); - - Map columnNameMapping = Maps.newTreeMap(); - if (tablesNode.isArray()) { - for (JsonNode node : tablesNode) { - String remoteDatabase = node.path("remoteDatabase").asText(); - String remoteTable = node.path("remoteTable").asText(); - if (remoteDatabaseName.equals(remoteDatabase) && remoteTable.equals(remoteTableName)) { - String remoteColumn = node.path("remoteColumn").asText(); - String mapping = node.path("mapping").asText(); - columnNameMapping.put(remoteColumn, mapping); - } - } - } - localColumnToRemoteColumn.putIfAbsent(remoteDatabaseName, new ConcurrentHashMap<>()); - localColumnToRemoteColumn.get(remoteDatabaseName).putIfAbsent(remoteTableName, new ConcurrentHashMap<>()); - - List remoteColumnNames = Lists.newArrayList(); - for (Column remoteColumn : remoteColumns) { - remoteColumnNames.add(remoteColumn.getName()); - } - - Map> result = nameListToMapping(remoteColumnNames, - localColumnToRemoteColumn.get(remoteDatabaseName).get(remoteTableName), - columnNameMapping, isLowerCaseMetaNames); - List localColumnNames = result.get("localNames"); - List conflictNames = result.get("conflictNames"); - if (!conflictNames.isEmpty()) { - throw new RuntimeException( - "Conflict column names found in remote database/schema: " + remoteDatabaseName - + " in remote table: " + remoteTableName - + " when lower_case_meta_names is true: " + conflictNames - + ". Please set lower_case_meta_names to false or" - + " use meta_name_mapping to specify the column names."); - } - for (int i = 0; i < remoteColumns.size(); i++) { - remoteColumns.get(i).setName(localColumnNames.get(i)); - } - return remoteColumns; - } - - @Override - public String toRemoteDatabaseName(String localDatabaseName) { - // If mapping is not required, return the original input - if (!isLowerCaseMetaNames && isMappingInvalid()) { - return localDatabaseName; - } - return getRequiredMapping(localDBToRemoteDB, localDatabaseName, "database", localDatabaseName); - } - - @Override - public String toRemoteTableName(String remoteDatabaseName, String localTableName) { - // If mapping is not required, return the original input - if (!isLowerCaseMetaNames && isMappingInvalid()) { - return localTableName; - } - Map tableMap = localTableToRemoteTable.computeIfAbsent(remoteDatabaseName, - k -> new ConcurrentHashMap<>()); - return getRequiredMapping(tableMap, localTableName, "table", localTableName); - } - - @Override - public Map toRemoteColumnNames(String remoteDatabaseName, String remoteTableName) { - // If mapping is not required, return an empty map (since there's no mapping) - if (!isLowerCaseMetaNames && isMappingInvalid()) { - return Collections.emptyMap(); - } - ConcurrentHashMap> tableColumnMap - = localColumnToRemoteColumn.computeIfAbsent(remoteDatabaseName, k -> new ConcurrentHashMap<>()); - Map columnMap = tableColumnMap.computeIfAbsent(remoteTableName, k -> new ConcurrentHashMap<>()); - if (columnMap.isEmpty()) { - LOG.warn("No remote column found for: {}. Please refresh this catalog.", remoteTableName); - throw new RuntimeException( - "No remote column found for: " + remoteTableName + ". Please refresh this catalog."); - } - return columnMap; - } - - private V getRequiredMapping(Map map, K key, String typeName, String entityName) { - V value = map.get(key); - if (value == null) { - LOG.warn("No remote {} found for {}: {}. Please refresh this catalog.", typeName, typeName, entityName); - throw new RuntimeException("No remote " + typeName + " found for " + typeName + ": " + entityName - + ". Please refresh this catalog."); - } - return value; - } - - private JsonNode readAndParseJson(String jsonPath, String nodeName) { - JsonNode rootNode; - try { - rootNode = mapper.readTree(jsonPath); - return rootNode.path(nodeName); - } catch (JsonProcessingException e) { - throw new RuntimeException("parse meta_names_mapping property error", e); - } - } - - private Map> nameListToMapping(List remoteNames, - ConcurrentHashMap localNameToRemoteName, - Map nameMapping, boolean isLowerCaseMetaNames) { - List filteredDatabaseNames = Lists.newArrayList(); - Set lowerCaseNames = Sets.newHashSet(); - Map> nameMap = Maps.newHashMap(); - List conflictNames = Lists.newArrayList(); - - for (String name : remoteNames) { - String mappedName = nameMapping.getOrDefault(name, name); - String localName = isLowerCaseMetaNames ? mappedName.toLowerCase() : mappedName; - - localNameToRemoteName.computeIfAbsent(localName, k -> name); - - if (isLowerCaseMetaNames && !lowerCaseNames.add(localName)) { - if (nameMap.containsKey(localName)) { - nameMap.get(localName).add(mappedName); - } - } else { - nameMap.putIfAbsent(localName, Lists.newArrayList(Collections.singletonList(mappedName))); - } - - filteredDatabaseNames.add(localName); - } - - for (List conflictNameList : nameMap.values()) { - if (conflictNameList.size() > 1) { - conflictNames.addAll(conflictNameList); - } - } - - Map> result = Maps.newConcurrentMap(); - result.put("localNames", filteredDatabaseNames); - result.put("conflictNames", conflictNames); - return result; - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java index 7745a25d27da47..363ef351152a39 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java @@ -18,20 +18,313 @@ package org.apache.doris.datasource.mapping; import org.apache.doris.catalog.Column; +import org.apache.doris.qe.GlobalVariable; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; + +public abstract class IdentifierMapping { + private static final Logger LOG = LogManager.getLogger(IdentifierMapping.class); + + private final ObjectMapper mapper = new ObjectMapper(); + private final ConcurrentHashMap localDBToRemoteDB = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> localTableToRemoteTable + = new ConcurrentHashMap<>(); + private final ConcurrentHashMap>> + localColumnToRemoteColumn = new ConcurrentHashMap<>(); + + private final AtomicBoolean dbNamesLoaded = new AtomicBoolean(false); + private final ConcurrentHashMap tableNamesLoadedMap = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> columnNamesLoadedMap + = new ConcurrentHashMap<>(); + + private final boolean isLowerCaseMetaNames; + private final String metaNamesMapping; + + public IdentifierMapping(boolean isLowerCaseMetaNames, String metaNamesMapping) { + this.isLowerCaseMetaNames = isLowerCaseMetaNames; + this.metaNamesMapping = metaNamesMapping; + } + + public List setDatabaseNameMapping(List remoteDatabaseNames) { + JsonNode databasesNode = readAndParseJson(metaNamesMapping, "databases"); + + Map databaseNameMapping = Maps.newTreeMap(); + if (databasesNode.isArray()) { + for (JsonNode node : databasesNode) { + String remoteDatabase = node.path("remoteDatabase").asText(); + String mapping = node.path("mapping").asText(); + databaseNameMapping.put(remoteDatabase, mapping); + } + } + + Map> result = nameListToMapping(remoteDatabaseNames, localDBToRemoteDB, + databaseNameMapping, isLowerCaseMetaNames); + List localDatabaseNames = result.get("localNames"); + List conflictNames = result.get("conflictNames"); + if (!conflictNames.isEmpty()) { + throw new RuntimeException( + "Conflict database/schema names found when lower_case_meta_names is true: " + conflictNames + + ". Please set lower_case_meta_names to false or" + + " use meta_name_mapping to specify the names."); + } + return localDatabaseNames; + } + + public List setTableNameMapping(String remoteDbName, List remoteTableNames) { + JsonNode tablesNode = readAndParseJson(metaNamesMapping, "tables"); + + Map tableNameMapping = Maps.newTreeMap(); + if (tablesNode.isArray()) { + for (JsonNode node : tablesNode) { + String remoteDatabase = node.path("remoteDatabase").asText(); + if (remoteDbName.equals(remoteDatabase)) { + String remoteTable = node.path("remoteTable").asText(); + String mapping = node.path("mapping").asText(); + tableNameMapping.put(remoteTable, mapping); + } + } + } + + localTableToRemoteTable.putIfAbsent(remoteDbName, new ConcurrentHashMap<>()); + + List localTableNames; + List conflictNames; + + if (GlobalVariable.lowerCaseTableNames == 1) { + Map> result = nameListToMapping(remoteTableNames, + localTableToRemoteTable.get(remoteDbName), + tableNameMapping, true); + localTableNames = result.get("localNames"); + conflictNames = result.get("conflictNames"); + if (!conflictNames.isEmpty()) { + throw new RuntimeException( + "Conflict table names found in remote database/schema: " + remoteDbName + + " when lower_case_table_names is 1: " + conflictNames + + ". Please use meta_name_mapping to specify the names."); + } + } else { + Map> result = nameListToMapping(remoteTableNames, + localTableToRemoteTable.get(remoteDbName), + tableNameMapping, isLowerCaseMetaNames); + localTableNames = result.get("localNames"); + conflictNames = result.get("conflictNames"); + + if (!conflictNames.isEmpty()) { + throw new RuntimeException( + "Conflict table names found in remote database/schema: " + remoteDbName + + "when lower_case_meta_names is true: " + conflictNames + + ". Please set lower_case_meta_names to false or" + + " use meta_name_mapping to specify the table names."); + } + } + return localTableNames; + } + + public List setColumnNameMapping(String remoteDbName, String remoteTableName, List remoteColumns) { + JsonNode tablesNode = readAndParseJson(metaNamesMapping, "columns"); + + Map columnNameMapping = Maps.newTreeMap(); + if (tablesNode.isArray()) { + for (JsonNode node : tablesNode) { + String remoteDatabase = node.path("remoteDatabase").asText(); + String remoteTable = node.path("remoteTable").asText(); + if (remoteDbName.equals(remoteDatabase) && remoteTable.equals(remoteTableName)) { + String remoteColumn = node.path("remoteColumn").asText(); + String mapping = node.path("mapping").asText(); + columnNameMapping.put(remoteColumn, mapping); + } + } + } + localColumnToRemoteColumn.putIfAbsent(remoteDbName, new ConcurrentHashMap<>()); + localColumnToRemoteColumn.get(remoteDbName).putIfAbsent(remoteTableName, new ConcurrentHashMap<>()); + + List localColumnNames; + List conflictNames; + + // Get the name from localColumns and save it to List + List remoteColumnNames = Lists.newArrayList(); + for (Column remoteColumn : remoteColumns) { + remoteColumnNames.add(remoteColumn.getName()); + } + + Map> result = nameListToMapping(remoteColumnNames, + localColumnToRemoteColumn.get(remoteDbName).get(remoteTableName), + columnNameMapping, isLowerCaseMetaNames); + localColumnNames = result.get("localNames"); + conflictNames = result.get("conflictNames"); + if (!conflictNames.isEmpty()) { + throw new RuntimeException( + "Conflict column names found in remote database/schema: " + remoteDbName + + " in remote table: " + remoteTableName + + " when lower_case_meta_names is true: " + conflictNames + + ". Please set lower_case_meta_names to false or" + + " use meta_name_mapping to specify the column names."); + } + // Replace the name in remoteColumns with localColumnNames + for (int i = 0; i < remoteColumns.size(); i++) { + remoteColumns.get(i).setName(localColumnNames.get(i)); + } + return remoteColumns; + } + + public String getRemoteDatabaseName(String localDbName) { + return getRequiredMapping(localDBToRemoteDB, localDbName, "database", this::loadDatabaseNamesIfNeeded, + localDbName); + } + + public String getRemoteTableName(String localDbName, String localTableName) { + String remoteDbName = getRemoteDatabaseName(localDbName); + Map tableMap = localTableToRemoteTable.computeIfAbsent(remoteDbName, + k -> new ConcurrentHashMap<>()); + return getRequiredMapping(tableMap, localTableName, "table", () -> loadTableNamesIfNeeded(localDbName), + localTableName); + } + + public Map getRemoteColumnNames(String localDbName, String localTableName) { + String remoteDbName = getRemoteDatabaseName(localDbName); + String remoteTableName = getRemoteTableName(localDbName, localTableName); + ConcurrentHashMap> tableColumnMap + = localColumnToRemoteColumn.computeIfAbsent(remoteDbName, k -> new ConcurrentHashMap<>()); + Map columnMap = tableColumnMap.computeIfAbsent(remoteTableName, k -> new ConcurrentHashMap<>()); + if (columnMap.isEmpty()) { + LOG.info("Column name mapping missing, loading column names for localDbName: {}, localTableName: {}", + localDbName, localTableName); + loadColumnNamesIfNeeded(localDbName, localTableName); + columnMap = tableColumnMap.get(remoteTableName); + } + if (columnMap.isEmpty()) { + LOG.warn("No remote column found for localTableName: {}. Please refresh this catalog.", localTableName); + throw new RuntimeException( + "No remote column found for localTableName: " + localTableName + ". Please refresh this catalog."); + } + return columnMap; + } + + + private void loadDatabaseNamesIfNeeded() { + if (dbNamesLoaded.compareAndSet(false, true)) { + try { + loadDatabaseNames(); + } catch (Exception e) { + dbNamesLoaded.set(false); // Reset on failure + LOG.warn("Error loading database names", e); + } + } + } + + private void loadTableNamesIfNeeded(String localDbName) { + AtomicBoolean isLoaded = tableNamesLoadedMap.computeIfAbsent(localDbName, k -> new AtomicBoolean(false)); + if (isLoaded.compareAndSet(false, true)) { + try { + loadTableNames(localDbName); + } catch (Exception e) { + tableNamesLoadedMap.get(localDbName).set(false); // Reset on failure + LOG.warn("Error loading table names for localDbName: {}", localDbName, e); + } + } + } + + private void loadColumnNamesIfNeeded(String localDbName, String localTableName) { + columnNamesLoadedMap.putIfAbsent(localDbName, new ConcurrentHashMap<>()); + AtomicBoolean isLoaded = columnNamesLoadedMap.get(localDbName) + .computeIfAbsent(localTableName, k -> new AtomicBoolean(false)); + if (isLoaded.compareAndSet(false, true)) { + try { + loadColumnNames(localDbName, localTableName); + } catch (Exception e) { + columnNamesLoadedMap.get(localDbName).get(localTableName).set(false); // Reset on failure + LOG.warn("Error loading column names for localDbName: {}, localTableName: {}", localDbName, + localTableName, e); + } + } + } + + private V getRequiredMapping(Map map, K key, String typeName, Runnable loadIfNeeded, + String entityName) { + if (map.isEmpty() || !map.containsKey(key) || map.get(key) == null) { + LOG.info("{} mapping missing, loading for {}: {}", typeName, typeName, entityName); + loadIfNeeded.run(); + } + V value = map.get(key); + if (value == null) { + LOG.warn("No remote {} found for {}: {}. Please refresh this catalog.", typeName, typeName, entityName); + throw new RuntimeException("No remote " + typeName + " found for " + typeName + ": " + entityName + + ". Please refresh this catalog."); + } + return value; + } + + // Load the database name from the data source. + // In the corresponding getDatabaseNameList(), setDatabaseNameMapping() must be used to update the mapping. + protected abstract void loadDatabaseNames(); + + // Load the table names for the specified database from the data source. + // In the corresponding getTableNameList(), setTableNameMapping() must be used to update the mapping. + protected abstract void loadTableNames(String localDbName); + + // Load the column names for a specified table in a database from the data source. + // In the corresponding getColumnNameList(), setColumnNameMapping() must be used to update the mapping. + protected abstract void loadColumnNames(String localDbName, String localTableName); + + private JsonNode readAndParseJson(String jsonPath, String nodeName) { + JsonNode rootNode; + try { + rootNode = mapper.readTree(jsonPath); + return rootNode.path(nodeName); + } catch (JsonProcessingException e) { + throw new RuntimeException("parse meta_names_mapping property error", e); + } + } + + private Map> nameListToMapping(List remoteNames, + ConcurrentHashMap localNameToRemoteName, + Map nameMapping, boolean isLowerCaseMetaNames) { + List filteredDatabaseNames = Lists.newArrayList(); + Set lowerCaseNames = Sets.newHashSet(); + Map> nameMap = Maps.newHashMap(); + List conflictNames = Lists.newArrayList(); -public interface IdentifierMapping { - List fromRemoteDatabaseName(List remoteDatabaseNames); + for (String name : remoteNames) { + String mappedName = nameMapping.getOrDefault(name, name); + String localName = isLowerCaseMetaNames ? mappedName.toLowerCase() : mappedName; - List fromRemoteTableName(String remoteDatabaseName, List remoteTableNames); + // Use computeIfAbsent to ensure atomicity + localNameToRemoteName.computeIfAbsent(localName, k -> name); - List fromRemoteColumnName(String remoteDatabaseName, String remoteTableName, List remoteColumns); + if (isLowerCaseMetaNames && !lowerCaseNames.add(localName)) { + if (nameMap.containsKey(localName)) { + nameMap.get(localName).add(mappedName); + } + } else { + nameMap.putIfAbsent(localName, Lists.newArrayList(Collections.singletonList(mappedName))); + } - String toRemoteDatabaseName(String localDatabaseName); + filteredDatabaseNames.add(localName); + } - String toRemoteTableName(String remoteDatabaseName, String localTableName); + for (List conflictNameList : nameMap.values()) { + if (conflictNameList.size() > 1) { + conflictNames.addAll(conflictNameList); + } + } - Map toRemoteColumnNames(String remoteDatabaseName, String remoteTableName); + Map> result = Maps.newConcurrentMap(); + result.put("localNames", filteredDatabaseNames); + result.put("conflictNames", conflictNames); + return result; + } }