Skip to content

Commit

Permalink
[fix](alter table property) fix alter property if rpc failed, branch …
Browse files Browse the repository at this point in the history
…2.0 #22845 (#23116)
  • Loading branch information
csun5285 authored Aug 17, 2023
1 parent 550add7 commit 88abaae
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 109 deletions.
22 changes: 4 additions & 18 deletions fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,27 +211,13 @@ private boolean processAlterOlapTable(AlterTableStmt stmt, OlapTable olapTable,
} else if (currentAlterOps.checkIsBeingSynced(alterClauses)) {
olapTable.setIsBeingSynced(currentAlterOps.isBeingSynced(alterClauses));
needProcessOutsideTableLock = true;
} else if (currentAlterOps.checkCompactionPolicy(alterClauses)
&& currentAlterOps.getCompactionPolicy(alterClauses) != olapTable.getCompactionPolicy()) {
olapTable.setCompactionPolicy(currentAlterOps.getCompactionPolicy(alterClauses));
} else if (currentAlterOps.checkCompactionPolicy(alterClauses)) {
needProcessOutsideTableLock = true;
} else if (currentAlterOps.checkTimeSeriesCompactionGoalSizeMbytes(alterClauses)
&& currentAlterOps.getTimeSeriesCompactionGoalSizeMbytes(alterClauses)
!= olapTable.getTimeSeriesCompactionGoalSizeMbytes()) {
olapTable.setTimeSeriesCompactionGoalSizeMbytes(currentAlterOps
.getTimeSeriesCompactionGoalSizeMbytes(alterClauses));
} else if (currentAlterOps.checkTimeSeriesCompactionGoalSizeMbytes(alterClauses)) {
needProcessOutsideTableLock = true;
} else if (currentAlterOps.checkTimeSeriesCompactionFileCountThreshold(alterClauses)
&& currentAlterOps.getTimeSeriesCompactionFileCountThreshold(alterClauses)
!= olapTable.getTimeSeriesCompactionFileCountThreshold()) {
olapTable.setTimeSeriesCompactionFileCountThreshold(currentAlterOps
.getTimeSeriesCompactionFileCountThreshold(alterClauses));
} else if (currentAlterOps.checkTimeSeriesCompactionFileCountThreshold(alterClauses)) {
needProcessOutsideTableLock = true;
} else if (currentAlterOps.checkTimeSeriesCompactionTimeThresholdSeconds(alterClauses)
&& currentAlterOps.getTimeSeriesCompactionTimeThresholdSeconds(alterClauses)
!= olapTable.getTimeSeriesCompactionTimeThresholdSeconds()) {
olapTable.setTimeSeriesCompactionTimeThresholdSeconds(currentAlterOps
.getTimeSeriesCompactionTimeThresholdSeconds(alterClauses));
} else if (currentAlterOps.checkTimeSeriesCompactionTimeThresholdSeconds(alterClauses)) {
needProcessOutsideTableLock = true;
} else if (currentAlterOps.checkBinlogConfigChange(alterClauses)) {
if (!Config.enable_feature_binlog) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2218,7 +2218,8 @@ public void updatePartitionsProperties(Database db, String tableName, List<Strin

for (String partitionName : partitionNames) {
try {
updatePartitionProperties(db, olapTable.getName(), partitionName, storagePolicyId, isInMemory, null);
updatePartitionProperties(db, olapTable.getName(), partitionName, storagePolicyId,
isInMemory, null, null, null);
} catch (Exception e) {
String errMsg = "Failed to update partition[" + partitionName + "]'s 'in_memory' property. "
+ "The reason is [" + e.getMessage() + "]";
Expand All @@ -2232,7 +2233,8 @@ public void updatePartitionsProperties(Database db, String tableName, List<Strin
* This operation may return partial successfully, with an exception to inform user to retry
*/
public void updatePartitionProperties(Database db, String tableName, String partitionName, long storagePolicyId,
int isInMemory, BinlogConfig binlogConfig) throws UserException {
int isInMemory, BinlogConfig binlogConfig, String compactionPolicy,
Map<String, Long> timeSeriesCompactionConfig) throws UserException {
// be id -> <tablet id,schemaHash>
Map<Long, Set<Pair<Long, Integer>>> beIdToTabletIdWithHash = Maps.newHashMap();
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableName, Table.TableType.OLAP);
Expand Down Expand Up @@ -2264,7 +2266,7 @@ public void updatePartitionProperties(Database db, String tableName, String part
for (Map.Entry<Long, Set<Pair<Long, Integer>>> kv : beIdToTabletIdWithHash.entrySet()) {
countDownLatch.addMark(kv.getKey(), kv.getValue());
UpdateTabletMetaInfoTask task = new UpdateTabletMetaInfoTask(kv.getKey(), kv.getValue(), isInMemory,
storagePolicyId, binlogConfig, countDownLatch);
storagePolicyId, binlogConfig, countDownLatch, compactionPolicy, timeSeriesCompactionConfig);
batchTask.addTask(task);
}
if (!FeConstants.runningUnitTest) {
Expand Down Expand Up @@ -2307,86 +2309,6 @@ public void updatePartitionProperties(Database db, String tableName, String part
}
}

/**
* Update one specified partition's properties by partition name of table
* This operation may return partial successfully, with an exception to inform user to retry
*/
public void updatePartitionProperties(Database db, String tableName, String partitionName, long storagePolicyId,
int isInMemory, BinlogConfig binlogConfig, String compactionPolicy,
Map<String, Long> timeSeriesCompactionConfig) throws UserException {
// be id -> <tablet id,schemaHash>
Map<Long, Set<Pair<Long, Integer>>> beIdToTabletIdWithHash = Maps.newHashMap();
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableName, Table.TableType.OLAP);
olapTable.readLock();
try {
Partition partition = olapTable.getPartition(partitionName);
if (partition == null) {
throw new DdlException(
"Partition[" + partitionName + "] does not exist in table[" + olapTable.getName() + "]");
}

for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
int schemaHash = olapTable.getSchemaHashByIndexId(index.getId());
for (Tablet tablet : index.getTablets()) {
for (Replica replica : tablet.getReplicas()) {
Set<Pair<Long, Integer>> tabletIdWithHash = beIdToTabletIdWithHash.computeIfAbsent(
replica.getBackendId(), k -> Sets.newHashSet());
tabletIdWithHash.add(Pair.of(tablet.getId(), schemaHash));
}
}
}
} finally {
olapTable.readUnlock();
}

int totalTaskNum = beIdToTabletIdWithHash.keySet().size();
MarkedCountDownLatch<Long, Set<Pair<Long, Integer>>> countDownLatch = new MarkedCountDownLatch<>(totalTaskNum);
AgentBatchTask batchTask = new AgentBatchTask();
for (Map.Entry<Long, Set<Pair<Long, Integer>>> kv : beIdToTabletIdWithHash.entrySet()) {
countDownLatch.addMark(kv.getKey(), kv.getValue());
UpdateTabletMetaInfoTask task = new UpdateTabletMetaInfoTask(kv.getKey(), kv.getValue(), isInMemory,
storagePolicyId, binlogConfig, countDownLatch, compactionPolicy, timeSeriesCompactionConfig);
batchTask.addTask(task);
}
if (!FeConstants.runningUnitTest) {
// send all tasks and wait them finished
AgentTaskQueue.addBatchTask(batchTask);
AgentTaskExecutor.submit(batchTask);
LOG.info("send update tablet meta task for table {}, partitions {}, number: {}", tableName, partitionName,
batchTask.getTaskNum());

// estimate timeout
long timeout = DbUtil.getCreateReplicasTimeoutMs(totalTaskNum);
boolean ok = false;
try {
ok = countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.warn("InterruptedException: ", e);
}

if (!ok || !countDownLatch.getStatus().ok()) {
String errMsg = "Failed to update partition[" + partitionName + "]. tablet meta.";
// clear tasks
AgentTaskQueue.removeBatchTask(batchTask, TTaskType.UPDATE_TABLET_META_INFO);

if (!countDownLatch.getStatus().ok()) {
errMsg += " Error: " + countDownLatch.getStatus().getErrorMsg();
} else {
List<Map.Entry<Long, Set<Pair<Long, Integer>>>> unfinishedMarks = countDownLatch.getLeftMarks();
// only show at most 3 results
List<Map.Entry<Long, Set<Pair<Long, Integer>>>> subList = unfinishedMarks.subList(0,
Math.min(unfinishedMarks.size(), 3));
if (!subList.isEmpty()) {
errMsg += " Unfinished mark: " + Joiner.on(", ").join(subList);
}
}
errMsg += ". This operation maybe partial successfully, You should retry until success.";
LOG.warn(errMsg);
throw new DdlException(errMsg);
}
}
}

@Override
public void cancel(CancelStmt stmt) throws DdlException {
CancelAlterTableStmt cancelAlterTableStmt = (CancelAlterTableStmt) stmt;
Expand Down Expand Up @@ -2988,7 +2910,8 @@ public boolean updateBinlogConfig(Database db, OlapTable olapTable, List<AlterCl


for (Partition partition : partitions) {
updatePartitionProperties(db, olapTable.getName(), partition.getName(), -1, -1, newBinlogConfig);
updatePartitionProperties(db, olapTable.getName(), partition.getName(), -1, -1,
newBinlogConfig, null, null);
}

olapTable.writeLockOrDdlException();
Expand Down
6 changes: 5 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -4504,7 +4504,11 @@ public void modifyTableProperties(Database db, OlapTable table, Map<String, Stri
}
tableProperty.buildInMemory()
.buildStoragePolicy()
.buildIsBeingSynced();
.buildIsBeingSynced()
.buildCompactionPolicy()
.buildTimeSeriesCompactionGoalSizeMbytes()
.buildTimeSeriesCompactionFileCountThreshold()
.buildTimeSeriesCompactionTimeThresholdSeconds();

// need to update partition info meta
for (Partition partition : table.getPartitions()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,7 @@ public UpdateTabletMetaInfoTask(long backendId,
MarkedCountDownLatch<Long, Set<Pair<Long, Integer>>> latch,
String compactionPolicy,
Map<String, Long> timeSeriesCompactionConfig) {
this(backendId, tableIdWithSchemaHash);
this.storagePolicyId = storagePolicyId;
this.inMemory = inMemory;
this.binlogConfig = binlogConfig;
this.latch = latch;
this(backendId, tableIdWithSchemaHash, inMemory, storagePolicyId, binlogConfig, latch);
this.compactionPolicy = compactionPolicy;
this.timeSeriesCompactionConfig = timeSeriesCompactionConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ suite("test_table_level_compaction_policy") {
logger.info("${showResult3}")
assertTrue(showResult3.toString().containsIgnoreCase('"time_series_compaction_file_count_threshold" = "6000"'))

sql """
sql """
alter table ${tableName} set ("time_series_compaction_time_threshold_seconds" = "3000")
"""
sql """sync"""
Expand All @@ -74,6 +74,33 @@ suite("test_table_level_compaction_policy") {
logger.info("${showResult4}")
assertTrue(showResult4.toString().containsIgnoreCase('"time_series_compaction_time_threshold_seconds" = "3000"'))

sql """
alter table ${tableName} set ("time_series_compaction_goal_size_mbytes" = "1024")
"""
sql """sync"""

def showResult6 = sql """show create table ${tableName}"""
logger.info("${showResult6}")
assertTrue(showResult6.toString().containsIgnoreCase('"time_series_compaction_goal_size_mbytes" = "1024"'))

sql """
alter table ${tableName} set ("time_series_compaction_file_count_threshold" = "6000")
"""
sql """sync"""

def showResult7 = sql """show create table ${tableName}"""
logger.info("${showResult7}")
assertTrue(showResult7.toString().containsIgnoreCase('"time_series_compaction_file_count_threshold" = "6000"'))

sql """
alter table ${tableName} set ("time_series_compaction_time_threshold_seconds" = "3000")
"""
sql """sync"""

def showResult8 = sql """show create table ${tableName}"""
logger.info("${showResult8}")
assertTrue(showResult8.toString().containsIgnoreCase('"time_series_compaction_time_threshold_seconds" = "3000"'))

sql """ DROP TABLE IF EXISTS ${tableName} """
sql """sync"""

Expand Down

0 comments on commit 88abaae

Please sign in to comment.