Skip to content

Commit

Permalink
[Enhancement](txn) Block new insert into if schema change happens dur…
Browse files Browse the repository at this point in the history
…ing transaction (#39483)
  • Loading branch information
Yukang-Lian authored Aug 28, 2024
1 parent d91a6c9 commit f6d3280
Show file tree
Hide file tree
Showing 30 changed files with 473 additions and 12 deletions.
1 change: 1 addition & 0 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,7 @@ SchemaChangeJob::SchemaChangeJob(StorageEngine& local_storage_engine,
// The admin should upgrade all BE and then upgrade FE.
// Should delete the old code after upgrade finished.
Status SchemaChangeJob::_do_process_alter_tablet(const TAlterTabletReqV2& request) {
DBUG_EXECUTE_IF("SchemaChangeJob._do_process_alter_tablet.sleep", { sleep(10); })
Status res;
signal::tablet_id = _base_tablet->get_table_id();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.analysis.StmtType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.ErrorCode;
Expand Down Expand Up @@ -111,6 +112,16 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
Preconditions.checkArgument(plan.isPresent(), "insert into command must contain OlapTableSinkNode");
sink = ((PhysicalOlapTableSink<?>) plan.get());
Table targetTable = sink.getTargetTable();
if (ctx.getTxnEntry().isFirstTxnInsert()) {
ctx.getTxnEntry().setTxnSchemaVersion(((OlapTable) targetTable).getBaseSchemaVersion());
ctx.getTxnEntry().setFirstTxnInsert(false);
} else {
if (((OlapTable) targetTable).getBaseSchemaVersion() != ctx.getTxnEntry().getTxnSchemaVersion()) {
throw new AnalysisException("There are schema changes in one transaction, "
+ "you can commit this transaction with formal data or rollback "
+ "this whole transaction.");
}
}
// should set columns of sink since we maybe generate some invisible columns
List<Column> fullSchema = sink.getTargetTable().getFullSchema();
List<Column> targetSchema = Lists.newArrayList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@ public PhysicalResultSink(List<NamedExpression> outputExprs, Optional<GroupExpre
logicalProperties, physicalProperties, statistics, child);
}

public List<NamedExpression> getOutputExprs() {
return outputExprs;
}

@Override
public PhysicalResultSink<Plan> withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2066,6 +2066,7 @@ private void handleTransactionStmt() throws Exception {
.setTxnConf(new TTxnParams().setNeedTxn(true).setThriftRpcTimeoutMs(5000).setTxnId(-1).setDb("")
.setTbl("").setMaxFilterRatio(context.getSessionVariable().getEnableInsertStrict() ? 0
: context.getSessionVariable().getInsertMaxFilterRatio()));
context.getTxnEntry().setFirstTxnInsert(true);
StringBuilder sb = new StringBuilder();
sb.append("{'label':'").append(context.getTxnEntry().getLabel()).append("', 'status':'")
.append(TransactionStatus.PREPARE.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public class TransactionEntry {
private List<InternalService.PDataRow> dataToSend = new ArrayList<>();
private long rowsInTransaction = 0;
private Types.PUniqueId pLoadId;
private boolean isFirstTxnInsert = false;
private volatile int txnSchemaVersion = -1;

// for insert into select for multi tables
private boolean isTransactionBegan = false;
Expand Down Expand Up @@ -181,6 +183,22 @@ public void setpLoadId(Types.PUniqueId pLoadId) {
this.pLoadId = pLoadId;
}

public boolean isFirstTxnInsert() {
return isFirstTxnInsert;
}

public void setFirstTxnInsert(boolean firstTxnInsert) {
isFirstTxnInsert = firstTxnInsert;
}

public int getTxnSchemaVersion() {
return txnSchemaVersion;
}

public void setTxnSchemaVersion(int txnSchemaVersion) {
this.txnSchemaVersion = txnSchemaVersion;
}

// Used for insert into select, return the sub_txn_id for this insert
public long beginTransaction(TableIf table, SubTransactionType subTransactionType) throws Exception {
if (isInsertValuesTxnBegan()) {
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_desc1 --
c1 int Yes true \N
c2 int Yes false \N NONE
c3 int Yes false \N NONE

-- !select_desc2 --
c1 int Yes true \N
new_col int Yes true \N
c2 int Yes false \N NONE
c3 int Yes false \N NONE

-- !select1 --
1 \N 2 3

Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_desc1 --
c1 int Yes true \N
c2 int Yes false \N NONE
c3 int Yes false \N NONE

-- !select_desc2 --
c1 int Yes true \N
c2 int Yes false \N NONE
c3 int Yes false \N NONE
new_col int Yes false \N NONE

-- !select1 --
1 2 3 \N

Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_desc1 --
c1 int Yes true \N
c2 bigint Yes false \N NONE
c3 int Yes false \N NONE

-- !select_desc2 --
c1 int Yes true \N
c3 int Yes false \N NONE
c2 bigint Yes false \N NONE

-- !select_desc3 --
c1 int Yes true \N
c3 int Yes false \N NONE
c2 bigint Yes false \N NONE

-- !select1 --
1 3 2

Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ suite("txn_insert") {
if (observer_fe_url != null) {
logger.info("observer url: $observer_fe_url")
connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = observer_fe_url) {
result = sql """ select count() from regression_test_insert_p0.${table}_0 """
result = sql """ select count() from regression_test_insert_p0_transaction.${table}_0 """
logger.info("select from observer result: $result")
assertEquals(79, result[0][0])
}
Expand Down Expand Up @@ -403,7 +403,7 @@ suite("txn_insert") {
}

// 13. txn insert does not commit or rollback by user, and txn is aborted because connection is closed
def dbName = "regression_test_insert_p0"
def dbName = "regression_test_insert_p0_transaction"
def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
logger.info("url: ${url}")
def get_txn_id_from_server_info = { serverInfo ->
Expand Down Expand Up @@ -776,7 +776,7 @@ suite("txn_insert") {
}
}

def db_name = "regression_test_insert_p0"
def db_name = "regression_test_insert_p0_transaction"
def tables = sql """ show tables from $db_name """
logger.info("tables: $tables")
for (def table_info : tables) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ suite("txn_insert_concurrent_insert") {
}
sql """ sync """

def dbName = "regression_test_insert_p0"
def dbName = "regression_test_insert_p0_transaction"
def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
logger.info("url: ${url}")

Expand Down Expand Up @@ -119,7 +119,7 @@ suite("txn_insert_concurrent_insert") {
logger.info("result: ${result}")
assertEquals(2606192, result[0][0])

def db_name = "regression_test_insert_p0"
def db_name = "regression_test_insert_p0_transaction"
def tables = sql """ show tables from $db_name """
logger.info("tables: $tables")
for (def table_info : tables) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ suite("txn_insert_inject_case", "nonConcurrent") {

// 2. commit failed
sql """ truncate table ${table}_0 """
def dbName = "regression_test_insert_p0"
def dbName = "regression_test_insert_p0_transaction"
def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
logger.info("url: ${url}")
def get_txn_id_from_server_info = { serverInfo ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit
suite("txn_insert_values_with_schema_change") {
def table = "txn_insert_values_with_schema_change"

def dbName = "regression_test_insert_p0"
def dbName = "regression_test_insert_p0_transaction"
def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
logger.info("url: ${url}")
List<String> errors = new ArrayList<>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit

suite("txn_insert_with_schema_change") {
def table = "txn_insert_with_schema_change"
def dbName = "regression_test_insert_p0"
def dbName = "regression_test_insert_p0_transaction"
def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
logger.info("url: ${url}")
List<String> errors = new ArrayList<>()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import java.sql.Connection
import java.sql.DriverManager
import java.sql.Statement
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit

suite("txn_insert_with_specify_columns_schema_change_add_key_column", "nonConcurrent") {
if(!isCloudMode()) {
def table = "txn_insert_with_specify_columns_schema_change_add_key_column"

def dbName = "regression_test_insert_p0_transaction"
def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
logger.info("url: ${url}")
List<String> errors = new ArrayList<>()
CountDownLatch insertLatch = new CountDownLatch(1)
CountDownLatch insertLatch2 = new CountDownLatch(1)

sql """ DROP TABLE IF EXISTS $table force """
sql """
create table $table (
c1 INT NULL,
c2 INT NULL,
c3 INT NULL
) ENGINE=OLAP
UNIQUE KEY(c1)
DISTRIBUTED BY HASH(c1) BUCKETS 1
PROPERTIES (
"replication_num" = "1");
"""
sql """ insert into ${table} (c3, c2, c1) values (3, 2, 1) """

def getAlterTableState = { job_state ->
def retry = 0
sql "use ${dbName};"
while (true) {
sleep(2000)
def state = sql " show alter table column where tablename = '${table}' order by CreateTime desc limit 1"
logger.info("alter table state: ${state}")
if (state.size() > 0 && state[0][9] == job_state) {
return
}
retry++
if (retry >= 10) {
break
}
}
assertTrue(false, "alter table job state is ${last_state}, not ${job_state} after retry ${retry} times")
}

def txnInsert = {
try (Connection conn = DriverManager.getConnection(url, context.config.jdbcUser, context.config.jdbcPassword);
Statement statement = conn.createStatement()) {
try {
qt_select_desc1 """desc $table"""

insertLatch.await(2, TimeUnit.MINUTES)

statement.execute("begin")
statement.execute("insert into ${table} (c3, c2, c1) values (33, 22, 11),(333, 222, 111);")

insertLatch2.await(2, TimeUnit.MINUTES)
qt_select_desc2 """desc $table"""
statement.execute("insert into ${table} (c3, c2, c1) values(3333, 2222, 1111);")
statement.execute("insert into ${table} (c3, c2, c1) values(33333, 22222, 11111),(333333, 222222, 111111);")
statement.execute("commit")
} catch (Exception e) {
logger.info("txn insert failed", e)
assertTrue(e.getMessage().contains("There are schema changes in one transaction, you can commit this transaction with formal data or rollback this whole transaction."))
statement.execute("rollback")
}
}
}

def schemaChange = { sql ->
try (Connection conn = DriverManager.getConnection(url, context.config.jdbcUser, context.config.jdbcPassword);
Statement statement = conn.createStatement()) {
statement.execute(sql)
getAlterTableState("RUNNING")
insertLatch.countDown()
getAlterTableState("FINISHED")
insertLatch2.countDown()
} catch (Throwable e) {
logger.error("schema change failed", e)
errors.add("schema change failed " + e.getMessage())
}
}

GetDebugPoint().clearDebugPointsForAllBEs()
GetDebugPoint().clearDebugPointsForAllFEs()
try {
GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob._do_process_alter_tablet.sleep")
Thread schema_change_thread = new Thread(() -> schemaChange("alter table ${table} add column new_col int key after c1;"))
Thread insert_thread = new Thread(() -> txnInsert())
schema_change_thread.start()
insert_thread.start()
schema_change_thread.join()
insert_thread.join()

logger.info("errors: " + errors)
assertEquals(0, errors.size())
getAlterTableState("FINISHED")
order_qt_select1 """select * from ${table} order by c1, c2, c3"""
} catch (Exception e) {
logger.info("failed: " + e.getMessage())
assertTrue(false)
} finally {
GetDebugPoint().clearDebugPointsForAllBEs()
}
}
}
Loading

0 comments on commit f6d3280

Please sign in to comment.