Skip to content

Commit

Permalink
[improvement](binlog) filter dropped indexes #41246 (#41299)
Browse files Browse the repository at this point in the history
cherry pick from #41246
  • Loading branch information
w41ter authored Sep 25, 2024
1 parent f8acd81 commit 2d1dac6
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -916,6 +916,10 @@ protected void getInfo(List<List<Comparable>> infos) {
}
}

public Map<Long, Long> getIndexIdMap() {
return indexIdMap;
}

public List<List<String>> getUnfinishedTasks(int limit) {
List<List<String>> taskInfos = Lists.newArrayList();
if (jobState == JobState.RUNNING) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@
package org.apache.doris.binlog;

import org.apache.doris.alter.AlterJobV2;
import org.apache.doris.alter.SchemaChangeJobV2;
import org.apache.doris.persist.gson.GsonUtils;

import com.google.gson.annotations.SerializedName;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class AlterJobRecord {
@SerializedName(value = "type")
private AlterJobV2.JobType type;
Expand All @@ -37,6 +42,8 @@ public class AlterJobRecord {
private AlterJobV2.JobState jobState;
@SerializedName(value = "rawSql")
private String rawSql;
@SerializedName(value = "iim")
private Map<Long, Long> indexIdMap;

public AlterJobRecord(AlterJobV2 job) {
this.type = job.getType();
Expand All @@ -46,9 +53,31 @@ public AlterJobRecord(AlterJobV2 job) {
this.jobId = job.getJobId();
this.jobState = job.getJobState();
this.rawSql = job.getRawSql();
if (type == AlterJobV2.JobType.SCHEMA_CHANGE && job instanceof SchemaChangeJobV2) {
this.indexIdMap = ((SchemaChangeJobV2) job).getIndexIdMap();
}
}

public boolean isJobFinished() {
return jobState == AlterJobV2.JobState.FINISHED;
}

public boolean isSchemaChangeJob() {
return type == AlterJobV2.JobType.SCHEMA_CHANGE;
}

public List<Long> getOriginIndexIdList() {
if (indexIdMap == null) {
return new ArrayList<>();
}
return new ArrayList<>(indexIdMap.values());
}

public String toJson() {
return GsonUtils.GSON.toJson(this);
}

public static AlterJobRecord fromJson(String json) {
return GsonUtils.GSON.fromJson(json, AlterJobRecord.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ public void addAlterJobV2(AlterJobV2 alterJob, long commitSeq) {
AlterJobRecord alterJobRecord = new AlterJobRecord(alterJob);
String data = alterJobRecord.toJson();

addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, alterJob);
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, alterJobRecord);
}

public void addModifyTableAddOrDropColumns(TableAddOrDropColumnsInfo info, long commitSeq) {
Expand Down Expand Up @@ -383,6 +383,20 @@ public List<Long> getDroppedTables(long dbId) {
}
}

// get the dropped indexes of the db.
public List<Long> getDroppedIndexes(long dbId) {
lock.readLock().lock();
try {
DBBinlog dbBinlog = dbBinlogMap.get(dbId);
if (dbBinlog == null) {
return Lists.newArrayList();
}
return dbBinlog.getDroppedIndexes();
} finally {
lock.readLock().unlock();
}
}

public List<BinlogTombstone> gc() {
LOG.info("begin gc binlog");

Expand Down
43 changes: 40 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public class DBBinlog {
private List<Pair<Long, Long>> droppedPartitions;
// The commit seq of the dropped tables
private List<Pair<Long, Long>> droppedTables;
// The commit seq of the dropped indexes
private List<Pair<Long, Long>> droppedIndexes;

private List<TBinlog> tableDummyBinlogs;

Expand All @@ -82,6 +84,7 @@ public DBBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog) {
timestamps = Lists.newArrayList();
droppedPartitions = Lists.newArrayList();
droppedTables = Lists.newArrayList();
droppedIndexes = Lists.newArrayList();

TBinlog dummy;
if (binlog.getType() == TBinlogType.DUMMY) {
Expand Down Expand Up @@ -129,6 +132,15 @@ public void recoverBinlog(TBinlog binlog, boolean dbBinlogEnable) {
if (record != null && record.getTableId() > 0) {
droppedTables.add(Pair.of(record.getTableId(), binlog.getCommitSeq()));
}
} else if (binlog.getType() == TBinlogType.ALTER_JOB) {
AlterJobRecord record = AlterJobRecord.fromJson(binlog.data);
if (record != null && record.isSchemaChangeJob() && record.isJobFinished()) {
for (Long indexId : record.getOriginIndexIdList()) {
if (indexId != null && indexId > 0) {
droppedIndexes.add(Pair.of(indexId, binlog.getCommitSeq()));
}
}
}
}

if (tableIds == null) {
Expand Down Expand Up @@ -193,6 +205,15 @@ public void addBinlog(TBinlog binlog, Object raw) {
if (tableId > 0) {
droppedTables.add(Pair.of(tableId, binlog.getCommitSeq()));
}
} else if (binlog.getType() == TBinlogType.ALTER_JOB && raw instanceof AlterJobRecord) {
AlterJobRecord alterJobRecord = (AlterJobRecord) raw;
if (alterJobRecord.isJobFinished() && alterJobRecord.isSchemaChangeJob()) {
for (Long indexId : alterJobRecord.getOriginIndexIdList()) {
if (indexId != null && indexId > 0) {
droppedIndexes.add(Pair.of(indexId, binlog.getCommitSeq()));
}
}
}
}

switch (binlog.getType()) {
Expand Down Expand Up @@ -263,6 +284,18 @@ public List<Long> getDroppedTables() {
}
}

// Get the dropped indexes of the db.
public List<Long> getDroppedIndexes() {
lock.readLock().lock();
try {
return droppedIndexes.stream()
.map(v -> v.first)
.collect(Collectors.toList());
} finally {
lock.readLock().unlock();
}
}

public Pair<TStatus, Long> getBinlogLag(long tableId, long prevCommitSeq) {
TStatus status = new TStatus(TStatusCode.OK);
lock.readLock().lock();
Expand Down Expand Up @@ -380,7 +413,7 @@ private void removeExpiredMetaData(long largestExpiredCommitSeq) {
}
}

gcDroppedPartitionAndTables(largestExpiredCommitSeq);
gcDroppedResources(largestExpiredCommitSeq);
if (lastCommitSeq != -1) {
dummy.setCommitSeq(lastCommitSeq);
}
Expand Down Expand Up @@ -418,7 +451,7 @@ private TBinlog getLastExpiredBinlog(BinlogComparator checker) {
timeIter.remove();
}

gcDroppedPartitionAndTables(lastExpiredBinlog.getCommitSeq());
gcDroppedResources(lastExpiredBinlog.getCommitSeq());
}

return lastExpiredBinlog;
Expand Down Expand Up @@ -528,7 +561,7 @@ public void dbBinlogDisableReplayGc(BinlogTombstone tombstone) {
}
}

private void gcDroppedPartitionAndTables(long commitSeq) {
private void gcDroppedResources(long commitSeq) {
Iterator<Pair<Long, Long>> iter = droppedPartitions.iterator();
while (iter.hasNext() && iter.next().second < commitSeq) {
iter.remove();
Expand All @@ -537,6 +570,10 @@ private void gcDroppedPartitionAndTables(long commitSeq) {
while (iter.hasNext() && iter.next().second < commitSeq) {
iter.remove();
}
iter = droppedIndexes.iterator();
while (iter.hasNext() && iter.next().second < commitSeq) {
iter.remove();
}
}

// not thread safety, do this without lock
Expand Down
1 change: 1 addition & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -5628,6 +5628,7 @@ public static TGetMetaResult getMeta(Database db, List<Table> tables) throws Met
BinlogManager binlogManager = Env.getCurrentEnv().getBinlogManager();
dbMeta.setDroppedPartitions(binlogManager.getDroppedPartitions(db.getId()));
dbMeta.setDroppedTables(binlogManager.getDroppedTables(db.getId()));
dbMeta.setDroppedIndexes(binlogManager.getDroppedIndexes(db.getId()));
}

result.setDbMeta(dbMeta);
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -1230,6 +1230,7 @@ struct TGetMetaDBMeta {
3: optional list<TGetMetaTableMeta> tables
4: optional list<i64> dropped_partitions
5: optional list<i64> dropped_tables
6: optional list<i64> dropped_indexes
}

struct TGetMetaResult {
Expand Down

0 comments on commit 2d1dac6

Please sign in to comment.