diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java index 1c31d74d98630bd..ebad937cf27beb9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java @@ -29,6 +29,7 @@ import org.apache.doris.catalog.Tablet; import org.apache.doris.cloud.datasource.CloudInternalCatalog; import org.apache.doris.cloud.proto.Cloud; +import org.apache.doris.cloud.qe.ClusterException; import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.MetaNotFoundException; @@ -90,7 +91,12 @@ public CloudRollupJobV2(String rawSql, long jobId, long dbId, long tableId, Stri baseSchemaHash, rollupSchemaHash, rollupKeysType, rollupShortKeyColumnCount, origStmt); ConnectContext context = ConnectContext.get(); if (context != null) { - String clusterName = context.getCloudCluster(); + String clusterName = ""; + try { + clusterName = context.getCloudCluster(); + } catch (ClusterException e) { + LOG.warn("failed to get cluster name", e); + } LOG.debug("rollup job add cloud cluster, context not null, cluster: {}", clusterName); if (!Strings.isNullOrEmpty(clusterName)) { setCloudClusterName(clusterName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java index a8bcc546de33e63..37f93e8aa3a7619 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java @@ -30,6 +30,7 @@ import org.apache.doris.catalog.Tablet; import org.apache.doris.cloud.datasource.CloudInternalCatalog; import org.apache.doris.cloud.proto.Cloud; +import org.apache.doris.cloud.qe.ClusterException; import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.Config; import org.apache.doris.common.MetaNotFoundException; @@ -79,7 +80,12 @@ public CloudSchemaChangeJobV2(String rawSql, long jobId, long dbId, long tableId super(rawSql, jobId, dbId, tableId, tableName, timeoutMs); ConnectContext context = ConnectContext.get(); if (context != null) { - String clusterName = context.getCloudCluster(); + String clusterName = ""; + try { + clusterName = context.getCloudCluster(); + } catch (ClusterException e) { + LOG.warn("failed to get cluster name", e); + } LOG.debug("rollup job add cloud cluster, context not null, cluster: {}", clusterName); if (!Strings.isNullOrEmpty(clusterName)) { setCloudClusterName(clusterName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/IndexChangeJob.java b/fe/fe-core/src/main/java/org/apache/doris/alter/IndexChangeJob.java index 4cdd96a5f21596b..7fa592356cc8b61 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/IndexChangeJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/IndexChangeJob.java @@ -326,13 +326,14 @@ protected void runWaitingTxnJob() throws AlterCancelException { long originTabletId = originTablet.getId(); List originReplicas = originTablet.getReplicas(); for (Replica originReplica : originReplicas) { - if (originReplica.getBackendId() < 0) { - LOG.warn("replica:{}, backendId: {}", originReplica, originReplica.getBackendId()); + if (originReplica.getBackendIdWithoutException() < 0) { + LOG.warn("replica:{}, backendId: {}", originReplica, + originReplica.getBackendIdWithoutException()); throw new AlterCancelException("originReplica:" + originReplica.getId() + " backendId < 0"); } AlterInvertedIndexTask alterInvertedIndexTask = new AlterInvertedIndexTask( - originReplica.getBackendId(), db.getId(), olapTable.getId(), + originReplica.getBackendIdWithoutException(), db.getId(), olapTable.getId(), partitionId, originIndexId, originTabletId, originSchemaHash, olapTable.getIndexes(), alterInvertedIndexes, originSchemaColumns, diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java index 33e6aa58de0bf4f..288734709604925 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java @@ -420,7 +420,7 @@ private RollupJobV2 createMaterializedViewJob(String rawSql, String mvName, Stri int healthyReplicaNum = 0; for (Replica baseReplica : baseReplicas) { long mvReplicaId = idGeneratorBuffer.getNextId(); - long backendId = baseReplica.getBackendId(); + long backendId = baseReplica.getBackendIdWithoutException(); if (baseReplica.getState() == ReplicaState.CLONE || baseReplica.getState() == ReplicaState.DECOMMISSION || baseReplica.getState() == ReplicaState.COMPACTION_TOO_SLOW diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java index e38c91d296fed4f..83cac3e75be45f5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -239,7 +239,7 @@ protected void createRollupReplica() throws AlterCancelException { long rollupTabletId = rollupTablet.getId(); List rollupReplicas = rollupTablet.getReplicas(); for (Replica rollupReplica : rollupReplicas) { - long backendId = rollupReplica.getBackendId(); + long backendId = rollupReplica.getBackendIdWithoutException(); long rollupReplicaId = rollupReplica.getId(); Preconditions.checkNotNull(tabletIdMap.get(rollupTabletId)); // baseTabletId countDownLatch.addMark(backendId, rollupTabletId); @@ -474,8 +474,8 @@ protected void runWaitingTxnJob() throws AlterCancelException { for (Replica rollupReplica : rollupReplicas) { - AlterReplicaTask rollupTask = new AlterReplicaTask(rollupReplica.getBackendId(), dbId, tableId, - partitionId, rollupIndexId, baseIndexId, rollupTabletId, baseTabletId, + AlterReplicaTask rollupTask = new AlterReplicaTask(rollupReplica.getBackendIdWithoutException(), + dbId, tableId, partitionId, rollupIndexId, baseIndexId, rollupTabletId, baseTabletId, rollupReplica.getId(), rollupSchemaHash, baseSchemaHash, visibleVersion, jobId, JobType.ROLLUP, defineExprs, descTable, tbl.getSchemaByIndexId(baseIndexId, true), objectPool, whereClause, expiration, vaultId); @@ -611,7 +611,7 @@ private void onFinished(OlapTable tbl) { List failedBackends = failedTabletBackends.get(tablet.getId()); for (Replica replica : tablet.getReplicas()) { replica.setState(ReplicaState.NORMAL); - if (failedBackends != null && failedBackends.contains(replica.getBackendId())) { + if (failedBackends != null && failedBackends.contains(replica.getBackendIdWithoutException())) { replica.setBad(true); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index 7d581272648c47c..901c482424f848c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -2239,8 +2239,9 @@ private void sendClearAlterTask(Database db, OlapTable olapTable) { int schemaHash = olapTable.getSchemaHashByIndexId(index.getId()); for (Tablet tablet : index.getTablets()) { for (Replica replica : tablet.getReplicas()) { - ClearAlterTask alterTask = new ClearAlterTask(replica.getBackendId(), db.getId(), - olapTable.getId(), partition.getId(), index.getId(), tablet.getId(), schemaHash); + ClearAlterTask alterTask = new ClearAlterTask(replica.getBackendIdWithoutException(), + db.getId(), olapTable.getId(), partition.getId(), + index.getId(), tablet.getId(), schemaHash); batchTask.addTask(alterTask); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index 111bfbce1f7fab5..494fbcf77343462 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -272,7 +272,7 @@ protected void createShadowIndexReplica() throws AlterCancelException { long shadowTabletId = shadowTablet.getId(); List shadowReplicas = shadowTablet.getReplicas(); for (Replica shadowReplica : shadowReplicas) { - long backendId = shadowReplica.getBackendId(); + long backendId = shadowReplica.getBackendIdWithoutException(); long shadowReplicaId = shadowReplica.getId(); countDownLatch.addMark(backendId, shadowTabletId); CreateReplicaTask createReplicaTask = new CreateReplicaTask( @@ -496,7 +496,8 @@ protected void runWaitingTxnJob() throws AlterCancelException { long originTabletId = partitionIndexTabletMap.get(partitionId, shadowIdxId).get(shadowTabletId); List shadowReplicas = shadowTablet.getReplicas(); for (Replica shadowReplica : shadowReplicas) { - AlterReplicaTask rollupTask = new AlterReplicaTask(shadowReplica.getBackendId(), dbId, + AlterReplicaTask rollupTask + = new AlterReplicaTask(shadowReplica.getBackendIdWithoutException(), dbId, tableId, partitionId, shadowIdxId, originIdxId, shadowTabletId, originTabletId, shadowReplica.getId(), shadowSchemaHash, originSchemaHash, visibleVersion, jobId, JobType.SCHEMA_CHANGE, defineExprs, descTable, originSchemaColumns, objectPool, @@ -667,7 +668,7 @@ private void onFinished(OlapTable tbl) { List failedBackends = failedTabletBackends.get(tablet.getId()); for (Replica replica : tablet.getReplicas()) { replica.setState(ReplicaState.NORMAL); - if (failedBackends != null && failedBackends.contains(replica.getBackendId())) { + if (failedBackends != null && failedBackends.contains(replica.getBackendIdWithoutException())) { replica.setBad(true); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java index d94513efa9d226b..07540225a3589d2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java @@ -203,14 +203,14 @@ private synchronized boolean tryNewTabletSnapshotTask(SnapshotTask task) { taskProgress.remove(task.getTabletId()); taskErrMsg.remove(task.getTabletId()); - SnapshotTask newTask = new SnapshotTask(null, replica.getBackendId(), task.getTabletId(), + SnapshotTask newTask = new SnapshotTask(null, replica.getBackendIdWithoutException(), task.getTabletId(), task.getJobId(), task.getDbId(), tbl.getId(), task.getPartitionId(), task.getIndexId(), task.getTabletId(), task.getVersion(), task.getSchemaHash(), timeoutMs, false /* not restore task */); AgentBatchTask batchTask = new AgentBatchTask(); batchTask.addTask(newTask); - unfinishedTaskIds.put(tablet.getId(), replica.getBackendId()); + unfinishedTaskIds.put(tablet.getId(), replica.getBackendIdWithoutException()); //send task AgentTaskQueue.addTask(newTask); @@ -609,13 +609,13 @@ private Status prepareSnapshotTaskForOlapTableWithoutLock(Database db, OlapTable + ". visible version: " + visibleVersion); return status; } - SnapshotTask task = new SnapshotTask(null, replica.getBackendId(), tablet.getId(), + SnapshotTask task = new SnapshotTask(null, replica.getBackendIdWithoutException(), tablet.getId(), jobId, dbId, olapTable.getId(), partition.getId(), index.getId(), tablet.getId(), visibleVersion, schemaHash, timeoutMs, false /* not restore task */); batchTask.addTask(task); - unfinishedTaskIds.put(tablet.getId(), replica.getBackendId()); + unfinishedTaskIds.put(tablet.getId(), replica.getBackendIdWithoutException()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index 587a27c9e141a43..79d1ffa31418289 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -991,12 +991,13 @@ private void prepareAndSendSnapshotTaskForOlapTable(Database db) { Tablet tablet = index.getTablet(idChain.getTabletId()); Replica replica = tablet.getReplicaById(idChain.getReplicaId()); long signature = env.getNextId(); - SnapshotTask task = new SnapshotTask(null, replica.getBackendId(), signature, jobId, db.getId(), + SnapshotTask task = new SnapshotTask(null, replica.getBackendIdWithoutException(), + signature, jobId, db.getId(), tbl.getId(), part.getId(), index.getId(), tablet.getId(), part.getVisibleVersion(), tbl.getSchemaHashByIndexId(index.getId()), timeoutMs, true /* is restore task*/); batchTask.addTask(task); unfinishedSignatureToId.put(signature, tablet.getId()); - bePathsMap.put(replica.getBackendId(), replica.getPathHash()); + bePathsMap.put(replica.getBackendIdWithoutException(), replica.getPathHash()); } finally { tbl.readUnlock(); } @@ -1103,7 +1104,7 @@ private void createReplicas(Database db, AgentBatchTask batchTask, OlapTable loc Env.getCurrentInvertedIndex().addTablet(restoreTablet.getId(), tabletMeta); for (Replica restoreReplica : restoreTablet.getReplicas()) { Env.getCurrentInvertedIndex().addReplica(restoreTablet.getId(), restoreReplica); - CreateReplicaTask task = new CreateReplicaTask(restoreReplica.getBackendId(), dbId, + CreateReplicaTask task = new CreateReplicaTask(restoreReplica.getBackendIdWithoutException(), dbId, localTbl.getId(), restorePart.getId(), restoredIdx.getId(), restoreTablet.getId(), restoreReplica.getId(), indexMeta.getShortKeyColumnCount(), indexMeta.getSchemaHash(), restoreReplica.getVersion(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java index c3e14e4955b4a03..bcee32c148daa5b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java @@ -158,7 +158,7 @@ private void sendTableGcInfoToBe(Map beBinlogGcTaskMap, Olap for (Tablet tablet : tablets) { List replicas = tablet.getReplicas(); for (Replica replica : replicas) { - long beId = replica.getBackendId(); + long beId = replica.getBackendIdWithoutException(); long signature = -1; BinlogGcTask binlogGcTask = null; if (beBinlogGcTaskMap.containsKey(beId)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index bcebc639add767b..c8fad52d52ebd61 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -6276,7 +6276,7 @@ private static void getTableMeta(OlapTable olapTable, TGetMetaDBMeta dbMeta) { for (Replica replica : tablet.getReplicas()) { TGetMetaReplicaMeta replicaMeta = new TGetMetaReplicaMeta(); replicaMeta.setId(replica.getId()); - replicaMeta.setBackendId(replica.getBackendId()); + replicaMeta.setBackendId(replica.getBackendIdWithoutException()); replicaMeta.setVersion(replica.getVersion()); tabletMeta.addToReplicas(replicaMeta); } @@ -6350,8 +6350,8 @@ public void compactTable(AdminCompactTableStmt stmt) throws DdlException { for (MaterializedIndex idx : partition.getMaterializedIndices(IndexExtState.VISIBLE)) { for (Tablet tablet : idx.getTablets()) { for (Replica replica : tablet.getReplicas()) { - CompactionTask compactionTask = new CompactionTask(replica.getBackendId(), db.getId(), - olapTable.getId(), partition.getId(), idx.getId(), tablet.getId(), + CompactionTask compactionTask = new CompactionTask(replica.getBackendIdWithoutException(), + db.getId(), olapTable.getId(), partition.getId(), idx.getId(), tablet.getId(), olapTable.getSchemaHashByIndexId(idx.getId()), type); batchTask.addTask(compactionTask); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java index a6c27cc16cad2c6..fd4a530f4369e1e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java @@ -88,7 +88,7 @@ private static List> getTabletStatus(String dbName, String tblName, List row = Lists.newArrayList(); ReplicaStatus status = ReplicaStatus.OK; - Backend be = infoService.getBackend(replica.getBackendId()); + Backend be = infoService.getBackend(replica.getBackendIdWithoutException()); if (be == null || !be.isAlive() || replica.isBad()) { status = ReplicaStatus.DEAD; } else if (replica.getVersion() < visibleVersion @@ -107,7 +107,7 @@ private static List> getTabletStatus(String dbName, String tblName, row.add(String.valueOf(tabletId)); row.add(String.valueOf(replica.getId())); - row.add(String.valueOf(replica.getBackendId())); + row.add(String.valueOf(replica.getBackendIdWithoutException())); row.add(String.valueOf(replica.getVersion())); row.add(String.valueOf(replica.getLastFailedVersion())); row.add(String.valueOf(replica.getLastSuccessVersion())); @@ -215,12 +215,13 @@ private static List> getTabletDistribution( for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) { for (Tablet tablet : index.getTablets()) { for (Replica replica : tablet.getReplicas()) { - if (!countMap.containsKey(replica.getBackendId())) { + if (!countMap.containsKey(replica.getBackendIdWithoutException())) { continue; } - countMap.put(replica.getBackendId(), countMap.get(replica.getBackendId()) + 1); - sizeMap.put(replica.getBackendId(), - sizeMap.get(replica.getBackendId()) + replica.getDataSize()); + countMap.put(replica.getBackendIdWithoutException(), + countMap.get(replica.getBackendIdWithoutException()) + 1); + sizeMap.put(replica.getBackendIdWithoutException(), + sizeMap.get(replica.getBackendIdWithoutException()) + replica.getDataSize()); totalReplicaNum++; totalReplicaSize += replica.getDataSize(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java index 0fcbef007437aa9..2a894f0a0f60628 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java @@ -18,6 +18,7 @@ package org.apache.doris.catalog; import org.apache.doris.common.Config; +import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.system.Backend; @@ -239,7 +240,16 @@ public long getId() { return this.id; } - public long getBackendId() { + public long getBackendIdWithoutException() { + try { + return getBackendId(); + } catch (UserException e) { + LOG.warn("getBackendIdWithoutException: ", e); + return -1; + } + } + + public long getBackendId() throws UserException { return this.backendId; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java index e09af6a116c0d11..40c3fad67edb80b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java @@ -215,7 +215,7 @@ protected boolean isLatestReplicaAndDeleteOld(Replica newReplica) { Iterator iterator = replicas.iterator(); while (iterator.hasNext()) { Replica replica = iterator.next(); - if (replica.getBackendId() == newReplica.getBackendId()) { + if (replica.getBackendIdWithoutException() == newReplica.getBackendIdWithoutException()) { hasBackend = true; if (replica.getVersion() <= version) { iterator.remove(); @@ -247,7 +247,7 @@ public List getReplicas() { public Set getBackendIds() { Set beIds = Sets.newHashSet(); for (Replica replica : replicas) { - beIds.add(replica.getBackendId()); + beIds.add(replica.getBackendIdWithoutException()); } return beIds; } @@ -263,7 +263,7 @@ public List getNormalReplicaBackendIds() { @FunctionalInterface interface BackendIdGetter { - long get(Replica rep, String be); + long get(Replica rep, String be) throws UserException; } private Multimap getNormalReplicaBackendPathMapImpl(String beEndpoint, BackendIdGetter idGetter) @@ -371,7 +371,7 @@ public Replica getReplicaById(long replicaId) { public Replica getReplicaByBackendId(long backendId) { for (Replica replica : replicas) { - if (replica.getBackendId() == backendId) { + if (replica.getBackendIdWithoutException() == backendId) { return replica; } } @@ -381,7 +381,7 @@ public Replica getReplicaByBackendId(long backendId) { public boolean deleteReplica(Replica replica) { if (replicas.contains(replica)) { replicas.remove(replica); - Env.getCurrentInvertedIndex().deleteReplica(id, replica.getBackendId()); + Env.getCurrentInvertedIndex().deleteReplica(id, replica.getBackendIdWithoutException()); return true; } return false; @@ -391,7 +391,7 @@ public boolean deleteReplicaByBackendId(long backendId) { Iterator iterator = replicas.iterator(); while (iterator.hasNext()) { Replica replica = iterator.next(); - if (replica.getBackendId() == backendId) { + if (replica.getBackendIdWithoutException() == backendId) { iterator.remove(); Env.getCurrentInvertedIndex().deleteReplica(id, backendId); return true; @@ -541,7 +541,7 @@ public TabletHealth getHealth(SystemInfoService systemInfoService, Set hosts = Sets.newHashSet(); ArrayList versions = new ArrayList<>(); for (Replica replica : replicas) { - Backend backend = systemInfoService.getBackend(replica.getBackendId()); + Backend backend = systemInfoService.getBackend(replica.getBackendIdWithoutException()); if (!isReplicaAndBackendAlive(replica, backend, hosts)) { continue; } @@ -636,7 +636,8 @@ public TabletHealth getHealth(SystemInfoService systemInfoService, // 3. replica is under relocating if (stable < replicationNum) { - Set replicaBeIds = replicas.stream().map(Replica::getBackendId).collect(Collectors.toSet()); + Set replicaBeIds = replicas.stream().map(Replica::getBackendIdWithoutException) + .collect(Collectors.toSet()); List availableBeIds = aliveBeIds.stream().filter(systemInfoService::checkBackendScheduleAvailable) .collect(Collectors.toList()); if (replicaBeIds.containsAll(availableBeIds) @@ -756,7 +757,7 @@ public TabletHealth getColocateHealth(long visibleVersion, int aliveAndVersionComplete = 0; Set hosts = Sets.newHashSet(); for (Replica replica : replicas) { - Backend backend = systemInfoService.getBackend(replica.getBackendId()); + Backend backend = systemInfoService.getBackend(replica.getBackendIdWithoutException()); if (!isReplicaAndBackendAlive(replica, backend, hosts)) { continue; } @@ -805,7 +806,7 @@ public TabletHealth getColocateHealth(long visibleVersion, // 2. check version completeness for (Replica replica : replicas) { - if (!backendsSet.contains(replica.getBackendId())) { + if (!backendsSet.contains(replica.getBackendIdWithoutException())) { // We don't care about replicas that are not in backendsSet. // eg: replicaBackendIds=(1,2,3,4); backendsSet=(1,2,3), // then replica 4 should be skipped here and then goto ```COLOCATE_REDUNDANT``` in step 3 @@ -861,7 +862,7 @@ public boolean readyToBeRepaired(SystemInfoService infoService, TabletSchedCtx.P boolean allBeAliveOrDecommissioned = true; for (Replica replica : replicas) { - Backend backend = infoService.getBackend(replica.getBackendId()); + Backend backend = infoService.getBackend(replica.getBackendIdWithoutException()); if (backend == null || (!backend.isAlive() && !backend.isDecommissioned())) { allBeAliveOrDecommissioned = false; break; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java index 08d947677aec913..466ec71986eaf8a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java @@ -628,15 +628,15 @@ public void addReplica(long tabletId, Replica replica) { try { Preconditions.checkState(tabletMetaMap.containsKey(tabletId), "tablet " + tabletId + " not exists, replica " + replica.getId() - + ", backend " + replica.getBackendId()); + + ", backend " + replica.getBackendIdWithoutException()); // cloud mode, create table not need backendId, represent with -1. - long backendId = Config.isCloudMode() ? -1 : replica.getBackendId(); + long backendId = Config.isCloudMode() ? -1 : replica.getBackendIdWithoutException(); replicaMetaTable.put(tabletId, backendId, replica); replicaToTabletMap.put(replica.getId(), tabletId); backingReplicaMetaTable.put(backendId, tabletId, replica); if (LOG.isDebugEnabled()) { LOG.debug("add replica {} of tablet {} in backend {}", - replica.getId(), tabletId, replica.getBackendId()); + replica.getId(), tabletId, replica.getBackendIdWithoutException()); } } finally { writeUnlock(stamp); diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java index 0da7428e4225212..dfc8af760a0148a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java @@ -332,7 +332,7 @@ public void completeSchedCtx(TabletSchedCtx tabletCtx) throws SchedException { Set hosts = Sets.newHashSet(); List replicaHighBEs = Lists.newArrayList(); for (BackendLoadStatistic beStat : highBEs) { - if (replicas.stream().anyMatch(replica -> beStat.getBeId() == replica.getBackendId())) { + if (replicas.stream().anyMatch(replica -> beStat.getBeId() == replica.getBackendIdWithoutException())) { replicaHighBEs.add(beStat); } Backend be = infoService.getBackend(beStat.getBeId()); @@ -350,7 +350,7 @@ public void completeSchedCtx(TabletSchedCtx tabletCtx) throws SchedException { // select a replica as source boolean setSource = false; for (Replica replica : replicas) { - PathSlot slot = backendsWorkingSlots.get(replica.getBackendId()); + PathSlot slot = backendsWorkingSlots.get(replica.getBackendIdWithoutException()); if (slot == null) { continue; } @@ -368,7 +368,8 @@ public void completeSchedCtx(TabletSchedCtx tabletCtx) throws SchedException { // Select a low load backend as destination. List candidates = Lists.newArrayList(); for (BackendLoadStatistic beStat : lowBEs) { - if (beStat.isAvailable() && replicas.stream().noneMatch(r -> r.getBackendId() == beStat.getBeId())) { + if (beStat.isAvailable() && replicas.stream() + .noneMatch(r -> r.getBackendIdWithoutException() == beStat.getBeId())) { // check if on same host. Backend lowBackend = infoService.getBackend(beStat.getBeId()); if (lowBackend == null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java index 96eef52d5978706..2fa5e6a8827581a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java @@ -325,10 +325,10 @@ public void completeSchedCtx(TabletSchedCtx tabletCtx) throws SchedException { } // check src slot - PathSlot slot = backendsWorkingSlots.get(replica.getBackendId()); + PathSlot slot = backendsWorkingSlots.get(replica.getBackendIdWithoutException()); if (slot == null) { if (LOG.isDebugEnabled()) { - LOG.debug("BE does not have slot: {}", replica.getBackendId()); + LOG.debug("BE does not have slot: {}", replica.getBackendIdWithoutException()); } throw new SchedException(Status.UNRECOVERABLE, "unable to take src slot"); } @@ -339,7 +339,7 @@ public void completeSchedCtx(TabletSchedCtx tabletCtx) throws SchedException { // after take src slot, we can set src replica now tabletCtx.setSrc(replica); - BackendLoadStatistic beStat = clusterStat.getBackendLoadStatistic(replica.getBackendId()); + BackendLoadStatistic beStat = clusterStat.getBackendLoadStatistic(replica.getBackendIdWithoutException()); if (!beStat.isAvailable()) { throw new SchedException(Status.UNRECOVERABLE, "the backend is not available"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java index 7095ad8dc54315c..163137825e4f387 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java @@ -250,7 +250,7 @@ private void checkMovesCompleted(List moves, List toDeleteKeys if (LOG.isDebugEnabled()) { LOG.debug("Move {} is completed. The cur dist: {}", move, invertedIndex.getReplicasByTabletId(move.tabletId).stream() - .map(Replica::getBackendId).collect(Collectors.toList())); + .map(Replica::getBackendIdWithoutException).collect(Collectors.toList())); } counterBalanceMoveSucceeded.incrementAndGet(); } @@ -261,7 +261,7 @@ private void checkMovesCompleted(List moves, List toDeleteKeys private boolean checkMoveCompleted(TabletMove move) { Long tabletId = move.tabletId; List bes = invertedIndex.getReplicasByTabletId(tabletId).stream() - .map(Replica::getBackendId).collect(Collectors.toList()); + .map(Replica::getBackendIdWithoutException).collect(Collectors.toList()); return !bes.contains(move.fromBe) && bes.contains(move.toBe); } @@ -283,8 +283,9 @@ protected void completeSchedCtx(TabletSchedCtx tabletCtx) // Check src replica's validation Replica srcReplica = tabletCtx.getTablet().getReplicaByBackendId(move.fromBe); Preconditions.checkNotNull(srcReplica); - TabletScheduler.PathSlot slot = backendsWorkingSlots.get(srcReplica.getBackendId()); - Preconditions.checkNotNull(slot, "unable to get fromBe " + srcReplica.getBackendId() + " slot"); + TabletScheduler.PathSlot slot = backendsWorkingSlots.get(srcReplica.getBackendIdWithoutException()); + Preconditions.checkNotNull(slot, "unable to get fromBe " + + srcReplica.getBackendIdWithoutException() + " slot"); if (slot.takeBalanceSlot(srcReplica.getPathHash()) != -1) { tabletCtx.setSrc(srcReplica); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java index cce296c615a19eb..faa4579aea07b7d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java @@ -464,7 +464,7 @@ public long getCopyTimeMs() { public long getSrcBackendId() { if (srcReplica != null) { - return srcReplica.getBackendId(); + return srcReplica.getBackendIdWithoutException(); } else { return -1; } @@ -485,7 +485,7 @@ public void setTempSrc(Replica srcReplica) { public long getTempSrcBackendId() { if (tempSrcReplica != null) { - return tempSrcReplica.getBackendId(); + return tempSrcReplica.getBackendIdWithoutException(); } return -1; } @@ -536,27 +536,27 @@ public boolean filterDestBE(long beId) { } String host = backend.getHost(); for (Replica replica : tablet.getReplicas()) { - Backend be = infoService.getBackend(replica.getBackendId()); + Backend be = infoService.getBackend(replica.getBackendIdWithoutException()); if (be == null) { // BE has been dropped, skip it if (LOG.isDebugEnabled()) { LOG.debug("replica's backend {} does not exist, skip. tablet: {}", - replica.getBackendId(), tabletId); + replica.getBackendIdWithoutException(), tabletId); } continue; } if (!Config.allow_replica_on_same_host && !FeConstants.runningUnitTest && host.equals(be.getHost())) { if (LOG.isDebugEnabled()) { LOG.debug("replica's backend {} is on same host {}, skip. tablet: {}", - replica.getBackendId(), host, tabletId); + replica.getBackendIdWithoutException(), host, tabletId); } return true; } - if (replica.getBackendId() == beId) { + if (replica.getBackendIdWithoutException() == beId) { if (LOG.isDebugEnabled()) { LOG.debug("replica's backend {} is same as dest backend {}, skip. tablet: {}", - replica.getBackendId(), beId, tabletId); + replica.getBackendIdWithoutException(), beId, tabletId); } return true; } @@ -612,10 +612,10 @@ public void chooseSrcReplica(Map backendsWorkingSlots, long exce */ List candidates = Lists.newArrayList(); for (Replica replica : tablet.getReplicas()) { - if (exceptBeId != -1 && replica.getBackendId() == exceptBeId) { + if (exceptBeId != -1 && replica.getBackendIdWithoutException() == exceptBeId) { if (LOG.isDebugEnabled()) { LOG.debug("replica's backend {} is same as except backend {}, skip. tablet: {}", - replica.getBackendId(), exceptBeId, tabletId); + replica.getBackendIdWithoutException(), exceptBeId, tabletId); } continue; } @@ -628,12 +628,12 @@ public void chooseSrcReplica(Map backendsWorkingSlots, long exce continue; } - Backend be = infoService.getBackend(replica.getBackendId()); + Backend be = infoService.getBackend(replica.getBackendIdWithoutException()); if (be == null || !be.isAlive()) { // backend which is in decommission can still be the source backend if (LOG.isDebugEnabled()) { LOG.debug("replica's backend {} does not exist or is not alive, skip. tablet: {}", - replica.getBackendId(), tabletId); + replica.getBackendIdWithoutException(), tabletId); } continue; } @@ -665,11 +665,11 @@ public void chooseSrcReplica(Map backendsWorkingSlots, long exce // sort replica by version count asc, so that we prefer to choose replicas with fewer versions Collections.sort(candidates, VERSION_COUNTER_COMPARATOR); for (Replica srcReplica : candidates) { - PathSlot slot = backendsWorkingSlots.get(srcReplica.getBackendId()); + PathSlot slot = backendsWorkingSlots.get(srcReplica.getBackendIdWithoutException()); if (slot == null) { if (LOG.isDebugEnabled()) { LOG.debug("replica's backend {} does not have working slot, skip. tablet: {}", - srcReplica.getBackendId(), tabletId); + srcReplica.getBackendIdWithoutException(), tabletId); } continue; } @@ -678,7 +678,7 @@ public void chooseSrcReplica(Map backendsWorkingSlots, long exce if (srcPathHash == -1) { if (LOG.isDebugEnabled()) { LOG.debug("replica's backend {} does not have available slot, skip. tablet: {}", - srcReplica.getBackendId(), tabletId); + srcReplica.getBackendIdWithoutException(), tabletId); } continue; } @@ -697,7 +697,7 @@ public void chooseSrcReplica(Map backendsWorkingSlots, long exce public void chooseSrcReplicaForVersionIncomplete(Map backendsWorkingSlots) throws SchedException { chooseSrcReplica(backendsWorkingSlots, destBackendId); - Preconditions.checkState(srcReplica.getBackendId() != destBackendId, + Preconditions.checkState(srcReplica.getBackendIdWithoutException() != destBackendId, "wrong be id: " + destBackendId); } @@ -726,10 +726,10 @@ public void chooseDestReplicaForVersionIncomplete(Map backendsWo } if (!replica.isScheduleAvailable()) { - if (Env.getCurrentSystemInfo().checkBackendScheduleAvailable(replica.getBackendId())) { + if (Env.getCurrentSystemInfo().checkBackendScheduleAvailable(replica.getBackendIdWithoutException())) { if (LOG.isDebugEnabled()) { LOG.debug("replica's backend {} does not exist or is not scheduler available, skip. tablet: {}", - replica.getBackendId(), tabletId); + replica.getBackendIdWithoutException(), tabletId); } } else { if (LOG.isDebugEnabled()) { @@ -762,7 +762,7 @@ public void chooseDestReplicaForVersionIncomplete(Map backendsWo continue; } - if (colocateBackendsSet != null && colocateBackendsSet.contains(replica.getBackendId())) { + if (colocateBackendsSet != null && colocateBackendsSet.contains(replica.getBackendIdWithoutException())) { colocateCand.add(replica); } else { notColocateCand.add(replica); @@ -799,7 +799,7 @@ public void chooseDestReplicaForVersionIncomplete(Map backendsWo Replica chosenReplica = null; for (Replica replica : candidates) { - PathSlot slot = backendsWorkingSlots.get(replica.getBackendId()); + PathSlot slot = backendsWorkingSlots.get(replica.getBackendIdWithoutException()); if (slot == null || !slot.hasAvailableSlot(replica.getPathHash())) { if (!replica.needFurtherRepair()) { throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT, @@ -831,7 +831,7 @@ public void chooseDestReplicaForVersionIncomplete(Map backendsWo // check if the dest replica has available slot // it should not happen cause it just check hasAvailableSlot yet. - PathSlot slot = backendsWorkingSlots.get(chosenReplica.getBackendId()); + PathSlot slot = backendsWorkingSlots.get(chosenReplica.getBackendIdWithoutException()); if (slot == null) { throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT, "backend of dest replica is missing"); @@ -863,9 +863,9 @@ public void chooseDestReplicaForVersionIncomplete(Map backendsWo setDecommissionTime(-1); LOG.info("choose replica {} on backend {} of tablet {} as dest replica for version incomplete," + " and change state from DECOMMISSION to NORMAL", - chosenReplica.getId(), chosenReplica.getBackendId(), tabletId); + chosenReplica.getId(), chosenReplica.getBackendIdWithoutException(), tabletId); } - setDest(chosenReplica.getBackendId(), chosenReplica.getPathHash()); + setDest(chosenReplica.getBackendIdWithoutException(), chosenReplica.getPathHash()); } private boolean checkFurtherRepairFinish(Replica replica, long version) { @@ -905,7 +905,7 @@ public void releaseResource(TabletScheduler tabletScheduler) { public void releaseResource(TabletScheduler tabletScheduler, boolean reserveTablet) { if (srcReplica != null) { Preconditions.checkState(srcPathHash != -1); - PathSlot slot = tabletScheduler.getBackendsWorkingSlots().get(srcReplica.getBackendId()); + PathSlot slot = tabletScheduler.getBackendsWorkingSlots().get(srcReplica.getBackendIdWithoutException()); if (slot != null) { if (type == Type.REPAIR) { slot.freeSlot(srcPathHash); @@ -979,7 +979,7 @@ private void reset(boolean reserveTablet) { } public void deleteReplica(Replica replica) { - tablet.deleteReplicaByBackendId(replica.getBackendId()); + tablet.deleteReplicaByBackendId(replica.getBackendIdWithoutException()); } public StorageMediaMigrationTask createStorageMediaMigrationTask() throws SchedException { @@ -987,7 +987,7 @@ public StorageMediaMigrationTask createStorageMediaMigrationTask() throws SchedE getSchemaHash(), getStorageMedium()); if (destPath == null || destPath.isEmpty()) { throw new SchedException(Status.UNRECOVERABLE, - "backend " + srcReplica.getBackendId() + ", dest path is empty"); + "backend " + srcReplica.getBackendIdWithoutException() + ", dest path is empty"); } storageMediaMigrationTask.setDataDir(destPath); this.taskTimeoutMs = getApproximateTimeoutMs(); @@ -997,10 +997,10 @@ public StorageMediaMigrationTask createStorageMediaMigrationTask() throws SchedE // database lock should be held. public CloneTask createCloneReplicaAndTask() throws SchedException { - Backend srcBe = infoService.getBackend(srcReplica.getBackendId()); + Backend srcBe = infoService.getBackend(srcReplica.getBackendIdWithoutException()); if (srcBe == null) { throw new SchedException(Status.SCHEDULE_FAILED, - "src backend " + srcReplica.getBackendId() + " does not exist"); + "src backend " + srcReplica.getBackendIdWithoutException() + " does not exist"); } Backend destBe = infoService.getBackend(destBackendId); @@ -1281,7 +1281,7 @@ public List getBrief() { // show the real priority value, higher this value, higher sched priority. Add 10 hour to make it // to be a positive value. result.add(String.valueOf((System.currentTimeMillis() - getCompareValue()) / 1000 + 10 * 3600L)); - result.add(srcReplica == null ? "-1" : String.valueOf(srcReplica.getBackendId())); + result.add(srcReplica == null ? "-1" : String.valueOf(srcReplica.getBackendIdWithoutException())); result.add(String.valueOf(srcPathHash)); result.add(String.valueOf(destBackendId)); result.add(String.valueOf(destPathHash)); @@ -1372,7 +1372,7 @@ public String toString() { sb.append(", priority: ").append(tabletHealth.priority.name()); sb.append(", tablet size: ").append(tabletSize); if (srcReplica != null) { - sb.append(", from backend: ").append(srcReplica.getBackendId()); + sb.append(", from backend: ").append(srcReplica.getBackendIdWithoutException()); sb.append(", src path hash: ").append(srcPathHash); } if (destPathHash != -1) { @@ -1421,7 +1421,7 @@ public void resetReplicaState() { replica.setPostWatermarkTxnId(-1); if (LOG.isDebugEnabled()) { LOG.debug("reset replica {} on backend {} of tablet {} state from DECOMMISSION to NORMAL", - replica.getId(), replica.getBackendId(), tabletId); + replica.getId(), replica.getBackendIdWithoutException(), tabletId); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index 7c58d6acc5321cd..5946378058bc551 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -46,6 +46,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.persist.ReplicaPersistInfo; @@ -648,7 +649,8 @@ private void checkDiskBalanceLastSuccTime(long beId, long pathHash) throws Sched public void updateDestPathHash(TabletSchedCtx tabletCtx) { // find dest replica Optional destReplica = tabletCtx.getReplicas() - .stream().filter(replica -> replica.getBackendId() == tabletCtx.getDestBackendId()).findAny(); + .stream().filter(replica -> replica.getBackendIdWithoutException() + == tabletCtx.getDestBackendId()).findAny(); if (destReplica.isPresent() && tabletCtx.getDestPathHash() != -1) { destReplica.get().setPathHash(tabletCtx.getDestPathHash()); } @@ -753,7 +755,14 @@ private Tag chooseProperTag(TabletSchedCtx tabletCtx, boolean forMissingReplica) Map allocMap = tabletCtx.getReplicaAlloc().getAllocMap(); Map currentAllocMap = Maps.newHashMap(); for (Replica replica : replicas) { - Backend be = infoService.getBackend(replica.getBackendId()); + long beId; + try { + beId = replica.getBackendId(); + } catch (UserException e) { + LOG.warn("replica is not found", e); + beId = -1; + } + Backend be = infoService.getBackend(beId); if (replica.isScheduleAvailable() && replica.isAlive() && !replica.tooSlow() && be.isMixNode()) { Short num = currentAllocMap.getOrDefault(be.getLocationTag(), (short) 0); @@ -877,7 +886,7 @@ private void handleRedundantReplica(TabletSchedCtx tabletCtx, boolean force) thr private boolean deleteBackendDropped(TabletSchedCtx tabletCtx, boolean force) throws SchedException { for (Replica replica : tabletCtx.getReplicas()) { - long beId = replica.getBackendId(); + long beId = replica.getBackendIdWithoutException(); if (infoService.getBackend(beId) == null) { deleteReplicaInternal(tabletCtx, replica, "backend dropped", force); return true; @@ -908,7 +917,7 @@ private boolean deleteTooSlowReplica(TabletSchedCtx tabletCtx, boolean force) th private boolean deleteBackendUnavailable(TabletSchedCtx tabletCtx, boolean force) throws SchedException { for (Replica replica : tabletCtx.getReplicas()) { - Backend be = infoService.getBackend(replica.getBackendId()); + Backend be = infoService.getBackend(replica.getBackendIdWithoutException()); if (be == null) { // this case should be handled in deleteBackendDropped() continue; @@ -957,7 +966,7 @@ private boolean deleteReplicaOnSameHost(TabletSchedCtx tabletCtx, boolean force) // host -> (replicas on same host) Map> hostToReplicas = Maps.newHashMap(); for (Replica replica : tabletCtx.getReplicas()) { - Backend be = infoService.getBackend(replica.getBackendId()); + Backend be = infoService.getBackend(replica.getBackendIdWithoutException()); if (be == null) { // this case should be handled in deleteBackendDropped() return false; @@ -992,7 +1001,7 @@ private boolean deleteReplicaNotInValidTag(TabletSchedCtx tabletCtx, boolean for List replicas = tablet.getReplicas(); Map allocMap = tabletCtx.getReplicaAlloc().getAllocMap(); for (Replica replica : replicas) { - Backend be = infoService.getBackend(replica.getBackendId()); + Backend be = infoService.getBackend(replica.getBackendIdWithoutException()); if (be.isMixNode() && !allocMap.containsKey(be.getLocationTag())) { deleteReplicaInternal(tabletCtx, replica, "not in valid tag", force); return true; @@ -1027,7 +1036,8 @@ private boolean deleteReplicaOnUrgentHighDisk(TabletSchedCtx tabletCtx, boolean Replica chosenReplica = null; double maxUsages = -1; for (Replica replica : tabletCtx.getReplicas()) { - BackendLoadStatistic beStatistic = statistic.getBackendLoadStatistic(replica.getBackendId()); + BackendLoadStatistic beStatistic = statistic + .getBackendLoadStatistic(replica.getBackendIdWithoutException()); if (beStatistic == null) { continue; } @@ -1061,7 +1071,8 @@ private boolean deleteFromHighLoadBackend(TabletSchedCtx tabletCtx, List 0 && replica.getBackendId() == debugHighBeId) { + if (debugHighBeId > 0 && replica.getBackendIdWithoutException() == debugHighBeId) { chosenReplica = replica; break; } @@ -1105,7 +1116,8 @@ private boolean deleteFromHighLoadBackend(TabletSchedCtx tabletCtx, List= tabletCtx.getReplicas().size() / 2 + 1) { chosenReplica.setState(ReplicaState.COMPACTION_TOO_SLOW); LOG.info("set replica id :{} tablet id: {}, backend id: {} to COMPACTION_TOO_SLOW", - chosenReplica.getId(), tabletCtx.getTablet().getId(), chosenReplica.getBackendId()); + chosenReplica.getId(), tabletCtx.getTablet().getId(), chosenReplica.getBackendIdWithoutException()); throw new SchedException(Status.FINISHED, "set replica to COMPACTION_TOO_SLOW"); } throw new SchedException(Status.FINISHED, "No replica set to COMPACTION_TOO_SLOW"); @@ -1180,7 +1192,7 @@ private void deleteReplicaInternal(TabletSchedCtx tabletCtx, // Remain it as VERY_HIGH may block other task. tabletCtx.setPriority(Priority.NORMAL); LOG.info("set replica {} on backend {} of tablet {} state to DECOMMISSION due to reason {}", - replica.getId(), replica.getBackendId(), tabletCtx.getTabletId(), reason); + replica.getId(), replica.getBackendIdWithoutException(), tabletCtx.getTabletId(), reason); } try { long preWatermarkTxnId = replica.getPreWatermarkTxnId(); @@ -1228,7 +1240,7 @@ private void deleteReplicaInternal(TabletSchedCtx tabletCtx, // NOTICE: only delete the replica from meta may not work. sometimes we can depend on tablet report // deleting these replicas, but in FORCE_REDUNDANT case, replica may be added to meta again in report // process. - sendDeleteReplicaTask(replica.getBackendId(), tabletCtx.getTabletId(), replica.getId(), + sendDeleteReplicaTask(replica.getBackendIdWithoutException(), tabletCtx.getTabletId(), replica.getId(), tabletCtx.getSchemaHash()); } @@ -1238,12 +1250,12 @@ private void deleteReplicaInternal(TabletSchedCtx tabletCtx, tabletCtx.getPartitionId(), tabletCtx.getIndexId(), tabletCtx.getTabletId(), - replica.getBackendId()); + replica.getBackendIdWithoutException()); Env.getCurrentEnv().getEditLog().logDeleteReplica(info); LOG.info("delete replica. tablet id: {}, backend id: {}. reason: {}, force: {}", - tabletCtx.getTabletId(), replica.getBackendId(), reason, force); + tabletCtx.getTabletId(), replica.getBackendIdWithoutException(), reason, force); } private void sendDeleteReplicaTask(long backendId, long tabletId, long replicaId, int schemaHash) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManagerUtils.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManagerUtils.java index 8e46547ae6c121e..879b452459ae551 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManagerUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManagerUtils.java @@ -25,6 +25,7 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Table; +import org.apache.doris.cloud.qe.ClusterException; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; import org.apache.doris.qe.ConnectContext; @@ -255,7 +256,11 @@ public AutoCloseConnectContext(ConnectContext connectContext) { this.previousContext = ConnectContext.get(); this.connectContext = connectContext; connectContext.setThreadLocalInfo(); - connectContext.getCloudCluster(); + try { + connectContext.getCloudCluster(); + } catch (ClusterException e) { + LOG.warn("failed to get cloud cluster", e); + } } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java index be0c510559eda25..04e69cf422d4934 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java @@ -20,7 +20,8 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.Replica; -import org.apache.doris.catalog.Replica.ReplicaContext; +import org.apache.doris.cloud.proto.Cloud; +import org.apache.doris.cloud.qe.ClusterException; import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; @@ -89,17 +90,26 @@ private boolean isColocated() { return Env.getCurrentColocateIndex().isColocateTable(tableId); } - private long getColocatedBeId(String cluster) { + private long getColocatedBeId(String cluster) throws ClusterException { List bes = ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getBackendsByClusterId(cluster); + String clusterName = ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getClusterNameByClusterId(cluster); + if (bes.isEmpty()) { + throw new ClusterException( + String.format("There are no Backend nodes in the current cluster %s", clusterName), + ClusterException.FailedTypeEnum.CURRENT_CLUSTER_NO_BE); + } List availableBes = new ArrayList<>(); for (Backend be : bes) { if (be.isAlive()) { availableBes.add(be); } } - if (availableBes == null || availableBes.size() == 0) { + + if (availableBes.isEmpty()) { LOG.warn("failed to get available be, clusterId: {}", cluster); - return -1; + throw new ClusterException( + String.format("All the Backend nodes in the current cluster %s are in an abnormal state", clusterName), + ClusterException.FailedTypeEnum.CLUSTERS_NO_ALIVE_BE); } // Tablets with the same idx will be hashed to the same BE, which @@ -112,11 +122,11 @@ private long getColocatedBeId(String cluster) { public long getBackendId(String beEndpoint) { String cluster = ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getClusterIdByBeAddr(beEndpoint); - return getBackendIdImpl(cluster); + return getBackendId(cluster); } @Override - public long getBackendId() { + public long getBackendId() throws ClusterException { String cluster = null; // Not in a connect session ConnectContext context = ConnectContext.get(); @@ -127,7 +137,8 @@ public long getBackendId() { ((CloudEnv) Env.getCurrentEnv()).checkCloudClusterPriv(cluster); } catch (Exception e) { LOG.warn("get cluster by session context exception"); - return -1; + throw new ClusterException(String.format("default cluster %s check auth failed", cluster), + ClusterException.FailedTypeEnum.CURRENT_USER_NO_AUTH_TO_USE_DEFAULT_CLUSTER); } if (LOG.isDebugEnabled()) { LOG.debug("get cluster by session context cluster: {}", cluster); @@ -137,28 +148,40 @@ public long getBackendId() { if (LOG.isDebugEnabled()) { LOG.debug("get cluster by context {}", cluster); } + String clusterStatus = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) + .getCloudStatusByName(cluster); + if (!Strings.isNullOrEmpty(clusterStatus) + && Cloud.ClusterStatus.valueOf(clusterStatus) + == Cloud.ClusterStatus.MANUAL_SHUTDOWN) { + LOG.warn("auto start cluster {} in manual shutdown status", cluster); + throw new ClusterException( + String.format("The current cluster %s has been manually shutdown", cluster), + ClusterException.FailedTypeEnum.CURRENT_CLUSTER_BEEN_MANUAL_SHUTDOWN); + } } } else { if (LOG.isDebugEnabled()) { LOG.debug("connect context is null in getBackendId"); } - return -1; + throw new ClusterException("connect context not set", + ClusterException.FailedTypeEnum.CONNECT_CONTEXT_NOT_SET); } return getBackendIdImpl(cluster); } - private long getBackendIdImpl(String cluster) { - // check default cluster valid. + private long getBackendIdImpl(String cluster) throws ClusterException { if (Strings.isNullOrEmpty(cluster)) { LOG.warn("failed to get available be, clusterName: {}", cluster); - return -1; + throw new ClusterException("cluster name is empty", + ClusterException.FailedTypeEnum.CONNECT_CONTEXT_NOT_SET_CLUSTER); } boolean exist = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) .getCloudClusterNames().contains(cluster); if (!exist) { // can't use this default cluster, plz change another LOG.warn("cluster: {} is not existed", cluster); - return -1; + throw new ClusterException(String.format("The current cluster %s is not registered in the system", cluster), + ClusterException.FailedTypeEnum.CURRENT_CLUSTER_NOT_EXIST); } // if cluster is SUSPENDED, wait @@ -229,10 +252,16 @@ private long getBackendIdImpl(String cluster) { return hashReplicaToBe(clusterId, false); } - public long hashReplicaToBe(String clusterId, boolean isBackGround) { + public long hashReplicaToBe(String clusterId, boolean isBackGround) throws ClusterException { // TODO(luwei) list should be sorted List clusterBes = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) .getBackendsByClusterId(clusterId); + String clusterName = ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getClusterNameByClusterId(clusterId); + if (clusterBes.isEmpty()) { + throw new ClusterException( + String.format("There are no Backend nodes in the current cluster %s", clusterName), + ClusterException.FailedTypeEnum.CURRENT_CLUSTER_NO_BE); + } // use alive be to exec sql List availableBes = new ArrayList<>(); for (Backend be : clusterBes) { @@ -244,11 +273,13 @@ public long hashReplicaToBe(String clusterId, boolean isBackGround) { availableBes.add(be); } } - if (availableBes == null || availableBes.size() == 0) { + if (availableBes.isEmpty()) { if (!isBackGround) { LOG.warn("failed to get available be, clusterId: {}", clusterId); } - return -1; + throw new ClusterException( + String.format("All the Backend nodes in the current cluster %s are in an abnormal state", clusterName), + ClusterException.FailedTypeEnum.CLUSTERS_NO_ALIVE_BE); } if (LOG.isDebugEnabled()) { LOG.debug("availableBes={}", availableBes); @@ -278,10 +309,16 @@ pickedBeId, getId(), partitionId, availableBes.size(), idx, index, return pickedBeId; } - public List hashReplicaToBes(String clusterId, boolean isBackGround, int replicaNum) { + public List hashReplicaToBes(String clusterId, boolean isBackGround, int replicaNum) throws ClusterException { // TODO(luwei) list should be sorted List clusterBes = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) .getBackendsByClusterId(clusterId); + String clusterName = ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getClusterNameByClusterId(clusterId); + if (clusterBes.isEmpty()) { + throw new ClusterException( + String.format("There are no Backend nodes in the current cluster %s", clusterName), + ClusterException.FailedTypeEnum.CURRENT_CLUSTER_NO_BE); + } // use alive be to exec sql List availableBes = new ArrayList<>(); for (Backend be : clusterBes) { @@ -293,17 +330,19 @@ public List hashReplicaToBes(String clusterId, boolean isBackGround, int r availableBes.add(be); } } - if (availableBes == null || availableBes.size() == 0) { + if (availableBes.isEmpty()) { if (!isBackGround) { LOG.warn("failed to get available be, clusterId: {}", clusterId); } - return new ArrayList(); + throw new ClusterException( + String.format("All the Backend nodes in the current cluster %s are in an abnormal state", clusterName), + ClusterException.FailedTypeEnum.CLUSTERS_NO_ALIVE_BE); } if (LOG.isDebugEnabled()) { LOG.debug("availableBes={}", availableBes); } - int realReplicaNum = replicaNum > availableBes.size() ? availableBes.size() : replicaNum; + int realReplicaNum = Math.min(replicaNum, availableBes.size()); List bes = new ArrayList(); for (int i = 0; i < realReplicaNum; ++i) { long index = -1; diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java index 73ddbe4c4551aeb..b92dbd38091f641 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java @@ -28,6 +28,7 @@ import org.apache.doris.catalog.Tablet; import org.apache.doris.cloud.persist.UpdateCloudReplicaInfo; import org.apache.doris.cloud.proto.Cloud; +import org.apache.doris.cloud.qe.ClusterException; import org.apache.doris.cloud.rpc.MetaServiceProxy; import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.ClientPool; @@ -470,7 +471,13 @@ private void completeRouteInfo() { Map> clusterToBackends = ((CloudReplica) replica).getClusterToBackends(); if (!clusterToBackends.containsKey(cluster)) { - long beId = ((CloudReplica) replica).hashReplicaToBe(cluster, true); + long beId; + try { + beId = ((CloudReplica) replica).hashReplicaToBe(cluster, true); + } catch (ClusterException e) { + LOG.warn("failed to hash replica to be {}", cluster, e); + beId = -1; + } if (beId <= 0) { assignedErrNum++; continue; @@ -949,8 +956,15 @@ private void migrateTablets(Long srcBe, Long dstBe) { // populate to followers Database db = Env.getCurrentInternalCatalog().getDbNullable(cloudReplica.getDbId()); if (db == null) { + long beId; + try { + beId = cloudReplica.getBackendId(); + } catch (ClusterException e) { + LOG.warn("get backend failed cloudReplica {}", cloudReplica, e); + beId = -1; + } LOG.error("get null db from replica, tabletId={}, partitionId={}, beId={}", - cloudReplica.getTableId(), cloudReplica.getPartitionId(), cloudReplica.getBackendId()); + cloudReplica.getTableId(), cloudReplica.getPartitionId(), beId); continue; } OlapTable table = (OlapTable) db.getTableNullable(cloudReplica.getTableId()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java index 0c8e20827167d4b..4182063616db64f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java @@ -23,6 +23,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; +import org.apache.doris.cloud.qe.ClusterException; import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; @@ -88,7 +89,13 @@ public CloudBrokerLoadJob(long dbId, String label, BrokerDesc brokerDesc, Origin private void setCloudClusterId() throws MetaNotFoundException { ConnectContext context = ConnectContext.get(); if (context != null) { - String clusterName = context.getCloudCluster(); + String clusterName = ""; + try { + clusterName = context.getCloudCluster(); + } catch (ClusterException e) { + LOG.warn("failed to get cluster name", e); + throw new MetaNotFoundException("failed to get cluster name " + e); + } if (Strings.isNullOrEmpty(clusterName)) { LOG.warn("cluster name is empty"); throw new MetaNotFoundException("cluster name is empty"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/qe/ClusterException.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/qe/ClusterException.java new file mode 100644 index 000000000000000..de9a12f6825363a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/qe/ClusterException.java @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cloud.qe; + +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.InternalErrorCode; +import org.apache.doris.common.UserException; + +/** + * Thrown for cloud cluster errors + */ +public class ClusterException extends UserException { + public enum FailedTypeEnum { + NOT_CLOUD_MODE, + CONNECT_CONTEXT_NOT_SET, + CONNECT_CONTEXT_NOT_SET_CLUSTER, + CURRENT_USER_NO_AUTH_TO_USE_ANY_CLUSTER, + CURRENT_USER_NO_AUTH_TO_USE_DEFAULT_CLUSTER, + CURRENT_CLUSTER_NO_BE, + CLUSTERS_NO_ALIVE_BE, + CURRENT_CLUSTER_NOT_EXIST, + CURRENT_CLUSTER_BEEN_MANUAL_SHUTDOWN, + SYSTEM_NOT_HAVE_CLUSTER, + } + + private InternalErrorCode errorCode; + private ErrorCode mysqlErrorCode; + private final FailedTypeEnum failedType; + public String msg; + + public ClusterException(String msg, FailedTypeEnum failedType) { + super(msg); + this.msg = msg; + this.mysqlErrorCode = ErrorCode.ERR_CLOUD_CLUSTER_ERROR; + this.errorCode = InternalErrorCode.INTERNAL_ERR; + this.failedType = failedType; + } + + public String toString() { + return msg + ", ClusterException: " + failedType; + } + + @Override + public String getMessage() { + String message = errorCode + ", detailMessage = " + super.getMessage() + ", ClusterException: " + failedType; + return deleteUselessMsg(message); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java index 48728efb003aba6..bbab22a52e1cfa3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java @@ -23,6 +23,7 @@ import org.apache.doris.cloud.proto.Cloud; import org.apache.doris.cloud.proto.Cloud.ClusterPB; import org.apache.doris.cloud.proto.Cloud.InstanceInfoPB; +import org.apache.doris.cloud.qe.ClusterException; import org.apache.doris.cloud.rpc.MetaServiceProxy; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; @@ -360,8 +361,15 @@ public boolean containClusterName(String clusterName) { @Override public int getMinPipelineExecutorSize() { + String clusterName = ""; + try { + clusterName = ConnectContext.get().getCloudCluster(false); + } catch (ClusterException e) { + LOG.warn("failed to get cluster name", e); + return 1; + } if (ConnectContext.get() != null - && Strings.isNullOrEmpty(ConnectContext.get().getCloudCluster(false))) { + && Strings.isNullOrEmpty(clusterName)) { return 1; } @@ -375,16 +383,21 @@ public ImmutableMap getBackendsByCurrentCluster() throws Analysis throw new AnalysisException("connect context is null"); } - String cluster = ctx.getCurrentCloudCluster(); - if (Strings.isNullOrEmpty(cluster)) { - throw new AnalysisException("cluster name is empty"); - } - - List backends = getBackendsByClusterName(cluster); Map idToBackend = Maps.newHashMap(); - for (Backend be : backends) { - idToBackend.put(be.getId(), be); + try { + String cluster = ctx.getCurrentCloudCluster(); + if (Strings.isNullOrEmpty(cluster)) { + throw new AnalysisException("cluster name is empty"); + } + + List backends = getBackendsByClusterName(cluster); + for (Backend be : backends) { + idToBackend.put(be.getId(), be); + } + } catch (ClusterException e) { + throw new AnalysisException(e.getMessage()); } + return ImmutableMap.copyOf(idToBackend); } @@ -694,7 +707,13 @@ public void setInstanceStatus(InstanceInfoPB.Status instanceStatus) { public void waitForAutoStartCurrentCluster() throws DdlException { ConnectContext context = ConnectContext.get(); if (context != null) { - String cloudCluster = context.getCloudCluster(); + String cloudCluster = ""; + try { + cloudCluster = context.getCloudCluster(); + } catch (ClusterException e) { + LOG.warn("failed to get cloud cluster", e); + return; + } if (!Strings.isNullOrEmpty(cloudCluster)) { waitForAutoStart(cloudCluster); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/UserException.java b/fe/fe-core/src/main/java/org/apache/doris/common/UserException.java index 5a2dfb8257673a0..c70a436d212fc17 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/UserException.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/UserException.java @@ -76,10 +76,17 @@ public void setMysqlErrorCode(ErrorCode mysqlErrorCode) { @Override public String getMessage() { - return errorCode + ", detailMessage = " + super.getMessage(); + return deleteUselessMsg(errorCode + ", detailMessage = " + super.getMessage()); } public String getDetailMessage() { - return super.getMessage(); + return deleteUselessMsg(super.getMessage()); + } + + protected String deleteUselessMsg(String msg) { + if (msg.contains("detailMessage = errCode = 2, ")) { + return msg.replace("detailMessage = errCode = 2, ", ""); + } + return msg; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java index 8f38b34a2b1761f..dabd3c28cc2e2a4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java @@ -80,7 +80,7 @@ public ProcResult fetchResult() throws AnalysisException { } for (Replica replica : replicas) { - Backend be = backendMap.get(replica.getBackendId()); + Backend be = backendMap.get(replica.getBackendIdWithoutException()); String host = (be == null ? Backend.DUMMY_IP : be.getHost()); int port = (be == null ? 0 : be.getHttpPort()); String hostPort = NetUtils.getHostPortInAccessibleFormat(host, port); @@ -106,7 +106,7 @@ public ProcResult fetchResult() throws AnalysisException { queryHits = QueryStatsUtil.getMergedReplicaStats(replica.getId()); } result.addRow(Arrays.asList(String.valueOf(replica.getId()), - String.valueOf(replica.getBackendId()), + String.valueOf(replica.getBackendIdWithoutException()), String.valueOf(replica.getVersion()), String.valueOf(replica.getLastSuccessVersion()), String.valueOf(replica.getLastFailedVersion()), diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java index 12c1adf71d53e30..a7591af19cdfa91 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java @@ -129,7 +129,7 @@ public List> fetchComparableResult(long version, long backendId } else { for (Replica replica : tablet.getReplicas()) { if ((version > -1 && replica.getVersion() != version) - || (backendId > -1 && replica.getBackendId() != backendId) + || (backendId > -1 && replica.getBackendIdWithoutException() != backendId) || (state != null && replica.getState() != state)) { continue; } @@ -137,7 +137,7 @@ public List> fetchComparableResult(long version, long backendId // tabletId -- replicaId -- backendId -- version -- dataSize -- rowCount -- state tabletInfo.add(tabletId); tabletInfo.add(replica.getId()); - tabletInfo.add(replica.getBackendId()); + tabletInfo.add(replica.getBackendIdWithoutException()); tabletInfo.add(replica.getSchemaHash()); tabletInfo.add(replica.getVersion()); tabletInfo.add(replica.getLastSuccessVersion()); @@ -155,7 +155,7 @@ public List> fetchComparableResult(long version, long backendId tabletInfo.add(replicaIdToQueryHits.getOrDefault(replica.getId(), 0L)); tabletInfo.add(replica.getPathHash()); tabletInfo.add(pathHashToRoot.getOrDefault(replica.getPathHash(), "")); - Backend be = backendMap.get(replica.getBackendId()); + Backend be = backendMap.get(replica.getBackendIdWithoutException()); String host = (be == null ? Backend.DUMMY_IP : be.getHost()); int port = (be == null ? 0 : be.getHttpPort()); String hostPort = NetUtils.getHostPortInAccessibleFormat(host, port); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/FetchRemoteTabletSchemaUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/FetchRemoteTabletSchemaUtil.java index 0e96dc8c5930d19..4a0b9d1ff5950d0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/FetchRemoteTabletSchemaUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/FetchRemoteTabletSchemaUtil.java @@ -77,7 +77,7 @@ public List fetch() { // only need alive replica if (replica.isAlive()) { Set tabletIds = beIdToTabletId.computeIfAbsent( - replica.getBackendId(), k -> Sets.newHashSet()); + replica.getBackendIdWithoutException(), k -> Sets.newHashSet()); tabletIds.add(tablet.getId()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/consistency/CheckConsistencyJob.java b/fe/fe-core/src/main/java/org/apache/doris/consistency/CheckConsistencyJob.java index dd9279a607e6890..0bd2a129f5b92a5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/consistency/CheckConsistencyJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/consistency/CheckConsistencyJob.java @@ -185,7 +185,7 @@ public boolean sendTasks() { maxDataSize = replica.getDataSize(); } - CheckConsistencyTask task = new CheckConsistencyTask(null, replica.getBackendId(), + CheckConsistencyTask task = new CheckConsistencyTask(null, replica.getBackendIdWithoutException(), tabletMeta.getDbId(), tabletMeta.getTableId(), tabletMeta.getPartitionId(), @@ -197,7 +197,7 @@ public boolean sendTasks() { batchTask.addTask(task); // init checksum as '-1' - checksumMap.put(replica.getBackendId(), -1L); + checksumMap.put(replica.getBackendIdWithoutException(), -1L); ++sentTaskReplicaNum; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 38cde9892011db4..83c692d0f54abf6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -1052,7 +1052,7 @@ public void eraseTableDropBackendReplicas(OlapTable olapTable, boolean isReplay) long tabletId = tablet.getId(); List replicas = tablet.getReplicas(); for (Replica replica : replicas) { - long backendId = replica.getBackendId(); + long backendId = replica.getBackendIdWithoutException(); long replicaId = replica.getId(); DropReplicaTask dropTask = new DropReplicaTask(backendId, tabletId, replicaId, schemaHash, true); @@ -2085,7 +2085,7 @@ protected Partition createPartitionWithIndices(long dbId, OlapTable tbl, long pa for (Tablet tablet : index.getTablets()) { long tabletId = tablet.getId(); for (Replica replica : tablet.getReplicas()) { - long backendId = replica.getBackendId(); + long backendId = replica.getBackendIdWithoutException(); long replicaId = replica.getId(); countDownLatch.addMark(backendId, tabletId); CreateReplicaTask task = new CreateReplicaTask(backendId, dbId, tbl.getId(), partitionId, indexId, diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index c2d50460ea49548..4076da346451021 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -22,6 +22,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; +import org.apache.doris.cloud.qe.ClusterException; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; @@ -362,7 +363,12 @@ private String getCloudClusterName(HttpServletRequest request) { return cloudClusterName; } - cloudClusterName = ConnectContext.get().getCloudCluster(); + try { + cloudClusterName = ConnectContext.get().getCloudCluster(); + } catch (ClusterException e) { + LOG.warn("get cloud cluster name failed", e); + return ""; + } if (!Strings.isNullOrEmpty(cloudClusterName)) { return cloudClusterName; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java index 4ccbfa44d8c113e..358bd4318125c8c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java @@ -433,7 +433,7 @@ protected List generateTabletCommitInfos() { LOG.warn("could not find tablet id for replica {}, the tablet maybe dropped", replica); return; } - tabletCommitInfos.add(new TabletCommitInfo(tabletId, replica.getBackendId())); + tabletCommitInfos.add(new TabletCommitInfo(tabletId, replica.getBackendIdWithoutException())); })); return tabletCommitInfos; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java index 1009c4257b85f54..a6a7cc65e3bba37 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java @@ -199,9 +199,14 @@ public Backend selectBackendForGroupCommit(long tableId, ConnectContext context, throw new LoadException(e.getMessage()); } } else { - // Master FE will select BE by itself. - return Env.getCurrentSystemInfo() + try { + // Master FE will select BE by itself. + return Env.getCurrentSystemInfo() .getBackend(selectBackendForGroupCommitInternal(tableId, context.getCloudCluster(), isCloud)); + } catch (Exception e) { + LOG.warn("get backend failed, tableId: {}, exception", tableId, e); + throw new LoadException(e.getMessage()); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java index 0323f3ea62ee15b..183ce31a4b0fd79 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java @@ -42,13 +42,9 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.PrimitiveType; -import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf.TableType; -import org.apache.doris.catalog.Tablet; -import org.apache.doris.catalog.TabletInvertedIndex; -import org.apache.doris.catalog.TabletMeta; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.CaseSensibility; @@ -63,7 +59,6 @@ import org.apache.doris.common.PatternMatcherWrapper; import org.apache.doris.common.UserException; import org.apache.doris.common.util.ExprUtil; -import org.apache.doris.common.util.ListComparator; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.load.LoadJob.JobState; @@ -85,7 +80,6 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; @@ -1293,91 +1287,6 @@ public long getLatestJobIdByLabel(long dbId, String labelValue) { return jobId; } - public List> getLoadJobUnfinishedInfo(long jobId) { - LinkedList> infos = new LinkedList>(); - TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); - - LoadJob loadJob = getLoadJob(jobId); - if (loadJob == null - || (loadJob.getState() != JobState.LOADING && loadJob.getState() != JobState.QUORUM_FINISHED)) { - return infos; - } - - long dbId = loadJob.getDbId(); - Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); - if (db == null) { - return infos; - } - - - readLock(); - try { - Map tabletMap = loadJob.getIdToTabletLoadInfo(); - for (long tabletId : tabletMap.keySet()) { - TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId); - if (tabletMeta == null) { - // tablet may be dropped during loading - continue; - } - - long tableId = tabletMeta.getTableId(); - - OlapTable table = (OlapTable) db.getTableNullable(tableId); - if (table == null) { - continue; - } - table.readLock(); - try { - long partitionId = tabletMeta.getPartitionId(); - Partition partition = table.getPartition(partitionId); - if (partition == null) { - continue; - } - - long indexId = tabletMeta.getIndexId(); - MaterializedIndex index = partition.getIndex(indexId); - if (index == null) { - continue; - } - - Tablet tablet = index.getTablet(tabletId); - if (tablet == null) { - continue; - } - - PartitionLoadInfo partitionLoadInfo = loadJob.getPartitionLoadInfo(tableId, partitionId); - long version = partitionLoadInfo.getVersion(); - - for (Replica replica : tablet.getReplicas()) { - if (replica.checkVersionCatchUp(version, false)) { - continue; - } - - List info = Lists.newArrayList(); - info.add(replica.getBackendId()); - info.add(tabletId); - info.add(replica.getId()); - info.add(replica.getVersion()); - info.add(partitionId); - info.add(version); - - infos.add(info); - } - } finally { - table.readUnlock(); - } - } - } finally { - readUnlock(); - } - - // sort by version, backendId - ListComparator> comparator = new ListComparator>(3, 0); - Collections.sort(infos, comparator); - - return infos; - } - public static class JobInfo { public String dbName; public Set tblNames = Sets.newHashSet(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java index 7b156aeff8fe932..c6b735d95de6eaf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java @@ -433,7 +433,13 @@ public HttpPut generateRequestForMySqlLoad( // cloud cluster if (Config.isCloudMode()) { - String clusterName = ConnectContext.get().getCloudCluster(); + String clusterName = ""; + try { + clusterName = ConnectContext.get().getCloudCluster(); + } catch (Exception e) { + LOG.warn("failed to get cloud cluster: " + e.getMessage()); + throw new LoadException("failed to get cloud cluster: " + e.getMessage()); + } if (Strings.isNullOrEmpty(clusterName)) { throw new LoadException("cloud cluster is empty"); } @@ -447,7 +453,14 @@ public HttpPut generateRequestForMySqlLoad( private String selectBackendForMySqlLoad(String database, String table) throws LoadException { Backend backend = null; if (Config.isCloudMode()) { - backend = StreamLoadHandler.selectBackend(ConnectContext.get().getCloudCluster(), false); + String clusterName = ""; + try { + clusterName = ConnectContext.get().getCloudCluster(); + } catch (Exception e) { + LOG.warn("failed to get cloud cluster: " + e.getMessage()); + throw new LoadException("failed to get cloud cluster: " + e); + } + backend = StreamLoadHandler.selectBackend(clusterName, false); } else { BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().build(); List backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index a25cd99985892e1..c3cb874c61ef0f0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -31,6 +31,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; +import org.apache.doris.cloud.qe.ClusterException; import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; @@ -346,7 +347,11 @@ public RoutineLoadJob(Long id, String name, sessionVariables.put(SessionVariable.SQL_MODE, Long.toString(var.getSqlMode())); this.memtableOnSinkNode = ConnectContext.get().getSessionVariable().enableMemtableOnSinkNode; this.qualifiedUser = ConnectContext.get().getQualifiedUser(); - this.cloudCluster = ConnectContext.get().getCloudCluster(); + try { + this.cloudCluster = ConnectContext.get().getCloudCluster(); + } catch (ClusterException e) { + LOG.warn("failed to get cloud cluster", e); + } } else { sessionVariables.put(SessionVariable.SQL_MODE, String.valueOf(SqlModeHelper.MODE_DEFAULT)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index bda849fddf081ce..e346b9bfe6a060e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -1296,7 +1296,7 @@ private static boolean addReplica(long tabletId, TabletMeta tabletMeta, TTabletI boolean canAddForceRedundant = status == TabletStatus.FORCE_REDUNDANT && infoService.checkBackendScheduleAvailable(backendId) && tablet.getReplicas().stream().anyMatch( - r -> !infoService.checkBackendScheduleAvailable(r.getBackendId())); + r -> !infoService.checkBackendScheduleAvailable(r.getBackendIdWithoutException())); if (isColocateBackend || canAddForceRedundant @@ -1385,7 +1385,7 @@ private static boolean addReplica(long tabletId, TabletMeta tabletMeta, TTabletI // replica is enough. check if this tablet is already in meta // (status changed between 'tabletReport()' and 'addReplica()') for (Replica replica : tablet.getReplicas()) { - if (replica.getBackendId() == backendId) { + if (replica.getBackendIdWithoutException() == backendId) { // tablet is already in meta. return true return true; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java index a5ee0bcd6b1d396..2003815e8d7c142 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java @@ -301,7 +301,7 @@ private List getWorkersByReplicas(Tablet tablet) { List replicas = tablet.getReplicas(); List workers = Lists.newArrayListWithCapacity(replicas.size()); for (Replica replica : replicas) { - DistributedPlanWorker worker = workerManager.getWorker(replica.getBackendId()); + DistributedPlanWorker worker = workerManager.getWorker(replica.getBackendIdWithoutException()); if (worker.available()) { workers.add(worker); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index ddd6c0f719e7788..2fd73a3b1b05f43 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -56,6 +56,7 @@ import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Tablet; +import org.apache.doris.cloud.qe.ClusterException; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.ErrorCode; @@ -869,7 +870,7 @@ private void addScanRangeLocations(Partition partition, replicaOptional.ifPresent( r -> { Backend backend = Env.getCurrentSystemInfo() - .getBackend(r.getBackendId()); + .getBackend(r.getBackendIdWithoutException()); if (backend != null && backend.isAlive()) { replicas.clear(); replicas.add(r); @@ -881,22 +882,30 @@ private void addScanRangeLocations(Partition partition, boolean tabletIsNull = true; boolean collectedStat = false; + boolean clusterException = false; List errs = Lists.newArrayList(); int replicaInTablet = 0; long oneReplicaBytes = 0; for (Replica replica : replicas) { - Backend backend = Env.getCurrentSystemInfo().getBackend(replica.getBackendId()); + Backend backend = null; + long backendId = -1; + try { + backendId = replica.getBackendId(); + backend = Env.getCurrentSystemInfo().getBackend(backendId); + } catch (ClusterException e) { + LOG.warn("failed to get backend {} for replica {}", backendId, replica.getId(), e); + errs.add(e.toString()); + clusterException = true; + continue; + } if (backend == null || !backend.isAlive()) { if (LOG.isDebugEnabled()) { - LOG.debug("backend {} not exists or is not alive for replica {}", replica.getBackendId(), + LOG.debug("backend {} not exists or is not alive for replica {}", backendId, replica.getId()); } - String err = "replica " + replica.getId() + "'s backend " + replica.getBackendId() + String err = "replica " + replica.getId() + "'s backend " + backendId + " does not exist or not alive"; - if (Config.isCloudMode()) { - errs.add(ConnectContext.cloudNoBackendsReason()); - } errs.add(err); continue; } @@ -940,6 +949,9 @@ private void addScanRangeLocations(Partition partition, break; } } + if (clusterException) { + throw new UserException("tablet " + tabletId + " err: " + Joiner.on(", ").join(errs)); + } if (tabletIsNull) { throw new UserException("tablet " + tabletId + " has no queryable replicas. err: " + Joiner.on(", ").join(errs)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index 621ba63a20d6768..1918a93f6cd6035 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -44,8 +44,8 @@ import org.apache.doris.catalog.RangePartitionItem; import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.Tablet; +import org.apache.doris.cloud.qe.ClusterException; import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; @@ -641,15 +641,15 @@ public List createLocation(long dbId, OlapTable table) // we should ensure the replica backend is alive // otherwise, there will be a 'unknown node id, id=xxx' error for stream load for (Tablet tablet : index.getTablets()) { - Multimap bePathsMap = tablet.getNormalReplicaBackendPathMap(); - if (bePathsMap.keySet().size() < loadRequiredReplicaNum) { - String errMsg = "tablet " + tablet.getId() + " alive replica num " + bePathsMap.keySet().size() - + " < load required replica num " + loadRequiredReplicaNum - + ", alive backends: [" + StringUtils.join(bePathsMap.keySet(), ",") + "]" - + ", detail: " + tablet.getDetailsStatusForQuery(visibleVersion); - if (Config.isCloudMode()) { - errMsg += ConnectContext.cloudNoBackendsReason(); - } else { + String errMsg = ""; + Multimap bePathsMap = HashMultimap.create(); + try { + bePathsMap = tablet.getNormalReplicaBackendPathMap(); + if (bePathsMap.keySet().size() < loadRequiredReplicaNum) { + errMsg = "tablet " + tablet.getId() + " alive replica num " + bePathsMap.keySet().size() + + " < load required replica num " + loadRequiredReplicaNum + + ", alive backends: [" + StringUtils.join(bePathsMap.keySet(), ",") + "]" + + ", detail: " + tablet.getDetailsStatusForQuery(visibleVersion); long now = System.currentTimeMillis(); long lastLoadFailedTime = tablet.getLastLoadFailedTime(); tablet.setLastLoadFailedTime(now); @@ -657,8 +657,12 @@ public List createLocation(long dbId, OlapTable table) Env.getCurrentEnv().getTabletScheduler().tryAddRepairTablet( tablet, dbId, table, partition, index, 0); } + throw new UserException(InternalErrorCode.REPLICA_FEW_ERR, errMsg); } - throw new UserException(InternalErrorCode.REPLICA_FEW_ERR, errMsg); + } catch (ClusterException e) { + LOG.warn("failed to get replica backend path for tablet " + tablet.getId(), e); + errMsg += e.toString(); + throw new UserException(InternalErrorCode.INTERNAL_ERR, errMsg); } debugWriteRandomChooseSink(tablet, visibleVersion, bePathsMap); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java index 7d14586bbeaf3a6..e8f947099d13c9b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java @@ -25,6 +25,7 @@ import org.apache.doris.analysis.StmtType; import org.apache.doris.analysis.ValueList; import org.apache.doris.catalog.Env; +import org.apache.doris.cloud.qe.ClusterException; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.Config; import org.apache.doris.common.util.DebugUtil; @@ -174,11 +175,15 @@ private static void logAuditLogImpl(ConnectContext ctx, String origStmt, Stateme long endTime = System.currentTimeMillis(); long elapseMs = endTime - ctx.getStartTime(); CatalogIf catalog = ctx.getCurrentCatalog(); - - String cluster = Config.isCloudMode() ? ctx.getCloudCluster(false) : ""; + String cloudCluster = ""; + try { + cloudCluster = ctx.getCloudCluster(); + } catch (ClusterException e) { + LOG.warn("Failed to get cloud cluster", e); + } + String cluster = Config.isCloudMode() ? cloudCluster : ""; AuditEventBuilder auditEventBuilder = ctx.getAuditEventBuilder(); - auditEventBuilder.reset(); auditEventBuilder .setTimestamp(ctx.getStartTime()) .setClientIp(ctx.getClientIP()) @@ -206,19 +211,25 @@ private static void logAuditLogImpl(ConnectContext ctx, String origStmt, Stateme if (ctx.getState().isQuery()) { MetricRepo.COUNTER_QUERY_ALL.increase(1L); MetricRepo.USER_COUNTER_QUERY_ALL.getOrAdd(ctx.getQualifiedUser()).increase(1L); - MetricRepo.increaseClusterQueryAll(ctx.getCloudCluster(false)); + try { + cloudCluster = ctx.getCloudCluster(false); + } catch (ClusterException e) { + LOG.warn("Failed to get cloud cluster", e); + return; + } + MetricRepo.increaseClusterQueryAll(cloudCluster); if (ctx.getState().getStateType() == MysqlStateType.ERR && ctx.getState().getErrType() != QueryState.ErrType.ANALYSIS_ERR) { // err query MetricRepo.COUNTER_QUERY_ERR.increase(1L); MetricRepo.USER_COUNTER_QUERY_ERR.getOrAdd(ctx.getQualifiedUser()).increase(1L); - MetricRepo.increaseClusterQueryErr(ctx.getCloudCluster(false)); + MetricRepo.increaseClusterQueryErr(cloudCluster); } else if (ctx.getState().getStateType() == MysqlStateType.OK || ctx.getState().getStateType() == MysqlStateType.EOF) { // ok query MetricRepo.HISTO_QUERY_LATENCY.update(elapseMs); MetricRepo.USER_HISTO_QUERY_LATENCY.getOrAdd(ctx.getQualifiedUser()).update(elapseMs); - MetricRepo.updateClusterQueryLatency(ctx.getCloudCluster(false), elapseMs); + MetricRepo.updateClusterQueryLatency(cloudCluster, elapseMs); if (elapseMs > Config.qe_slow_log_ms) { String sqlDigest = DigestUtils.md5Hex(((Queriable) parsedStmt).toDigest()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index b5f8c8d9be5ec7c..8deff0d9fd823cc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -35,6 +35,7 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; import org.apache.doris.cloud.proto.Cloud; +import org.apache.doris.cloud.qe.ClusterException; import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.Config; @@ -1121,7 +1122,7 @@ public String getQueryIdentifier() { // maybe user set cluster by SQL hint of session variable: cloud_cluster // so first check it and then get from connect context. - public String getCurrentCloudCluster() { + public String getCurrentCloudCluster() throws ClusterException { String cluster = getSessionVariable().getCloudCluster(); if (Strings.isNullOrEmpty(cluster)) { cluster = getCloudCluster(); @@ -1133,7 +1134,7 @@ public void setCloudCluster(String cluster) { this.cloudCluster = cluster; } - public String getCloudCluster() { + public String getCloudCluster() throws ClusterException { return getCloudCluster(true); } @@ -1162,7 +1163,7 @@ public String toString() { } } - public static String cloudNoBackendsReason() { + public static String cloudNoBackendsReason() throws ClusterException { StringBuilder sb = new StringBuilder(); if (ConnectContext.get() != null) { String clusterName = ConnectContext.get().getCloudCluster(); @@ -1248,10 +1249,11 @@ public CloudClusterResult getCloudClusterByPolicy() { * * @param updateErr whether set the connect state to error if the returned cluster is null or empty * @return non-empty cluster name if a cluster has been chosen otherwise null or empty string + * @throws ClusterException, outer get reason by exception */ - public String getCloudCluster(boolean updateErr) { + public String getCloudCluster(boolean updateErr) throws ClusterException { if (!Config.isCloudMode()) { - return null; + throw new ClusterException("not cloud mode", ClusterException.FailedTypeEnum.NOT_CLOUD_MODE); } String cluster = null; @@ -1282,6 +1284,8 @@ public String getCloudCluster(boolean updateErr) { getState().setError(ErrorCode.ERR_NO_CLUSTER_ERROR, "Cant get a Valid cluster for you to use, plz connect admin"); } + throw new ClusterException("the user is not granted permission to the cluster", + ClusterException.FailedTypeEnum.CURRENT_USER_NO_AUTH_TO_USE_ANY_CLUSTER); } else { this.cloudCluster = cluster; LOG.info("finally set context cluster name {} for user {} with chose way '{}'", @@ -1302,33 +1306,50 @@ public String getDefaultCloudCluster() { return null; } - public String getAuthorizedCloudCluster() { + public String getAuthorizedCloudCluster() throws ClusterException { List cloudClusterNames = ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getCloudClusterNames(); // get all available cluster of the user + boolean hasAuthCluster = false; + AtomicBoolean selectedClusterHasAliveBe = new AtomicBoolean(false); + String selectedCluster = null; for (String cloudClusterName : cloudClusterNames) { if (!Env.getCurrentEnv().getAuth().checkCloudPriv(getCurrentUserIdentity(), cloudClusterName, PrivPredicate.USAGE, ResourceTypeEnum.CLUSTER)) { continue; } + hasAuthCluster = true; // find a cluster has more than one alive be + selectedCluster = cloudClusterName; List bes = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) .getBackendsByClusterName(cloudClusterName); - AtomicBoolean hasAliveBe = new AtomicBoolean(false); + bes.stream().filter(Backend::isAlive).findAny().ifPresent(backend -> { if (LOG.isDebugEnabled()) { LOG.debug("get a clusterName {}, it's has more than one alive be {}", cloudClusterName, backend); } - hasAliveBe.set(true); + selectedClusterHasAliveBe.set(true); }); - if (hasAliveBe.get()) { + if (selectedClusterHasAliveBe.get()) { if (LOG.isDebugEnabled()) { LOG.debug("set context cluster name {}", cloudClusterName); } return cloudClusterName; } } + if (!hasAuthCluster) { + throw new ClusterException("the user is not granted permission to the cluster", + ClusterException.FailedTypeEnum.CURRENT_USER_NO_AUTH_TO_USE_ANY_CLUSTER); + } - return null; + if (!selectedClusterHasAliveBe.get()) { + throw new ClusterException( + String.format("All the Backend nodes in the current cluster %s are in an abnormal state", + selectedCluster), + ClusterException.FailedTypeEnum.CLUSTERS_NO_ALIVE_BE); + } + + throw new ClusterException("There are no clusters registered in the current system", + ClusterException.FailedTypeEnum.SYSTEM_NOT_HAVE_CLUSTER); } public StatsErrorEstimator getStatsErrorEstimator() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index 8d2a1931b5a5fda..bd10475c1875238 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -33,6 +33,7 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.cloud.catalog.CloudEnv; import org.apache.doris.cloud.proto.Cloud; +import org.apache.doris.cloud.qe.ClusterException; import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; @@ -237,6 +238,8 @@ protected void handleQuery(MysqlCommand mysqlCommand, String originStmt) throws executeQuery(mysqlCommand, originStmt); } catch (ConnectionException exception) { throw exception; + } catch (UserException exception) { + LOG.warn("execute query exception", exception); } catch (Exception ignored) { // saved use handleQueryException } @@ -245,7 +248,13 @@ protected void handleQuery(MysqlCommand mysqlCommand, String originStmt) throws public void executeQuery(MysqlCommand mysqlCommand, String originStmt) throws Exception { if (MetricRepo.isInit) { MetricRepo.COUNTER_REQUEST_ALL.increase(1L); - MetricRepo.increaseClusterRequestAll(ctx.getCloudCluster(false)); + if (Config.isCloudMode()) { + try { + MetricRepo.increaseClusterRequestAll(ctx.getCloudCluster(false)); + } catch (ClusterException e) { + LOG.warn("metrics get cluster exception", e); + } + } } String convertedStmt = convertOriginStmt(originStmt); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java index 0fa3435f18ecf5e..31e1273f253fdd4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java @@ -19,6 +19,7 @@ import org.apache.doris.analysis.LiteralExpr; import org.apache.doris.catalog.Env; +import org.apache.doris.cloud.qe.ClusterException; import org.apache.doris.common.ClientPool; import org.apache.doris.common.ErrorCode; import org.apache.doris.thrift.FrontendService; @@ -151,7 +152,12 @@ private TMasterOpRequest buildStmtForwardParams() { params.setStmtId(ctx.getStmtId()); params.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift()); - String cluster = ctx.getCloudCluster(false); + String cluster = ""; + try { + ctx.getCloudCluster(false); + } catch (ClusterException e) { + LOG.warn("failed to get cloud cluster", e); + } if (!Strings.isNullOrEmpty(cluster)) { params.setCloudCluster(cluster); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java index eaab01df55635af..dcf3e6deae261e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java @@ -211,7 +211,13 @@ private TMasterOpRequest buildStmtForwardParams() throws AnalysisException { params.setStmtId(ctx.getStmtId()); params.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift()); - String cluster = ctx.getCloudCluster(false); + String cluster = ""; + try { + cluster = ctx.getCloudCluster(false); + } catch (Exception e) { + LOG.warn("failed to get cloud cluster", e); + throw new AnalysisException("failed to get cloud cluster", e); + } if (!Strings.isNullOrEmpty(cluster)) { params.setCloudCluster(cluster); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index a3d1ca313aef94c..c9bc19735b73313 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -157,6 +157,7 @@ import org.apache.doris.cloud.datasource.CloudInternalCatalog; import org.apache.doris.cloud.load.CloudLoadManager; import org.apache.doris.cloud.proto.Cloud; +import org.apache.doris.cloud.qe.ClusterException; import org.apache.doris.cloud.rpc.MetaServiceProxy; import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.cluster.ClusterNamespace; @@ -807,7 +808,13 @@ private void handleShowCluster() throws AnalysisException { PrivPredicate.USAGE, ResourceTypeEnum.CLUSTER)) { continue; } - row.add(clusterName.equals(ctx.getCloudCluster()) ? "TRUE" : "FALSE"); + String clusterNameFromCtx = ""; + try { + clusterNameFromCtx = ctx.getCloudCluster(); + } catch (ClusterException e) { + LOG.warn("failed to get cluster name", e); + } + row.add(clusterName.equals(clusterNameFromCtx) ? "TRUE" : "FALSE"); List users = Env.getCurrentEnv().getAuth().getCloudClusterUsers(clusterName); // non-root do not display root information if (!Auth.ROOT_USER.equals(ctx.getQualifiedUser())) { @@ -2022,7 +2029,7 @@ private void handleShowTablet() throws AnalysisException { List replicas = tablet.getReplicas(); for (Replica replica : replicas) { - Replica tmp = invertedIndex.getReplica(tabletId, replica.getBackendId()); + Replica tmp = invertedIndex.getReplica(tabletId, replica.getBackendIdWithoutException()); if (tmp == null) { isSync = false; break; @@ -3079,7 +3086,7 @@ private void handleCopyTablet() throws AnalysisException { if (replica == null) { throw new AnalysisException("Replica not found on backend: " + backendId); } - backendId = replica.getBackendId(); + backendId = replica.getBackendIdWithoutException(); Backend be = Env.getCurrentSystemInfo().getBackend(backendId); if (be == null || !be.isAlive()) { throw new AnalysisException("Unavailable backend: " + backendId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index e0ae5763abbc431..710b06942befa5f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -98,6 +98,7 @@ import org.apache.doris.cloud.analysis.UseCloudClusterStmt; import org.apache.doris.cloud.catalog.CloudEnv; import org.apache.doris.cloud.proto.Cloud.ClusterStatus; +import org.apache.doris.cloud.qe.ClusterException; import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.AuditLog; @@ -1210,11 +1211,18 @@ public void updateProfile(boolean isFinished) { } private boolean hasCloudClusterPriv() { - if (ConnectContext.get() == null || Strings.isNullOrEmpty(ConnectContext.get().getCloudCluster())) { + String clusterName = ""; + try { + clusterName = ConnectContext.get().getCloudCluster(); + } catch (ClusterException e) { + LOG.warn("failed to get cloud cluster", e); + return false; + } + if (ConnectContext.get() == null || Strings.isNullOrEmpty(clusterName)) { return false; } return Env.getCurrentEnv().getAuth().checkCloudPriv(ConnectContext.get().getCurrentUserIdentity(), - ConnectContext.get().getCloudCluster(), PrivPredicate.USAGE, ResourceTypeEnum.CLUSTER); + clusterName, PrivPredicate.USAGE, ResourceTypeEnum.CLUSTER); } // Analyze one statement to structure in memory. @@ -1273,7 +1281,7 @@ public void analyze(TQueryOptions tQueryOptions) throws UserException, Interrupt prepareStmt.analyze(analyzer); // Need analyze inner statement parsedStmt = ((PrepareStmt) prepareStmt).getInnerStmt(); - if (((PrepareStmt) prepareStmt).getPreparedType() == PrepareStmt.PreparedType.STATEMENT) { + if (((PrepareStmt) prepareStmt).getPreparedType() == PreparedType.STATEMENT) { // Skip analyze, do it lazy return; } @@ -1391,7 +1399,7 @@ && hasCloudClusterPriv()) { } } if (preparedStmtReanalyzed - && ((PrepareStmt) preparedStmtCtx.stmt).getPreparedType() == PrepareStmt.PreparedType.FULL_PREPARED) { + && ((PrepareStmt) preparedStmtCtx.stmt).getPreparedType() == PreparedType.FULL_PREPARED) { ((PrepareStmt) prepareStmt).asignValues(execStmt.getArgs()); if (LOG.isDebugEnabled()) { LOG.debug("update planner and analyzer after prepared statement reanalyzed"); @@ -1925,14 +1933,14 @@ public void executeAndSendResult(boolean isOutfileQuery, boolean isSendFields, (NereidsPlanner) planner); profile.addExecutionProfile(coord.getExecutionProfile()); QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), - new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord)); + new QueryInfo(context, originStmt.originStmt, coord)); coordBase = coord; } else { coord = EnvFactory.getInstance().createCoordinator( context, analyzer, planner, context.getStatsErrorEstimator()); profile.addExecutionProfile(coord.getExecutionProfile()); QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), - new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord)); + new QueryInfo(context, originStmt.originStmt, coord)); coordBase = coord; } @@ -2087,7 +2095,7 @@ private void outfileWriteSuccess(OutFileClause outFileClause) throws Exception { .setResultFileSink(ByteString.copyFrom(new TSerializer().serialize(sink))).build(); Future future = BackendServiceProxy.getInstance() .outfileWriteSuccessAsync(address, request); - InternalService.POutfileWriteSuccessResult result = future.get(); + POutfileWriteSuccessResult result = future.get(); TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); String errMsg; if (code != TStatusCode.OK) { @@ -3419,7 +3427,7 @@ public List executeInternalQuery() { profile.addExecutionProfile(coord.getExecutionProfile()); try { QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), - new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord)); + new QueryInfo(context, originStmt.originStmt, coord)); } catch (UserException e) { throw new RuntimeException("Failed to execute internal SQL. " + Util.getRootCauseMessage(e), e); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 88925770640365f..138615634c37e18 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -2668,7 +2668,7 @@ public TGetTabletReplicaInfosResult getTabletReplicaInfos(TGetTabletReplicaInfos LOG.warn("replica {} not normal", replica.getId()); continue; } - Backend backend = Env.getCurrentEnv().getCurrentSystemInfo().getBackend(replica.getBackendId()); + Backend backend = Env.getCurrentSystemInfo().getBackend(replica.getBackendIdWithoutException()); if (backend != null) { TReplicaInfo replicaInfo = new TReplicaInfo(); replicaInfo.setHost(backend.getHost()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index 0f5c81b1cf06929..f2e1ecc0ff994e5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -47,6 +47,7 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; import org.apache.doris.catalog.VariantType; +import org.apache.doris.cloud.qe.ClusterException; import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; @@ -150,7 +151,12 @@ public static List execStatisticQuery(String sql, boolean enableFileC boolean useFileCacheForStat = (enableFileCache && Config.allow_analyze_statistics_info_polluting_file_cache); try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(false, useFileCacheForStat)) { if (Config.isCloudMode()) { - r.connectContext.getCloudCluster(); + try { + r.connectContext.getCloudCluster(); + } catch (ClusterException e) { + LOG.warn("failed to connect to cloud cluster", e); + return Collections.emptyList(); + } } StmtExecutor stmtExecutor = new StmtExecutor(r.connectContext, sql); r.connectContext.setExecutor(stmtExecutor); @@ -232,7 +238,12 @@ public static AutoCloseConnectContext buildConnectContext(boolean limitScan, boo connectContext.setStartTime(); if (Config.isCloudMode()) { AutoCloseConnectContext ctx = new AutoCloseConnectContext(connectContext); - ctx.connectContext.getCloudCluster(); + try { + ctx.connectContext.getCloudCluster(); + } catch (ClusterException e) { + LOG.warn("failed to connect to cloud cluster", e); + return ctx; + } sessionVariable.disableFileCache = !useFileCacheForStat; return ctx; } else { @@ -487,7 +498,12 @@ public static boolean statsTblAvailable() { return false; } try (AutoCloseConnectContext r = buildConnectContext()) { - r.connectContext.getCloudCluster(); + try { + r.connectContext.getCloudCluster(); + } catch (ClusterException e) { + LOG.warn("failed to connect to cloud cluster", e); + return false; + } for (OlapTable table : statsTbls) { for (Partition partition : table.getPartitions()) { if (partition.getBaseIndex().getTablets().stream() diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Diagnoser.java b/fe/fe-core/src/main/java/org/apache/doris/system/Diagnoser.java index c2a091d11c680f5..46d0519a4560bd8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Diagnoser.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Diagnoser.java @@ -92,7 +92,7 @@ public static List> diagnoseTablet(long tabletId) { List replicas = tablet.getReplicas(); JSONObject jobj = new JSONObject(); for (Replica replica : replicas) { - jobj.put(replica.getId(), replica.getBackendId()); + jobj.put(replica.getId(), replica.getBackendIdWithoutException()); } results.add(Lists.newArrayList("Replicas(ReplicaId -> BackendId)", jobj.toJSONString(), "")); // replica @@ -113,49 +113,54 @@ public static List> diagnoseTablet(long tabletId) { for (Replica replica : replicas) { // backend do { - Backend be = infoService.getBackend(replica.getBackendId()); + Backend be = infoService.getBackend(replica.getBackendIdWithoutException()); if (be == null) { - backendErr.append("Backend " + replica.getBackendId() + " does not exist. "); + backendErr.append("Backend " + + replica.getBackendIdWithoutException() + " does not exist. "); break; } if (!be.isAlive()) { - backendErr.append("Backend " + replica.getBackendId() + " is not alive. "); + backendErr.append("Backend " + replica.getBackendIdWithoutException() + " is not alive. "); break; } if (be.isDecommissioned()) { - backendErr.append("Backend " + replica.getBackendId() + " is decommission. "); + backendErr.append("Backend " + replica.getBackendIdWithoutException() + " is decommission. "); break; } if (!be.isLoadAvailable()) { - backendErr.append("Backend " + replica.getBackendId() + " is not load available. "); + backendErr.append("Backend " + replica.getBackendIdWithoutException() + " is not load available. "); break; } if (!be.isQueryAvailable()) { - backendErr.append("Backend " + replica.getBackendId() + " is not query available. "); + backendErr.append("Backend " + + replica.getBackendIdWithoutException() + " is not query available. "); break; } if (be.diskExceedLimit()) { - backendErr.append("Backend " + replica.getBackendId() + " has no space left. "); + backendErr.append("Backend " + replica.getBackendIdWithoutException() + " has no space left. "); break; } } while (false); // version if (replica.getVersion() != visibleVersion) { - versionErr.append("Replica on backend " + replica.getBackendId() + "'s version (" + versionErr.append("Replica on backend " + replica.getBackendIdWithoutException() + "'s version (" + replica.getVersion() + ") does not equal" + " to partition visible version (" + visibleVersion + ")"); } else if (replica.getLastFailedVersion() != -1) { - versionErr.append("Replica on backend " + replica.getBackendId() + "'s last failed version is " + versionErr.append("Replica on backend " + + replica.getBackendIdWithoutException() + "'s last failed version is " + replica.getLastFailedVersion()); } // status if (!replica.isAlive() || replica.isUserDrop()) { - statusErr.append("Replica on backend " + replica.getBackendId() + "'s state is " + replica.getState() + statusErr.append("Replica on backend " + replica.getBackendIdWithoutException() + + "'s state is " + replica.getState() + ", and is bad: " + (replica.isBad() ? "Yes" : "No") + ", and is going to drop: " + (replica.isUserDrop() ? "Yes" : "No")); } if (replica.tooBigVersionCount()) { - compactionErr.append("Replica on backend " + replica.getBackendId() + "'s version count is too high: " + compactionErr.append("Replica on backend " + replica.getBackendIdWithoutException() + + "'s version count is too high: " + replica.getVisibleVersionCount()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 299a74315558374..de5639a3d4c4c15 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -1383,7 +1383,8 @@ private PublishResult finishCheckQuorumReplicas(TransactionState transactionStat tabletWriteFailedReplicas.clear(); tabletVersionFailedReplicas.clear(); for (Replica replica : tablet.getReplicas()) { - List publishVersionTasks = publishTasks.get(replica.getBackendId()); + List publishVersionTasks + = publishTasks.get(replica.getBackendIdWithoutException()); Preconditions.checkState(publishVersionTasks == null || publishVersionTasks.size() == 1, "publish tasks: " + publishVersionTasks); PublishVersionTask publishVersionTask = null; @@ -2260,10 +2261,10 @@ private boolean updateCatalogAfterVisible(TransactionState transactionState, Dat replica.getId(), newVersion, lastFailedVersion, lastSuccessVersion); } replica.updateVersionWithFailed(newVersion, lastFailedVersion, lastSuccessVersion); - Set partitionIds = backendPartitions.get(replica.getBackendId()); + Set partitionIds = backendPartitions.get(replica.getBackendIdWithoutException()); if (partitionIds == null) { partitionIds = Sets.newHashSet(); - backendPartitions.put(replica.getBackendId(), partitionIds); + backendPartitions.put(replica.getBackendIdWithoutException(), partitionIds); } partitionIds.add(partitionId); } @@ -2704,7 +2705,8 @@ private PublishResult finishCheckQuorumReplicas(TransactionState transactionStat List tabletWriteFailedReplicas = Lists.newArrayList(); List tabletVersionFailedReplicas = Lists.newArrayList(); for (Replica replica : tablet.getReplicas()) { - List publishVersionTasks = publishTasks.get(replica.getBackendId()); + List publishVersionTasks + = publishTasks.get(replica.getBackendIdWithoutException()); List replicaTasks = new ArrayList<>(); for (Long subTransactionId : subTxnIds) { PublishVersionTask publishVersionTask = null; diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java index 93b7d95dbfc6455..590b6563e1106cd 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java @@ -155,7 +155,7 @@ public static boolean compareCatalog(Env masterEnv, Env slaveEnv) { List allReplicas = masterTablet.getReplicas(); for (Replica masterReplica : allReplicas) { Replica slaveReplica = slaveTablet.getReplicaById(masterReplica.getId()); - if (slaveReplica.getBackendId() != masterReplica.getBackendId() + if (slaveReplica.getBackendIdWithoutException() != masterReplica.getBackendIdWithoutException() || slaveReplica.getVersion() != masterReplica.getVersion() || slaveReplica.getLastFailedVersion() != masterReplica.getLastFailedVersion() || slaveReplica.getLastSuccessVersion() != masterReplica.getLastSuccessVersion()) { @@ -409,7 +409,7 @@ public static long getReplicaPathHash(long tabletId, long backendId) { } Tablet tablet = materializedIndex.getTablet(tabletId); for (Replica replica : tablet.getReplicas()) { - if (replica.getBackendId() == backendId) { + if (replica.getBackendIdWithoutException() == backendId) { return replica.getPathHash(); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaTest.java index 63f61cc8747b870..101ec619e765a6d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaTest.java @@ -60,7 +60,7 @@ public void setUp() { @Test public void getMethodTest() { Assert.assertEquals(replicaId, replica.getId()); - Assert.assertEquals(backendId, replica.getBackendId()); + Assert.assertEquals(backendId, replica.getBackendIdWithoutException()); Assert.assertEquals(version, replica.getVersion()); Assert.assertEquals(dataSize, replica.getDataSize()); Assert.assertEquals(rowCount, replica.getRowCount()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java index 189406fa80f0afa..413851800e394a4 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java @@ -98,9 +98,9 @@ public void getMethodTest() { Assert.assertEquals(replica3, tablet.getReplicaById(replica3.getId())); Assert.assertEquals(3, tablet.getReplicas().size()); - Assert.assertEquals(replica1, tablet.getReplicaByBackendId(replica1.getBackendId())); - Assert.assertEquals(replica2, tablet.getReplicaByBackendId(replica2.getBackendId())); - Assert.assertEquals(replica3, tablet.getReplicaByBackendId(replica3.getBackendId())); + Assert.assertEquals(replica1, tablet.getReplicaByBackendId(replica1.getBackendIdWithoutException())); + Assert.assertEquals(replica2, tablet.getReplicaByBackendId(replica2.getBackendIdWithoutException())); + Assert.assertEquals(replica3, tablet.getReplicaByBackendId(replica3.getBackendIdWithoutException())); long newTabletId = 20000; @@ -111,11 +111,11 @@ public void getMethodTest() { @Test public void deleteReplicaTest() { // delete replica1 - Assert.assertTrue(tablet.deleteReplicaByBackendId(replica1.getBackendId())); + Assert.assertTrue(tablet.deleteReplicaByBackendId(replica1.getBackendIdWithoutException())); Assert.assertNull(tablet.getReplicaById(replica1.getId())); // err: re-delete replica1 - Assert.assertFalse(tablet.deleteReplicaByBackendId(replica1.getBackendId())); + Assert.assertFalse(tablet.deleteReplicaByBackendId(replica1.getBackendIdWithoutException())); Assert.assertFalse(tablet.deleteReplica(replica1)); Assert.assertNull(tablet.getReplicaById(replica1.getId())); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java index 860a36f8f637c55..f071224b8ebf7c9 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java @@ -109,7 +109,7 @@ public void testDiskReblanceWhenSchedulerIdle() throws Exception { tablets.forEach(tablet -> { Lists.newArrayList(tablet.getReplicas()).forEach( replica -> { - if (replica.getBackendId() == backends.get(1).getId()) { + if (replica.getBackendIdWithoutException() == backends.get(1).getId()) { replica.setDataSize(totalCapacity / 4); replica.setRowCount(1); tablet.deleteReplica(replica); diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/RoundRobinCreateTabletTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/RoundRobinCreateTabletTest.java index d9ea37f3eac4494..b76c49081d89d1a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/RoundRobinCreateTabletTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/RoundRobinCreateTabletTest.java @@ -121,7 +121,7 @@ public void testCreateTablets() { int beNum = 4; for (Tablet tablet : index.getTablets()) { for (Replica replica : tablet.getReplicas()) { - Assert.assertEquals((i++ % beNum) + 1, replica.getBackendId()); + Assert.assertEquals((i++ % beNum) + 1, replica.getBackendIdWithoutException()); } } } diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index ddec61e6d4340d1..fb4c21e42841931 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -262,6 +262,12 @@ class Suite implements GroovyInterceptable { return context.connect(user, password, url, actionSupplier) } + public T connectInDocker(String user = context.config.jdbcUser, String password = context.config.jdbcPassword, + Closure actionSupplier) { + def connInfo = context.threadLocalConn.get() + return context.connect(user, password, connInfo.conn.getMetaData().getURL(), actionSupplier) + } + public void dockerAwaitUntil(int atMostSeconds, int intervalSecond = 1, Closure actionSupplier) { def connInfo = context.threadLocalConn.get() Awaitility.await().atMost(atMostSeconds, SECONDS).pollInterval(intervalSecond, SECONDS).until( diff --git a/regression-test/suites/cloud_p0/multi_cluster/test_no_cluster_hits.groovy b/regression-test/suites/cloud_p0/multi_cluster/test_no_cluster_hits.groovy new file mode 100644 index 000000000000000..aea4cb0fdf0db5e --- /dev/null +++ b/regression-test/suites/cloud_p0/multi_cluster/test_no_cluster_hits.groovy @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import groovy.json.JsonSlurper +import org.awaitility.Awaitility; +import static java.util.concurrent.TimeUnit.SECONDS; + +suite('test_no_cluster_hits', 'multi_cluster') { + if (!isCloudMode()) { + return; + } + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'sys_log_verbose_modules=org', + ] + options.setFeNum(3) + options.setBeNum(3) + options.cloudMode = true + // options.connectToFollower = true + options.enableDebugPoints() + + def user = "test_no_cluster_hits_user" + def table = "test_no_cluster_table" + + docker(options) { + sql """create user $user""" + sql """CREATE TABLE $table ( + `k1` int(11) NULL, + `k2` int(11) NULL + ) + DUPLICATE KEY(`k1`, `k2`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ( + "replication_num"="1" + ); + """ + // no cluster auth + sql """GRANT SELECT_PRIV ON *.*.* TO ${user}""" + try { + connectInDocker(user = user, password = '') { + // errCode = 2, detailMessage = the user is not granted permission to the cluster, ClusterException: CURRENT_USER_NO_AUTH_TO_USE_ANY_CLUSTER + sql """select * from information_schema.columns""" + } + } catch (Exception e) { + logger.info("exception: {}", e.getMessage()) + assertTrue(e.getMessage().contains("ClusterException: CURRENT_USER_NO_AUTH_TO_USE_ANY_CLUSTER")) + assertTrue(e.getMessage().contains("the user is not granted permission to the cluster")) + } + def result = sql_return_maparray """show clusters""" + logger.info("show cluster1 : {}", result) + def currentCluster = result.stream().filter(cluster -> cluster.is_current == "TRUE").findFirst().orElse(null) + sql """GRANT USAGE_PRIV ON CLUSTER ${currentCluster.cluster} TO $user""" + connectInDocker(user = user, password = '') { + try { + sql """select * from information_schema.columns""" + } catch (Exception e) { + logger.info("exception: {}", e.getMessage()) + assertFalse(false, "impossible go here, somewhere has error") + } + } + + sql """ + insert into $table values (1, 1) + """ + + // cluster all be abnormal + cluster.stopBackends(1, 2, 3) + try { + // errCode = 2, detailMessage = All the Backend nodes in the current cluster compute_cluster are in an abnormal state, ClusterException: CLUSTERS_NO_ALIVE_BE + sql """ + insert into $table values (2, 2) + """ + } catch (Exception e) { + logger.info("exception: {}", e.getMessage()) + assertTrue(e.getMessage().contains("ClusterException: CLUSTERS_NO_ALIVE_BE")) + assertTrue(e.getMessage().contains("are in an abnormal state")) + } + + try { + // errCode = 2, detailMessage = tablet 10901 err: All the Backend nodes in the current cluster compute_cluster are in an abnormal state, ClusterException: CLUSTERS_NO_ALIVE_BE + sql """ + select * from $table + """ + } catch (Exception e) { + logger.info("exception: {}", e.getMessage()) + assertTrue(e.getMessage().contains("ClusterException: CLUSTERS_NO_ALIVE_BE")) + assertTrue(e.getMessage().contains("are in an abnormal state")) + } + + cluster.startBackends(1, 2, 3) + result = sql """insert into $table values (3, 3)""" + result = sql """select * from $table""" + log.info("result = {}", result) + assertEquals(2, result.size()) + + // no cluster + def tag = getCloudBeTagByName(currentCluster.cluster) + logger.info("cluster1 = {}, tag = {}", currentCluster, tag) + + def jsonSlurper = new JsonSlurper() + def jsonObject = jsonSlurper.parseText(tag) + def cloudClusterId = jsonObject.cloud_cluster_id + + def ms = cluster.getAllMetaservices().get(0) + logger.info("ms addr={}, port={}", ms.host, ms.httpPort) + drop_cluster(currentCluster.cluster, cloudClusterId, ms) + + dockerAwaitUntil(5) { + result = sql_return_maparray """show clusters""" + logger.info("show cluster2 : {}", result) + result.size() == 0 + } + + try { + // errCode = 2, detailMessage = tablet 10901 err: The current cluster compute_cluster is not registered in the system, ClusterException: CURRENT_CLUSTER_NOT_EXIST + sql """ + select * from $table + """ + } catch (Exception e) { + logger.info("exception: {}", e.getMessage()) + assertTrue(e.getMessage().contains("ClusterException: CURRENT_CLUSTER_NOT_EXIST")) + assertTrue(e.getMessage().contains("The current cluster compute_cluster is not registered in the system")) + } + } +}