Skip to content

Commit

Permalink
[Enhancement] Support recover partitions to a new table
Browse files Browse the repository at this point in the history
  • Loading branch information
Vallishp committed Sep 12, 2024
1 parent 7a34ebb commit 0c82c17
Show file tree
Hide file tree
Showing 8 changed files with 319 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ unsupportedRecoverStatement
| RECOVER TABLE name=multipartIdentifier
id=INTEGER_VALUE? (AS alias=identifier)? #recoverTable
| RECOVER PARTITION name=identifier id=INTEGER_VALUE? (AS alias=identifier)?
FROM tableName=multipartIdentifier #recoverPartition
FROM tableName=multipartIdentifier (AS alias=identifier)? #recoverPartition
;

unsupportedAdminStatement
Expand Down
4 changes: 2 additions & 2 deletions fe/fe-core/src/main/cup/sql_parser.cup
Original file line number Diff line number Diff line change
Expand Up @@ -3309,9 +3309,9 @@ recover_stmt ::=
{:
RESULT = new RecoverTableStmt(dbTblName, tableId, alias);
:}
| KW_RECOVER KW_PARTITION ident:partitionName opt_id:partitionId opt_alias:alias KW_FROM table_name:dbTblName
| KW_RECOVER KW_PARTITION ident:partitionName opt_id:partitionId opt_alias:alias KW_FROM table_name:dbTblName opt_alias:newTableName
{:
RESULT = new RecoverPartitionStmt(dbTblName, partitionName, partitionId, alias);
RESULT = new RecoverPartitionStmt(dbTblName, partitionName, partitionId, alias, newTableName);
:}
;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,20 @@ public class RecoverPartitionStmt extends DdlStmt implements NotFallbackInParser
private String partitionName;
private long partitionId = -1;
private String newPartitionName = "";
private String newTableName = "";

public RecoverPartitionStmt(TableName dbTblName, String partitionName, long partitionId, String newPartitionName) {

public RecoverPartitionStmt(TableName dbTblName, String partitionName, long partitionId, String newPartitionName,
String newTableName) {
this.dbTblName = dbTblName;
this.partitionName = partitionName;
this.partitionId = partitionId;
if (newPartitionName != null) {
this.newPartitionName = newPartitionName;
}
if (newTableName != null) {
this.newTableName = newTableName;
}
}

public String getDbName() {
Expand All @@ -63,6 +69,10 @@ public String getNewPartitionName() {
return newPartitionName;
}

public String getNewTableName() {
return newTableName;
}

@Override
public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
dbTblName.analyze(analyzer);
Expand Down Expand Up @@ -95,7 +105,12 @@ public String toSql() {
sb.append(getDbName()).append(".");
}
sb.append(getTableName());
if (!Strings.isNullOrEmpty(newTableName)) {
sb.append(" AS ");
sb.append(this.newTableName);
}
return sb.toString();

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.DebugUtil;
Expand Down Expand Up @@ -57,6 +58,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -787,8 +789,58 @@ private synchronized boolean innerRecoverTable(Database db, Table table, String
return true;
}

public synchronized void recoverPartitionUsingNewTable(String newTableName,
RecyclePartitionInfo recoverPartitionInfo, PartitionItem recoverItem,
OlapTable origTable, String newPartitionName, Database db) throws DdlException {
try {
Optional<Table> newTable = db.getTable(newTableName);
if (!newTable.isPresent()) {
Env.getCurrentEnv().createTableUsingPartitionInfo(newTableName,
recoverPartitionInfo, origTable);
newTable = db.getTable(newTableName);
}
if (!newTable.isPresent()) {
// raise exeception
throw new DdlException("Unable to create " + newTableName);
}

OlapTable table = (OlapTable) newTable.get();
Partition recoverPartition = recoverPartitionInfo.getPartition();
// check if partition name exists
if (!Strings.isNullOrEmpty(newPartitionName)) {
if (table.checkPartitionNameExist(newPartitionName)) {
throw new DdlException("Partition name[" + newPartitionName + "] is already used");
}
recoverPartition.setName(newPartitionName);
}
// recover partition
table.addPartition(recoverPartition);

PartitionInfo partitionInfo = table.getPartitionInfo();
// recover partition info
long partitionId = recoverPartition.getId();
partitionInfo.setItem(partitionId, false, recoverItem);
partitionInfo.setDataProperty(partitionId, recoverPartitionInfo.getDataProperty());
partitionInfo.setReplicaAllocation(partitionId, recoverPartitionInfo.getReplicaAlloc());
partitionInfo.setIsInMemory(partitionId, recoverPartitionInfo.isInMemory());
partitionInfo.setIsMutable(partitionId, recoverPartitionInfo.isMutable());

// remove from recycle bin
idToPartition.remove(partitionId);
idToRecycleTime.remove(partitionId);

// log
long dbId = db.getId();
RecoverInfo recoverInfo = new RecoverInfo(dbId, table.getId(), partitionId, "", "", newPartitionName);
Env.getCurrentEnv().getEditLog().logRecoverPartition(recoverInfo);
LOG.info("recover partition[{}]", partitionId);
} catch (UserException e) {
throw new DdlException(e.getMessage());
}
}

public synchronized void recoverPartition(long dbId, OlapTable table, String partitionName,
long partitionIdToRecover, String newPartitionName) throws DdlException {
long partitionIdToRecover, String newPartitionName, String newTableName, Database db) throws DdlException {
if (table.getType() == TableType.MATERIALIZED_VIEW) {
throw new DdlException("Can not recover partition in materialized view: " + table.getName());
}
Expand Down Expand Up @@ -833,49 +885,54 @@ public synchronized void recoverPartition(long dbId, OlapTable table, String par
} else if (partitionInfo.getType() == PartitionType.LIST) {
recoverItem = recoverPartitionInfo.getListPartitionItem();
}
// check if partition item is invalid
if (partitionInfo.getAnyIntersectItem(recoverItem, false) != null) {
throw new DdlException("Can not recover partition[" + partitionName + "]. Partition item conflict.");
}

// check if schema change
Partition recoverPartition = recoverPartitionInfo.getPartition();
Set<Long> tableIndex = table.getIndexIdToMeta().keySet();
Set<Long> partitionIndex = recoverPartition.getMaterializedIndices(IndexExtState.ALL).stream()
.map(i -> i.getId()).collect(Collectors.toSet());
if (!tableIndex.equals(partitionIndex)) {
throw new DdlException("table's index not equal with partition's index. table's index=" + tableIndex
+ ", partition's index=" + partitionIndex);
}
if (newTableName != null) {
// use new table to recover
recoverPartitionUsingNewTable(newTableName, recoverPartitionInfo, recoverItem, table, newPartitionName, db);
} else {
// check if partition item is invalid
if (partitionInfo.getAnyIntersectItem(recoverItem, false) != null) {
throw new DdlException("Can not recover partition[" + partitionName + "]. Partition item conflict.");
}
// check if schema change
Partition recoverPartition = recoverPartitionInfo.getPartition();
Set<Long> tableIndex = table.getIndexIdToMeta().keySet();
Set<Long> partitionIndex = recoverPartition.getMaterializedIndices(IndexExtState.ALL).stream()
.map(i -> i.getId()).collect(Collectors.toSet());
if (!tableIndex.equals(partitionIndex)) {
throw new DdlException("table's index not equal with partition's index. table's index=" + tableIndex
+ ", partition's index=" + partitionIndex);
}

// check if partition name exists
Preconditions.checkState(recoverPartition.getName().equalsIgnoreCase(partitionName));
if (!Strings.isNullOrEmpty(newPartitionName)) {
if (table.checkPartitionNameExist(newPartitionName)) {
throw new DdlException("Partition name[" + newPartitionName + "] is already used");
// check if partition name exists
Preconditions.checkState(recoverPartition.getName().equalsIgnoreCase(partitionName));
if (!Strings.isNullOrEmpty(newPartitionName)) {
if (table.checkPartitionNameExist(newPartitionName)) {
throw new DdlException("Partition name[" + newPartitionName + "] is already used");
}
recoverPartition.setName(newPartitionName);
}
recoverPartition.setName(newPartitionName);
}

// recover partition
table.addPartition(recoverPartition);
// recover partition
table.addPartition(recoverPartition);

// recover partition info
long partitionId = recoverPartition.getId();
partitionInfo.setItem(partitionId, false, recoverItem);
partitionInfo.setDataProperty(partitionId, recoverPartitionInfo.getDataProperty());
partitionInfo.setReplicaAllocation(partitionId, recoverPartitionInfo.getReplicaAlloc());
partitionInfo.setIsInMemory(partitionId, recoverPartitionInfo.isInMemory());
partitionInfo.setIsMutable(partitionId, recoverPartitionInfo.isMutable());
// recover partition info
long partitionId = recoverPartition.getId();
partitionInfo.setItem(partitionId, false, recoverItem);
partitionInfo.setDataProperty(partitionId, recoverPartitionInfo.getDataProperty());
partitionInfo.setReplicaAllocation(partitionId, recoverPartitionInfo.getReplicaAlloc());
partitionInfo.setIsInMemory(partitionId, recoverPartitionInfo.isInMemory());
partitionInfo.setIsMutable(partitionId, recoverPartitionInfo.isMutable());

// remove from recycle bin
idToPartition.remove(partitionId);
idToRecycleTime.remove(partitionId);
// remove from recycle bin
idToPartition.remove(partitionId);
idToRecycleTime.remove(partitionId);

// log
RecoverInfo recoverInfo = new RecoverInfo(dbId, table.getId(), partitionId, "", "", newPartitionName);
Env.getCurrentEnv().getEditLog().logRecoverPartition(recoverInfo);
LOG.info("recover partition[{}]", partitionId);
}

// log
RecoverInfo recoverInfo = new RecoverInfo(dbId, table.getId(), partitionId, "", "", newPartitionName);
Env.getCurrentEnv().getEditLog().logRecoverPartition(recoverInfo);
LOG.info("recover partition[{}]", partitionId);
}

// The caller should keep table write lock
Expand Down
106 changes: 106 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import org.apache.doris.binlog.BinlogGcer;
import org.apache.doris.binlog.BinlogManager;
import org.apache.doris.blockrule.SqlBlockRuleMgr;
import org.apache.doris.catalog.CatalogRecycleBin.RecyclePartitionInfo;
import org.apache.doris.catalog.ColocateTableIndex.GroupId;
import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
Expand Down Expand Up @@ -125,6 +126,7 @@
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.SmallFileMgr;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.consistency.ConsistencyChecker;
Expand Down Expand Up @@ -6641,4 +6643,108 @@ private void replayJournalsAndExit() {

System.exit(0);
}

/**
* Get table ddl stmt from partition Info.
* Only olap table
*
*/
public static String getDdlStmtFromPartitionInfo(TableIf table,
RecyclePartitionInfo recovRecyclePartitionInfo) {
StringBuilder sb = new StringBuilder();
sb.append("CREATE ");

sb.append(table.getType() != TableType.MATERIALIZED_VIEW ? "TABLE " : "MATERIALIZED VIEW ");
sb.append("`").append(table.getName()).append("`");
sb.append(" (\n");
int idx = 0;
OlapTable olapTable = (OlapTable) table;

// this getting always latest column set.
// how to get old one?
long oldIndex = recovRecyclePartitionInfo.getPartition().getBaseIndex().getId();
List<Column> columns = olapTable.getSchemaByIndexId(oldIndex);
for (Column column : columns) {
if (idx++ != 0) {
sb.append(",\n");
}
// There MUST BE 2 space in front of each column description line
// sqlalchemy requires this to parse SHOW CREATE TABLE stmt.
if (table.isManagedTable()) {
sb.append(" ").append(
column.toSql(((OlapTable) table).getKeysType() == KeysType.UNIQUE_KEYS, true));
} else {
sb.append(" ").append(column.toSql());
}
}
if (table instanceof OlapTable) {
if (CollectionUtils.isNotEmpty(olapTable.getIndexes())) {
for (Index index : olapTable.getIndexes()) {
sb.append(",\n");
sb.append(" ").append(index.toSql());
}
}
}
sb.append("\n) ENGINE=");
sb.append(table.getType().name());

if (table instanceof OlapTable) {
// keys
String keySql = olapTable.getKeysType().toSql();
if (olapTable.isDuplicateWithoutKey()) {
// after #18621, use can create a DUP_KEYS olap table without key columns
// and get a ddl schema without key type and key columns
} else {
sb.append("\n").append(keySql).append("(");
List<String> keysColumnNames = Lists.newArrayList();
Map<Integer, String> clusterKeysColumnNamesToId = new TreeMap<>();
for (Column column : olapTable.getBaseSchema()) {
if (column.isKey()) {
keysColumnNames.add("`" + column.getName() + "`");
}
if (column.isClusterKey()) {
clusterKeysColumnNamesToId.put(column.getClusterKeyId(), column.getName());
}
}
sb.append(Joiner.on(", ").join(keysColumnNames)).append(")");
// show cluster keys
if (!clusterKeysColumnNamesToId.isEmpty()) {
sb.append("\n").append("CLUSTER BY (`");
sb.append(Joiner.on("`, `").join(clusterKeysColumnNamesToId.values())).append("`)");
}
}

// partition
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
List<Long> partitionId = null;
if ((partitionInfo.getType() == PartitionType.RANGE
|| partitionInfo.getType() == PartitionType.LIST)) {
sb.append("\n").append(partitionInfo.toSql(olapTable, partitionId));
}

// distribution
DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo();
sb.append("\n").append(distributionInfo.toSql());

// properties
sb.append("\nPROPERTIES (\n");
addOlapTablePropertyInfo(olapTable, sb, false, false, partitionId);
sb.append("\n)");
}
sb.append(";");
String createTableStmt = sb.toString();
return createTableStmt;
}

public void createTableUsingPartitionInfo(String newTableName,
RecyclePartitionInfo recovRecyclePartitionInfo, OlapTable origTable) throws UserException {

String createTableStmt = Env.getDdlStmtFromPartitionInfo(origTable, recovRecyclePartitionInfo);
LOG.info("create table stmt: {}", createTableStmt);
CreateTableStmt parsedCreateTableStmt = (CreateTableStmt) SqlParserUtils.parseAndAnalyzeStmt(
createTableStmt, ConnectContext.get());
parsedCreateTableStmt.setTableName(newTableName);
parsedCreateTableStmt.setIfNotExists(true);
createTable(parsedCreateTableStmt);
}
}
Loading

0 comments on commit 0c82c17

Please sign in to comment.