Skip to content

Commit

Permalink
[cherry-pick](branch-2.1) Pick "[Fix](group commit) Fix multiple clus…
Browse files Browse the repository at this point in the history
…ter group commit BE select strategy (apache#38644)" (apache#39010)

## Proposed changes

Pick apache#38644 

<!--Describe your changes.-->
  • Loading branch information
Yukang-Lian authored Aug 7, 2024
1 parent 749c9f7 commit e083dc2
Showing 1 changed file with 20 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ public class GroupCommitManager {

// Table id to BE id map. Only for group commit.
private Map<Long, Long> tableToBeMap = new ConcurrentHashMap<>();
// BE id to pressure map. Only for group commit.
private Map<Long, SlidingWindowCounter> tablePressureMap = new ConcurrentHashMap<>();
// Table id to pressure map. Only for group commit.
private Map<Long, SlidingWindowCounter> tableToPressureMap = new ConcurrentHashMap<>();

public boolean isBlock(long tableId) {
return blockedTableIds.contains(tableId);
Expand Down Expand Up @@ -236,8 +236,8 @@ public long selectBackendForGroupCommitInternal(long tableId)
}

private long selectBackendForLocalGroupCommitInternal(long tableId) throws LoadException {
LOG.debug("group commit select be info, tableToBeMap {}, tablePressureMap {}", tableToBeMap.toString(),
tablePressureMap.toString());
LOG.debug("group commit select be info, tableToBeMap {}, tableToPressureMap {}", tableToBeMap.toString(),
tableToPressureMap.toString());
Long cachedBackendId = getCachedBackend(tableId);
if (cachedBackendId != null) {
return cachedBackendId;
Expand All @@ -264,8 +264,18 @@ private long selectBackendForLocalGroupCommitInternal(long tableId) throws LoadE
private Long getCachedBackend(long tableId) {
OlapTable table = (OlapTable) Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId);
if (tableToBeMap.containsKey(tableId)) {
if (tablePressureMap.get(tableId).get() < table.getGroupCommitDataBytes()) {
Backend backend = Env.getCurrentSystemInfo().getBackend(tableToBeMap.get(tableId));
if (tableToPressureMap.get(tableId).get() < table.getGroupCommitDataBytes()) {
// There are multiple threads getting cached backends for the same table.
// Maybe one thread removes the tableId from the tableToBeMap.
// Another thread gets the same tableId but can not find this tableId.
// So another thread needs to get the random backend.
Long backendId = tableToBeMap.get(tableId);
Backend backend;
if (backendId != null) {
backend = Env.getCurrentSystemInfo().getBackend(backendId);
} else {
return null;
}
if (backend.isAlive() && !backend.isDecommissioned()) {
return backend.getId();
} else {
Expand All @@ -285,7 +295,7 @@ private Long getRandomBackend(long tableId, List<Backend> backends) {
for (Backend backend : backends) {
if (backend.isAlive() && !backend.isDecommissioned()) {
tableToBeMap.put(tableId, backend.getId());
tablePressureMap.put(tableId,
tableToPressureMap.put(tableId,
new SlidingWindowCounter(table.getGroupCommitIntervalMs() / 1000 + 1));
return backend.getId();
}
Expand Down Expand Up @@ -315,10 +325,10 @@ public void updateLoadData(long tableId, long receiveData) {
}

public void updateLoadDataInternal(long tableId, long receiveData) {
if (tablePressureMap.containsKey(tableId)) {
tablePressureMap.get(tableId).add(receiveData);
if (tableToPressureMap.containsKey(tableId)) {
tableToPressureMap.get(tableId).add(receiveData);
LOG.info("Update load data for table{}, receiveData {}, tablePressureMap {}", tableId, receiveData,
tablePressureMap.toString());
tableToPressureMap.toString());
} else {
LOG.warn("can not find backend id: {}", tableId);
}
Expand Down

0 comments on commit e083dc2

Please sign in to comment.