Skip to content

Commit

Permalink
[Enhancement](txn) Block new insert into if schema change happens dur…
Browse files Browse the repository at this point in the history
…ing transaction (#39483)
  • Loading branch information
Yukang-Lian authored and yiguolei committed Aug 29, 2024
1 parent c50d2ea commit 8885d91
Show file tree
Hide file tree
Showing 17 changed files with 466 additions and 5 deletions.
1 change: 1 addition & 0 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,7 @@ std::unordered_set<int64_t> SchemaChangeHandler::_tablet_ids_in_converting;
// The admin should upgrade all BE and then upgrade FE.
// Should delete the old code after upgrade finished.
Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& request) {
DBUG_EXECUTE_IF("SchemaChangeJob._do_process_alter_tablet.sleep", { sleep(10); })
Status res = Status::OK();
TabletSharedPtr base_tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(request.base_tablet_id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.ErrorCode;
Expand Down Expand Up @@ -114,6 +115,16 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
Preconditions.checkArgument(plan.isPresent(), "insert into command must contain OlapTableSinkNode");
sink = ((PhysicalOlapTableSink<?>) plan.get());
Table targetTable = sink.getTargetTable();
if (ctx.getTxnEntry().isFirstTxnInsert()) {
ctx.getTxnEntry().setTxnSchemaVersion(((OlapTable) targetTable).getBaseSchemaVersion());
ctx.getTxnEntry().setFirstTxnInsert(false);
} else {
if (((OlapTable) targetTable).getBaseSchemaVersion() != ctx.getTxnEntry().getTxnSchemaVersion()) {
throw new AnalysisException("There are schema changes in one transaction, "
+ "you can commit this transaction with formal data or rollback "
+ "this whole transaction.");
}
}
// should set columns of sink since we maybe generate some invisible columns
List<Column> fullSchema = sink.getTargetTable().getFullSchema();
List<Column> targetSchema = Lists.newArrayList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@ public PhysicalResultSink(List<NamedExpression> outputExprs, Optional<GroupExpre
logicalProperties, physicalProperties, statistics, child);
}

public List<NamedExpression> getOutputExprs() {
return outputExprs;
}

@Override
public PhysicalResultSink<Plan> withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1978,6 +1978,7 @@ private void handleTransactionStmt() throws Exception {
.setThriftRpcTimeoutMs(5000).setTxnId(-1).setDb("").setTbl("")
.setMaxFilterRatio(context.getSessionVariable().getEnableInsertStrict() ? 0
: context.getSessionVariable().getInsertMaxFilterRatio()));
context.getTxnEntry().setFirstTxnInsert(true);
StringBuilder sb = new StringBuilder();
sb.append("{'label':'").append(context.getTxnEntry().getLabel()).append("', 'status':'")
.append(TransactionStatus.PREPARE.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public class TransactionEntry {
private List<InternalService.PDataRow> dataToSend = new ArrayList<>();
private long rowsInTransaction = 0;
private Types.PUniqueId pLoadId;
private boolean isFirstTxnInsert = false;
private volatile int txnSchemaVersion = -1;

// for insert into select for multi tables
private boolean isTransactionBegan = false;
Expand Down Expand Up @@ -164,6 +166,22 @@ public void setpLoadId(Types.PUniqueId pLoadId) {
this.pLoadId = pLoadId;
}

public boolean isFirstTxnInsert() {
return isFirstTxnInsert;
}

public void setFirstTxnInsert(boolean firstTxnInsert) {
isFirstTxnInsert = firstTxnInsert;
}

public int getTxnSchemaVersion() {
return txnSchemaVersion;
}

public void setTxnSchemaVersion(int txnSchemaVersion) {
this.txnSchemaVersion = txnSchemaVersion;
}

// Used for insert into select
public void beginTransaction(DatabaseIf database, TableIf table)
throws DdlException, BeginTransactionException, MetaNotFoundException, AnalysisException,
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_desc1 --
c1 int Yes true \N
c2 int Yes false \N NONE
c3 int Yes false \N NONE

-- !select_desc2 --
c1 int Yes true \N
new_col int Yes true \N
c2 int Yes false \N NONE
c3 int Yes false \N NONE

-- !select1 --
1 \N 2 3

Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_desc1 --
c1 int Yes true \N
c2 int Yes false \N NONE
c3 int Yes false \N NONE

-- !select_desc2 --
c1 int Yes true \N
c2 int Yes false \N NONE
c3 int Yes false \N NONE
new_col int Yes false \N NONE

-- !select1 --
1 2 3 \N

Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_desc1 --
c1 int Yes true \N
c2 bigint Yes false \N NONE
c3 int Yes false \N NONE

-- !select_desc2 --
c1 int Yes true \N
c3 int Yes false \N NONE
c2 bigint Yes false \N NONE

-- !select_desc3 --
c1 int Yes true \N
c3 int Yes false \N NONE
c2 bigint Yes false \N NONE

-- !select1 --
1 3 2

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit
suite("txn_insert_values_with_schema_change") {
def table = "txn_insert_values_with_schema_change"

def dbName = "regression_test_insert_p0"
def dbName = "regression_test_insert_p0_transaction"
def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
logger.info("url: ${url}")
List<String> errors = new ArrayList<>()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import java.sql.Connection
import java.sql.DriverManager
import java.sql.Statement
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit

suite("txn_insert_with_specify_columns_schema_change_add_key_column", "nonConcurrent") {
if(!isCloudMode()) {
def table = "txn_insert_with_specify_columns_schema_change_add_key_column"

def dbName = "regression_test_insert_p0_transaction"
def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
logger.info("url: ${url}")
List<String> errors = new ArrayList<>()
CountDownLatch insertLatch = new CountDownLatch(1)
CountDownLatch insertLatch2 = new CountDownLatch(1)

sql """ DROP TABLE IF EXISTS $table force """
sql """
create table $table (
c1 INT NULL,
c2 INT NULL,
c3 INT NULL
) ENGINE=OLAP
UNIQUE KEY(c1)
DISTRIBUTED BY HASH(c1) BUCKETS 1
PROPERTIES (
"replication_num" = "1");
"""
sql """ insert into ${table} (c3, c2, c1) values (3, 2, 1) """

def getAlterTableState = { job_state ->
def retry = 0
sql "use ${dbName};"
while (true) {
sleep(2000)
def state = sql " show alter table column where tablename = '${table}' order by CreateTime desc limit 1"
logger.info("alter table state: ${state}")
if (state.size() > 0 && state[0][9] == job_state) {
return
}
retry++
if (retry >= 10) {
break
}
}
assertTrue(false, "alter table job state is ${last_state}, not ${job_state} after retry ${retry} times")
}

def txnInsert = {
try (Connection conn = DriverManager.getConnection(url, context.config.jdbcUser, context.config.jdbcPassword);
Statement statement = conn.createStatement()) {
try {
qt_select_desc1 """desc $table"""

insertLatch.await(2, TimeUnit.MINUTES)

statement.execute("begin")
statement.execute("insert into ${table} (c3, c2, c1) values (33, 22, 11),(333, 222, 111);")

insertLatch2.await(2, TimeUnit.MINUTES)
qt_select_desc2 """desc $table"""
statement.execute("insert into ${table} (c3, c2, c1) values(3333, 2222, 1111);")
statement.execute("insert into ${table} (c3, c2, c1) values(33333, 22222, 11111),(333333, 222222, 111111);")
statement.execute("commit")
} catch (Exception e) {
logger.info("txn insert failed", e)
assertTrue(e.getMessage().contains("There are schema changes in one transaction, you can commit this transaction with formal data or rollback this whole transaction."))
statement.execute("rollback")
}
}
}

def schemaChange = { sql ->
try (Connection conn = DriverManager.getConnection(url, context.config.jdbcUser, context.config.jdbcPassword);
Statement statement = conn.createStatement()) {
statement.execute(sql)
getAlterTableState("RUNNING")
insertLatch.countDown()
getAlterTableState("FINISHED")
insertLatch2.countDown()
} catch (Throwable e) {
logger.error("schema change failed", e)
errors.add("schema change failed " + e.getMessage())
}
}

GetDebugPoint().clearDebugPointsForAllBEs()
GetDebugPoint().clearDebugPointsForAllFEs()
try {
GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob._do_process_alter_tablet.sleep")
Thread schema_change_thread = new Thread(() -> schemaChange("alter table ${table} add column new_col int key after c1;"))
Thread insert_thread = new Thread(() -> txnInsert())
schema_change_thread.start()
insert_thread.start()
schema_change_thread.join()
insert_thread.join()

logger.info("errors: " + errors)
assertEquals(0, errors.size())
getAlterTableState("FINISHED")
order_qt_select1 """select * from ${table} order by c1, c2, c3"""
} catch (Exception e) {
logger.info("failed: " + e.getMessage())
assertTrue(false)
} finally {
GetDebugPoint().clearDebugPointsForAllBEs()
}
}
}
Loading

0 comments on commit 8885d91

Please sign in to comment.