From eedd24316d03b46e798d48b2f304c223e995820b Mon Sep 17 00:00:00 2001 From: abmdocrt Date: Sat, 2 Sep 2023 20:40:06 +0800 Subject: [PATCH] [Feature](CCR) Support MoW for CCR (#22798) --- be/src/service/backend_service.cpp | 49 ++- .../test_binlog_config_change.groovy | 108 +++++++ ...est_create_table_with_binlog_config.groovy | 88 +++++ .../ccr_mow_syncer_p0/test_get_binlog.groovy | 142 +++++++++ .../test_ingest_binlog.groovy | 121 +++++++ .../test_multi_buckets.groovy | 98 ++++++ .../ccr_mow_syncer_p0/test_txn_case.groovy | 301 ++++++++++++++++++ .../test_backup_restore.groovy | 68 ++++ 8 files changed, 973 insertions(+), 2 deletions(-) create mode 100644 regression-test/suites/ccr_mow_syncer_p0/test_binlog_config_change.groovy create mode 100644 regression-test/suites/ccr_mow_syncer_p0/test_create_table_with_binlog_config.groovy create mode 100644 regression-test/suites/ccr_mow_syncer_p0/test_get_binlog.groovy create mode 100644 regression-test/suites/ccr_mow_syncer_p0/test_ingest_binlog.groovy create mode 100644 regression-test/suites/ccr_mow_syncer_p0/test_multi_buckets.groovy create mode 100644 regression-test/suites/ccr_mow_syncer_p0/test_txn_case.groovy create mode 100644 regression-test/suites/ccr_mow_syncer_p1/test_backup_restore.groovy diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index 61c9e80dcd448c..c674e3e421eba2 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -46,10 +46,12 @@ #include "http/http_client.h" #include "olap/olap_common.h" #include "olap/olap_define.h" +#include "olap/rowset/beta_rowset.h" #include "olap/rowset/rowset_factory.h" #include "olap/rowset/rowset_meta.h" #include "olap/storage_engine.h" #include "olap/tablet_manager.h" +#include "olap/tablet_meta.h" #include "olap/txn_manager.h" #include "runtime/exec_env.h" #include "runtime/external_scan_context_mgr.h" @@ -632,7 +634,7 @@ void BackendService::ingest_binlog(TIngestBinlogResult& result, } } - // Step 6: create rowset && commit + // Step 6: create rowset && calculate delete bitmap && commit // Step 6.1: create rowset RowsetSharedPtr rowset; status = RowsetFactory::create_rowset(local_tablet->tablet_schema(), @@ -648,7 +650,44 @@ void BackendService::ingest_binlog(TIngestBinlogResult& result, return; } - // Step 6.2: commit txn + // Step 6.2 calculate delete bitmap before commit + auto calc_delete_bitmap_token = + StorageEngine::instance()->calc_delete_bitmap_executor()->create_token(); + DeleteBitmapPtr delete_bitmap = std::make_shared(local_tablet_id); + RowsetIdUnorderedSet pre_rowset_ids; + if (local_tablet->enable_unique_key_merge_on_write()) { + auto beta_rowset = reinterpret_cast(rowset.get()); + std::vector segments; + status = beta_rowset->load_segments(&segments); + if (!status) { + LOG(WARNING) << "failed to load segments from rowset" + << ". rowset_id: " << beta_rowset->rowset_id() << ", txn_id=" << txn_id + << ", status=" << status.to_string(); + status.to_thrift(&tstatus); + return; + } + if (segments.size() > 1) { + // calculate delete bitmap between segments + status = local_tablet->calc_delete_bitmap_between_segments(rowset, segments, + delete_bitmap); + if (!status) { + LOG(WARNING) << "failed to calculate delete bitmap" + << ". tablet_id: " << local_tablet->tablet_id() + << ". rowset_id: " << rowset->rowset_id() << ", txn_id=" << txn_id + << ", status=" << status.to_string(); + status.to_thrift(&tstatus); + return; + } + } + + local_tablet->commit_phase_update_delete_bitmap(rowset, pre_rowset_ids, delete_bitmap, + segments, txn_id, + calc_delete_bitmap_token.get(), nullptr); + calc_delete_bitmap_token->wait(); + calc_delete_bitmap_token->get_delete_bitmap(delete_bitmap); + } + + // Step 6.3: commit txn Status commit_txn_status = StorageEngine::instance()->txn_manager()->commit_txn( local_tablet->data_dir()->get_meta(), rowset_meta->partition_id(), rowset_meta->txn_id(), rowset_meta->tablet_id(), local_tablet->tablet_uid(), @@ -664,6 +703,12 @@ void BackendService::ingest_binlog(TIngestBinlogResult& result, return; } + if (local_tablet->enable_unique_key_merge_on_write()) { + StorageEngine::instance()->txn_manager()->set_txn_related_delete_bitmap( + partition_id, txn_id, local_tablet_id, local_tablet->schema_hash(), + local_tablet->tablet_uid(), true, delete_bitmap, pre_rowset_ids); + } + tstatus.__set_status_code(TStatusCode::OK); } } // namespace doris diff --git a/regression-test/suites/ccr_mow_syncer_p0/test_binlog_config_change.groovy b/regression-test/suites/ccr_mow_syncer_p0/test_binlog_config_change.groovy new file mode 100644 index 00000000000000..0e8a2022eba857 --- /dev/null +++ b/regression-test/suites/ccr_mow_syncer_p0/test_binlog_config_change.groovy @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_mow_binlog_config_change") { + + def syncer = getSyncer() + def tableName = "tbl_binlog_config_change" + def test_num = 0 + def insert_num = 5 + + sql "DROP TABLE IF EXISTS ${tableName}" + sql """ + CREATE TABLE if NOT EXISTS ${tableName} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "replication_allocation" = "tag.location.default: 1" + ) + """ + sql """ALTER TABLE ${tableName} set ("binlog.enable" = "true")""" + + target_sql "DROP TABLE IF EXISTS ${tableName}" + target_sql """ + CREATE TABLE if NOT EXISTS ${tableName} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "replication_allocation" = "tag.location.default: 1" + ) + """ + assertTrue(syncer.getTargetMeta("${tableName}")) + + // test 1: target cluster follow source cluster + logger.info("=== Test 1: Target cluster follow source cluster case ===") + test_num = 1 + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName} VALUES (${test_num}, ${index}) + """ + assertTrue(syncer.getBinlog("${tableName}")) + assertTrue(syncer.beginTxn("${tableName}")) + assertTrue(syncer.getBackendClients()) + assertTrue(syncer.ingestBinlog()) + assertTrue(syncer.commitTxn()) + assertTrue(syncer.checkTargetVersion()) + syncer.closeBackendClients() + } + + def res = target_sql """SELECT * FROM ${tableName} WHERE test=${test_num}""" + assertTrue(res.size() == insert_num) + + // TODO: bugfix + // test 2: source cluster disable and re-enable binlog + // target_sql "DROP TABLE IF EXISTS ${tableName}" + // target_sql """ + // CREATE TABLE if NOT EXISTS ${tableName} + // ( + // `test` INT, + // `id` INT + // ) + // ENGINE=OLAP + // UNIQUE KEY(`test`, `id`) + // DISTRIBUTED BY HASH(id) BUCKETS 1 + // PROPERTIES ( + // "replication_allocation" = "tag.location.default: 1" + // ) + // """ + // sql """ALTER TABLE ${tableName} set ("binlog.enable" = "false")""" + // sql """ALTER TABLE ${tableName} set ("binlog.enable" = "true")""" + + // syncer.context.seq = -1 + + // assertTrue(syncer.getBinlog("${tableName}")) + // assertTrue(syncer.beginTxn("${tableName}")) + // assertTrue(syncer.ingestBinlog()) + // assertTrue(syncer.commitTxn()) + // assertTrue(syncer.checkTargetVersion()) + + // res = target_sql """SELECT * FROM ${tableName} WHERE test=${test_num}""" + // assertTrue(res.size() == insert_num) + +} \ No newline at end of file diff --git a/regression-test/suites/ccr_mow_syncer_p0/test_create_table_with_binlog_config.groovy b/regression-test/suites/ccr_mow_syncer_p0/test_create_table_with_binlog_config.groovy new file mode 100644 index 00000000000000..e4610bbe0d0bf0 --- /dev/null +++ b/regression-test/suites/ccr_mow_syncer_p0/test_create_table_with_binlog_config.groovy @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_mow_create_table_with_binlog_config") { + sql "drop database if exists test_table_binlog" + + sql """ + create database test_table_binlog + """ + result = sql "show create database test_table_binlog" + logger.info("${result}") + + // Case 1: database disable binlog, create table with binlog disable + sql """ + CREATE TABLE test_table_binlog.t1 ( k1 INT ) ENGINE = olap UNIQUE KEY(k1) DISTRIBUTED BY HASH(k1) BUCKETS 3 PROPERTIES ( "enable_unique_key_merge_on_write" = "true", "replication_num" = "1", "binlog.enable" = "false" ); + """ + result = sql "show create table test_table_binlog.t1" + logger.info("${result}") + assertTrue(result.toString().containsIgnoreCase('"binlog.enable" = "false"')) + sql """ + drop table if exists test_table_binlog.t1 + """ + + // Case 2: database disable binlog, create table with binlog enable + sql """ + CREATE TABLE test_table_binlog.t1 ( k1 INT ) ENGINE = olap UNIQUE KEY(k1) DISTRIBUTED BY HASH(k1) BUCKETS 3 PROPERTIES ( "enable_unique_key_merge_on_write" = "true", "replication_num" = "1", "binlog.enable" = "true" ); + """ + result = sql "show create table test_table_binlog.t1" + logger.info("${result}") + assertTrue(result.toString().containsIgnoreCase('"binlog.enable" = "true"')) + sql """ + drop table if exists test_table_binlog.t1 + """ + + // Case 3: database enable binlog, create table with binlog disable + sql """ + alter database test_table_binlog set properties ("binlog.enable" = "true") + """ + assertThrows(Exception.class, { + sql """ + CREATE TABLE test_table_binlog.t1 ( k1 INT ) ENGINE = olap UNIQUE KEY(k1) DISTRIBUTED BY HASH(k1) BUCKETS 3 PROPERTIES ( "enable_unique_key_merge_on_write" = "true", "replication_num" = "1", "binlog.enable" = "false" ); + """ + }) + sql """ + drop table if exists test_table_binlog.t1 + """ + + // Case 4: database enable binlog, create table with binlog enable + sql """ + alter database test_table_binlog set properties ("binlog.enable" = "true") + """ + sql """ + CREATE TABLE test_table_binlog.t1 ( k1 INT ) ENGINE = olap UNIQUE KEY(k1) DISTRIBUTED BY HASH(k1) BUCKETS 3 PROPERTIES ( "enable_unique_key_merge_on_write" = "true", "replication_num" = "1", "binlog.enable" = "true" ); + """ + result = sql "show create table test_table_binlog.t1" + logger.info("${result}") + assertTrue(result.toString().containsIgnoreCase('"binlog.enable" = "true"')) + sql """ + drop table if exists test_table_binlog.t1 + """ + + // Case 5: database enable binlog, create table inherit database binlog config + sql """ + CREATE TABLE test_table_binlog.t1 ( k1 INT ) ENGINE = olap UNIQUE KEY(k1) DISTRIBUTED BY HASH(k1) BUCKETS 3 PROPERTIES ( "enable_unique_key_merge_on_write" = "true", "replication_num" = "1" ); + """ + result = sql "show create table test_table_binlog.t1" + logger.info("${result}") + assertTrue(result.toString().containsIgnoreCase('"binlog.enable" = "true"')) + sql """ + drop table if exists test_table_binlog.t1 + """ + + sql "drop database if exists test_table_binlog" +} diff --git a/regression-test/suites/ccr_mow_syncer_p0/test_get_binlog.groovy b/regression-test/suites/ccr_mow_syncer_p0/test_get_binlog.groovy new file mode 100644 index 00000000000000..18a680380c18fb --- /dev/null +++ b/regression-test/suites/ccr_mow_syncer_p0/test_get_binlog.groovy @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_mow_get_binlog_case") { + + def create_table = { TableName -> + sql "DROP TABLE IF EXISTS ${TableName}" + sql """ + CREATE TABLE if NOT EXISTS ${TableName} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "replication_allocation" = "tag.location.default: 1" + ) + """ + } + + def syncer = getSyncer() + def seqTableName = "tbl_get_binlog_case" + def test_num = 0 + def insert_num = 5 + long seq = -1 + create_table.call(seqTableName) + sql """ALTER TABLE ${seqTableName} set ("binlog.enable" = "true")""" + sql """ + INSERT INTO ${seqTableName} VALUES (${test_num}, 0) + """ + assertTrue(syncer.getBinlog("${seqTableName}")) + long firstSeq = syncer.context.seq + + + + + logger.info("=== Test 1: normal case ===") + test_num = 1 + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${seqTableName} VALUES (${test_num}, ${index}) + """ + assertTrue(syncer.getBinlog("${seqTableName}")) + } + + long endSeq = syncer.context.seq + + + + + logger.info("=== Test 2: Abnormal seq case ===") + logger.info("=== Test 2.1: too old seq case ===") + syncer.context.seq = -1 + assertTrue(syncer.context.seq == -1) + assertTrue(syncer.getBinlog("${seqTableName}")) + assertTrue(syncer.context.seq == firstSeq) + + + logger.info("=== Test 2.2: too new seq case ===") + syncer.context.seq = endSeq + 100 + assertTrue((syncer.getBinlog("${seqTableName}")) == false) + + + logger.info("=== Test 2.3: not find table case ===") + assertTrue(syncer.getBinlog("this_is_an_invalid_tbl") == false) + + + logger.info("=== Test 2.4: seq between first and end case ===") + long midSeq = (firstSeq + endSeq) / 2 + syncer.context.seq = midSeq + assertTrue(syncer.getBinlog("${seqTableName}")) + long test5Seq = syncer.context.seq + assertTrue(firstSeq <= test5Seq && test5Seq <= endSeq) + + + + + + logger.info("=== Test 3: Get binlog with different priv user case ===") + logger.info("=== Test 3.1: read only user get binlog case ===") + // TODO: bugfix + // syncer.context.seq = -1 + // readOnlyUser = "read_only_user" + // sql """DROP USER IF EXISTS ${readOnlyUser}""" + // sql """CREATE USER ${readOnlyUser} IDENTIFIED BY '123456'""" + // sql """GRANT ALL ON ${context.config.defaultDb}.* TO ${readOnlyUser}""" + // sql """GRANT SELECT_PRIV ON TEST_${context.dbName}.${seqTableName} TO ${readOnlyUser}""" + // syncer.context.user = "${readOnlyUser}" + // syncer.context.passwd = "123456" + // assertTrue(syncer.getBinlog("${seqTableName}")) + + + logger.info("=== Test 3.2: no priv user get binlog case ===") + syncer.context.seq = -1 + noPrivUser = "no_priv_user" + emptyTable = "tbl_empty_test" + sql "DROP TABLE IF EXISTS ${emptyTable}" + sql """ + CREATE TABLE if NOT EXISTS ${emptyTable} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "replication_allocation" = "tag.location.default: 1" + ) + """ + sql """DROP USER IF EXISTS ${noPrivUser}""" + sql """CREATE USER ${noPrivUser} IDENTIFIED BY '123456'""" + sql """GRANT ALL ON ${context.config.defaultDb}.* TO ${noPrivUser}""" + sql """GRANT ALL ON TEST_${context.dbName}.${emptyTable} TO ${noPrivUser}""" + syncer.context.user = "${noPrivUser}" + syncer.context.passwd = "123456" + assertTrue((syncer.getBinlog("${seqTableName}")) == false) + + + logger.info("=== Test 3.3: Non-existent user set in syncer get binlog case ===") + syncer.context.user = "this_is_an_invalid_user" + syncer.context.passwd = "this_is_an_invalid_user" + assertTrue(syncer.getBinlog("${seqTableName}", false) == false) +} \ No newline at end of file diff --git a/regression-test/suites/ccr_mow_syncer_p0/test_ingest_binlog.groovy b/regression-test/suites/ccr_mow_syncer_p0/test_ingest_binlog.groovy new file mode 100644 index 00000000000000..4b1b273ab810b3 --- /dev/null +++ b/regression-test/suites/ccr_mow_syncer_p0/test_ingest_binlog.groovy @@ -0,0 +1,121 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_mow_ingest_binlog") { + + def syncer = getSyncer() + def tableName = "tbl_ingest_binlog" + def insert_num = 5 + def test_num = 0 + sql "DROP TABLE IF EXISTS ${tableName}" + sql """ + CREATE TABLE if NOT EXISTS ${tableName} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "replication_allocation" = "tag.location.default: 1" + ) + """ + sql """ALTER TABLE ${tableName} set ("binlog.enable" = "true")""" + + target_sql "DROP TABLE IF EXISTS ${tableName}" + target_sql """ + CREATE TABLE if NOT EXISTS ${tableName} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "replication_allocation" = "tag.location.default: 1" + ) + """ + assertTrue(syncer.getTargetMeta("${tableName}")) + + + + + logger.info("=== Test 1: Common ingest binlog case ===") + test_num = 1 + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName} VALUES (${test_num}, ${index}) + """ + assertTrue(syncer.getBinlog("${tableName}")) + assertTrue(syncer.beginTxn("${tableName}")) + assertTrue(syncer.getBackendClients()) + assertTrue(syncer.ingestBinlog()) + assertTrue(syncer.commitTxn()) + assertTrue(syncer.checkTargetVersion()) + syncer.closeBackendClients() + } + + res = target_sql """SELECT * FROM ${tableName} WHERE test=${test_num}""" + assertTrue(res.size() == insert_num) + + + + + logger.info("=== Test 2: Wrong IngestBinlogRequest case ===") + test_num = 2 + sql """ + INSERT INTO ${tableName} VALUES (${test_num}, 0) + """ + assertTrue(syncer.getBinlog("${tableName}")) + assertTrue(syncer.beginTxn("${tableName}")) + assertTrue(syncer.getBackendClients()) + + + logger.info("=== Test 2.1: Wrong txnId case ===") + // TODO: bugfix + // def originTxnId = syncer.context.txnId + // syncer.context.txnId = -1 + // assertTrue(syncer.ingestBinlog() == false) + // syncer.context.txnId = originTxnId + + + logger.info("=== Test 2.2: Wrong binlog version case ===") + // -1 means use the number of syncer.context + // Boolean ingestBinlog(long fakePartitionId = -1, long fakeVersion = -1) + assertTrue(syncer.ingestBinlog(-1, 1) == false) + + + logger.info("=== Test 2.3: Wrong partitionId case ===") + // TODO: bugfix + // assertTrue(syncer.ingestBinlog(1, -1) == false) + + + logger.info("=== Test 2.4: Right case ===") + assertTrue(syncer.ingestBinlog()) + assertTrue(syncer.commitTxn()) + assertTrue(syncer.checkTargetVersion()) + res = target_sql """SELECT * FROM ${tableName} WHERE test=${test_num}""" + assertTrue(res.size() == 1) + + + // End Test 2 + syncer.closeBackendClients() +} \ No newline at end of file diff --git a/regression-test/suites/ccr_mow_syncer_p0/test_multi_buckets.groovy b/regression-test/suites/ccr_mow_syncer_p0/test_multi_buckets.groovy new file mode 100644 index 00000000000000..7f4051552f73ef --- /dev/null +++ b/regression-test/suites/ccr_mow_syncer_p0/test_multi_buckets.groovy @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_mow_multi_buckets") { + + def syncer = getSyncer() + def tableName = "tbl_multi_buckets" + def test_num = 0 + def insert_num = 5 + + sql "DROP TABLE IF EXISTS ${tableName}" + sql """ + CREATE TABLE if NOT EXISTS ${tableName} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + DISTRIBUTED BY HASH(id) BUCKETS 3 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "replication_allocation" = "tag.location.default: 1" + ) + """ + sql """ALTER TABLE ${tableName} set ("binlog.enable" = "true")""" + + target_sql "DROP TABLE IF EXISTS ${tableName}" + target_sql """ + CREATE TABLE if NOT EXISTS ${tableName} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + DISTRIBUTED BY HASH(id) BUCKETS 3 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "replication_allocation" = "tag.location.default: 1" + ) + """ + assertTrue(syncer.getTargetMeta("${tableName}")) + + + + + logger.info("=== Test 1: Blank row set case ===") + test_num = 1 + sql """ + INSERT INTO ${tableName} VALUES (${test_num}, 0) + """ + assertTrue(syncer.getBinlog("${tableName}")) + assertTrue(syncer.beginTxn("${tableName}")) + assertTrue(syncer.getBackendClients()) + assertTrue(syncer.ingestBinlog()) + assertTrue(syncer.commitTxn()) + syncer.closeBackendClients() + assertTrue(syncer.checkTargetVersion()) + def res = target_sql """SELECT * FROM ${tableName} WHERE test=${test_num}""" + assertTrue(res.size() == 1) + + + + + logger.info("=== Test 2: Upsert case ===") + test_num = 2 + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName} VALUES (${test_num}, ${index}) + """ + assertTrue(syncer.getBinlog("${tableName}")) + assertTrue(syncer.beginTxn("${tableName}")) + assertTrue(syncer.getBackendClients()) + assertTrue(syncer.ingestBinlog()) + assertTrue(syncer.commitTxn()) + assertTrue(syncer.checkTargetVersion()) + syncer.closeBackendClients() + } + + res = target_sql """SELECT * FROM ${tableName} WHERE test=${test_num}""" + assertTrue(res.size() == insert_num) + +} diff --git a/regression-test/suites/ccr_mow_syncer_p0/test_txn_case.groovy b/regression-test/suites/ccr_mow_syncer_p0/test_txn_case.groovy new file mode 100644 index 00000000000000..05f44e48086661 --- /dev/null +++ b/regression-test/suites/ccr_mow_syncer_p0/test_txn_case.groovy @@ -0,0 +1,301 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_mow_txn_case") { + + // TODO: bugfix + def fullPriv = ["SELECT_PRIV"/*, "LOAD_PRIV"*/, "ALTER_PRIV", "CREATE_PRIV", "DROP_PRIV"] + def nowPriv = [] + def recursionPriv = { fullPrivList, idx, nowPrivList, num, callback -> + for (; (num - nowPrivList.size() <= fullPrivList.size() - idx) && (nowPrivList.size()) < num; ++idx) { + nowPrivList.push(fullPrivList[idx]) + call(fullPrivList, idx + 1, nowPrivList, num, callback) + nowPrivList.pop() + } + if (nowPrivList.size() == num) { + String privStr = "" + for (int i = 0; i < num; ++i) { + privStr += nowPrivList[i] + if (i < num - 1) { + privStr += ", " + } + } + callback.call(privStr) + } + } + + def syncer = getSyncer() + def txnTableName = "tbl_txn_case" + def test_num = 0 + sql "DROP TABLE IF EXISTS ${txnTableName}" + sql """ + CREATE TABLE if NOT EXISTS ${txnTableName} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "replication_allocation" = "tag.location.default: 1" + ) + """ + sql """ALTER TABLE ${txnTableName} set ("binlog.enable" = "true")""" + + target_sql "DROP TABLE IF EXISTS ${txnTableName}" + target_sql """ + CREATE TABLE if NOT EXISTS ${txnTableName} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "replication_allocation" = "tag.location.default: 1" + ) + """ + assertTrue(syncer.getTargetMeta("${txnTableName}")) + + + + + logger.info("=== Test 1: common txn case ===") + test_num = 1 + sql """ + INSERT INTO ${txnTableName} VALUES (${test_num}, 0) + """ + assertTrue(syncer.getBinlog("${txnTableName}")) + assertTrue(syncer.getBackendClients()) + assertTrue(syncer.beginTxn("${txnTableName}")) + assertTrue(syncer.ingestBinlog()) + assertTrue(syncer.commitTxn()) + assertTrue(syncer.checkTargetVersion()) + def res = target_sql """SELECT * FROM ${txnTableName} WHERE test=${test_num}""" + assertTrue(res.size() == 1) + + + + + logger.info("=== Test 2: Wrong BeginTxnRequest context case ===") + + + logger.info("=== Test 2.1: Begin a txn with non-existent table case ===") + assertTrue(syncer.beginTxn("tbl_non_existent") == false) + + + logger.info("=== Test 2.2: Begin a txn with duplicate labels case ===") + assertTrue(syncer.beginTxn("${txnTableName}") == false) + + + // End Test 2 + syncer.closeBackendClients() + + + + + logger.info("=== Test 3: Begin a txn with different priv user case ===") + test_num = 3 + sql """ + INSERT INTO ${txnTableName} VALUES (${test_num}, 0) + """ + assertTrue(syncer.getBinlog("${txnTableName}")) + + + logger.info("=== Test 3.1: Begin a txn with non-existent user set in syncer case ===") + syncer.context.user = "this_is_an_invalid_user" + syncer.context.passwd = "this_is_an_invalid_user" + assertTrue(syncer.beginTxn("${txnTableName}") == false) + + + logger.info("=== Test 3.2: Begin a txn with no priv user case ===") + def noPrivUser = "no_priv_user" + def emptyTable = "tbl_empty_test" + target_sql "DROP TABLE IF EXISTS ${emptyTable}" + target_sql """ + CREATE TABLE if NOT EXISTS ${emptyTable} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "replication_allocation" = "tag.location.default: 1" + ) + """ + target_sql """DROP USER IF EXISTS ${noPrivUser}""" + target_sql """CREATE USER ${noPrivUser} IDENTIFIED BY '123456'""" + target_sql """GRANT ALL ON ${context.config.defaultDb}.* TO ${noPrivUser}""" + target_sql """GRANT ALL ON TEST_${context.dbName}.${emptyTable} TO ${noPrivUser}""" + syncer.context.user = "${noPrivUser}" + syncer.context.passwd = "123456" + assertTrue(syncer.beginTxn("${txnTableName}") == false) + + // TODO: bugfix + // Recursively selecting privileges, + // if not all privileges are obtained, txn should not be began + logger.info("=== Test 3.3: Begin a txn with low priv user case ===") + def lowPrivUser = "low_priv_user" + target_sql """DROP USER IF EXISTS ${lowPrivUser}""" + target_sql """CREATE USER ${lowPrivUser} IDENTIFIED BY '123456'""" + target_sql """GRANT ALL ON ${context.config.defaultDb}.* TO ${lowPrivUser}""" + syncer.context.user = "${lowPrivUser}" + syncer.context.passwd = "123456" + + def beginTxnCallback = { privStr -> + target_sql """GRANT ${privStr} ON TEST_${context.dbName}.${txnTableName} TO ${lowPrivUser}""" + assertTrue((syncer.beginTxn("${txnTableName}")) == false) + target_sql """REVOKE ${privStr} ON TEST_${context.dbName}.${txnTableName} FROM ${lowPrivUser}""" + } + + for (int i = 1; i <= 4; ++i) { + recursionPriv.call(fullPriv, 0, nowPriv, i, beginTxnCallback) + } + + logger.info("=== Test 3.4: Complete the txn with SHOW_PRIV user case ===") + def showPrivUser = "show_priv_user" + target_sql """DROP USER IF EXISTS ${showPrivUser}""" + target_sql """CREATE USER ${showPrivUser} IDENTIFIED BY '123456'""" + target_sql """GRANT ALL ON ${context.config.defaultDb}.* TO ${showPrivUser}""" + target_sql """ + GRANT + SELECT_PRIV, LOAD_PRIV, ALTER_PRIV, CREATE_PRIV, DROP_PRIV + ON TEST_${context.dbName}.${txnTableName} + TO ${showPrivUser} + """ + syncer.context.user = "${showPrivUser}" + syncer.context.passwd = "123456" + assertTrue(syncer.beginTxn("${txnTableName}")) + assertTrue(syncer.getBackendClients()) + assertTrue(syncer.ingestBinlog()) + assertTrue(syncer.commitTxn()) + assertTrue(syncer.checkTargetVersion()) + res = target_sql """SELECT * FROM ${txnTableName} WHERE test=${test_num}""" + assertTrue(res.size() == 1) + + // End Test 3 + syncer.context.user = context.config.feSyncerUser + syncer.context.passwd = context.config.feSyncerPassword + syncer.closeBackendClients() + + + + + logger.info("=== Test 4: Wrong CommitTxnRequest context case ===") + test_num = 4 + sql """ + INSERT INTO ${txnTableName} VALUES (${test_num}, 0) + """ + assertTrue(syncer.getBinlog("${txnTableName}")) + assertTrue(syncer.getBackendClients()) + assertTrue(syncer.beginTxn("${txnTableName}")) + assertTrue(syncer.ingestBinlog()) + + + logger.info("=== Test 4.1: Wrong txnId case ===") + def originTxnId = syncer.context.txnId + syncer.context.txnId = -1 + assertTrue(syncer.commitTxn() == false) + syncer.context.txnId = originTxnId + + + logger.info("=== Test 4.2: Wrong commit info case ===") + // TODO: bugfix + // def originCommitInfos = syncer.resetCommitInfos() + // syncer.context.addCommitInfo(-1, -1) + // assertTrue(syncer.commitTxn()) == false) + + + logger.info("=== Test 4.3: Empty commit info case ===") + // TODO: bugfix + // assertTrue(syncer.commitTxn() == false) + + + logger.info("=== Test 4.4: duplicate txnId case ===") + // TODO: bugfix + // def lastCommitInfo = syncer.copyCommitInfos() + assertTrue(syncer.commitTxn()) + assertTrue(syncer.checkTargetVersion()) + res = target_sql """SELECT * FROM ${txnTableName} WHERE test=${test_num}""" + assertTrue(res.size() == 1) + // syncer.context.commitInfos = lastCommitInfo + // assertTrue(syncer.commitTxn() == false) + + // End Test 4 + syncer.closeBackendClients() + + + + + logger.info("=== Test 5: User root beginTxn, Other user commitTxn case ===") + test_num = 5 + sql """ + INSERT INTO ${txnTableName} VALUES (${test_num}, 0) + """ + assertTrue(syncer.getBinlog("${txnTableName}")) + assertTrue(syncer.getBackendClients()) + assertTrue(syncer.beginTxn("${txnTableName}")) + assertTrue(syncer.ingestBinlog()) + + + logger.info("=== Test 5.1: Non-existent user commitTxn case ===") + syncer.context.user = "this_is_an_invalid_user" + syncer.context.passwd = "this_is_an_invalid_user" + assertTrue(syncer.commitTxn() == false) + + + logger.info("=== Test 5.2: No priv user commitTxn case ===") + syncer.context.user = "${noPrivUser}" + syncer.context.passwd = "123456" + assertTrue(syncer.commitTxn() == false) + + + logger.info("=== Test 5.3: Low priv user commitTxn case ===") + syncer.context.user = "${lowPrivUser}" + syncer.context.passwd = "123456" + + def commitTxnCallback = { privStr -> + target_sql """GRANT ${privStr} ON TEST_${context.dbName}.${txnTableName} TO ${lowPrivUser}""" + assertTrue(syncer.commitTxn() == false) + target_sql """REVOKE ${privStr} ON TEST_${context.dbName}.${txnTableName} FROM ${lowPrivUser}""" + } + for (int i = 1; i <= 4; ++i) { + recursionPriv.call(fullPriv, 0, nowPriv, i, commitTxnCallback) + } + + + logger.info("=== Test 5.4: SHOW_PRIV user commitTxn case ===") + syncer.context.user = "${showPrivUser}" + syncer.context.passwd = "123456" + assertTrue(syncer.commitTxn()) + assertTrue(syncer.checkTargetVersion()) + res = target_sql """SELECT * FROM ${txnTableName} WHERE test=${test_num}""" + assertTrue(res.size() == 1) + + // End Test 5 + syncer.context.user = context.config.feSyncerUser + syncer.context.passwd = context.config.feSyncerPassword + syncer.closeBackendClients() + +} \ No newline at end of file diff --git a/regression-test/suites/ccr_mow_syncer_p1/test_backup_restore.groovy b/regression-test/suites/ccr_mow_syncer_p1/test_backup_restore.groovy new file mode 100644 index 00000000000000..b0844cded5bfb1 --- /dev/null +++ b/regression-test/suites/ccr_mow_syncer_p1/test_backup_restore.groovy @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_mow_backup_restore") { + + def syncer = getSyncer() + def tableName = "tbl_backup_restore" + def test_num = 0 + def insert_num = 5 + + sql "DROP TABLE IF EXISTS ${tableName}" + sql """ + CREATE TABLE if NOT EXISTS ${tableName} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + + logger.info("=== Test 1: Common backup and restore ===") + test_num = 1 + def snapshotName = "snapshot_test_1" + for (int i = 0; i < insert_num; ++i) { + sql """ + INSERT INTO ${tableName} VALUES (${test_num}, ${i}) + """ + } + def res = sql "SELECT * FROM ${tableName}" + assertTrue(res.size() == insert_num) + sql """ + BACKUP SNAPSHOT ${context.dbName}.${snapshotName} + TO `__keep_on_local__` + ON (${tableName}) + PROPERTIES ("type" = "full") + """ + while (syncer.checkSnapshotFinish() == false) { + Thread.sleep(3000) + } + assertTrue(syncer.getSnapshot("${snapshotName}", "${tableName}")) + assertTrue(syncer.restoreSnapshot(true)) + while (syncer.checkRestoreFinish() == false) { + Thread.sleep(3000) + } + res = target_sql "SELECT * FROM ${tableName}" + assertTrue(res.size() == insert_num) +} \ No newline at end of file