Skip to content

Commit

Permalink
[fix](create-table)The CREATE TABLE IF NOT EXISTS AS SELECT statement…
Browse files Browse the repository at this point in the history
… should refrain from performing any INSERT operations if the table already exists (#35210)
  • Loading branch information
CalvinKirs authored and dataroaring committed May 26, 2024
1 parent b581649 commit 67809f1
Show file tree
Hide file tree
Showing 11 changed files with 183 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import com.google.common.base.Preconditions;
import lombok.Getter;
import lombok.Setter;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -54,6 +55,13 @@ public class CreateTableAsSelectStmt extends DdlStmt {
@Getter
private final InsertStmt insertStmt;

/**
* If the table has already exists, set this flag to true.
*/
@Setter
@Getter
private boolean tableHasExists = false;

protected CreateTableAsSelectStmt(CreateTableStmt createTableStmt,
List<String> columnNames, QueryStmt queryStmt) {
this.createTableStmt = createTableStmt;
Expand Down
6 changes: 4 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -3167,11 +3167,13 @@ public void replayRenameDatabase(String dbName, String newDbName) {
* 9. create tablet in BE
* 10. add this table to FE's meta
* 11. add this table to ColocateGroup if necessary
* @return if CreateTableStmt.isIfNotExists is true, return true if table already exists
* otherwise return false
*/
public void createTable(CreateTableStmt stmt) throws UserException {
public boolean createTable(CreateTableStmt stmt) throws UserException {
CatalogIf<?> catalogIf = catalogMgr.getCatalogOrException(stmt.getCatalogName(),
catalog -> new DdlException(("Unknown catalog " + catalog)));
catalogIf.createTable(stmt);
return catalogIf.createTable(stmt);
}

public void createTableLike(CreateTableLikeStmt stmt) throws DdlException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,11 @@ default CatalogLog constructEditLog() {

void dropDb(DropDbStmt stmt) throws DdlException;

void createTable(CreateTableStmt stmt) throws UserException;
/**
* @return if org.apache.doris.analysis.CreateTableStmt.ifNotExists is true, return true if table exists,
* return false otherwise
*/
boolean createTable(CreateTableStmt stmt) throws UserException;

void dropTable(DropTableStmt stmt) throws DdlException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -726,14 +726,14 @@ public void dropDb(DropDbStmt stmt) throws DdlException {
}

@Override
public void createTable(CreateTableStmt stmt) throws UserException {
public boolean createTable(CreateTableStmt stmt) throws UserException {
makeSureInitialized();
if (metadataOps == null) {
LOG.warn("createTable not implemented");
return;
return false;
}
try {
metadataOps.createTable(stmt);
return metadataOps.createTable(stmt);
} catch (Exception e) {
LOG.warn("Failed to create a table.", e);
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1147,7 +1147,7 @@ public void replayDeleteReplica(ReplicaPersistInfo info) throws MetaNotFoundExce
* 10. add this table to FE's meta
* 11. add this table to ColocateGroup if necessary
*/
public void createTable(CreateTableStmt stmt) throws UserException {
public boolean createTable(CreateTableStmt stmt) throws UserException {
String engineName = stmt.getEngineName();
String dbName = stmt.getDbName();
String tableName = stmt.getTableName();
Expand All @@ -1169,37 +1169,40 @@ public void createTable(CreateTableStmt stmt) throws UserException {
if (db.getTable(tableName).isPresent()) {
if (stmt.isSetIfNotExists()) {
LOG.info("create table[{}] which already exists", tableName);
return;
return true;
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
}
}

if (engineName.equals("olap")) {
createOlapTable(db, stmt);
return;
} else if (engineName.equals("odbc")) {
createOdbcTable(db, stmt);
return;
} else if (engineName.equals("mysql")) {
createMysqlTable(db, stmt);
return;
} else if (engineName.equals("broker")) {
createBrokerTable(db, stmt);
return;
} else if (engineName.equalsIgnoreCase("elasticsearch") || engineName.equalsIgnoreCase("es")) {
createEsTable(db, stmt);
return;
} else if (engineName.equalsIgnoreCase("hive")) {
return createOlapTable(db, stmt);
}
if (engineName.equals("odbc")) {
return createOdbcTable(db, stmt);
}
if (engineName.equals("mysql")) {
return createMysqlTable(db, stmt);
}
if (engineName.equals("broker")) {
return createBrokerTable(db, stmt);
}
if (engineName.equalsIgnoreCase("elasticsearch") || engineName.equalsIgnoreCase("es")) {
return createEsTable(db, stmt);
}
if (engineName.equalsIgnoreCase("hive")) {
// should use hive catalog to create external hive table
throw new UserException("Cannot create hive table in internal catalog, should switch to hive catalog.");
} else if (engineName.equalsIgnoreCase("jdbc")) {
createJdbcTable(db, stmt);
return;
}
if (engineName.equalsIgnoreCase("jdbc")) {
return createJdbcTable(db, stmt);

} else {
ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_STORAGE_ENGINE, engineName);
}

Preconditions.checkState(false);
return false;
}

public void createTableLike(CreateTableLikeStmt stmt) throws DdlException {
Expand Down Expand Up @@ -1357,7 +1360,8 @@ public void createTableAsSelect(CreateTableAsSelectStmt stmt) throws DdlExceptio
}
Analyzer dummyRootAnalyzer = new Analyzer(Env.getCurrentEnv(), ConnectContext.get());
createTableStmt.analyze(dummyRootAnalyzer);
createTable(createTableStmt);
boolean tableHasExists = createTable(createTableStmt);
stmt.setTableHasExists(tableHasExists);
} catch (UserException e) {
throw new DdlException("Failed to execute CTAS Reason: " + e.getMessage());
}
Expand Down Expand Up @@ -2103,12 +2107,12 @@ private Type getChildTypeByName(String name, CreateTableStmt stmt)
}

// Create olap table and related base index synchronously.
private void createOlapTable(Database db, CreateTableStmt stmt) throws UserException {
private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserException {
String tableName = stmt.getTableName();
if (LOG.isDebugEnabled()) {
LOG.debug("begin create olap table: {}", tableName);
}

boolean tableHasExist = false;
BinlogConfig dbBinlogConfig;
db.readLock();
try {
Expand Down Expand Up @@ -2908,36 +2912,33 @@ private void createOlapTable(Database db, CreateTableStmt stmt) throws UserExcep
throw t;
}
}
return tableHasExist;
}

private void createMysqlTable(Database db, CreateTableStmt stmt) throws DdlException {
private boolean createMysqlTable(Database db, CreateTableStmt stmt) throws DdlException {
String tableName = stmt.getTableName();

List<Column> columns = stmt.getColumns();

long tableId = Env.getCurrentEnv().getNextId();
MysqlTable mysqlTable = new MysqlTable(tableId, tableName, columns, stmt.getProperties());
mysqlTable.setComment(stmt.getComment());
if (!db.createTableWithLock(mysqlTable, false, stmt.isSetIfNotExists()).first) {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
}
LOG.info("successfully create table[{}-{}]", tableName, tableId);
Pair<Boolean, Boolean> result = db.createTableWithLock(mysqlTable, false, stmt.isSetIfNotExists());
return checkCreateTableResult(tableName, tableId, result);
}

private void createOdbcTable(Database db, CreateTableStmt stmt) throws DdlException {
private boolean createOdbcTable(Database db, CreateTableStmt stmt) throws DdlException {
String tableName = stmt.getTableName();
List<Column> columns = stmt.getColumns();

long tableId = Env.getCurrentEnv().getNextId();
OdbcTable odbcTable = new OdbcTable(tableId, tableName, columns, stmt.getProperties());
odbcTable.setComment(stmt.getComment());
if (!db.createTableWithLock(odbcTable, false, stmt.isSetIfNotExists()).first) {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
}
LOG.info("successfully create table[{}-{}]", tableName, tableId);
Pair<Boolean, Boolean> result = db.createTableWithLock(odbcTable, false, stmt.isSetIfNotExists());
return checkCreateTableResult(tableName, tableId, result);
}

private Table createEsTable(Database db, CreateTableStmt stmt) throws DdlException, AnalysisException {
private boolean createEsTable(Database db, CreateTableStmt stmt) throws DdlException, AnalysisException {
String tableName = stmt.getTableName();

// validate props to get column from es.
Expand Down Expand Up @@ -2970,14 +2971,11 @@ private Table createEsTable(Database db, CreateTableStmt stmt) throws DdlExcepti
esTable.setId(tableId);
esTable.setComment(stmt.getComment());
esTable.syncTableMetaData();
if (!db.createTableWithLock(esTable, false, stmt.isSetIfNotExists()).first) {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
}
LOG.info("successfully create table{} with id {}", tableName, tableId);
return esTable;
Pair<Boolean, Boolean> result = db.createTableWithLock(esTable, false, stmt.isSetIfNotExists());
return checkCreateTableResult(tableName, tableId, result);
}

private void createBrokerTable(Database db, CreateTableStmt stmt) throws DdlException {
private boolean createBrokerTable(Database db, CreateTableStmt stmt) throws DdlException {
String tableName = stmt.getTableName();

List<Column> columns = stmt.getColumns();
Expand All @@ -2986,11 +2984,8 @@ private void createBrokerTable(Database db, CreateTableStmt stmt) throws DdlExce
BrokerTable brokerTable = new BrokerTable(tableId, tableName, columns, stmt.getProperties());
brokerTable.setComment(stmt.getComment());
brokerTable.setBrokerProperties(stmt.getExtProperties());

if (!db.createTableWithLock(brokerTable, false, stmt.isSetIfNotExists()).first) {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
}
LOG.info("successfully create table[{}-{}]", tableName, tableId);
Pair<Boolean, Boolean> result = db.createTableWithLock(brokerTable, false, stmt.isSetIfNotExists());
return checkCreateTableResult(tableName, tableId, result);
}

private void createHiveTable(Database db, CreateTableStmt stmt) throws DdlException {
Expand All @@ -3017,7 +3012,7 @@ private void createHiveTable(Database db, CreateTableStmt stmt) throws DdlExcept
LOG.info("successfully create table[{}-{}]", tableName, tableId);
}

private void createJdbcTable(Database db, CreateTableStmt stmt) throws DdlException {
private boolean createJdbcTable(Database db, CreateTableStmt stmt) throws DdlException {
String tableName = stmt.getTableName();
List<Column> columns = stmt.getColumns();

Expand All @@ -3026,10 +3021,20 @@ private void createJdbcTable(Database db, CreateTableStmt stmt) throws DdlExcept
JdbcTable jdbcTable = new JdbcTable(tableId, tableName, columns, stmt.getProperties());
jdbcTable.setComment(stmt.getComment());
// check table if exists
if (!db.createTableWithLock(jdbcTable, false, stmt.isSetIfNotExists()).first) {
Pair<Boolean, Boolean> result = db.createTableWithLock(jdbcTable, false, stmt.isSetIfNotExists());
return checkCreateTableResult(tableName, tableId, result);
}

private boolean checkCreateTableResult(String tableName, long tableId, Pair<Boolean, Boolean> result)
throws DdlException {
if (Boolean.FALSE.equals(result.first)) {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
}
if (Boolean.TRUE.equals(result.second)) {
return true;
}
LOG.info("successfully create table[{}-{}]", tableName, tableId);
return false;
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public void dropDb(DropDbStmt stmt) throws DdlException {
}

@Override
public void createTable(CreateTableStmt stmt) throws UserException {
public boolean createTable(CreateTableStmt stmt) throws UserException {
String dbName = stmt.getDbName();
String tblName = stmt.getTableName();
ExternalDatabase<?> db = catalog.getDbNullable(dbName);
Expand All @@ -157,7 +157,7 @@ public void createTable(CreateTableStmt stmt) throws UserException {
if (tableExist(dbName, tblName)) {
if (stmt.isSetIfNotExists()) {
LOG.info("create table[{}] which already exists", tblName);
return;
return true;
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tblName);
}
Expand Down Expand Up @@ -225,6 +225,7 @@ public void createTable(CreateTableStmt stmt) throws UserException {
} catch (Exception e) {
throw new UserException(e.getMessage(), e);
}
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public void dropDb(DropDbStmt stmt) throws DdlException {
}

@Override
public void createTable(CreateTableStmt stmt) throws UserException {
public boolean createTable(CreateTableStmt stmt) throws UserException {
String dbName = stmt.getDbName();
ExternalDatabase<?> db = dorisCatalog.getDbNullable(dbName);
if (db == null) {
Expand All @@ -129,7 +129,7 @@ public void createTable(CreateTableStmt stmt) throws UserException {
if (tableExist(dbName, tableName)) {
if (stmt.isSetIfNotExists()) {
LOG.info("create table[{}] which already exists", tableName);
return;
return true;
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
}
Expand All @@ -147,6 +147,7 @@ public void createTable(CreateTableStmt stmt) throws UserException {
PartitionSpec partitionSpec = IcebergUtils.solveIcebergPartitionSpec(stmt.getPartitionDesc(), schema);
catalog.createTable(TableIdentifier.of(dbName, tableName), schema, partitionSpec, properties);
db.setUnInitialized(true);
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@ public interface ExternalMetadataOps {
/**
*
* @param stmt
* @return if set isExists is true, return true if table exists, otherwise return false
* @throws UserException
*/
void createTable(CreateTableStmt stmt) throws UserException;
boolean createTable(CreateTableStmt stmt) throws UserException;

/**
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
ctx.queryId(), createTableInfo.getTableName());
}
try {
Env.getCurrentEnv().createTable(createTableStmt);
if (Env.getCurrentEnv().createTable(createTableStmt)) {
return;
}
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e.getCause());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2970,6 +2970,9 @@ private void handleCtasStmt() {
context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage());
return;
}
if (ctasStmt.isTableHasExists()) {
return;
}
// after success create table insert data
try {
parsedStmt = ctasStmt.getInsertStmt();
Expand Down
Loading

0 comments on commit 67809f1

Please sign in to comment.