From 7e52a1ae0a122d4792dedf02d0a685f5230e6306 Mon Sep 17 00:00:00 2001 From: zhannngchen <48427519+zhannngchen@users.noreply.github.com> Date: Wed, 21 Aug 2024 19:07:25 +0800 Subject: [PATCH] [fix](ut) repair segcompaction ut (#38165) (#38225) (#39153) cherry-pick #38165 and #22928 --- be/src/olap/rowset/beta_rowset_writer.cpp | 1 + be/src/olap/rowset/segcompaction.cpp | 8 +- be/src/olap/rowset/segcompaction.h | 6 +- be/test/CMakeLists.txt | 1 - be/test/olap/segcompaction_test.cpp | 492 ++++++++++------------ 5 files changed, 242 insertions(+), 266 deletions(-) diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 3abb05b08b0e30..57e45b6f83b20b 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -458,6 +458,7 @@ Status BetaRowsetWriter::add_rowset_for_linked_schema_change(RowsetSharedPtr row Status BetaRowsetWriter::flush() { if (_segment_writer != nullptr) { RETURN_IF_ERROR(_flush_segment_writer(&_segment_writer)); + RETURN_IF_ERROR(_segcompaction_if_necessary()); } return Status::OK(); } diff --git a/be/src/olap/rowset/segcompaction.cpp b/be/src/olap/rowset/segcompaction.cpp index 15d8c3a243888f..598a78326edb56 100644 --- a/be/src/olap/rowset/segcompaction.cpp +++ b/be/src/olap/rowset/segcompaction.cpp @@ -66,6 +66,8 @@ namespace doris { using namespace ErrorCode; +SegcompactionWorker::SegcompactionWorker(BetaRowsetWriter* writer) : _writer(writer) {} + Status SegcompactionWorker::_get_segcompaction_reader( SegCompactionCandidatesSharedPtr segments, TabletSharedPtr tablet, std::shared_ptr schema, OlapReaderStatistics* stat, @@ -151,8 +153,8 @@ Status SegcompactionWorker::_delete_original_segments(uint32_t begin, uint32_t e } Status SegcompactionWorker::_check_correctness(OlapReaderStatistics& reader_stat, - Merger::Statistics& merger_stat, uint64_t begin, - uint64_t end) { + Merger::Statistics& merger_stat, uint32_t begin, + uint32_t end) { uint64_t raw_rows_read = reader_stat.raw_rows_read; /* total rows read before merge */ uint64_t sum_src_row = 0; /* sum of rows in each involved source segments */ uint64_t filtered_rows = merger_stat.filtered_rows; /* rows filtered by del conditions */ @@ -192,7 +194,7 @@ Status SegcompactionWorker::_check_correctness(OlapReaderStatistics& reader_stat } Status SegcompactionWorker::_create_segment_writer_for_segcompaction( - std::unique_ptr* writer, uint64_t begin, uint64_t end) { + std::unique_ptr* writer, uint32_t begin, uint32_t end) { return _writer->_do_create_segment_writer(writer, true, begin, end); } diff --git a/be/src/olap/rowset/segcompaction.h b/be/src/olap/rowset/segcompaction.h index e9484c317a157c..273fbdec560623 100644 --- a/be/src/olap/rowset/segcompaction.h +++ b/be/src/olap/rowset/segcompaction.h @@ -48,7 +48,7 @@ class SegcompactionWorker { friend class BetaRowsetWriter; public: - SegcompactionWorker(BetaRowsetWriter* writer) { _writer = writer; } + SegcompactionWorker(BetaRowsetWriter* writer); void compact_segments(SegCompactionCandidatesSharedPtr segments); @@ -59,7 +59,7 @@ class SegcompactionWorker { private: Status _create_segment_writer_for_segcompaction( - std::unique_ptr* writer, uint64_t begin, uint64_t end); + std::unique_ptr* writer, uint32_t begin, uint32_t end); Status _get_segcompaction_reader(SegCompactionCandidatesSharedPtr segments, TabletSharedPtr tablet, std::shared_ptr schema, OlapReaderStatistics* stat, @@ -70,7 +70,7 @@ class SegcompactionWorker { uint64_t end); Status _delete_original_segments(uint32_t begin, uint32_t end); Status _check_correctness(OlapReaderStatistics& reader_stat, Merger::Statistics& merger_stat, - uint64_t begin, uint64_t end); + uint32_t begin, uint32_t end); Status _do_compact_segments(SegCompactionCandidatesSharedPtr segments); private: diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt index 3189da07918340..5c5f69621222ef 100644 --- a/be/test/CMakeLists.txt +++ b/be/test/CMakeLists.txt @@ -46,7 +46,6 @@ list(REMOVE_ITEM UT_FILES ${CMAKE_CURRENT_SOURCE_DIR}/olap/rowset/segment_v2/frame_of_reference_page_test.cpp ${CMAKE_CURRENT_SOURCE_DIR}/olap/rowset/segment_v2/plain_page_test.cpp ${CMAKE_CURRENT_SOURCE_DIR}/olap/rowset/segment_v2/rle_page_test.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/olap/segcompaction_test.cpp ${CMAKE_CURRENT_SOURCE_DIR}/runtime/decimal_value_test.cpp ${CMAKE_CURRENT_SOURCE_DIR}/runtime/result_buffer_mgr_test.cpp ${CMAKE_CURRENT_SOURCE_DIR}/util/decompress_test.cpp diff --git a/be/test/olap/segcompaction_test.cpp b/be/test/olap/segcompaction_test.cpp index e8f216045a2ab8..dcf7258ae9f148 100644 --- a/be/test/olap/segcompaction_test.cpp +++ b/be/test/olap/segcompaction_test.cpp @@ -23,7 +23,6 @@ #include #include "common/config.h" -#include "env/env_posix.h" #include "gen_cpp/AgentService_types.h" #include "gen_cpp/olap_file.pb.h" #include "io/fs/local_file_system.h" @@ -52,9 +51,7 @@ static const std::string lTestDir = "./data_test/data/segcompaction_test"; class SegCompactionTest : public testing::Test { public: - SegCompactionTest() : _data_dir(std::make_unique(lTestDir)) { - _data_dir->update_capacity(); - } + SegCompactionTest() = default; void SetUp() { config::enable_segcompaction = true; @@ -80,17 +77,24 @@ class SegCompactionTest : public testing::Test { ExecEnv* exec_env = doris::ExecEnv::GetInstance(); exec_env->set_storage_engine(l_engine); + _data_dir = new DataDir(lTestDir, 1000000000); + static_cast(_data_dir->init()); + static_cast(_data_dir->update_capacity()); EXPECT_TRUE(io::global_local_filesystem()->create_directory(lTestDir).ok()); - l_engine->start_bg_threads(); + s = l_engine->start_bg_threads(); + EXPECT_TRUE(s.ok()) << s.to_string(); } void TearDown() { + SAFE_DELETE(_data_dir); + EXPECT_TRUE(io::global_local_filesystem()->delete_directory(lTestDir).ok()); if (l_engine != nullptr) { l_engine->stop(); delete l_engine; l_engine = nullptr; + ExecEnv::GetInstance()->set_storage_engine(nullptr); } config::enable_segcompaction = false; } @@ -126,7 +130,8 @@ class SegCompactionTest : public testing::Test { } // (k1 int, k2 varchar(20), k3 int) keys (k1, k2) - void create_tablet_schema(TabletSchemaSPtr tablet_schema, KeysType keystype) { + void create_tablet_schema(TabletSchemaSPtr tablet_schema, KeysType keystype, + int num_value_col = 1) { TabletSchemaPB tablet_schema_pb; tablet_schema_pb.set_keys_type(keystype); tablet_schema_pb.set_num_short_key_columns(2); @@ -156,15 +161,18 @@ class SegCompactionTest : public testing::Test { column_2->set_is_nullable(true); column_2->set_is_bf_column(false); - ColumnPB* column_3 = tablet_schema_pb.add_column(); - column_3->set_unique_id(3); - column_3->set_name("v1"); - column_3->set_type("INT"); - column_3->set_length(4); - column_3->set_is_key(false); - column_3->set_is_nullable(false); - column_3->set_is_bf_column(false); - column_3->set_aggregation("SUM"); + for (int i = 1; i <= num_value_col; i++) { + ColumnPB* v_column = tablet_schema_pb.add_column(); + v_column->set_unique_id(2 + i); + v_column->set_name(fmt::format("v{}", i)); + v_column->set_type("INT"); + v_column->set_length(4); + v_column->set_is_key(false); + v_column->set_is_nullable(false); + v_column->set_is_bf_column(false); + v_column->set_default_value(std::to_string(i * 10)); + v_column->set_aggregation("SUM"); + } tablet_schema->init_from_pb(tablet_schema_pb); } @@ -196,15 +204,12 @@ class SegCompactionTest : public testing::Test { l_engine->create_tablet(req, &profile); rowset_writer_context->tablet = l_engine->tablet_manager()->get_tablet(TTabletId tablet_id); #endif - std::shared_ptr data_dir = std::make_shared(lTestDir); TabletMetaSharedPtr tablet_meta = std::make_shared(); tablet_meta->_tablet_id = 1; + static_cast(tablet_meta->set_partition_id(10000)); tablet_meta->_schema = tablet_schema; - auto tablet = std::make_shared(tablet_meta, data_dir.get(), "test_str"); - char* tmp_str = (char*)malloc(20); - strncpy(tmp_str, "test_tablet_name", 20); - - tablet->_full_name = tmp_str; + auto tablet = std::make_shared(tablet_meta, _data_dir, "test_str"); + tablet->init(); // tablet->key rowset_writer_context->tablet = tablet; } @@ -220,7 +225,7 @@ class SegCompactionTest : public testing::Test { } private: - std::unique_ptr _data_dir; + DataDir* _data_dir = nullptr; }; TEST_F(SegCompactionTest, SegCompactionThenRead) { @@ -242,27 +247,25 @@ TEST_F(SegCompactionTest, SegCompactionThenRead) { std::unique_ptr rowset_writer; s = RowsetFactory::create_rowset_writer(writer_context, false, &rowset_writer); - EXPECT_EQ(Status::OK(), s); - - RowCursor input_row; - input_row.init(tablet_schema); + EXPECT_TRUE(s.ok()); // for segment "i", row "rid" // k1 := rid*10 + i // k2 := k1 * 10 // k3 := rid for (int i = 0; i < num_segments; ++i) { - vectorized::Arena arena; + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); for (int rid = 0; rid < rows_per_segment; ++rid) { uint32_t k1 = rid * 100 + i; uint32_t k2 = i; uint32_t k3 = rid; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); s = rowset_writer->flush(); EXPECT_EQ(Status::OK(), s); sleep(1); @@ -284,7 +287,7 @@ TEST_F(SegCompactionTest, SegCompactionThenRead) { RowsetReaderContext reader_context; reader_context.tablet_schema = tablet_schema; // use this type to avoid cache from other ut - reader_context.reader_type = READER_CUMULATIVE_COMPACTION; + reader_context.reader_type = ReaderType::READER_CUMULATIVE_COMPACTION; reader_context.need_ordered_result = true; std::vector return_columns = {0, 1, 2}; reader_context.return_columns = &return_columns; @@ -324,6 +327,7 @@ TEST_F(SegCompactionTest, SegCompactionThenRead) { } EXPECT_EQ(Status::Error(""), s); EXPECT_EQ(rowset->rowset_meta()->num_rows(), num_rows_read); + EXPECT_EQ(num_rows_read, num_segments * rows_per_segment); EXPECT_TRUE(rowset_reader->get_segment_num_rows(&segment_num_rows).ok()); size_t total_num_rows = 0; for (const auto& i : segment_num_rows) { @@ -350,10 +354,7 @@ TEST_F(SegCompactionTest, SegCompactionInterleaveWithBig_ooooOOoOooooooooO) { std::unique_ptr rowset_writer; s = RowsetFactory::create_rowset_writer(writer_context, false, &rowset_writer); - EXPECT_EQ(Status::OK(), s); - - RowCursor input_row; - input_row.init(tablet_schema); + EXPECT_TRUE(s.ok()); // for segment "i", row "rid" // k1 := rid*10 + i @@ -362,85 +363,90 @@ TEST_F(SegCompactionTest, SegCompactionInterleaveWithBig_ooooOOoOooooooooO) { int num_segments = 4; uint32_t rows_per_segment = 4096; for (int i = 0; i < num_segments; ++i) { - vectorized::Arena arena; + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); for (int rid = 0; rid < rows_per_segment; ++rid) { uint32_t k1 = rid * 100 + i; uint32_t k2 = i; uint32_t k3 = rid; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); s = rowset_writer->flush(); EXPECT_EQ(Status::OK(), s); } num_segments = 2; rows_per_segment = 6400; for (int i = 0; i < num_segments; ++i) { - vectorized::Arena arena; + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); for (int rid = 0; rid < rows_per_segment; ++rid) { uint32_t k1 = rid * 100 + i; uint32_t k2 = i; uint32_t k3 = rid; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); s = rowset_writer->flush(); EXPECT_EQ(Status::OK(), s); } num_segments = 1; rows_per_segment = 4096; for (int i = 0; i < num_segments; ++i) { - vectorized::Arena arena; + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); for (int rid = 0; rid < rows_per_segment; ++rid) { uint32_t k1 = rid * 100 + i; uint32_t k2 = i; uint32_t k3 = rid; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); s = rowset_writer->flush(); EXPECT_EQ(Status::OK(), s); } num_segments = 1; rows_per_segment = 6400; for (int i = 0; i < num_segments; ++i) { - vectorized::Arena arena; + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); for (int rid = 0; rid < rows_per_segment; ++rid) { uint32_t k1 = rid * 100 + i; uint32_t k2 = i; uint32_t k3 = rid; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); s = rowset_writer->flush(); EXPECT_EQ(Status::OK(), s); } num_segments = 8; rows_per_segment = 4096; for (int i = 0; i < num_segments; ++i) { - vectorized::Arena arena; + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); for (int rid = 0; rid < rows_per_segment; ++rid) { uint32_t k1 = rid * 100 + i; uint32_t k2 = i; uint32_t k3 = rid; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); s = rowset_writer->flush(); EXPECT_EQ(Status::OK(), s); sleep(1); @@ -448,17 +454,18 @@ TEST_F(SegCompactionTest, SegCompactionInterleaveWithBig_ooooOOoOooooooooO) { num_segments = 1; rows_per_segment = 6400; for (int i = 0; i < num_segments; ++i) { - vectorized::Arena arena; + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); for (int rid = 0; rid < rows_per_segment; ++rid) { uint32_t k1 = rid * 100 + i; uint32_t k2 = i; uint32_t k3 = rid; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); s = rowset_writer->flush(); EXPECT_EQ(Status::OK(), s); sleep(1); @@ -494,10 +501,7 @@ TEST_F(SegCompactionTest, SegCompactionInterleaveWithBig_OoOoO) { std::unique_ptr rowset_writer; s = RowsetFactory::create_rowset_writer(writer_context, false, &rowset_writer); - EXPECT_EQ(Status::OK(), s); - - RowCursor input_row; - input_row.init(tablet_schema); + EXPECT_TRUE(s.ok()); // for segment "i", row "rid" // k1 := rid*10 + i @@ -506,85 +510,90 @@ TEST_F(SegCompactionTest, SegCompactionInterleaveWithBig_OoOoO) { int num_segments = 1; uint32_t rows_per_segment = 6400; for (int i = 0; i < num_segments; ++i) { - vectorized::Arena arena; + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); for (int rid = 0; rid < rows_per_segment; ++rid) { uint32_t k1 = rid * 100 + i; uint32_t k2 = i; uint32_t k3 = rid; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); s = rowset_writer->flush(); EXPECT_EQ(Status::OK(), s); } num_segments = 1; rows_per_segment = 4096; for (int i = 0; i < num_segments; ++i) { - vectorized::Arena arena; + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); for (int rid = 0; rid < rows_per_segment; ++rid) { uint32_t k1 = rid * 100 + i; uint32_t k2 = i; uint32_t k3 = rid; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); s = rowset_writer->flush(); EXPECT_EQ(Status::OK(), s); } num_segments = 1; rows_per_segment = 6400; for (int i = 0; i < num_segments; ++i) { - vectorized::Arena arena; + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); for (int rid = 0; rid < rows_per_segment; ++rid) { uint32_t k1 = rid * 100 + i; uint32_t k2 = i; uint32_t k3 = rid; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); s = rowset_writer->flush(); EXPECT_EQ(Status::OK(), s); } num_segments = 1; rows_per_segment = 4096; for (int i = 0; i < num_segments; ++i) { - vectorized::Arena arena; + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); for (int rid = 0; rid < rows_per_segment; ++rid) { uint32_t k1 = rid * 100 + i; uint32_t k2 = i; uint32_t k3 = rid; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); s = rowset_writer->flush(); EXPECT_EQ(Status::OK(), s); } num_segments = 1; rows_per_segment = 6400; for (int i = 0; i < num_segments; ++i) { - vectorized::Arena arena; + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); for (int rid = 0; rid < rows_per_segment; ++rid) { uint32_t k1 = rid * 100 + i; uint32_t k2 = i; uint32_t k3 = rid; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); s = rowset_writer->flush(); EXPECT_EQ(Status::OK(), s); sleep(1); @@ -618,69 +627,59 @@ TEST_F(SegCompactionTest, SegCompactionThenReadUniqueTableSmall) { std::unique_ptr rowset_writer; s = RowsetFactory::create_rowset_writer(writer_context, false, &rowset_writer); - EXPECT_EQ(Status::OK(), s); + EXPECT_TRUE(s.ok()); - RowCursor input_row; - input_row.init(tablet_schema); - - vectorized::Arena arena; uint32_t k1 = 0; uint32_t k2 = 0; uint32_t k3 = 0; + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); // segment#0 k1 = k2 = 1; k3 = 1; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); k1 = k2 = 4; k3 = 1; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); k1 = k2 = 6; k3 = 1; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); s = rowset_writer->flush(); EXPECT_EQ(Status::OK(), s); sleep(1); // segment#1 k1 = k2 = 2; k3 = 1; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); k1 = k2 = 4; k3 = 2; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); k1 = k2 = 6; k3 = 2; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); s = rowset_writer->flush(); EXPECT_EQ(Status::OK(), s); sleep(1); @@ -688,28 +687,24 @@ TEST_F(SegCompactionTest, SegCompactionThenReadUniqueTableSmall) { // segment#2 k1 = k2 = 3; k3 = 1; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); k1 = k2 = 6; k3 = 3; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); k1 = k2 = 9; k3 = 1; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); s = rowset_writer->flush(); EXPECT_EQ(Status::OK(), s); sleep(1); @@ -717,28 +712,24 @@ TEST_F(SegCompactionTest, SegCompactionThenReadUniqueTableSmall) { // segment#3 k1 = k2 = 4; k3 = 3; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); k1 = k2 = 9; k3 = 2; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); k1 = k2 = 12; k3 = 1; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); s = rowset_writer->flush(); EXPECT_EQ(Status::OK(), s); sleep(1); @@ -746,12 +737,12 @@ TEST_F(SegCompactionTest, SegCompactionThenReadUniqueTableSmall) { // segment#4 k1 = k2 = 25; k3 = 1; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); s = rowset_writer->flush(); EXPECT_EQ(Status::OK(), s); sleep(1); @@ -759,12 +750,12 @@ TEST_F(SegCompactionTest, SegCompactionThenReadUniqueTableSmall) { // segment#5 k1 = k2 = 26; k3 = 1; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); s = rowset_writer->flush(); EXPECT_EQ(Status::OK(), s); sleep(1); @@ -782,7 +773,7 @@ TEST_F(SegCompactionTest, SegCompactionThenReadUniqueTableSmall) { RowsetReaderContext reader_context; reader_context.tablet_schema = tablet_schema; // use this type to avoid cache from other ut - reader_context.reader_type = READER_CUMULATIVE_COMPACTION; + reader_context.reader_type = ReaderType::READER_CUMULATIVE_COMPACTION; reader_context.need_ordered_result = true; std::vector return_columns = {0, 1, 2}; reader_context.return_columns = &return_columns; @@ -852,69 +843,60 @@ TEST_F(SegCompactionTest, SegCompactionThenReadAggTableSmall) { std::unique_ptr rowset_writer; s = RowsetFactory::create_rowset_writer(writer_context, false, &rowset_writer); - EXPECT_EQ(Status::OK(), s); - - RowCursor input_row; - input_row.init(tablet_schema); + EXPECT_TRUE(s.ok()); - vectorized::Arena arena; uint32_t k1 = 0; uint32_t k2 = 0; uint32_t k3 = 0; + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + // segment#0 k1 = k2 = 1; k3 = 1; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); k1 = k2 = 4; k3 = 1; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); k1 = k2 = 6; k3 = 1; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); s = rowset_writer->flush(); EXPECT_EQ(Status::OK(), s); sleep(1); // segment#1 k1 = k2 = 2; k3 = 1; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); k1 = k2 = 4; k3 = 2; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); k1 = k2 = 6; k3 = 2; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); s = rowset_writer->flush(); EXPECT_EQ(Status::OK(), s); sleep(1); @@ -922,28 +904,24 @@ TEST_F(SegCompactionTest, SegCompactionThenReadAggTableSmall) { // segment#2 k1 = k2 = 3; k3 = 1; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); k1 = k2 = 6; k3 = 3; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); k1 = k2 = 9; k3 = 1; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); s = rowset_writer->flush(); EXPECT_EQ(Status::OK(), s); sleep(1); @@ -951,28 +929,24 @@ TEST_F(SegCompactionTest, SegCompactionThenReadAggTableSmall) { // segment#3 k1 = k2 = 4; k3 = 3; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); k1 = k2 = 9; k3 = 2; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); k1 = k2 = 12; k3 = 1; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); s = rowset_writer->flush(); EXPECT_EQ(Status::OK(), s); sleep(1); @@ -980,12 +954,12 @@ TEST_F(SegCompactionTest, SegCompactionThenReadAggTableSmall) { // segment#4 k1 = k2 = 25; k3 = 1; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); s = rowset_writer->flush(); EXPECT_EQ(Status::OK(), s); sleep(1); @@ -993,12 +967,12 @@ TEST_F(SegCompactionTest, SegCompactionThenReadAggTableSmall) { // segment#5 k1 = k2 = 26; k3 = 1; - input_row.set_field_content(0, reinterpret_cast(&k1), &arena); - input_row.set_field_content(1, reinterpret_cast(&k2), &arena); - input_row.set_field_content(2, reinterpret_cast(&k3), &arena); - s = rowset_writer->add_row(input_row); - EXPECT_EQ(Status::OK(), s); + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); s = rowset_writer->flush(); EXPECT_EQ(Status::OK(), s); sleep(1); @@ -1016,7 +990,7 @@ TEST_F(SegCompactionTest, SegCompactionThenReadAggTableSmall) { RowsetReaderContext reader_context; reader_context.tablet_schema = tablet_schema; // use this type to avoid cache from other ut - reader_context.reader_type = READER_CUMULATIVE_COMPACTION; + reader_context.reader_type = ReaderType::READER_CUMULATIVE_COMPACTION; reader_context.need_ordered_result = true; std::vector return_columns = {0, 1, 2}; reader_context.return_columns = &return_columns;