Skip to content

Commit

Permalink
batch report editlog
Browse files Browse the repository at this point in the history
  • Loading branch information
yujun777 committed Oct 10, 2024
1 parent 1e8ea55 commit 3846125
Show file tree
Hide file tree
Showing 12 changed files with 774 additions and 615 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.BatchModifyReplicasInfo;
import org.apache.doris.persist.RemoveAlterJobV2OperationLog;
import org.apache.doris.persist.ReplicaPersistInfo;
import org.apache.doris.task.AlterReplicaTask;
Expand Down Expand Up @@ -247,12 +248,12 @@ public void handleFinishAlterTask(AlterReplicaTask task) throws MetaNotFoundExce
}

if (versionChanged) {
ReplicaPersistInfo info = ReplicaPersistInfo.createForClone(task.getDbId(), task.getTableId(),
ReplicaPersistInfo info = ReplicaPersistInfo.createForUpdate(task.getDbId(), task.getTableId(),
task.getPartitionId(), task.getIndexId(), task.getTabletId(), task.getBackendId(),
replica.getId(), replica.getVersion(), -1,
replica.getDataSize(), replica.getRemoteDataSize(), replica.getRowCount(),
replica.getLastFailedVersion(), replica.getLastSuccessVersion());
Env.getCurrentEnv().getEditLog().logUpdateReplica(info);
replica.getLastFailedVersion(), replica.getLastSuccessVersion(), replica.isBad());
Env.getCurrentEnv().getEditLog().logBatchModifyReplica(new BatchModifyReplicasInfo(info));
}

LOG.info("after handle alter task tablet: {}, replica: {}", task.getSignature(), replica);
Expand Down
5 changes: 5 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@
import org.apache.doris.persist.AutoIncrementIdUpdateLog;
import org.apache.doris.persist.BackendReplicasInfo;
import org.apache.doris.persist.BackendTabletsInfo;
import org.apache.doris.persist.BatchModifyReplicasInfo;
import org.apache.doris.persist.BinlogGcInfo;
import org.apache.doris.persist.CleanQueryStatsInfo;
import org.apache.doris.persist.DropPartitionInfo;
Expand Down Expand Up @@ -4128,6 +4129,10 @@ public void replayDeleteReplica(ReplicaPersistInfo info) throws MetaNotFoundExce
getInternalCatalog().replayDeleteReplica(info);
}

public void replayBatchModifyReplicasInfo(BatchModifyReplicasInfo info) throws MetaNotFoundException {
getInternalCatalog().replayBatchModifyReplicasInfo(info);
}

public void replayAddFrontend(Frontend fe) {
tryLock(true);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ private void writeUnlock(long stamp) {
public void tabletReport(long backendId, Map<Long, TTablet> backendTablets,
Map<Long, Long> backendPartitionsVersion,
final HashMap<Long, TStorageMedium> storageMediumMap,
ListMultimap<Long, Long> tabletSyncMap,
ListMultimap<Long, Long> tabletDeleteFromMeta,
Map<Long, ListMultimap<Long, Long>> tabletSyncMap,
Map<Long, ListMultimap<Long, Long>> tabletDeleteFromMeta,
Set<Long> tabletFoundInMeta,
ListMultimap<TStorageMedium, Long> tabletMigrationMap,
Map<Long, Long> partitionVersionSyncMap,
Expand All @@ -145,6 +145,7 @@ public void tabletReport(long backendId, Map<Long, TTablet> backendTablets,
long feTabletNum = 0;
long stamp = readLock();
long start = System.currentTimeMillis();
long tabletSyncNum = 0;
try {
if (LOG.isDebugEnabled()) {
LOG.debug("begin to do tablet diff with backend[{}]. num: {}", backendId, backendTablets.size());
Expand Down Expand Up @@ -194,7 +195,13 @@ public void tabletReport(long backendId, Map<Long, TTablet> backendTablets,
if (needSync(replica, backendTabletInfo)) {
// need sync
synchronized (tabletSyncMap) {
tabletSyncMap.put(tabletMeta.getDbId(), tabletId);
ListMultimap<Long, Long> dbTablets = tabletSyncMap.get(tabletMeta.getDbId());
if (dbTablets == null) {
dbTablets = LinkedListMultimap.create();
tabletSyncMap.put(tabletMeta.getDbId(), dbTablets);
}
dbTablets.put(tabletMeta.getTableId(), tabletId);
tabletSyncNum++;
}
}

Expand Down Expand Up @@ -281,7 +288,13 @@ && isLocal(tabletMeta.getStorageMedium())) {
LOG.debug("backend[{}] does not report tablet[{}-{}]", backendId, tabletId, tabletMeta);
}
synchronized (tabletDeleteFromMeta) {
tabletDeleteFromMeta.put(tabletMeta.getDbId(), tabletId);
ListMultimap<Long, Long> dbTablets = tabletDeleteFromMeta.get(tabletMeta.getDbId());
if (dbTablets == null) {
dbTablets = LinkedListMultimap.create();
tabletDeleteFromMeta.put(tabletMeta.getDbId(), dbTablets);
}
dbTablets.put(tabletMeta.getTableId(), tabletId);
tabletDeleteFromMetaNum++;
}
}
});
Expand All @@ -306,8 +319,8 @@ && isLocal(tabletMeta.getStorageMedium())) {
+ " metaDel: {}. foundInMeta: {}. migration: {}. backend partition num: {}, backend need "
+ "update: {}. found invalid transactions {}. found republish "
+ "transactions {}. tabletToUpdate: {}. need recovery: {}. cost: {} ms",
backendId, feTabletNum, backendTablets.size(), tabletSyncMap.size(),
tabletDeleteFromMeta.size(), tabletFoundInMeta.size(), tabletMigrationMap.size(),
backendId, feTabletNum, backendTablets.size(), tabletSyncNum,
tabletDeleteFromMetaNum, tabletFoundInMeta.size(), tabletMigrationMap.size(),
backendPartitionsVersion.size(), partitionVersionSyncMap.size(),
transactionsToClear.size(), transactionsToPublish.size(), tabletToUpdate.size(),
tabletRecoveryMap.size(), (end - start));
Expand Down
35 changes: 23 additions & 12 deletions fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.BatchModifyReplicasInfo;
import org.apache.doris.persist.ReplicaPersistInfo;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.Backend;
Expand Down Expand Up @@ -1225,25 +1226,35 @@ public void finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request)
replica.setFurtherRepairWatermarkTxnTd(-1);
}

ReplicaPersistInfo info = ReplicaPersistInfo.createForClone(dbId, tblId, partitionId, indexId,
tabletId, destBackendId, replica.getId(),
reportedTablet.getVersion(),
reportedTablet.getSchemaHash(),
reportedTablet.getDataSize(),
reportedTablet.getRemoteDataSize(),
reportedTablet.getRowCount(),
replica.getLastFailedVersion(),
replica.getLastSuccessVersion());

ReplicaPersistInfo info = null;
if (replica.getState() == ReplicaState.CLONE) {
replica.setState(ReplicaState.NORMAL);
Env.getCurrentEnv().getEditLog().logAddReplica(info);
info = ReplicaPersistInfo.createForAdd(dbId, tblId, partitionId, indexId,
tabletId, destBackendId, replica.getId(),
reportedTablet.getVersion(),
reportedTablet.getSchemaHash(),
reportedTablet.getDataSize(),
reportedTablet.getRemoteDataSize(),
reportedTablet.getRowCount(),
replica.getLastFailedVersion(),
replica.getLastSuccessVersion());
} else {
// if in VERSION_INCOMPLETE, replica is not newly created, thus the state is not CLONE
// so we keep it state unchanged, and log update replica
Env.getCurrentEnv().getEditLog().logUpdateReplica(info);
info = ReplicaPersistInfo.createForUpdate(dbId, tblId, partitionId, indexId,
tabletId, destBackendId, replica.getId(),
reportedTablet.getVersion(),
reportedTablet.getSchemaHash(),
reportedTablet.getDataSize(),
reportedTablet.getRemoteDataSize(),
reportedTablet.getRowCount(),
replica.getLastFailedVersion(),
replica.getLastSuccessVersion(),
replica.isBad());
}

Env.getCurrentEnv().getEditLog().logBatchModifyReplica(new BatchModifyReplicasInfo(info));

state = State.FINISHED;
LOG.info("clone finished: {}, replica {}, replica old version {}, need further repair {}, is catchup {}",
this, replica, destOldVersion, replica.needFurtherRepair(), isCatchup);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.persist.BatchModifyReplicasInfo;
import org.apache.doris.persist.ReplicaPersistInfo;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.Backend;
Expand Down Expand Up @@ -1270,7 +1271,7 @@ private void deleteReplicaInternal(TabletSchedCtx tabletCtx,
tabletCtx.getTabletId(),
replica.getBackendIdWithoutException());

Env.getCurrentEnv().getEditLog().logDeleteReplica(info);
Env.getCurrentEnv().getEditLog().logBatchModifyReplica(new BatchModifyReplicasInfo(info));

LOG.info("delete replica. tablet id: {}, backend id: {}. reason: {}, force: {}",
tabletCtx.getTabletId(), replica.getBackendIdWithoutException(), reason, force);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
import org.apache.doris.persist.AlterDatabasePropertyInfo;
import org.apache.doris.persist.AutoIncrementIdUpdateLog;
import org.apache.doris.persist.BatchModifyReplicasInfo;
import org.apache.doris.persist.ColocatePersistInfo;
import org.apache.doris.persist.DatabaseInfo;
import org.apache.doris.persist.DropDbInfo;
Expand Down Expand Up @@ -1139,12 +1140,15 @@ private void unprotectUpdateReplica(OlapTable olapTable, ReplicaPersistInfo info
Tablet tablet = materializedIndex.getTablet(info.getTabletId());
Replica replica = tablet.getReplicaById(info.getReplicaId());
Preconditions.checkNotNull(replica, info);
replica.setBad(info.isBad());
if (info.isBad()) {
return;
}
replica.updateVersionWithFailed(info.getVersion(), info.getLastFailedVersion(),
info.getLastSuccessVersion());
replica.setDataSize(info.getDataSize());
replica.setRemoteDataSize(info.getRemoteDataSize());
replica.setRowCount(info.getRowCount());
replica.setBad(false);
}

public void replayAddReplica(ReplicaPersistInfo info) throws MetaNotFoundException {
Expand Down Expand Up @@ -1187,6 +1191,25 @@ public void replayDeleteReplica(ReplicaPersistInfo info) throws MetaNotFoundExce
}
}

public void replayBatchModifyReplicasInfo(BatchModifyReplicasInfo info) throws MetaNotFoundException {
for (ReplicaPersistInfo replica : info.getReplicas()) {
switch (replica.getOpType()) {
case ADD:
replayAddReplica(replica);
break;
case DELETE:
replayDeleteReplica(replica);
break;
case UPDATE:
replayUpdateReplica(replica);
break;
default:
throw new MetaNotFoundException("not implement BatchModifyReplicasInfo's replay function "
+ "with operation type " + replica.getOpType());
}
}
}

/**
* Following is the step to create an olap table:
* 1. create columns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.apache.doris.persist.BarrierLog;
import org.apache.doris.persist.BatchDropInfo;
import org.apache.doris.persist.BatchModifyPartitionsInfo;
import org.apache.doris.persist.BatchModifyReplicasInfo;
import org.apache.doris.persist.BatchRemoveTransactionsOperation;
import org.apache.doris.persist.BatchRemoveTransactionsOperationV2;
import org.apache.doris.persist.BinlogGcInfo;
Expand Down Expand Up @@ -352,6 +353,11 @@ public void readFields(DataInput in) throws IOException {
isRead = true;
break;
}
case OperationType.OP_BATCH_MODIFY_REPLICA: {
data = BatchModifyReplicasInfo.read(in);
isRead = true;
break;
}
case OperationType.OP_ADD_BACKEND:
case OperationType.OP_DROP_BACKEND:
case OperationType.OP_MODIFY_BACKEND:
Expand Down
Loading

0 comments on commit 3846125

Please sign in to comment.