Skip to content

Commit

Permalink
[Feature](CCR) Support MoW for CCR (#22798)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yukang-Lian authored Sep 2, 2023
1 parent 6b56896 commit eedd243
Show file tree
Hide file tree
Showing 8 changed files with 973 additions and 2 deletions.
49 changes: 47 additions & 2 deletions be/src/service/backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@
#include "http/http_client.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "olap/tablet_meta.h"
#include "olap/txn_manager.h"
#include "runtime/exec_env.h"
#include "runtime/external_scan_context_mgr.h"
Expand Down Expand Up @@ -632,7 +634,7 @@ void BackendService::ingest_binlog(TIngestBinlogResult& result,
}
}

// Step 6: create rowset && commit
// Step 6: create rowset && calculate delete bitmap && commit
// Step 6.1: create rowset
RowsetSharedPtr rowset;
status = RowsetFactory::create_rowset(local_tablet->tablet_schema(),
Expand All @@ -648,7 +650,44 @@ void BackendService::ingest_binlog(TIngestBinlogResult& result,
return;
}

// Step 6.2: commit txn
// Step 6.2 calculate delete bitmap before commit
auto calc_delete_bitmap_token =
StorageEngine::instance()->calc_delete_bitmap_executor()->create_token();
DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(local_tablet_id);
RowsetIdUnorderedSet pre_rowset_ids;
if (local_tablet->enable_unique_key_merge_on_write()) {
auto beta_rowset = reinterpret_cast<BetaRowset*>(rowset.get());
std::vector<segment_v2::SegmentSharedPtr> segments;
status = beta_rowset->load_segments(&segments);
if (!status) {
LOG(WARNING) << "failed to load segments from rowset"
<< ". rowset_id: " << beta_rowset->rowset_id() << ", txn_id=" << txn_id
<< ", status=" << status.to_string();
status.to_thrift(&tstatus);
return;
}
if (segments.size() > 1) {
// calculate delete bitmap between segments
status = local_tablet->calc_delete_bitmap_between_segments(rowset, segments,
delete_bitmap);
if (!status) {
LOG(WARNING) << "failed to calculate delete bitmap"
<< ". tablet_id: " << local_tablet->tablet_id()
<< ". rowset_id: " << rowset->rowset_id() << ", txn_id=" << txn_id
<< ", status=" << status.to_string();
status.to_thrift(&tstatus);
return;
}
}

local_tablet->commit_phase_update_delete_bitmap(rowset, pre_rowset_ids, delete_bitmap,
segments, txn_id,
calc_delete_bitmap_token.get(), nullptr);
calc_delete_bitmap_token->wait();
calc_delete_bitmap_token->get_delete_bitmap(delete_bitmap);
}

// Step 6.3: commit txn
Status commit_txn_status = StorageEngine::instance()->txn_manager()->commit_txn(
local_tablet->data_dir()->get_meta(), rowset_meta->partition_id(),
rowset_meta->txn_id(), rowset_meta->tablet_id(), local_tablet->tablet_uid(),
Expand All @@ -664,6 +703,12 @@ void BackendService::ingest_binlog(TIngestBinlogResult& result,
return;
}

if (local_tablet->enable_unique_key_merge_on_write()) {
StorageEngine::instance()->txn_manager()->set_txn_related_delete_bitmap(
partition_id, txn_id, local_tablet_id, local_tablet->schema_hash(),
local_tablet->tablet_uid(), true, delete_bitmap, pre_rowset_ids);
}

tstatus.__set_status_code(TStatusCode::OK);
}
} // namespace doris
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_mow_binlog_config_change") {

def syncer = getSyncer()
def tableName = "tbl_binlog_config_change"
def test_num = 0
def insert_num = 5

sql "DROP TABLE IF EXISTS ${tableName}"
sql """
CREATE TABLE if NOT EXISTS ${tableName}
(
`test` INT,
`id` INT
)
ENGINE=OLAP
UNIQUE KEY(`test`, `id`)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
"enable_unique_key_merge_on_write" = "true",
"replication_allocation" = "tag.location.default: 1"
)
"""
sql """ALTER TABLE ${tableName} set ("binlog.enable" = "true")"""

target_sql "DROP TABLE IF EXISTS ${tableName}"
target_sql """
CREATE TABLE if NOT EXISTS ${tableName}
(
`test` INT,
`id` INT
)
ENGINE=OLAP
UNIQUE KEY(`test`, `id`)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
"enable_unique_key_merge_on_write" = "true",
"replication_allocation" = "tag.location.default: 1"
)
"""
assertTrue(syncer.getTargetMeta("${tableName}"))

// test 1: target cluster follow source cluster
logger.info("=== Test 1: Target cluster follow source cluster case ===")
test_num = 1
for (int index = 0; index < insert_num; index++) {
sql """
INSERT INTO ${tableName} VALUES (${test_num}, ${index})
"""
assertTrue(syncer.getBinlog("${tableName}"))
assertTrue(syncer.beginTxn("${tableName}"))
assertTrue(syncer.getBackendClients())
assertTrue(syncer.ingestBinlog())
assertTrue(syncer.commitTxn())
assertTrue(syncer.checkTargetVersion())
syncer.closeBackendClients()
}

def res = target_sql """SELECT * FROM ${tableName} WHERE test=${test_num}"""
assertTrue(res.size() == insert_num)

// TODO: bugfix
// test 2: source cluster disable and re-enable binlog
// target_sql "DROP TABLE IF EXISTS ${tableName}"
// target_sql """
// CREATE TABLE if NOT EXISTS ${tableName}
// (
// `test` INT,
// `id` INT
// )
// ENGINE=OLAP
// UNIQUE KEY(`test`, `id`)
// DISTRIBUTED BY HASH(id) BUCKETS 1
// PROPERTIES (
// "replication_allocation" = "tag.location.default: 1"
// )
// """
// sql """ALTER TABLE ${tableName} set ("binlog.enable" = "false")"""
// sql """ALTER TABLE ${tableName} set ("binlog.enable" = "true")"""

// syncer.context.seq = -1

// assertTrue(syncer.getBinlog("${tableName}"))
// assertTrue(syncer.beginTxn("${tableName}"))
// assertTrue(syncer.ingestBinlog())
// assertTrue(syncer.commitTxn())
// assertTrue(syncer.checkTargetVersion())

// res = target_sql """SELECT * FROM ${tableName} WHERE test=${test_num}"""
// assertTrue(res.size() == insert_num)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_mow_create_table_with_binlog_config") {
sql "drop database if exists test_table_binlog"

sql """
create database test_table_binlog
"""
result = sql "show create database test_table_binlog"
logger.info("${result}")

// Case 1: database disable binlog, create table with binlog disable
sql """
CREATE TABLE test_table_binlog.t1 ( k1 INT ) ENGINE = olap UNIQUE KEY(k1) DISTRIBUTED BY HASH(k1) BUCKETS 3 PROPERTIES ( "enable_unique_key_merge_on_write" = "true", "replication_num" = "1", "binlog.enable" = "false" );
"""
result = sql "show create table test_table_binlog.t1"
logger.info("${result}")
assertTrue(result.toString().containsIgnoreCase('"binlog.enable" = "false"'))
sql """
drop table if exists test_table_binlog.t1
"""

// Case 2: database disable binlog, create table with binlog enable
sql """
CREATE TABLE test_table_binlog.t1 ( k1 INT ) ENGINE = olap UNIQUE KEY(k1) DISTRIBUTED BY HASH(k1) BUCKETS 3 PROPERTIES ( "enable_unique_key_merge_on_write" = "true", "replication_num" = "1", "binlog.enable" = "true" );
"""
result = sql "show create table test_table_binlog.t1"
logger.info("${result}")
assertTrue(result.toString().containsIgnoreCase('"binlog.enable" = "true"'))
sql """
drop table if exists test_table_binlog.t1
"""

// Case 3: database enable binlog, create table with binlog disable
sql """
alter database test_table_binlog set properties ("binlog.enable" = "true")
"""
assertThrows(Exception.class, {
sql """
CREATE TABLE test_table_binlog.t1 ( k1 INT ) ENGINE = olap UNIQUE KEY(k1) DISTRIBUTED BY HASH(k1) BUCKETS 3 PROPERTIES ( "enable_unique_key_merge_on_write" = "true", "replication_num" = "1", "binlog.enable" = "false" );
"""
})
sql """
drop table if exists test_table_binlog.t1
"""

// Case 4: database enable binlog, create table with binlog enable
sql """
alter database test_table_binlog set properties ("binlog.enable" = "true")
"""
sql """
CREATE TABLE test_table_binlog.t1 ( k1 INT ) ENGINE = olap UNIQUE KEY(k1) DISTRIBUTED BY HASH(k1) BUCKETS 3 PROPERTIES ( "enable_unique_key_merge_on_write" = "true", "replication_num" = "1", "binlog.enable" = "true" );
"""
result = sql "show create table test_table_binlog.t1"
logger.info("${result}")
assertTrue(result.toString().containsIgnoreCase('"binlog.enable" = "true"'))
sql """
drop table if exists test_table_binlog.t1
"""

// Case 5: database enable binlog, create table inherit database binlog config
sql """
CREATE TABLE test_table_binlog.t1 ( k1 INT ) ENGINE = olap UNIQUE KEY(k1) DISTRIBUTED BY HASH(k1) BUCKETS 3 PROPERTIES ( "enable_unique_key_merge_on_write" = "true", "replication_num" = "1" );
"""
result = sql "show create table test_table_binlog.t1"
logger.info("${result}")
assertTrue(result.toString().containsIgnoreCase('"binlog.enable" = "true"'))
sql """
drop table if exists test_table_binlog.t1
"""

sql "drop database if exists test_table_binlog"
}
Loading

0 comments on commit eedd243

Please sign in to comment.