Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
yujun777 committed Sep 12, 2024
1 parent 0037bcd commit c0bb86f
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 33 deletions.
18 changes: 16 additions & 2 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,14 @@ public class Config extends ConfigBase {
"The log roll size of BDBJE. When the number of log entries exceeds this value, the log will be rolled"})
public static int edit_log_roll_num = 50000;

@ConfField(mutable = true, masterOnly = true, description = {
"批量 BDBJE 日志包含的最大条目数", "The max number of log entries for batching BDBJE"})
public static int batch_edit_log_max_item_num = 1000;

@ConfField(mutable = true, masterOnly = true, description = {
"批量 BDBJE 日志包含的最大长度", "The max size for batching BDBJE"})
public static long batch_edit_log_max_byte_size = 640 * 1024L;

@ConfField(description = {"元数据同步的容忍延迟时间,单位为秒。如果元数据的延迟超过这个值,非主 FE 会停止提供服务",
"The toleration delay time of meta data synchronization, in seconds. "
+ "If the delay of meta data exceeds this value, non-master FE will stop offering service"})
Expand Down Expand Up @@ -3030,8 +3038,14 @@ public static int metaServiceRpcRetryTimes() {
@ConfField(mutable = true, description = {"存算分离模式下是否开启大事务提交,默认false"})
public static boolean enable_cloud_txn_lazy_commit = false;

@ConfField(mutable = true, description = {"存算分离模式下,当tablet分布的be异常,是否立即映射tablet到新的be上,默认true"})
public static boolean enable_immediate_be_assign = true;
@ConfField(mutable = true, masterOnly = true,
description = {"存算分离模式下,当tablet分布的be异常,是否立即映射tablet到新的be上,默认false"})
public static boolean enable_immediate_be_assign = false;

@ConfField(mutable = true, masterOnly = true,
description = { "存算分离模式下,一个BE挂掉多长时间后,它的tablet彻底转移到其他BE上" }
public static int rehash_tablet_after_be_dead_seconds = 3600;


// ATTN: DONOT add any config not related to cloud mode here
// ATTN: DONOT add any config not related to cloud mode here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,16 +264,24 @@ public long hashReplicaToBe(String clusterId, boolean isBackGround) {
.getBackendsByClusterId(clusterId);
// use alive be to exec sql
List<Backend> availableBes = new ArrayList<>();
List<Backend> decommissionAvailBes = new ArrayList<>();
for (Backend be : clusterBes) {
long lastUpdateMs = be.getLastUpdateMs();
long missTimeMs = Math.abs(lastUpdateMs - System.currentTimeMillis());
// be core or restart must in heartbeat_interval_second
if ((be.isAlive() || missTimeMs <= Config.heartbeat_interval_second * 1000L)
&& !be.isSmoothUpgradeSrc()) {
availableBes.add(be);
if (be.isDecommissioned()) {
decommissionAvailBes.add(be);
} else {
availableBes.add(be);
}
}
}
if (availableBes == null || availableBes.size() == 0) {
if (availableBes.isEmpty()) {
availableBes = decommissionAvailBes;
}
if (availableBes.isEmpty()) {
if (!isBackGround) {
LOG.warn("failed to get available be, clusterId: {}", clusterId);
}
Expand Down Expand Up @@ -309,16 +317,24 @@ public List<Long> hashReplicaToBes(String clusterId, boolean isBackGround, int r
.getBackendsByClusterId(clusterId);
// use alive be to exec sql
List<Backend> availableBes = new ArrayList<>();
List<Backend> decommissionAvailBes = new ArrayList<>();
for (Backend be : clusterBes) {
long lastUpdateMs = be.getLastUpdateMs();
long missTimeMs = Math.abs(lastUpdateMs - System.currentTimeMillis());
// be core or restart must in heartbeat_interval_second
if ((be.isAlive() || missTimeMs <= Config.heartbeat_interval_second * 1000L)
&& !be.isSmoothUpgradeSrc()) {
availableBes.add(be);
if (be.isDecommissioned()) {
decommissionAvailBes.add(be);
} else {
availableBes.add(be);
}
}
}
if (availableBes == null || availableBes.size() == 0) {
if (availableBes.isEmpty()) {
availableBes = decommissionAvailBes;
}
if (availableBes.isEmpty()) {
if (!isBackGround) {
LOG.warn("failed to get available be, clusterId: {}", clusterId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,17 +375,21 @@ public void checkInflghtWarmUpCacheAsync() {
}

List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
long needRehashDeadTime = System.currentTimeMillis() - Config.rehash_tablet_after_be_dead_seconds * 1000L;
for (Map.Entry<Long, List<InfightTask>> entry : beToInfightTasks.entrySet()) {
LOG.info("before pre cache check dest be {} inflight task num {}", entry.getKey(), entry.getValue().size());
Backend destBackend = cloudSystemInfoService.getBackend(entry.getKey());
if (destBackend == null) {
if (destBackend == null || (!destBe.isAlive() && destBe.getLastUpdateMs() < needRehashDeadTime)) {
for (InfightTask task : entry.getValue()) {
for (InfightTablet key : tabletToInfightTask.keySet()) {
tabletToInfightTask.remove(new InfightTablet(task.pickedTablet.getId(), key.clusterId));
}
}
continue;
}
if (!destBe.isAlive()) {
continue;
}
List<Long> tablets = entry.getValue().stream()
.map(task -> task.pickedTablet.getId()).collect(Collectors.toList());
Map<Long, Boolean> taskDone = sendCheckWarmUpCacheAsyncRpc(tablets, entry.getKey());
Expand Down Expand Up @@ -489,7 +493,7 @@ public void checkDecommissionState(Map<String, List<Long>> clusterToBes) {
private List<UpdateCloudReplicaInfo> completeRouteInfo() {
List<UpdateCloudReplicaInfo> updateReplicaInfos = new ArrayList<UpdateCloudReplicaInfo>();
int assignedErrNum = 0L;
long needRehashDeadTime = System.currentTimeMillis() - Config.cloud_rehash_after_be_dead_seconds * 1000L;
long needRehashDeadTime = System.currentTimeMillis() - Config.rehash_tablet_after_be_dead_seconds * 1000L;
loopCloudReplica((Database db, Table table, Partition partition, MaterializedIndex index, String cluster) -> {
boolean assigned = false;
List<Long> beIds = new ArrayList<Long>();
Expand Down Expand Up @@ -585,16 +589,11 @@ public void statRouteInfo() {
fillBeToTablets(beId, table.getId(), partition.getId(), index.getId(), tablet,
tmpBeToTabletsGlobal, beToTabletsInTable, this.partitionToTablets);

InfightTask task = tabletToInfightTask
.getOrDefault(new InfightTablet(tablet.getId(), cluster), null);
InfightTask task = tabletToInfightTask.get(new InfightTablet(tablet.getId(), cluster));
long futureBeId = task == null ? beId : task.destBe;

if (task != null) {
fillBeToTablets(task.destBe, table.getId(), partition.getId(), index.getId(), tablet,
futureBeToTabletsGlobal, futureBeToTabletsInTable, futurePartitionToTablets);
} else {
fillBeToTablets(beId, table.getId(), partition.getId(), index.getId(), tablet,
futureBeToTabletsGlobal, futureBeToTabletsInTable, futurePartitionToTablets);
}
fillBeToTablets(futureBeId, table.getId(), partition.getId(), index.getId(), tablet,
futureBeToTabletsGlobal, futureBeToTabletsInTable, futurePartitionToTablets);
}
}
});
Expand Down Expand Up @@ -949,14 +948,8 @@ public void addTabletMigrationTask(Long srcBe, Long dstBe) {
*/
private void migrateTablets(Long srcBe, Long dstBe) {
// get tablets
List<Tablet> tablets = new ArrayList<>();
if (!beToTabletsGlobal.containsKey(srcBe)) {
LOG.info("smooth upgrade srcBe={} does not have any tablets, set inactive", srcBe);
((CloudEnv) Env.getCurrentEnv()).getCloudUpgradeMgr().setBeStateInactive(srcBe);
return;
}
tablets = beToTabletsGlobal.get(srcBe);
if (tablets.isEmpty()) {
List<Tablet> tablets = beToTabletsGlobal.get(srcBe);
if (tablets == null || tablets.isEmpty()) {
LOG.info("smooth upgrade srcBe={} does not have any tablets, set inactive", srcBe);
((CloudEnv) Env.getCurrentEnv()).getCloudUpgradeMgr().setBeStateInactive(srcBe);
return;
Expand All @@ -972,10 +965,6 @@ private void migrateTablets(Long srcBe, Long dstBe) {
}
String clusterId = be.getCloudClusterId();
String clusterName = be.getCloudClusterName();
// update replica location info
cloudReplica.updateClusterToBe(clusterId, dstBe, true);
LOG.info("cloud be migrate tablet {} from srcBe={} to dstBe={}, clusterId={}, clusterName={}",
tablet.getId(), srcBe, dstBe, clusterId, clusterName);

// populate to followers
Database db = Env.getCurrentInternalCatalog().getDbNullable(cloudReplica.getDbId());
Expand All @@ -994,13 +983,18 @@ private void migrateTablets(Long srcBe, Long dstBe) {
if (db.getTableNullable(cloudReplica.getTableId()) == null) {
continue;
}
// update replica location info
cloudReplica.updateClusterToBe(clusterId, dstBe, true);
UpdateCloudReplicaInfo info = new UpdateCloudReplicaInfo(cloudReplica.getDbId(),
cloudReplica.getTableId(), cloudReplica.getPartitionId(), cloudReplica.getIndexId(),
tablet.getId(), cloudReplica.getId(), clusterId, dstBe);
infos.add(info);
} finally {
table.readUnlock();
}

LOG.info("cloud be migrate tablet {} from srcBe={} to dstBe={}, clusterId={}, clusterName={}",
tablet.getId(), srcBe, dstBe, clusterId, clusterName);
}
long oldSize = infos.size();
infos = batchUpdateCloudReplicaInfoEditlogs(infos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1269,12 +1269,13 @@ public void rollEditLog() {
}

private synchronized <T extends Writable> void logEdit(short op, List<T> entries) throws IOException {
JournalBatch batch = new JournalBatch(35);
int itemNum = Math.max(1, Math.min(Config.batch_edit_log_max_item_num, entries.size()));
JournalBatch batch = new JournalBatch(itemNum);
for (T entry : entries) {
// the number of batch entities to less than 32 and the batch data size to less than 640KB
if (batch.getJournalEntities().size() >= 32 || batch.getSize() >= 640 * 1024) {
if (batch.getJournalEntities().size() >= Config.batch_edit_log_max_item_num
|| batch.getSize() >= Config.batch_edit_log_max_byte_size) {
journal.write(batch);
batch = new JournalBatch(35);
batch = new JournalBatch(itemNum);
}
batch.addJournal(op, entry);
}
Expand Down

0 comments on commit c0bb86f

Please sign in to comment.