Skip to content

Commit

Permalink
[improve](binlog) Filter the truncated partitions (#41611)
Browse files Browse the repository at this point in the history
The suite case in selectdb/ccr-syncer#190
  • Loading branch information
w41ter committed Oct 9, 2024
1 parent ade86c0 commit d3e7a5f
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ public void addTruncateTable(TruncateTableInfo info, long commitSeq) {
TruncateTableRecord record = new TruncateTableRecord(info);
String data = record.toJson();

addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info);
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, record);
}

// get binlog by dbId, return first binlog.version > version
Expand Down
12 changes: 12 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,13 @@ public void recoverBinlog(TBinlog binlog, boolean dbBinlogEnable) {
}
}
}
} else if (binlog.getType() == TBinlogType.TRUNCATE_TABLE) {
TruncateTableRecord record = TruncateTableRecord.fromJson(binlog.data);
if (record != null) {
for (long partitionId : record.getOldPartitionIds()) {
droppedPartitions.add(Pair.of(partitionId, binlog.getCommitSeq()));
}
}
}

if (tableIds == null) {
Expand Down Expand Up @@ -214,6 +221,11 @@ public void addBinlog(TBinlog binlog, Object raw) {
}
}
}
} else if (binlog.getType() == TBinlogType.TRUNCATE_TABLE && raw instanceof TruncateTableRecord) {
TruncateTableRecord truncateTableRecord = (TruncateTableRecord) raw;
for (long partitionId : truncateTableRecord.getOldPartitionIds()) {
droppedPartitions.add(Pair.of(partitionId, binlog.getCommitSeq()));
}
}

switch (binlog.getType()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@

import com.google.gson.annotations.SerializedName;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

public class TruncateTableRecord {
@SerializedName(value = "dbId")
private long dbId;
Expand All @@ -35,6 +40,8 @@ public class TruncateTableRecord {
private boolean isEntireTable = false;
@SerializedName(value = "rawSql")
private String rawSql = "";
@SerializedName(value = "op")
private Map<Long, String> oldPartitions = new HashMap<>();

public TruncateTableRecord(TruncateTableInfo info) {
this.dbId = info.getDbId();
Expand All @@ -43,9 +50,18 @@ public TruncateTableRecord(TruncateTableInfo info) {
this.table = info.getTable();
this.isEntireTable = info.isEntireTable();
this.rawSql = info.getRawSql();
this.oldPartitions = info.getOldPartitions();
}

public Collection<Long> getOldPartitionIds() {
return oldPartitions == null ? new ArrayList<>() : oldPartitions.keySet();
}

public String toJson() {
return GsonUtils.GSON.toJson(this);
}

public static TruncateTableRecord fromJson(String json) {
return GsonUtils.GSON.fromJson(json, TruncateTableRecord.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3460,8 +3460,7 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti
// write edit log
TruncateTableInfo info =
new TruncateTableInfo(db.getId(), db.getFullName(), olapTable.getId(), olapTable.getName(),
newPartitions,
truncateEntireTable, truncateTableStmt.toSqlWithoutTable());
newPartitions, truncateEntireTable, truncateTableStmt.toSqlWithoutTable(), oldPartitions);
Env.getCurrentEnv().getEditLog().logTruncateTable(info);
} catch (DdlException e) {
failedCleanCallback.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class TruncateTableInfo implements Writable {
@SerializedName(value = "dbId")
Expand All @@ -45,20 +47,25 @@ public class TruncateTableInfo implements Writable {
private boolean isEntireTable = false;
@SerializedName(value = "rawSql")
private String rawSql = "";
@SerializedName(value = "op")
private Map<Long, String> oldPartitions = new HashMap<>();

public TruncateTableInfo() {

}

public TruncateTableInfo(long dbId, String db, long tblId, String table, List<Partition> partitions,
boolean isEntireTable, String rawSql) {
boolean isEntireTable, String rawSql, List<Partition> oldPartitions) {
this.dbId = dbId;
this.db = db;
this.tblId = tblId;
this.table = table;
this.partitions = partitions;
this.isEntireTable = isEntireTable;
this.rawSql = rawSql;
for (Partition partition : oldPartitions) {
this.oldPartitions.put(partition.getId(), partition.getName());
}
}

public long getDbId() {
Expand All @@ -81,6 +88,10 @@ public List<Partition> getPartitions() {
return partitions;
}

public Map<Long, String> getOldPartitions() {
return oldPartitions == null ? new HashMap<>() : oldPartitions;
}

public boolean isEntireTable() {
return isEntireTable;
}
Expand Down

0 comments on commit d3e7a5f

Please sign in to comment.