Skip to content

Commit

Permalink
Handling Comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zy-kkk committed Oct 7, 2024
1 parent 0ca8b28 commit a833914
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

@Getter
public class JdbcExternalCatalog extends ExternalCatalog {
Expand Down Expand Up @@ -245,12 +246,16 @@ protected void initLocalObjectsImpl() {

@Override
protected List<String> listDatabaseNames() {
return identifierMapping.fromRemoteDatabaseName(jdbcClient.getDatabaseNameList());
return getMappedDatabaseNames();
}

@Override
protected void buildDatabaseMapping() {
identifierMapping.fromRemoteDatabaseName(jdbcClient.getDatabaseNameList());
getMappedDatabaseNames();
}

private List<String> getMappedDatabaseNames() {
return identifierMapping.fromRemoteDatabaseName(jdbcClient.getDatabaseNameList());
}

protected String getRemoteDatabaseName(String dbName) {
Expand Down Expand Up @@ -287,9 +292,20 @@ public List<Column> listColumns(String dbName, String tblName) {
makeSureInitialized();
String remoteDbName = getRemoteDatabaseName(dbName);
String remoteTblName = getRemoteTableName(dbName, tblName);
return identifierMapping.fromRemoteColumnName(remoteDbName, remoteTblName,
jdbcClient.getColumnsFromJdbc(remoteDbName,
remoteTblName));

List<Column> remoteColumns = jdbcClient.getColumnsFromJdbc(remoteDbName, remoteTblName);

List<String> remoteColumnNames = remoteColumns.stream()
.map(Column::getName)
.collect(Collectors.toList());
List<String> localColumnNames = identifierMapping.fromRemoteColumnName(remoteDbName, remoteTblName,
remoteColumnNames);

for (int i = 0; i < remoteColumns.size(); i++) {
remoteColumns.get(i).setName(localColumnNames.get(i));
}

return remoteColumns;
}

protected Map<String, String> getRemoteColumnNames(String dbName, String tblName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,10 @@ protected ResultSet getRemoteColumns(DatabaseMetaData databaseMetaData, String c
}

@Override
public List<JdbcFieldSchema> getJdbcColumnsInfo(String localDbName, String localTableName) {
public List<JdbcFieldSchema> getJdbcColumnsInfo(String remoteDbName, String remoteTableName) {
Connection conn = getConnection();
ResultSet rs = null;
List<JdbcFieldSchema> tableSchema = Lists.newArrayList();
String remoteDbName = getRemoteDatabaseName(localDbName);
String remoteTableName = getRemoteTableName(localDbName, localTableName);
try {
DatabaseMetaData databaseMetaData = conn.getMetaData();
String catalogName = getCatalogName(conn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

public class DefaultIdentifierMapping implements IdentifierMapping {
private static final Logger LOG = LogManager.getLogger(DefaultIdentifierMapping.class);
Expand Down Expand Up @@ -125,12 +126,12 @@ public List<String> fromRemoteTableName(String remoteDbName, List<String> remote
}

@Override
public List<Column> fromRemoteColumnName(String remoteDatabaseName, String remoteTableName,
List<Column> remoteColumns) {
// If mapping is not required, return the original input
public List<String> fromRemoteColumnName(String remoteDatabaseName, String remoteTableName,
List<String> remoteColumnNames) {
if (!isLowerCaseMetaNames && isMappingInvalid()) {
return remoteColumns;
return remoteColumnNames;
}

JsonNode tablesNode = readAndParseJson(metaNamesMapping, "columns");

Map<String, String> columnNameMapping = Maps.newTreeMap();
Expand All @@ -145,19 +146,17 @@ public List<Column> fromRemoteColumnName(String remoteDatabaseName, String remot
}
}
}

localColumnToRemoteColumn.putIfAbsent(remoteDatabaseName, new ConcurrentHashMap<>());
localColumnToRemoteColumn.get(remoteDatabaseName).putIfAbsent(remoteTableName, new ConcurrentHashMap<>());

List<String> remoteColumnNames = Lists.newArrayList();
for (Column remoteColumn : remoteColumns) {
remoteColumnNames.add(remoteColumn.getName());
}

Map<String, List<String>> result = nameListToMapping(remoteColumnNames,
localColumnToRemoteColumn.get(remoteDatabaseName).get(remoteTableName),
columnNameMapping, isLowerCaseMetaNames);

List<String> localColumnNames = result.get("localNames");
List<String> conflictNames = result.get("conflictNames");

if (!conflictNames.isEmpty()) {
throw new RuntimeException(
"Conflict column names found in remote database/schema: " + remoteDatabaseName
Expand All @@ -166,10 +165,7 @@ public List<Column> fromRemoteColumnName(String remoteDatabaseName, String remot
+ ". Please set lower_case_meta_names to false or"
+ " use meta_name_mapping to specify the column names.");
}
for (int i = 0; i < remoteColumns.size(); i++) {
remoteColumns.get(i).setName(localColumnNames.get(i));
}
return remoteColumns;
return localColumnNames;
}

@Override
Expand All @@ -178,7 +174,7 @@ public String toRemoteDatabaseName(String localDatabaseName) {
if (!isLowerCaseMetaNames && isMappingInvalid()) {
return localDatabaseName;
}
return getRequiredMapping(localDBToRemoteDB, localDatabaseName, "database", localDatabaseName);
return getRequiredMapping(localDBToRemoteDB, localDatabaseName, "database");
}

@Override
Expand All @@ -189,7 +185,7 @@ public String toRemoteTableName(String remoteDatabaseName, String localTableName
}
Map<String, String> tableMap = localTableToRemoteTable.computeIfAbsent(remoteDatabaseName,
k -> new ConcurrentHashMap<>());
return getRequiredMapping(tableMap, localTableName, "table", localTableName);
return getRequiredMapping(tableMap, localTableName, "table");
}

@Override
Expand All @@ -209,12 +205,11 @@ public Map<String, String> toRemoteColumnNames(String remoteDatabaseName, String
return columnMap;
}

private <K, V> V getRequiredMapping(Map<K, V> map, K key, String typeName, String entityName) {
private <K, V> V getRequiredMapping(Map<K, V> map, K key, String typeName) {
V value = map.get(key);
if (value == null) {
LOG.warn("No remote {} found for {}: {}. Please refresh this catalog.", typeName, typeName, entityName);
throw new RuntimeException("No remote " + typeName + " found for " + typeName + ": " + entityName
+ ". Please refresh this catalog.");
LOG.warn("No remote {} found for: {}. Please refresh this catalog.", typeName, key);
throw new RuntimeException("No remote" + typeName + "found for: " + key + ". Please refresh this catalog.");
}
return value;
}
Expand All @@ -232,7 +227,7 @@ private JsonNode readAndParseJson(String jsonPath, String nodeName) {
private Map<String, List<String>> nameListToMapping(List<String> remoteNames,
ConcurrentHashMap<String, String> localNameToRemoteName,
Map<String, String> nameMapping, boolean isLowerCaseMetaNames) {
List<String> filteredDatabaseNames = Lists.newArrayList();
List<String> filteredNames = Lists.newArrayList();
Set<String> lowerCaseNames = Sets.newHashSet();
Map<String, List<String>> nameMap = Maps.newHashMap();
List<String> conflictNames = Lists.newArrayList();
Expand All @@ -244,14 +239,12 @@ private Map<String, List<String>> nameListToMapping(List<String> remoteNames,
localNameToRemoteName.computeIfAbsent(localName, k -> name);

if (isLowerCaseMetaNames && !lowerCaseNames.add(localName)) {
if (nameMap.containsKey(localName)) {
nameMap.get(localName).add(mappedName);
}
nameMap.get(localName).add(mappedName);
} else {
nameMap.putIfAbsent(localName, Lists.newArrayList(Collections.singletonList(mappedName)));
}

filteredDatabaseNames.add(localName);
filteredNames.add(localName);
}

for (List<String> conflictNameList : nameMap.values()) {
Expand All @@ -261,7 +254,7 @@ private Map<String, List<String>> nameListToMapping(List<String> remoteNames,
}

Map<String, List<String>> result = Maps.newConcurrentMap();
result.put("localNames", filteredDatabaseNames);
result.put("localNames", filteredNames);
result.put("conflictNames", conflictNames);
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,59 @@
import java.util.Map;

public interface IdentifierMapping {

/**
* Maps a list of remote database names to their corresponding local database names.
*
* @param remoteDatabaseNames the list of remote database names to be mapped
* @return a list of corresponding local database names
*/
List<String> fromRemoteDatabaseName(List<String> remoteDatabaseNames);

/**
* Maps a list of remote table names in a specified remote database to their corresponding local table names.
*
* @param remoteDatabaseName the name of the remote database where the tables reside
* @param remoteTableNames the list of remote table names to be mapped
* @return a list of corresponding local table names
*/
List<String> fromRemoteTableName(String remoteDatabaseName, List<String> remoteTableNames);

List<Column> fromRemoteColumnName(String remoteDatabaseName, String remoteTableName, List<Column> remoteColumns);
/**
* Maps a list of remote columns in a specified remote database and table to their corresponding local columns.
*
* @param remoteDatabaseName the name of the remote database
* @param remoteTableName the name of the remote table where the columns reside
* @param remoteColumnNames the list of remote column names to be mapped
* @return a list of corresponding local columns
*/
List<String> fromRemoteColumnName(String remoteDatabaseName, String remoteTableName,
List<String> remoteColumnNames);

/**
* Maps a local database name to its corresponding remote database name.
*
* @param localDatabaseName the name of the local database to be mapped
* @return the corresponding remote database name
*/
String toRemoteDatabaseName(String localDatabaseName);

/**
* Maps a local table name in a specified remote database to its corresponding remote table name.
*
* @param remoteDatabaseName the name of the remote database where the table resides
* @param localTableName the name of the local table to be mapped
* @return the corresponding remote table name
*/
String toRemoteTableName(String remoteDatabaseName, String localTableName);

/**
* Maps local column names in a specified remote database and table to their corresponding remote column names.
*
* @param remoteDatabaseName the name of the remote database
* @param remoteTableName the name of the remote table
* @return a map of local column names to corresponding remote column names
*/
Map<String, String> toRemoteColumnNames(String remoteDatabaseName, String remoteTableName);
}

0 comments on commit a833914

Please sign in to comment.