Skip to content

Commit

Permalink
[fix](create-table)The CREATE TABLE IF NOT EXISTS AS SELECT statement…
Browse files Browse the repository at this point in the history
… should refrain from performing any INSERT operations if the table already exists #35210 (#35419)
  • Loading branch information
CalvinKirs authored May 27, 2024
1 parent 4179db1 commit 465b952
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import com.google.common.base.Preconditions;
import lombok.Getter;
import lombok.Setter;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -54,6 +55,13 @@ public class CreateTableAsSelectStmt extends DdlStmt {
@Getter
private final InsertStmt insertStmt;

/**
* If the table has already exists, set this flag to true.
*/
@Setter
@Getter
private boolean tableHasExists = false;

protected CreateTableAsSelectStmt(CreateTableStmt createTableStmt,
List<String> columnNames, QueryStmt queryStmt) {
this.createTableStmt = createTableStmt;
Expand Down
38 changes: 19 additions & 19 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -2845,8 +2845,8 @@ public void replayRenameDatabase(String dbName, String newDbName) {
* 10. add this table to FE's meta
* 11. add this table to ColocateGroup if necessary
*/
public void createTable(CreateTableStmt stmt) throws UserException {
getInternalCatalog().createTable(stmt);
public boolean createTable(CreateTableStmt stmt) throws UserException {
return getInternalCatalog().createTable(stmt);
}

public void createTableLike(CreateTableLikeStmt stmt) throws DdlException {
Expand Down Expand Up @@ -2891,15 +2891,15 @@ public void replayGcBinlog(BinlogGcInfo binlogGcInfo) {
}

public static void getDdlStmt(TableIf table, List<String> createTableStmt, List<String> addPartitionStmt,
List<String> createRollupStmt, boolean separatePartition, boolean hidePassword,
long specificVersion) {
List<String> createRollupStmt, boolean separatePartition, boolean hidePassword,
long specificVersion) {
getDdlStmt(null, null, table, createTableStmt, addPartitionStmt, createRollupStmt, separatePartition,
hidePassword, false, specificVersion, false, false);
}

public static void getSyncedDdlStmt(TableIf table, List<String> createTableStmt, List<String> addPartitionStmt,
List<String> createRollupStmt, boolean separatePartition, boolean hidePassword,
long specificVersion) {
List<String> createRollupStmt, boolean separatePartition, boolean hidePassword,
long specificVersion) {
getDdlStmt(null, null, table, createTableStmt, addPartitionStmt, createRollupStmt, separatePartition,
hidePassword, false, specificVersion, false, true);
}
Expand All @@ -2910,10 +2910,10 @@ public static void getSyncedDdlStmt(TableIf table, List<String> createTableStmt,
* @param getDdlForLike Get schema for 'create table like' or not. when true, without hidden columns.
*/
public static void getDdlStmt(DdlStmt ddlStmt, String dbName, TableIf table, List<String> createTableStmt,
List<String> addPartitionStmt, List<String> createRollupStmt,
boolean separatePartition,
boolean hidePassword, boolean getDdlForLike, long specificVersion,
boolean getBriefDdl, boolean getDdlForSync) {
List<String> addPartitionStmt, List<String> createRollupStmt,
boolean separatePartition,
boolean hidePassword, boolean getDdlForLike, long specificVersion,
boolean getBriefDdl, boolean getDdlForSync) {
StringBuilder sb = new StringBuilder();

// 1. create table
Expand Down Expand Up @@ -3488,12 +3488,12 @@ public void dropTable(DropTableStmt stmt) throws DdlException {
}

public boolean unprotectDropTable(Database db, Table table, boolean isForceDrop, boolean isReplay,
Long recycleTime) {
Long recycleTime) {
return getInternalCatalog().unprotectDropTable(db, table, isForceDrop, isReplay, recycleTime);
}

public void replayDropTable(Database db, long tableId, boolean isForceDrop,
Long recycleTime) throws MetaNotFoundException {
Long recycleTime) throws MetaNotFoundException {
getInternalCatalog().replayDropTable(db, tableId, isForceDrop, recycleTime);
}

Expand Down Expand Up @@ -3933,7 +3933,7 @@ public void setHaProtocol(HAProtocol protocol) {
}

public static short calcShortKeyColumnCount(List<Column> columns, Map<String, String> properties,
boolean isKeysRequired) throws DdlException {
boolean isKeysRequired) throws DdlException {
List<Column> indexColumns = new ArrayList<Column>();
boolean hasValueColumn = false;
for (Column column : columns) {
Expand Down Expand Up @@ -4158,7 +4158,7 @@ public void replayRenameTable(TableInfo tableInfo) throws MetaNotFoundException

// the invoker should keep table's write lock
public void modifyTableColocate(Database db, OlapTable table, String assignedGroup, boolean isReplay,
GroupId assignedGroupId)
GroupId assignedGroupId)
throws DdlException {

String oldGroup = table.getColocateGroup();
Expand Down Expand Up @@ -4382,8 +4382,8 @@ public void replayRenamePartition(TableInfo tableInfo) throws MetaNotFoundExcept
}

private void renameColumn(Database db, OlapTable table, String colName,
String newColName, Map<Long, Integer> indexIdToSchemaVersion,
boolean isReplay) throws DdlException {
String newColName, Map<Long, Integer> indexIdToSchemaVersion,
boolean isReplay) throws DdlException {
table.checkNormalStateForAlter();
if (colName.equalsIgnoreCase(newColName)) {
throw new DdlException("Same column name");
Expand Down Expand Up @@ -4631,7 +4631,7 @@ public void modifyTableReplicaAllocation(Database db, OlapTable table, Map<Strin
*/
// The caller need to hold the table write lock
public void modifyTableDefaultReplicaAllocation(Database db, OlapTable table,
Map<String, String> properties) throws UserException {
Map<String, String> properties) throws UserException {
Preconditions.checkArgument(table.isWriteLockHeldByCurrentThread());
table.checkChangeReplicaAllocation();
table.setReplicaAllocation(properties);
Expand Down Expand Up @@ -4730,7 +4730,7 @@ public void replayModifyTableProperty(short opCode, ModifyTablePropertyOperation
}

public void modifyDefaultDistributionBucketNum(Database db, OlapTable olapTable,
ModifyDistributionClause modifyDistributionClause)
ModifyDistributionClause modifyDistributionClause)
throws DdlException {
olapTable.writeLockOrDdlException();
try {
Expand Down Expand Up @@ -5361,7 +5361,7 @@ public void replaySetReplicaStatus(SetReplicaStatusOperationLog log) throws Meta
}

private void setReplicaStatusInternal(long tabletId, long backendId, ReplicaStatus status, long userDropTime,
boolean isReplay)
boolean isReplay)
throws MetaNotFoundException {
try {
TabletMeta meta = tabletInvertedIndex.getTabletMeta(tabletId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1076,7 +1076,7 @@ public void replayDeleteReplica(ReplicaPersistInfo info) throws MetaNotFoundExce
* 10. add this table to FE's meta
* 11. add this table to ColocateGroup if necessary
*/
public void createTable(CreateTableStmt stmt) throws UserException {
public boolean createTable(CreateTableStmt stmt) throws UserException {
String engineName = stmt.getEngineName();
String dbName = stmt.getDbName();
String tableName = stmt.getTableName();
Expand All @@ -1101,40 +1101,33 @@ public void createTable(CreateTableStmt stmt) throws UserException {
if (db.getTable(tableName).isPresent()) {
if (stmt.isSetIfNotExists()) {
LOG.info("create table[{}] which already exists", tableName);
return;
return true;
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
}
}

if (engineName.equals("olap")) {
createOlapTable(db, stmt);
return;
return createOlapTable(db, stmt);
} else if (engineName.equals("odbc")) {
createOdbcTable(db, stmt);
return;
return createOdbcTable(db, stmt);
} else if (engineName.equals("mysql")) {
createMysqlTable(db, stmt);
return;
return createMysqlTable(db, stmt);
} else if (engineName.equals("broker")) {
createBrokerTable(db, stmt);
return;
return createBrokerTable(db, stmt);
} else if (engineName.equalsIgnoreCase("elasticsearch") || engineName.equalsIgnoreCase("es")) {
createEsTable(db, stmt);
return;
return createEsTable(db, stmt);
} else if (engineName.equalsIgnoreCase("hive")) {
createHiveTable(db, stmt);
return;
return createHiveTable(db, stmt);
} else if (engineName.equalsIgnoreCase("iceberg")) {
IcebergCatalogMgr.createIcebergTable(db, stmt);
return;
return IcebergCatalogMgr.createIcebergTable(db, stmt);
} else if (engineName.equalsIgnoreCase("jdbc")) {
createJdbcTable(db, stmt);
return;
return createJdbcTable(db, stmt);
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_STORAGE_ENGINE, engineName);
}
Preconditions.checkState(false);
return false;
}

public void createTableLike(CreateTableLikeStmt stmt) throws DdlException {
Expand Down Expand Up @@ -1292,7 +1285,8 @@ public void createTableAsSelect(CreateTableAsSelectStmt stmt) throws DdlExceptio
}
Analyzer dummyRootAnalyzer = new Analyzer(Env.getCurrentEnv(), ConnectContext.get());
createTableStmt.analyze(dummyRootAnalyzer);
createTable(createTableStmt);
boolean tableHasExists = createTable(createTableStmt);
stmt.setTableHasExists(tableHasExists);
} catch (UserException e) {
throw new DdlException("Failed to execute CTAS Reason: " + e.getMessage());
}
Expand Down Expand Up @@ -1982,10 +1976,10 @@ private Partition createPartitionWithIndices(String clusterName, long dbId, long
}

// Create olap table and related base index synchronously.
private void createOlapTable(Database db, CreateTableStmt stmt) throws UserException {
private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserException {
String tableName = stmt.getTableName();
LOG.debug("begin create olap table: {}", tableName);

boolean tableHasExists = false;
BinlogConfig dbBinlogConfig;
db.readLock();
try {
Expand Down Expand Up @@ -2616,6 +2610,7 @@ private void createOlapTable(Database db, CreateTableStmt stmt) throws UserExcep
Env.getCurrentInvertedIndex().deleteTablet(tabletId);
}
LOG.info("duplicate create table[{};{}], skip next steps", tableName, tableId);
tableHasExists = true;
} else {
// we have added these index to memory, only need to persist here
if (Env.getCurrentColocateIndex().isColocateTable(tableId)) {
Expand Down Expand Up @@ -2654,36 +2649,33 @@ private void createOlapTable(Database db, CreateTableStmt stmt) throws UserExcep
}
LOG.info("Create related {} mv job.", jobs.size());
}
return tableHasExists;
}

private void createMysqlTable(Database db, CreateTableStmt stmt) throws DdlException {
private boolean createMysqlTable(Database db, CreateTableStmt stmt) throws DdlException {
String tableName = stmt.getTableName();

List<Column> columns = stmt.getColumns();

long tableId = Env.getCurrentEnv().getNextId();
MysqlTable mysqlTable = new MysqlTable(tableId, tableName, columns, stmt.getProperties());
mysqlTable.setComment(stmt.getComment());
if (!db.createTableWithLock(mysqlTable, false, stmt.isSetIfNotExists()).first) {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
}
LOG.info("successfully create table[{}-{}]", tableName, tableId);
Pair<Boolean, Boolean> result = db.createTableWithLock(mysqlTable, false, stmt.isSetIfNotExists());
return checkCreateTableResult(tableName, tableId, result);
}

private void createOdbcTable(Database db, CreateTableStmt stmt) throws DdlException {
private boolean createOdbcTable(Database db, CreateTableStmt stmt) throws DdlException {
String tableName = stmt.getTableName();
List<Column> columns = stmt.getColumns();

long tableId = Env.getCurrentEnv().getNextId();
OdbcTable odbcTable = new OdbcTable(tableId, tableName, columns, stmt.getProperties());
odbcTable.setComment(stmt.getComment());
if (!db.createTableWithLock(odbcTable, false, stmt.isSetIfNotExists()).first) {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
}
LOG.info("successfully create table[{}-{}]", tableName, tableId);
Pair<Boolean, Boolean> result = db.createTableWithLock(odbcTable, false, stmt.isSetIfNotExists());
return checkCreateTableResult(tableName, tableId, result);
}

private Table createEsTable(Database db, CreateTableStmt stmt) throws DdlException, AnalysisException {
private boolean createEsTable(Database db, CreateTableStmt stmt) throws DdlException, AnalysisException {
String tableName = stmt.getTableName();

// validate props to get column from es.
Expand Down Expand Up @@ -2716,14 +2708,11 @@ private Table createEsTable(Database db, CreateTableStmt stmt) throws DdlExcepti
esTable.setId(tableId);
esTable.setComment(stmt.getComment());
esTable.syncTableMetaData();
if (!db.createTableWithLock(esTable, false, stmt.isSetIfNotExists()).first) {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
}
LOG.info("successfully create table{} with id {}", tableName, tableId);
return esTable;
Pair<Boolean, Boolean> result = db.createTableWithLock(esTable, false, stmt.isSetIfNotExists());
return checkCreateTableResult(tableName, tableId, result);
}

private void createBrokerTable(Database db, CreateTableStmt stmt) throws DdlException {
private boolean createBrokerTable(Database db, CreateTableStmt stmt) throws DdlException {
String tableName = stmt.getTableName();

List<Column> columns = stmt.getColumns();
Expand All @@ -2732,14 +2721,11 @@ private void createBrokerTable(Database db, CreateTableStmt stmt) throws DdlExce
BrokerTable brokerTable = new BrokerTable(tableId, tableName, columns, stmt.getProperties());
brokerTable.setComment(stmt.getComment());
brokerTable.setBrokerProperties(stmt.getExtProperties());

if (!db.createTableWithLock(brokerTable, false, stmt.isSetIfNotExists()).first) {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
}
LOG.info("successfully create table[{}-{}]", tableName, tableId);
Pair<Boolean, Boolean> result = db.createTableWithLock(brokerTable, false, stmt.isSetIfNotExists());
return checkCreateTableResult(tableName, tableId, result);
}

private void createHiveTable(Database db, CreateTableStmt stmt) throws DdlException {
private boolean createHiveTable(Database db, CreateTableStmt stmt) throws DdlException {
String tableName = stmt.getTableName();
List<Column> columns = stmt.getColumns();
long tableId = Env.getCurrentEnv().getNextId();
Expand All @@ -2757,13 +2743,11 @@ private void createHiveTable(Database db, CreateTableStmt stmt) throws DdlExcept
throw new DdlException(String.format("Table [%s] dose not exist in Hive.", hiveTable.getHiveDbTable()));
}
// check hive table if exists in doris database
if (!db.createTableWithLock(hiveTable, false, stmt.isSetIfNotExists()).first) {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
}
LOG.info("successfully create table[{}-{}]", tableName, tableId);
Pair<Boolean, Boolean> result = db.createTableWithLock(hiveTable, false, stmt.isSetIfNotExists());
return checkCreateTableResult(tableName, tableId, result);
}

private void createJdbcTable(Database db, CreateTableStmt stmt) throws DdlException {
private boolean createJdbcTable(Database db, CreateTableStmt stmt) throws DdlException {
String tableName = stmt.getTableName();
List<Column> columns = stmt.getColumns();

Expand All @@ -2772,10 +2756,20 @@ private void createJdbcTable(Database db, CreateTableStmt stmt) throws DdlExcept
JdbcTable jdbcTable = new JdbcTable(tableId, tableName, columns, stmt.getProperties());
jdbcTable.setComment(stmt.getComment());
// check table if exists
if (!db.createTableWithLock(jdbcTable, false, stmt.isSetIfNotExists()).first) {
Pair<Boolean, Boolean> result = db.createTableWithLock(jdbcTable, false, stmt.isSetIfNotExists());
return checkCreateTableResult(tableName, tableId, result);
}

private boolean checkCreateTableResult(String tableName, long tableId, Pair<Boolean, Boolean> result)
throws DdlException {
if (Boolean.FALSE.equals(result.first)) {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
}
if (Boolean.TRUE.equals(result.second)) {
return true;
}
LOG.info("successfully create table[{}-{}]", tableName, tableId);
return false;
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.Pair;
import org.apache.doris.common.SystemIdGenerator;
import org.apache.doris.external.iceberg.util.IcebergUtils;

Expand Down Expand Up @@ -186,7 +187,7 @@ public static IcebergTable getTableFromIceberg(long tableId, String tableName, I
* @param stmt
* @throws DdlException
*/
public static void createIcebergTable(Database db, CreateTableStmt stmt) throws DdlException {
public static boolean createIcebergTable(Database db, CreateTableStmt stmt) throws DdlException {
String tableName = stmt.getTableName();
Map<String, String> properties = stmt.getProperties();

Expand All @@ -212,9 +213,14 @@ public static void createIcebergTable(Database db, CreateTableStmt stmt) throws
}

// check iceberg table if exists in doris database
if (!db.createTableWithLock(table, false, stmt.isSetIfNotExists()).first) {
Pair<Boolean, Boolean> result = db.createTableWithLock(table, false, stmt.isSetIfNotExists());
if (Boolean.FALSE.equals(result.first)) {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
}
LOG.info("successfully create table[{}-{}]", tableName, table.getId());
if (Boolean.TRUE.equals(result.second)) {
return true;
}
LOG.info("successfully create table[{}-{}]", tableName, tableId);
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2426,6 +2426,9 @@ private void handleCtasStmt() {
context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage());
return;
}
if (ctasStmt.isTableHasExists()) {
return;
}
// after success create table insert data
try {
parsedStmt = ctasStmt.getInsertStmt();
Expand Down
Loading

0 comments on commit 465b952

Please sign in to comment.