From d7a152f6ead40f7701c9e25ed9342cc3770626d7 Mon Sep 17 00:00:00 2001 From: deardeng <565620795@qq.com> Date: Tue, 26 Mar 2024 20:29:16 +0800 Subject: [PATCH] [feature](merge-cloud) Add tablet rebalance run in cloud --- .../java/org/apache/doris/common/Config.java | 29 + .../java/org/apache/doris/catalog/Env.java | 28 +- .../apache/doris/cloud/catalog/CloudEnv.java | 15 + .../cloud/catalog/CloudTabletRebalancer.java | 915 ++++++++++++++++++ .../datasource/CloudInternalCatalog.java | 76 +- .../cloud/persist/UpdateCloudReplicaInfo.java | 168 ++++ .../apache/doris/journal/JournalEntity.java | 7 +- .../org/apache/doris/persist/EditLog.java | 12 +- 8 files changed, 1233 insertions(+), 17 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/cloud/persist/UpdateCloudReplicaInfo.java diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 07f62f52e88ef9..73059c35a031b7 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2635,6 +2635,35 @@ public static boolean isNotCloudMode() { @ConfField public static int drop_user_notify_ms_max_times = 86400; + @ConfField(mutable = true) + public static long cloud_tablet_rebalancer_interval_second = 20; + + @ConfField(mutable = true) + public static boolean enable_cloud_partition_balance = true; + + @ConfField(mutable = true) + public static boolean enable_cloud_table_balance = true; + + @ConfField(mutable = true) + public static boolean enable_cloud_global_balance = true; + + @ConfField(mutable = true) + public static int cloud_pre_heating_time_limit_sec = 300; + + @ConfField(mutable = true) + public static double cloud_rebalance_percent_threshold = 0.05; + + @ConfField(mutable = true) + public static long cloud_rebalance_number_threshold = 2; + + @ConfField(mutable = true) + public static double cloud_balance_tablet_percent_per_run = 0.05; + + @ConfField(mutable = true) + public static int cloud_min_balance_tablet_num_per_run = 2; + + @ConfField(mutable = true) + public static boolean cloud_preheating_enabled = true; //========================================================================== // end of cloud config //========================================================================== diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index f5819b9d998dec..55d4c2400abb7f 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -1646,24 +1646,26 @@ protected void startMasterOnlyDaemonThreads() { loadJobScheduler.start(); loadEtlChecker.start(); loadLoadingChecker.start(); - // Tablet checker and scheduler - tabletChecker.start(); - tabletScheduler.start(); - // Colocate tables checker and balancer - ColocateTableCheckerAndBalancer.getInstance().start(); - // Publish Version Daemon - publishVersionDaemon.start(); - // Start txn cleaner - txnCleaner.start(); + if (Config.isNotCloudMode()) { + // Tablet checker and scheduler + tabletChecker.start(); + tabletScheduler.start(); + // Colocate tables checker and balancer + ColocateTableCheckerAndBalancer.getInstance().start(); + // Publish Version Daemon + publishVersionDaemon.start(); + // Start txn cleaner + txnCleaner.start(); + // Consistency checker + getConsistencyChecker().start(); + // Backup handler + getBackupHandler().start(); + } jobManager.start(); // transient task manager transientTaskManager.start(); // Alter getAlterInstance().start(); - // Consistency checker - getConsistencyChecker().start(); - // Backup handler - getBackupHandler().start(); // catalog recycle bin getRecycleBin().start(); // time printer diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java index 7c37f1dbcffa98..abcd7f99a9c2ed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java @@ -19,12 +19,15 @@ import org.apache.doris.analysis.ResourceTypeEnum; import org.apache.doris.catalog.Env; +import org.apache.doris.cloud.datasource.CloudInternalCatalog; +import org.apache.doris.cloud.persist.UpdateCloudReplicaInfo; import org.apache.doris.cloud.proto.Cloud; import org.apache.doris.cloud.proto.Cloud.NodeInfoPB; import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.common.io.CountingDataOutputStream; import org.apache.doris.common.util.HttpURLUtil; @@ -57,10 +60,17 @@ public class CloudEnv extends Env { private CloudInstanceStatusChecker cloudInstanceStatusChecker; private CloudClusterChecker cloudClusterCheck; + private CloudTabletRebalancer cloudTabletRebalancer; + public CloudEnv(boolean isCheckpointCatalog) { super(isCheckpointCatalog); this.cloudClusterCheck = new CloudClusterChecker((CloudSystemInfoService) systemInfo); this.cloudInstanceStatusChecker = new CloudInstanceStatusChecker((CloudSystemInfoService) systemInfo); + this.cloudTabletRebalancer = new CloudTabletRebalancer((CloudSystemInfoService) systemInfo); + } + + public CloudTabletRebalancer getCloudTabletRebalancer() { + return this.cloudTabletRebalancer; } @Override @@ -68,6 +78,7 @@ protected void startMasterOnlyDaemonThreads() { LOG.info("start cloud Master only daemon threads"); super.startMasterOnlyDaemonThreads(); cloudClusterCheck.start(); + cloudTabletRebalancer.start(); } @Override @@ -416,4 +427,8 @@ public String analyzeCloudCluster(String name, ConnectContext ctx) throws DdlExc changeCloudCluster(res[1], ctx); return res[0]; } + + public void replayUpdateCloudReplica(UpdateCloudReplicaInfo info) throws MetaNotFoundException { + ((CloudInternalCatalog) getInternalCatalog()).replayUpdateCloudReplica(info); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java new file mode 100644 index 00000000000000..bcaaee800f3621 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java @@ -0,0 +1,915 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cloud.catalog; + +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MaterializedIndex; +import org.apache.doris.catalog.MaterializedIndex.IndexExtState; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.Replica; +import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.TableIf.TableType; +import org.apache.doris.catalog.Tablet; +import org.apache.doris.cloud.persist.UpdateCloudReplicaInfo; +import org.apache.doris.cloud.proto.Cloud; +import org.apache.doris.cloud.rpc.MetaServiceProxy; +import org.apache.doris.cloud.system.CloudSystemInfoService; +import org.apache.doris.common.ClientPool; +import org.apache.doris.common.Config; +import org.apache.doris.common.Pair; +import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.rpc.RpcException; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.BackendService; +import org.apache.doris.thrift.TCheckPreCacheRequest; +import org.apache.doris.thrift.TCheckPreCacheResponse; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TPreCacheAsyncRequest; +import org.apache.doris.thrift.TPreCacheAsyncResponse; +import org.apache.doris.thrift.TStatusCode; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; + +public class CloudTabletRebalancer extends MasterDaemon { + private static final Logger LOG = LogManager.getLogger(CloudTabletRebalancer.class); + + private volatile ConcurrentHashMap> beToTabletsGlobal = + new ConcurrentHashMap>(); + + private Map> futureBeToTabletsGlobal; + + private Map> clusterToBes; + + private Set allBes; + + private List replicaInfos; + + // partitionId -> indexId -> be -> tablet + private Map>>> partitionToTablets; + + private Map>>> futurePartitionToTablets; + + // tableId -> be -> tablet + private Map>> beToTabletsInTable; + + private Map>> futureBeToTabletsInTable; + + private Map beToDecommissionedTime = new HashMap(); + + private Random rand = new Random(); + + private boolean indexBalanced = true; + + private boolean tableBalanced = true; + + private LinkedBlockingQueue> tabletsMigrateTasks = new LinkedBlockingQueue>(); + + private Map tabletToInfightTask = new HashMap(); + + private long assignedErrNum = 0; + + private CloudSystemInfoService cloudSystemInfoService; + + public CloudTabletRebalancer(CloudSystemInfoService cloudSystemInfoService) { + super("cloud tablet rebalancer", Config.cloud_tablet_rebalancer_interval_second * 1000); + this.cloudSystemInfoService = cloudSystemInfoService; + } + + private interface Operator { + void op(Database db, Table table, Partition partition, MaterializedIndex index, String cluster); + } + + public enum BalanceType { + GLOBAL, + TABLE, + PARTITION + } + + private class InfightTask { + public Tablet pickedTablet; + public long srcBe; + public long destBe; + public boolean isGlobal; + public String clusterId; + public Map> beToTablets; + public long startTimestamp; + BalanceType balanceType; + } + + private class TransferPairInfo { + public long srcBe; + public long destBe; + public long minTabletsNum; + public long maxTabletsNum; + public boolean srcDecommissioned; + } + + public Set getSnapshotTabletsByBeId(Long beId) { + Set snapshotTablets = new HashSet(); + if (beToTabletsGlobal == null || !beToTabletsGlobal.containsKey(beId)) { + LOG.warn("beToTabletsGlobal null or not contain beId {}", beId); + return snapshotTablets; + } + + beToTabletsGlobal.get(beId).forEach(tablet -> snapshotTablets.add(tablet.getId())); + return snapshotTablets; + } + + // 1 build cluster to backends info + // 2 complete route info + // 3 check whether the inflight preheating task has been completed + // 4 migrate tablet for smooth upgrade + // 5 statistics be to tablets mapping information + // 6 partition-level balance + // 7 if tablets in partition-level already balanced, perform table balance + // 8 if tablets in partition-level and table-level already balanced, perform global balance + // 9 check whether all tablets of decomission node have been migrated + @Override + protected void runAfterCatalogReady() { + if (Config.enable_cloud_multi_replica) { + LOG.info("Tablet balance is temporarily not supported when multi replica enabled"); + return; + } + + LOG.info("cloud tablet rebalance begin"); + + clusterToBes = new HashMap>(); + allBes = new HashSet(); + long start = System.currentTimeMillis(); + + // 1 build cluster to backend info + for (Long beId : cloudSystemInfoService.getAllBackendIds()) { + Backend be = cloudSystemInfoService.getBackend(beId); + clusterToBes.putIfAbsent(be.getCloudClusterId(), new ArrayList()); + clusterToBes.get(be.getCloudClusterId()).add(beId); + allBes.add(beId); + } + LOG.info("cluster to backends {}", clusterToBes); + + // 2 complete route info + replicaInfos = new ArrayList(); + completeRouteInfo(); + for (UpdateCloudReplicaInfo info : replicaInfos) { + Env.getCurrentEnv().getEditLog().logUpdateCloudReplica(info); + } + + // 3 check whether the inflight preheating task has been completed + checkInflghtPreCache(); + + // TODO(merge-cloud): wait add cloud upgrade mgr + // 4 migrate tablet for smooth upgrade + /* + Pair pair; + statRouteInfo(); + while (!tabletsMigrateTasks.isEmpty()) { + try { + pair = tabletsMigrateTasks.take(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + LOG.info("begin tablets migration from be {} to be {}", pair.first, pair.second); + migrateTablets(pair.first, pair.second); + } + */ + + // 5 statistics be to tablets mapping information + statRouteInfo(); + + indexBalanced = true; + tableBalanced = true; + + // 6 partition-level balance + if (Config.enable_cloud_partition_balance) { + balanceAllPartitions(); + } + + // 7 if tablets in partition-level already balanced, perform table balance + if (Config.enable_cloud_table_balance && indexBalanced) { + balanceAllTables(); + } + + // 8 if tablets in partition-level and table-level already balanced, perform global balance + if (Config.enable_cloud_global_balance && indexBalanced && tableBalanced) { + globalBalance(); + } + + // 9 check whether all tablets of decomission have been migrated + checkDecommissionState(clusterToBes); + + LOG.info("finished to rebalancer. cost: {} ms", (System.currentTimeMillis() - start)); + } + + public void balanceAllPartitions() { + for (Map.Entry> entry : beToTabletsGlobal.entrySet()) { + LOG.info("before partition balance be {} tablet num {}", entry.getKey(), entry.getValue().size()); + } + + for (Map.Entry> entry : futureBeToTabletsGlobal.entrySet()) { + LOG.info("before partition balance be {} tablet num(current + pre heating inflight) {}", + entry.getKey(), entry.getValue().size()); + } + + // balance in partitions/index + for (Map.Entry> entry : clusterToBes.entrySet()) { + balanceInPartition(entry.getValue(), entry.getKey()); + } + + for (Map.Entry> entry : beToTabletsGlobal.entrySet()) { + LOG.info("after partition balance be {} tablet num {}", entry.getKey(), entry.getValue().size()); + } + + for (Map.Entry> entry : futureBeToTabletsGlobal.entrySet()) { + LOG.info("after partition balance be {} tablet num(current + pre heating inflight) {}", + entry.getKey(), entry.getValue().size()); + } + } + + public void balanceAllTables() { + for (Map.Entry> entry : beToTabletsGlobal.entrySet()) { + LOG.info("before table balance be {} tablet num {}", entry.getKey(), entry.getValue().size()); + } + + for (Map.Entry> entry : futureBeToTabletsGlobal.entrySet()) { + LOG.info("before table balance be {} tablet num(current + pre heating inflight) {}", + entry.getKey(), entry.getValue().size()); + } + + // balance in partitions/index + for (Map.Entry> entry : clusterToBes.entrySet()) { + balanceInTable(entry.getValue(), entry.getKey()); + } + + for (Map.Entry> entry : beToTabletsGlobal.entrySet()) { + LOG.info("after table balance be {} tablet num {}", entry.getKey(), entry.getValue().size()); + } + + for (Map.Entry> entry : futureBeToTabletsGlobal.entrySet()) { + LOG.info("after table balance be {} tablet num(current + pre heating inflight) {}", + entry.getKey(), entry.getValue().size()); + } + } + + public void globalBalance() { + for (Map.Entry> entry : beToTabletsGlobal.entrySet()) { + LOG.info("before global balance be {} tablet num {}", entry.getKey(), entry.getValue().size()); + } + + for (Map.Entry> entry : futureBeToTabletsGlobal.entrySet()) { + LOG.info("before global balance be {} tablet num(current + pre heating inflight) {}", + entry.getKey(), entry.getValue().size()); + } + + for (Map.Entry> entry : clusterToBes.entrySet()) { + balanceImpl(entry.getValue(), entry.getKey(), futureBeToTabletsGlobal, BalanceType.GLOBAL); + } + + for (Map.Entry> entry : beToTabletsGlobal.entrySet()) { + LOG.info("after global balance be {} tablet num {}", entry.getKey(), entry.getValue().size()); + } + + for (Map.Entry> entry : futureBeToTabletsGlobal.entrySet()) { + LOG.info("after global balance be {} tablet num(current + pre heating inflight) {}", + entry.getKey(), entry.getValue().size()); + } + } + + public void checkInflghtPreCache() { + Map> beToTabletIds = new HashMap>(); + + for (Map.Entry entry : tabletToInfightTask.entrySet()) { + beToTabletIds.putIfAbsent(entry.getValue().destBe, new ArrayList()); + beToTabletIds.get(entry.getValue().destBe).add(entry.getValue().pickedTablet.getId()); + } + + for (Map.Entry> entry : beToTabletIds.entrySet()) { + LOG.info("before pre cache check dest be {} inflight task num {}", entry.getKey(), entry.getValue().size()); + Backend destBackend = cloudSystemInfoService.getBackend(entry.getKey()); + if (destBackend == null) { + for (long tabletId : entry.getValue()) { + tabletToInfightTask.remove(tabletId); + } + continue; + } + + Map taskDone = sendCheckPreCacheRpc(entry.getValue(), entry.getKey()); + if (taskDone == null) { + LOG.warn("sendCheckPreCacheRpc return null be {}, inFight tasks {}", entry.getKey(), entry.getValue()); + continue; + } + + for (Map.Entry result : taskDone.entrySet()) { + InfightTask task = tabletToInfightTask.get(result.getKey()); + if (result.getValue() + || System.currentTimeMillis() / 1000 - task.startTimestamp + > Config.cloud_pre_heating_time_limit_sec) { + if (!result.getValue()) { + LOG.info("{} pre cache timeout, forced to change the mapping", result.getKey()); + } + updateClusterToBeMap(task.pickedTablet, task.destBe, task.clusterId); + tabletToInfightTask.remove(result.getKey()); + } + } + } + + // recalculate inflight beToTablets, just for print the log + beToTabletIds = new HashMap>(); + for (Map.Entry entry : tabletToInfightTask.entrySet()) { + beToTabletIds.putIfAbsent(entry.getValue().destBe, new ArrayList()); + beToTabletIds.get(entry.getValue().destBe).add(entry.getValue().pickedTablet.getId()); + } + + for (Map.Entry> entry : beToTabletIds.entrySet()) { + LOG.info("after pre cache check dest be {} inflight task num {}", entry.getKey(), entry.getValue().size()); + } + } + + public void checkDecommissionState(Map> clusterToBes) { + for (Map.Entry> entry : clusterToBes.entrySet()) { + List beList = entry.getValue(); + long tabletNum = 0L; + for (long beId : beList) { + tabletNum = beToTabletsGlobal.get(beId) == null ? 0 : beToTabletsGlobal.get(beId).size(); + Backend backend = cloudSystemInfoService.getBackend(beId); + if ((backend.isDecommissioned() && tabletNum == 0 && !backend.isActive()) + || (backend.isDecommissioned() && beList.size() == 1)) { + LOG.info("check decommission be {} state {} tabletNum {} isActive {} beList {}", + backend.getId(), backend.isDecommissioned(), tabletNum, backend.isActive(), beList); + if (!beToDecommissionedTime.containsKey(beId)) { + LOG.info("prepare to notify meta service be {} decommissioned", backend.getId()); + Cloud.AlterClusterRequest.Builder builder = + Cloud.AlterClusterRequest.newBuilder(); + builder.setCloudUniqueId(Config.cloud_unique_id); + builder.setOp(Cloud.AlterClusterRequest.Operation.NOTIFY_DECOMMISSIONED); + + Cloud.ClusterPB.Builder clusterBuilder = + Cloud.ClusterPB.newBuilder(); + clusterBuilder.setClusterName(backend.getCloudClusterName()); + clusterBuilder.setClusterId(backend.getCloudClusterId()); + clusterBuilder.setType(Cloud.ClusterPB.Type.COMPUTE); + + Cloud.NodeInfoPB.Builder nodeBuilder = Cloud.NodeInfoPB.newBuilder(); + nodeBuilder.setIp(backend.getHost()); + nodeBuilder.setHeartbeatPort(backend.getHeartbeatPort()); + nodeBuilder.setCloudUniqueId(backend.getCloudUniqueId()); + nodeBuilder.setStatus(Cloud.NodeStatusPB.NODE_STATUS_DECOMMISSIONED); + + clusterBuilder.addNodes(nodeBuilder); + builder.setCluster(clusterBuilder); + + Cloud.AlterClusterResponse response; + try { + response = MetaServiceProxy.getInstance().alterCluster(builder.build()); + if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) { + LOG.warn("notify decommission response: {}", response); + } + LOG.info("notify decommission response: {} ", response); + } catch (RpcException e) { + LOG.info("failed to notify decommission {}", e); + return; + } + beToDecommissionedTime.put(beId, System.currentTimeMillis() / 1000); + } + } + } + } + } + + private void completeRouteInfo() { + assignedErrNum = 0L; + loopCloudReplica((Database db, Table table, Partition partition, MaterializedIndex index, String cluster) -> { + boolean assigned = false; + List beIds = new ArrayList(); + List tabletIds = new ArrayList(); + for (Tablet tablet : index.getTablets()) { + for (Replica replica : tablet.getReplicas()) { + Map> clusterToBackends = + ((CloudReplica) replica).getClusterToBackends(); + if (!clusterToBackends.containsKey(cluster)) { + long beId = ((CloudReplica) replica).hashReplicaToBe(cluster, true); + if (beId <= 0) { + assignedErrNum++; + continue; + } + List bes = new ArrayList(); + bes.add(beId); + clusterToBackends.put(cluster, bes); + + assigned = true; + beIds.add(beId); + tabletIds.add(tablet.getId()); + } else { + beIds.add(clusterToBackends.get(cluster).get(0)); + tabletIds.add(tablet.getId()); + } + } + } + + if (assigned) { + UpdateCloudReplicaInfo info = new UpdateCloudReplicaInfo(db.getId(), table.getId(), + partition.getId(), index.getId(), cluster, beIds, tabletIds); + replicaInfos.add(info); + } + }); + + if (assignedErrNum > 0) { + LOG.warn("completeRouteInfo error num {}", assignedErrNum); + } + } + + public void fillBeToTablets(long be, long tableId, long partId, long indexId, Tablet tablet, + Map> globalBeToTablets, + Map>> beToTabletsInTable, + Map>>> partToTablets) { + // global + globalBeToTablets.putIfAbsent(be, new ArrayList()); + globalBeToTablets.get(be).add(tablet); + + // table + beToTabletsInTable.putIfAbsent(tableId, new HashMap>()); + Map> beToTabletsOfTable = beToTabletsInTable.get(tableId); + beToTabletsOfTable.putIfAbsent(be, new ArrayList()); + beToTabletsOfTable.get(be).add(tablet); + + // partition + partToTablets.putIfAbsent(partId, new HashMap>>()); + Map>> indexToTablets = partToTablets.get(partId); + indexToTablets.putIfAbsent(indexId, new HashMap>()); + Map> beToTabletsOfIndex = indexToTablets.get(indexId); + beToTabletsOfIndex.putIfAbsent(be, new ArrayList()); + beToTabletsOfIndex.get(be).add(tablet); + } + + public void statRouteInfo() { + ConcurrentHashMap> tmpBeToTabletsGlobal = new ConcurrentHashMap>(); + futureBeToTabletsGlobal = new HashMap>(); + + partitionToTablets = new HashMap>>>(); + futurePartitionToTablets = new HashMap>>>(); + + beToTabletsInTable = new HashMap>>(); + futureBeToTabletsInTable = new HashMap>>(); + + loopCloudReplica((Database db, Table table, Partition partition, MaterializedIndex index, String cluster) -> { + for (Tablet tablet : index.getTablets()) { + for (Replica replica : tablet.getReplicas()) { + Map> clusterToBackends = + ((CloudReplica) replica).getClusterToBackends(); + for (Map.Entry> entry : clusterToBackends.entrySet()) { + if (!cluster.equals(entry.getKey())) { + continue; + } + + List bes = entry.getValue(); + if (!allBes.contains(bes.get(0))) { + continue; + } + + fillBeToTablets(bes.get(0), table.getId(), partition.getId(), index.getId(), tablet, + tmpBeToTabletsGlobal, beToTabletsInTable, this.partitionToTablets); + + if (tabletToInfightTask.containsKey(tablet.getId())) { + InfightTask task = tabletToInfightTask.get(tablet.getId()); + fillBeToTablets(task.destBe, table.getId(), partition.getId(), index.getId(), tablet, + futureBeToTabletsGlobal, futureBeToTabletsInTable, futurePartitionToTablets); + } else { + fillBeToTablets(bes.get(0), table.getId(), partition.getId(), index.getId(), tablet, + futureBeToTabletsGlobal, futureBeToTabletsInTable, futurePartitionToTablets); + } + } + } + } + }); + + beToTabletsGlobal = tmpBeToTabletsGlobal; + } + + public void loopCloudReplica(Operator operator) { + List dbIds = Env.getCurrentInternalCatalog().getDbIds(); + for (Long dbId : dbIds) { + Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); + if (db == null) { + continue; + } + List tableList = db.getTables(); + for (Table table : tableList) { + if (table.getType() != TableType.OLAP) { + continue; + } + OlapTable olapTable = (OlapTable) table; + if (!table.writeLockIfExist()) { + continue; + } + + try { + for (Partition partition : olapTable.getAllPartitions()) { + for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) { + for (Map.Entry> entry : clusterToBes.entrySet()) { + String cluster = entry.getKey(); + operator.op(db, table, partition, index, cluster); + } + } // end for indices + } // end for partitions + } finally { + table.writeUnlock(); + } + } + } + } + + public void balanceInPartition(List bes, String clusterId) { + // balance all partition + for (Map.Entry>>> partitionEntry : futurePartitionToTablets.entrySet()) { + Map>> indexToTablets = partitionEntry.getValue(); + // balance all index of a partition + for (Map.Entry>> entry : indexToTablets.entrySet()) { + // balance a index + balanceImpl(bes, clusterId, entry.getValue(), BalanceType.PARTITION); + } + } + } + + public void balanceInTable(List bes, String clusterId) { + // balance all tables + for (Map.Entry>> entry : futureBeToTabletsInTable.entrySet()) { + balanceImpl(bes, clusterId, entry.getValue(), BalanceType.TABLE); + } + } + + private void sendPreHeatingRpc(Tablet pickedTablet, long srcBe, long destBe) throws Exception { + BackendService.Client client = null; + TNetworkAddress address = null; + Backend srcBackend = cloudSystemInfoService.getBackend(srcBe); + Backend destBackend = cloudSystemInfoService.getBackend(destBe); + try { + address = new TNetworkAddress(destBackend.getHost(), destBackend.getBePort()); + client = ClientPool.backendPool.borrowObject(address); + TPreCacheAsyncRequest req = new TPreCacheAsyncRequest(); + req.setHost(srcBackend.getHost()); + req.setBrpcPort(srcBackend.getBrpcPort()); + List tablets = new ArrayList(); + tablets.add(pickedTablet.getId()); + req.setTabletIds(tablets); + TPreCacheAsyncResponse result = client.preCacheAsync(req); + if (result.getStatus().getStatusCode() != TStatusCode.OK) { + LOG.warn("pre cache failed status {} {}", result.getStatus().getStatusCode(), + result.getStatus().getErrorMsgs()); + } + } catch (Exception e) { + LOG.warn("send pre heating rpc error. backend[{}]", destBackend.getId(), e); + ClientPool.backendPool.invalidateObject(address, client); + throw e; + } finally { + ClientPool.backendPool.returnObject(address, client); + } + } + + private Map sendCheckPreCacheRpc(List tabletIds, long be) { + BackendService.Client client = null; + TNetworkAddress address = null; + Backend destBackend = cloudSystemInfoService.getBackend(be); + try { + address = new TNetworkAddress(destBackend.getHost(), destBackend.getBePort()); + client = ClientPool.backendPool.borrowObject(address); + TCheckPreCacheRequest req = new TCheckPreCacheRequest(); + req.setTablets(tabletIds); + TCheckPreCacheResponse result = client.checkPreCache(req); + if (result.getStatus().getStatusCode() != TStatusCode.OK) { + LOG.warn("check pre cache status {} {}", result.getStatus().getStatusCode(), + result.getStatus().getErrorMsgs()); + } else { + LOG.info("check pre cache succ status {} {}", result.getStatus().getStatusCode(), + result.getStatus().getErrorMsgs()); + } + return result.getTaskDone(); + } catch (Exception e) { + LOG.warn("send check pre cache rpc error. backend[{}]", destBackend.getId(), e); + ClientPool.backendPool.invalidateObject(address, client); + } finally { + ClientPool.backendPool.returnObject(address, client); + } + return null; + } + + private void updateBeToTablets(Tablet pickedTablet, long srcBe, long destBe, BalanceType balanceType, + Map> globalBeToTablets, + Map>> beToTabletsInTable, + Map>>> partToTablets) { + CloudReplica replica = (CloudReplica) pickedTablet.getReplicas().get(0); + long tableId = replica.getTableId(); + long partId = replica.getPartitionId(); + long indexId = replica.getIndexId(); + + globalBeToTablets.get(srcBe).remove(pickedTablet); + beToTabletsInTable.get(tableId).get(srcBe).remove(pickedTablet); + partToTablets.get(partId).get(indexId).get(srcBe).remove(pickedTablet); + + fillBeToTablets(destBe, tableId, partId, indexId, pickedTablet, globalBeToTablets, beToTabletsInTable, + partToTablets); + } + + private void updateClusterToBeMap(Tablet pickedTablet, long destBe, String clusterId) { + CloudReplica cloudReplica = (CloudReplica) pickedTablet.getReplicas().get(0); + cloudReplica.updateClusterToBe(clusterId, destBe); + Database db = Env.getCurrentInternalCatalog().getDbNullable(cloudReplica.getDbId()); + if (db == null) { + return; + } + OlapTable table = (OlapTable) db.getTableNullable(cloudReplica.getTableId()); + if (table == null) { + return; + } + + table.readLock(); + + try { + if (db.getTableNullable(cloudReplica.getTableId()) == null) { + return; + } + + UpdateCloudReplicaInfo info = new UpdateCloudReplicaInfo(cloudReplica.getDbId(), + cloudReplica.getTableId(), cloudReplica.getPartitionId(), cloudReplica.getIndexId(), + pickedTablet.getId(), cloudReplica.getId(), clusterId, destBe); + Env.getCurrentEnv().getEditLog().logUpdateCloudReplica(info); + } finally { + table.readUnlock(); + } + } + + private boolean getTransferPair(List bes, Map> beToTablets, long avgNum, + TransferPairInfo pairInfo) { + long destBe = bes.get(0); + long srcBe = bes.get(0); + + long minTabletsNum = Long.MAX_VALUE; + long maxTabletsNum = 0; + boolean srcDecommissioned = false; + + for (Long be : bes) { + long tabletNum = beToTablets.get(be) == null ? 0 : beToTablets.get(be).size(); + if (tabletNum > maxTabletsNum) { + srcBe = be; + maxTabletsNum = tabletNum; + } + + Backend backend = cloudSystemInfoService.getBackend(be); + if (tabletNum < minTabletsNum && backend.isAlive() && !backend.isDecommissioned() + && !backend.isSmoothUpgradeSrc()) { + destBe = be; + minTabletsNum = tabletNum; + } + } + + for (Long be : bes) { + long tabletNum = beToTablets.get(be) == null ? 0 : beToTablets.get(be).size(); + Backend backend = cloudSystemInfoService.getBackend(be); + if (backend.isDecommissioned() && tabletNum > 0) { + srcBe = be; + srcDecommissioned = true; + break; + } + } + + if (!srcDecommissioned) { + if ((maxTabletsNum < avgNum * (1 + Config.cloud_rebalance_percent_threshold) + && minTabletsNum > avgNum * (1 - Config.cloud_rebalance_percent_threshold)) + || minTabletsNum > maxTabletsNum - Config.cloud_rebalance_number_threshold) { + return false; + } + } + + pairInfo.srcBe = srcBe; + pairInfo.destBe = destBe; + pairInfo.minTabletsNum = minTabletsNum; + pairInfo.maxTabletsNum = maxTabletsNum; + return true; + } + + private boolean isConflict(long srcBe, long destBe, CloudReplica cloudReplica, BalanceType balanceType, + Map>>> beToTabletsInParts, + Map>> beToTabletsInTables) { + if (balanceType == balanceType.GLOBAL) { + // check is conflict with partition balance + long maxBeSize = beToTabletsInParts.get(cloudReplica.getPartitionId()) + .get(cloudReplica.getIndexId()).get(srcBe).size(); + List destBeTablets = beToTabletsInParts.get(cloudReplica.getPartitionId()) + .get(cloudReplica.getIndexId()).get(destBe); + long minBeSize = destBeTablets == null ? 0 : destBeTablets.size(); + if (minBeSize >= maxBeSize) { + return true; + } + + // check is conflict with table balance + maxBeSize = beToTabletsInTables.get(cloudReplica.getTableId()).get(srcBe).size(); + destBeTablets = beToTabletsInTables.get(cloudReplica.getTableId()).get(destBe); + minBeSize = destBeTablets == null ? 0 : destBeTablets.size(); + if (minBeSize >= maxBeSize) { + return true; + } + } + + if (balanceType == balanceType.TABLE) { + // check is conflict with partition balance + long maxBeSize = beToTabletsInParts.get(cloudReplica.getPartitionId()) + .get(cloudReplica.getIndexId()).get(srcBe).size(); + List destBeTablets = beToTabletsInParts.get(cloudReplica.getPartitionId()) + .get(cloudReplica.getIndexId()).get(destBe); + long minBeSize = destBeTablets == null ? 0 : destBeTablets.size(); + if (minBeSize >= maxBeSize) { + return true; + } + } + + return false; + } + + private void balanceImpl(List bes, String clusterId, Map> beToTablets, + BalanceType balanceType) { + if (bes == null || bes.isEmpty() || beToTablets == null || beToTablets.isEmpty()) { + return; + } + + long totalTabletsNum = 0; + long beNum = 0; + for (Long be : bes) { + long tabletNum = beToTablets.get(be) == null ? 0 : beToTablets.get(be).size(); + Backend backend = cloudSystemInfoService.getBackend(be); + if (backend != null && !backend.isDecommissioned()) { + beNum++; + } + totalTabletsNum += tabletNum; + } + if (beNum == 0) { + LOG.warn("zero be, but want balance, skip"); + return; + } + long avgNum = totalTabletsNum / beNum; + long transferNum = Math.max(Math.round(avgNum * Config.cloud_balance_tablet_percent_per_run), + Config.cloud_min_balance_tablet_num_per_run); + + for (int i = 0; i < transferNum; i++) { + TransferPairInfo pairInfo = new TransferPairInfo(); + if (!getTransferPair(bes, beToTablets, avgNum, pairInfo)) { + // no need balance; + break; + } + + if (balanceType == balanceType.PARTITION) { + indexBalanced = false; + } + + if (balanceType == balanceType.TABLE) { + tableBalanced = false; + } + + long srcBe = pairInfo.srcBe; + long destBe = pairInfo.destBe; + long minTabletsNum = pairInfo.minTabletsNum; + long maxTabletsNum = pairInfo.maxTabletsNum; + + int randomIndex = rand.nextInt(beToTablets.get(srcBe).size()); + Tablet pickedTablet = beToTablets.get(srcBe).get(randomIndex); + CloudReplica cloudReplica = (CloudReplica) pickedTablet.getReplicas().get(0); + + if (Config.cloud_preheating_enabled) { + if (isConflict(srcBe, destBe, cloudReplica, balanceType, futurePartitionToTablets, + futureBeToTabletsInTable)) { + continue; + } + + try { + sendPreHeatingRpc(pickedTablet, srcBe, destBe); + } catch (Exception e) { + break; + } + + InfightTask task = new InfightTask(); + task.pickedTablet = pickedTablet; + task.srcBe = srcBe; + task.destBe = destBe; + task.balanceType = balanceType; + task.clusterId = clusterId; + task.beToTablets = beToTablets; + task.startTimestamp = System.currentTimeMillis() / 1000; + tabletToInfightTask.put(pickedTablet.getId(), task); + + LOG.info("pre cache {} from {} to {}, cluster {} minNum {} maxNum {} beNum {} tabletsNum {}, part {}", + pickedTablet.getId(), srcBe, destBe, clusterId, + minTabletsNum, maxTabletsNum, beNum, totalTabletsNum, cloudReplica.getPartitionId()); + updateBeToTablets(pickedTablet, srcBe, destBe, balanceType, + futureBeToTabletsGlobal, futureBeToTabletsInTable, futurePartitionToTablets); + } else { + if (isConflict(srcBe, destBe, cloudReplica, balanceType, partitionToTablets, beToTabletsInTable)) { + continue; + } + + LOG.info("transfer {} from {} to {}, cluster {} minNum {} maxNum {} beNum {} tabletsNum {}, part {}", + pickedTablet.getId(), srcBe, destBe, clusterId, + minTabletsNum, maxTabletsNum, beNum, totalTabletsNum, cloudReplica.getPartitionId()); + + updateBeToTablets(pickedTablet, srcBe, destBe, balanceType, beToTabletsGlobal, + beToTabletsInTable, partitionToTablets); + updateBeToTablets(pickedTablet, srcBe, destBe, balanceType, + futureBeToTabletsGlobal, futureBeToTabletsInTable, futurePartitionToTablets); + updateClusterToBeMap(pickedTablet, destBe, clusterId); + } + } + } + + public void addTabletMigrationTask(Long srcBe, Long dstBe) { + tabletsMigrateTasks.offer(Pair.of(srcBe, dstBe)); + } + + /* Migrate tablet replicas from srcBe to dstBe + * replica location info will be updated in both master and follower FEs. + */ + private void migrateTablets(Long srcBe, Long dstBe) { + // get tablets + List tablets = new ArrayList<>(); + if (!beToTabletsGlobal.containsKey(srcBe)) { + LOG.info("smooth upgrade srcBe={} does not have any tablets, set inactive", srcBe); + // TODO(merge-cloud): wait add cloud upgrade mgr + // Env.getCurrentEnv().getCloudUpgradeMgr().setBeStateInactive(srcBe); + return; + } + tablets = beToTabletsGlobal.get(srcBe); + if (tablets.isEmpty()) { + LOG.info("smooth upgrade srcBe={} does not have any tablets, set inactive", srcBe); + // TODO(merge-cloud): wait add cloud upgrade mgr + // Env.getCurrentEnv().getCloudUpgradeMgr().setBeStateInactive(srcBe); + return; + } + for (Tablet tablet : tablets) { + // get replica + CloudReplica cloudReplica = (CloudReplica) tablet.getReplicas().get(0); + String clusterId = cloudSystemInfoService.getBackend(srcBe).getCloudClusterId(); + String clusterName = cloudSystemInfoService.getBackend(srcBe).getCloudClusterName(); + // update replica location info + cloudReplica.updateClusterToBe(clusterId, dstBe); + LOG.info("cloud be migrate tablet {} from srcBe={} to dstBe={}, clusterId={}, clusterName={}", + tablet.getId(), srcBe, dstBe, clusterId, clusterName); + + // populate to followers + Database db = Env.getCurrentInternalCatalog().getDbNullable(cloudReplica.getDbId()); + if (db == null) { + LOG.error("get null db from replica, tabletId={}, partitionId={}, beId={}", + cloudReplica.getTableId(), cloudReplica.getPartitionId(), cloudReplica.getBackendId()); + continue; + } + OlapTable table = (OlapTable) db.getTableNullable(cloudReplica.getTableId()); + if (table == null) { + continue; + } + + table.readLock(); + try { + if (db.getTableNullable(cloudReplica.getTableId()) == null) { + continue; + } + UpdateCloudReplicaInfo info = new UpdateCloudReplicaInfo(cloudReplica.getDbId(), + cloudReplica.getTableId(), cloudReplica.getPartitionId(), cloudReplica.getIndexId(), + tablet.getId(), cloudReplica.getId(), clusterId, dstBe); + Env.getCurrentEnv().getEditLog().logUpdateCloudReplica(info); + } finally { + table.readUnlock(); + } + } + + // TODO(merge-cloud): wait add cloud upgrade mgr + /* + try { + Env.getCurrentEnv().getCloudUpgradeMgr().registerWaterShedTxnId(srcBe); + } catch (AnalysisException e) { + throw new RuntimeException(e); + } + */ + } +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java index b473f80e126959..0d21facd570ad7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java @@ -41,10 +41,13 @@ import org.apache.doris.catalog.TabletMeta; import org.apache.doris.cloud.catalog.CloudPartition; import org.apache.doris.cloud.catalog.CloudReplica; +import org.apache.doris.cloud.persist.UpdateCloudReplicaInfo; import org.apache.doris.cloud.proto.Cloud; import org.apache.doris.cloud.rpc.MetaServiceProxy; +import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; +import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.proto.OlapCommon; import org.apache.doris.proto.OlapFile; @@ -55,6 +58,7 @@ import org.apache.doris.thrift.TTabletType; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import doris.segment_v2.SegmentV2; @@ -74,8 +78,6 @@ public CloudInternalCatalog() { } // BEGIN CREATE TABLE - - // TODO(merge-cloud): merge code with InternalCatalog @Override protected Partition createPartitionWithIndices(long dbId, OlapTable tbl, long partitionId, String partitionName, Map indexIdToMeta, @@ -770,4 +772,74 @@ public void dropStage(Cloud.StagePB.StageType stageType, String userName, String throw new DdlException("internal error, try later"); } } + + public void replayUpdateCloudReplica(UpdateCloudReplicaInfo info) throws MetaNotFoundException { + Database db = getDbNullable(info.getDbId()); + if (db == null) { + LOG.warn("replay update cloud replica, unknown database {}", info.toString()); + return; + } + OlapTable olapTable = (OlapTable) db.getTableNullable(info.getTableId()); + if (olapTable == null) { + LOG.warn("replay update cloud replica, unknown table {}", info.toString()); + return; + } + + olapTable.writeLock(); + try { + unprotectUpdateCloudReplica(olapTable, info); + } catch (Exception e) { + LOG.warn("unexpected exception", e); + } finally { + olapTable.writeUnlock(); + } + } + + private void unprotectUpdateCloudReplica(OlapTable olapTable, UpdateCloudReplicaInfo info) { + LOG.debug("replay update a cloud replica {}", info); + Partition partition = olapTable.getPartition(info.getPartitionId()); + MaterializedIndex materializedIndex = partition.getIndex(info.getIndexId()); + + try { + if (info.getTabletId() != -1) { + Tablet tablet = materializedIndex.getTablet(info.getTabletId()); + Replica replica = tablet.getReplicaById(info.getReplicaId()); + Preconditions.checkNotNull(replica, info); + + String clusterId = info.getClusterId(); + String realClusterId = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) + .getCloudClusterIdByName(clusterId); + LOG.debug("cluster Id {}, real cluster Id {}", clusterId, realClusterId); + if (!Strings.isNullOrEmpty(realClusterId)) { + clusterId = realClusterId; + } + + ((CloudReplica) replica).updateClusterToBe(clusterId, info.getBeId()); + + LOG.debug("update single cloud replica cluster {} replica {} be {}", info.getClusterId(), + replica.getId(), info.getBeId()); + } else { + List tabletIds = info.getTabletIds(); + for (int i = 0; i < tabletIds.size(); ++i) { + Tablet tablet = materializedIndex.getTablet(tabletIds.get(i)); + Replica replica = tablet.getReplicas().get(0); + Preconditions.checkNotNull(replica, info); + + String clusterId = info.getClusterId(); + String realClusterId = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) + .getCloudClusterIdByName(clusterId); + LOG.debug("cluster Id {}, real cluster Id {}", clusterId, realClusterId); + if (!Strings.isNullOrEmpty(realClusterId)) { + clusterId = realClusterId; + } + + LOG.debug("update cloud replica cluster {} replica {} be {}", info.getClusterId(), + replica.getId(), info.getBeIds().get(i)); + ((CloudReplica) replica).updateClusterToBe(clusterId, info.getBeIds().get(i)); + } + } + } catch (Exception e) { + LOG.warn("unexpected exception", e); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/persist/UpdateCloudReplicaInfo.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/persist/UpdateCloudReplicaInfo.java new file mode 100644 index 00000000000000..1ff2912a3971df --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/persist/UpdateCloudReplicaInfo.java @@ -0,0 +1,168 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cloud.persist; + +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.gson.annotations.SerializedName; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class UpdateCloudReplicaInfo implements Writable { + @SerializedName(value = "dbId") + private long dbId; + @SerializedName(value = "tableId") + private long tableId; + @SerializedName(value = "partitionId") + private long partitionId; + @SerializedName(value = "indexId") + private long indexId; + @SerializedName(value = "tabletId") + private long tabletId; + @SerializedName(value = "replicaId") + private long replicaId; + + @SerializedName(value = "clusterId") + private String clusterId; + @SerializedName(value = "beId") + private long beId; + + @SerializedName(value = "tabletIds") + private List tabletIds = new ArrayList(); + + @SerializedName(value = "beIds") + private List beIds = new ArrayList(); + + public UpdateCloudReplicaInfo() { + } + + public UpdateCloudReplicaInfo(long dbId, long tableId, long partitionId, long indexId, + long tabletId, long replicaId, String clusterId, long beId) { + this.dbId = dbId; + this.tableId = tableId; + this.partitionId = partitionId; + this.indexId = indexId; + this.tabletId = tabletId; + this.replicaId = replicaId; + this.clusterId = clusterId; + this.beId = beId; + + this.beIds = null; + this.tabletIds = null; + } + + public UpdateCloudReplicaInfo(long dbId, long tableId, long partitionId, long indexId, + String clusterId, List beIds, List tabletIds) { + this.dbId = dbId; + this.tableId = tableId; + this.partitionId = partitionId; + this.indexId = indexId; + this.clusterId = clusterId; + this.beIds = beIds; + this.tabletIds = tabletIds; + + this.tabletId = -1; + this.replicaId = -1; + this.beId = -1; + } + + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this, UpdateCloudReplicaInfo.class); + Text.writeString(out, json); + } + + public static UpdateCloudReplicaInfo read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, UpdateCloudReplicaInfo.class); + } + + public long getDbId() { + return dbId; + } + + public long getTableId() { + return tableId; + } + + public long getPartitionId() { + return partitionId; + } + + public long getIndexId() { + return indexId; + } + + public long getTabletId() { + return tabletId; + } + + public long getReplicaId() { + return replicaId; + } + + public String getClusterId() { + return clusterId; + } + + public long getBeId() { + return beId; + } + + public List getBeIds() { + return beIds; + } + + public List getTabletIds() { + return tabletIds; + } + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("database id: ").append(dbId); + sb.append(" table id: ").append(tableId); + sb.append(" partition id: ").append(partitionId); + sb.append(" index id: ").append(indexId); + sb.append(" tablet id: ").append(tabletId); + sb.append(" replica id: ").append(replicaId); + sb.append(" cluster: ").append(clusterId); + sb.append(" backend id: ").append(beId); + + if (tabletId == -1) { + if (beIds != null && !beIds.isEmpty()) { + sb.append(" be id list: "); + for (long id : beIds) { + sb.append(" ").append(id); + } + + sb.append(" tablet id list: "); + for (long id : tabletIds) { + sb.append(" ").append(id); + } + } + } + + return sb.toString(); + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index f8669d49e0f479..bafd60e251805c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -32,6 +32,7 @@ import org.apache.doris.catalog.Function; import org.apache.doris.catalog.FunctionSearchDesc; import org.apache.doris.catalog.Resource; +import org.apache.doris.cloud.persist.UpdateCloudReplicaInfo; import org.apache.doris.cluster.Cluster; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; @@ -949,7 +950,11 @@ public void readFields(DataInput in) throws IOException { break; } // FIXME: support cloud related operation types. - case OperationType.OP_UPDATE_CLOUD_REPLICA: + case OperationType.OP_UPDATE_CLOUD_REPLICA: { + data = UpdateCloudReplicaInfo.read(in); + isRead = true; + break; + } case OperationType.OP_MODIFY_TTL_SECONDS: case OperationType.OP_MODIFY_CLOUD_WARM_UP_JOB: { isRead = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 94e789bfd02bfd..c103f998219720 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -38,6 +38,8 @@ import org.apache.doris.catalog.Function; import org.apache.doris.catalog.FunctionSearchDesc; import org.apache.doris.catalog.Resource; +import org.apache.doris.cloud.catalog.CloudEnv; +import org.apache.doris.cloud.persist.UpdateCloudReplicaInfo; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.MetaNotFoundException; @@ -1200,7 +1202,11 @@ public static void loadJournal(Env env, Long logId, JournalEntity journal) { // TODO: implement this while statistics finished related work. break; } - case OperationType.OP_UPDATE_CLOUD_REPLICA: + case OperationType.OP_UPDATE_CLOUD_REPLICA: { + UpdateCloudReplicaInfo info = (UpdateCloudReplicaInfo) journal.getData(); + ((CloudEnv) env).replayUpdateCloudReplica(info); + break; + } case OperationType.OP_MODIFY_TTL_SECONDS: case OperationType.OP_MODIFY_CLOUD_WARM_UP_JOB: { // TODO: support cloud replated operation type. @@ -1563,6 +1569,10 @@ public void logExportCreate(ExportJob job) { logEdit(OperationType.OP_EXPORT_CREATE, job); } + public void logUpdateCloudReplica(UpdateCloudReplicaInfo info) { + logEdit(OperationType.OP_UPDATE_CLOUD_REPLICA, info); + } + public void logExportUpdateState(long jobId, ExportJobState newState) { ExportJobStateTransfer transfer = new ExportJobStateTransfer(jobId, newState); logEdit(OperationType.OP_EXPORT_UPDATE_STATE, transfer);