Skip to content

Commit

Permalink
[Enhancement](external catalog) Added status reset when jdbc name map…
Browse files Browse the repository at this point in the history
…ping is abnormal
  • Loading branch information
zy-kkk committed Apr 22, 2024
1 parent 06c9fb6 commit 206cf00
Showing 1 changed file with 56 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collections;
import java.util.List;
Expand All @@ -35,6 +37,7 @@
import java.util.concurrent.atomic.AtomicBoolean;

public abstract class IdentifierMapping {
private static final Logger LOG = LogManager.getLogger(IdentifierMapping.class);

private final ObjectMapper mapper = new ObjectMapper();
private final ConcurrentHashMap<String, String> localDBToRemoteDB = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -179,51 +182,58 @@ public List<Column> setColumnNameMapping(String remoteDbName, String remoteTable
}

public String getRemoteDatabaseName(String localDbName) {
if (localDBToRemoteDB.isEmpty() || !localDBToRemoteDB.containsKey(localDbName)) {
loadDatabaseNamesIfNeeded();
}
return localDBToRemoteDB.get(localDbName);
return getRequiredMapping(localDBToRemoteDB, localDbName, "database", this::loadDatabaseNamesIfNeeded,
localDbName);
}

public String getRemoteTableName(String localDbName, String localTableName) {
String remoteDbName = getRemoteDatabaseName(localDbName);
if (localTableToRemoteTable.isEmpty()
|| !localTableToRemoteTable.containsKey(remoteDbName)
|| localTableToRemoteTable.get(remoteDbName) == null
|| localTableToRemoteTable.get(remoteDbName).isEmpty()
|| !localTableToRemoteTable.get(remoteDbName).containsKey(localTableName)
|| localTableToRemoteTable.get(remoteDbName).get(localTableName) == null) {
loadTableNamesIfNeeded(localDbName);
}

return localTableToRemoteTable.get(remoteDbName).get(localTableName);
Map<String, String> tableMap = localTableToRemoteTable.computeIfAbsent(remoteDbName,
k -> new ConcurrentHashMap<>());
return getRequiredMapping(tableMap, localTableName, "table", () -> loadTableNamesIfNeeded(localDbName),
localTableName);
}

public Map<String, String> getRemoteColumnNames(String localDbName, String localTableName) {
String remoteDbName = getRemoteDatabaseName(localDbName);
String remoteTableName = getRemoteTableName(localDbName, localTableName);
if (localColumnToRemoteColumn.isEmpty()
|| !localColumnToRemoteColumn.containsKey(remoteDbName)
|| localColumnToRemoteColumn.get(remoteDbName) == null
|| localColumnToRemoteColumn.get(remoteDbName).isEmpty()
|| !localColumnToRemoteColumn.get(remoteDbName).containsKey(remoteTableName)
|| localColumnToRemoteColumn.get(remoteDbName).get(remoteTableName) == null
|| localColumnToRemoteColumn.get(remoteDbName).get(remoteTableName).isEmpty()) {
ConcurrentHashMap<String, ConcurrentHashMap<String, String>> tableColumnMap
= localColumnToRemoteColumn.computeIfAbsent(remoteDbName, k -> new ConcurrentHashMap<>());
Map<String, String> columnMap = tableColumnMap.computeIfAbsent(remoteTableName, k -> new ConcurrentHashMap<>());
if (columnMap.isEmpty()) {
LOG.info("Column name mapping missing, loading column names for localDbName: {}, localTableName: {}",
localDbName, localTableName);
loadColumnNamesIfNeeded(localDbName, localTableName);
}
return localColumnToRemoteColumn.get(remoteDbName).get(remoteTableName);
if (columnMap.isEmpty()) {
LOG.error("No remote column found for localTableName: {}. Please refresh this catalog.", localTableName);
throw new RuntimeException(
"No remote column found for localTableName: " + localTableName + ". Please refresh this catalog.");
}
return columnMap;
}


private void loadDatabaseNamesIfNeeded() {
if (dbNamesLoaded.compareAndSet(false, true)) {
loadDatabaseNames();
try {
loadDatabaseNames();
} catch (Exception e) {
dbNamesLoaded.set(false); // Reset on failure
LOG.error("Error loading database names", e);
}
}
}

private void loadTableNamesIfNeeded(String localDbName) {
AtomicBoolean isLoaded = tableNamesLoadedMap.computeIfAbsent(localDbName, k -> new AtomicBoolean(false));
if (isLoaded.compareAndSet(false, true)) {
loadTableNames(localDbName);
try {
loadTableNames(localDbName);
} catch (Exception e) {
tableNamesLoadedMap.get(localDbName).set(false); // Reset on failure
LOG.error("Error loading table names for localDbName: {}", localDbName, e);
}
}
}

Expand All @@ -232,8 +242,29 @@ private void loadColumnNamesIfNeeded(String localDbName, String localTableName)
AtomicBoolean isLoaded = columnNamesLoadedMap.get(localDbName)
.computeIfAbsent(localTableName, k -> new AtomicBoolean(false));
if (isLoaded.compareAndSet(false, true)) {
loadColumnNames(localDbName, localTableName);
try {
loadColumnNames(localDbName, localTableName);
} catch (Exception e) {
columnNamesLoadedMap.get(localDbName).get(localTableName).set(false); // Reset on failure
LOG.error("Error loading column names for localDbName: {}, localTableName: {}", localDbName,
localTableName, e);
}
}
}

private <K, V> V getRequiredMapping(Map<K, V> map, K key, String typeName, Runnable loadIfNeeded,
String entityName) {
if (map.isEmpty() || !map.containsKey(key) || map.get(key) == null) {
LOG.info("{} mapping missing, loading for {}: {}", typeName, typeName, entityName);
loadIfNeeded.run();
}
V value = map.get(key);
if (value == null) {
LOG.error("No remote {} found for {}: {}. Please refresh this catalog.", typeName, typeName, entityName);
throw new RuntimeException("No remote " + typeName + " found for " + typeName + ": " + entityName
+ ". Please refresh this catalog.");
}
return value;
}

// Load the database name from the data source.
Expand Down

0 comments on commit 206cf00

Please sign in to comment.