diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java index 1ec6a06179e443..b6cf6cbb0a84a3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java @@ -55,8 +55,8 @@ public class GroupCommitManager { // Table id to BE id map. Only for group commit. private Map tableToBeMap = new ConcurrentHashMap<>(); - // BE id to pressure map. Only for group commit. - private Map tablePressureMap = new ConcurrentHashMap<>(); + // Table id to pressure map. Only for group commit. + private Map tableToPressureMap = new ConcurrentHashMap<>(); public boolean isBlock(long tableId) { return blockedTableIds.contains(tableId); @@ -236,8 +236,8 @@ public long selectBackendForGroupCommitInternal(long tableId) } private long selectBackendForLocalGroupCommitInternal(long tableId) throws LoadException { - LOG.debug("group commit select be info, tableToBeMap {}, tablePressureMap {}", tableToBeMap.toString(), - tablePressureMap.toString()); + LOG.debug("group commit select be info, tableToBeMap {}, tableToPressureMap {}", tableToBeMap.toString(), + tableToPressureMap.toString()); Long cachedBackendId = getCachedBackend(tableId); if (cachedBackendId != null) { return cachedBackendId; @@ -264,8 +264,18 @@ private long selectBackendForLocalGroupCommitInternal(long tableId) throws LoadE private Long getCachedBackend(long tableId) { OlapTable table = (OlapTable) Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId); if (tableToBeMap.containsKey(tableId)) { - if (tablePressureMap.get(tableId).get() < table.getGroupCommitDataBytes()) { - Backend backend = Env.getCurrentSystemInfo().getBackend(tableToBeMap.get(tableId)); + if (tableToPressureMap.get(tableId).get() < table.getGroupCommitDataBytes()) { + // There are multiple threads getting cached backends for the same table. + // Maybe one thread removes the tableId from the tableToBeMap. + // Another thread gets the same tableId but can not find this tableId. + // So another thread needs to get the random backend. + Long backendId = tableToBeMap.get(tableId); + Backend backend; + if (backendId != null) { + backend = Env.getCurrentSystemInfo().getBackend(backendId); + } else { + return null; + } if (backend.isAlive() && !backend.isDecommissioned()) { return backend.getId(); } else { @@ -285,7 +295,7 @@ private Long getRandomBackend(long tableId, List backends) { for (Backend backend : backends) { if (backend.isAlive() && !backend.isDecommissioned()) { tableToBeMap.put(tableId, backend.getId()); - tablePressureMap.put(tableId, + tableToPressureMap.put(tableId, new SlidingWindowCounter(table.getGroupCommitIntervalMs() / 1000 + 1)); return backend.getId(); } @@ -315,10 +325,10 @@ public void updateLoadData(long tableId, long receiveData) { } public void updateLoadDataInternal(long tableId, long receiveData) { - if (tablePressureMap.containsKey(tableId)) { - tablePressureMap.get(tableId).add(receiveData); + if (tableToPressureMap.containsKey(tableId)) { + tableToPressureMap.get(tableId).add(receiveData); LOG.info("Update load data for table{}, receiveData {}, tablePressureMap {}", tableId, receiveData, - tablePressureMap.toString()); + tableToPressureMap.toString()); } else { LOG.warn("can not find backend id: {}", tableId); }