Skip to content

Commit

Permalink
7
Browse files Browse the repository at this point in the history
  • Loading branch information
Yukang-Lian committed Oct 10, 2024
1 parent a0eca9c commit dd6223a
Show file tree
Hide file tree
Showing 6 changed files with 274 additions and 49 deletions.
136 changes: 121 additions & 15 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
#include "olap/olap_common.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/rowset_fwd.h"
#include "olap/storage_engine.h"
#include "olap/tablet_meta.h"
#include "runtime/client_cache.h"
Expand Down Expand Up @@ -751,22 +752,127 @@ Status CloudMetaMgr::commit_rowset(const RowsetMeta& rs_meta,
TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::commit_rowset", ret_st);
}
if (config::enable_table_size_correctness_check) {
if (rs_meta.data_disk_size() + rs_meta.index_disk_size() != rs_meta.total_disk_size()) {
LOG(FATAL) << "[Cloud table size check failed]:"
<< " tablet id: " << rs_meta.tablet_id()
<< ", rowset id:" << rs_meta.rowset_id()
<< ", rowset data disk size:" << rs_meta.data_disk_size()
<< ", rowset index disk size:" << rs_meta.index_disk_size()
<< ", rowset total disk size:" << rs_meta.total_disk_size() << ".";
const auto fs = const_cast<RowsetMeta&>(rs_meta).fs();
if (!fs) {
LOG(WARNING) << "get fs failed, resource_id={}" << rs_meta.resource_id();
}
if (rs_meta.index_disk_size() >
config::max_table_index_data_ratio * rs_meta.data_disk_size()) {
LOG(FATAL) << "[Cloud table size check failed]:"
<< " tablet id: " << rs_meta.tablet_id()
<< ", rowset id:" << rs_meta.rowset_id()
<< ", rowset data disk size:" << rs_meta.data_disk_size()
<< ", rowset index disk size:" << rs_meta.index_disk_size()
<< ", rowset total disk size:" << rs_meta.total_disk_size() << ".";
int64_t total_segment_size = 0;
int64_t total_inverted_index_size = 0;
for (int64_t seg_id = 0; seg_id < rs_meta.num_segments(); seg_id++) {
std::string segment_path = StorageResource().remote_segment_path(
rs_meta.tablet_id(), rs_meta.rowset_id().to_string(), seg_id);
int64_t segment_file_size = 0;
auto st = fs->file_size(segment_path, &segment_file_size);
if (!st.ok()) {
segment_file_size = 0;
LOG(WARNING) << "table size correctness check get segment size failed! msg:"
<< st.msg() << ", segment path:" << segment_path;
}
total_segment_size += segment_file_size;
}

if (rs_meta.tablet_schema()->get_inverted_index_storage_format() ==
InvertedIndexStorageFormatPB::V1) {
auto indices = rs_meta.tablet_schema()->indexes();
for (auto& index : indices) {
// only get file_size for inverted index
if (index.index_type() != IndexType::INVERTED) {
continue;
}
for (int seg_id = 0; seg_id < rs_meta.num_segments(); ++seg_id) {
std::string segment_path = StorageResource().remote_segment_path(
rs_meta.tablet_id(), rs_meta.rowset_id().to_string(), seg_id);
int64_t file_size = 0;

std::string inverted_index_file_path =
InvertedIndexDescriptor::get_index_file_path_v1(
InvertedIndexDescriptor::get_index_file_path_prefix(
segment_path),
index.index_id(), index.get_index_suffix());
auto st = fs->file_size(inverted_index_file_path, &file_size);
if (!st.ok()) {
file_size = 0;
LOG(WARNING)
<< "table size correctness check get inverted index v1 "
"size failed! msg:"
<< st.msg() << ", inverted index path:" << inverted_index_file_path;
}
total_inverted_index_size += file_size;
}
}
} else {
for (int seg_id = 0; seg_id < rs_meta.num_segments(); ++seg_id) {
int64_t file_size = 0;
std::string segment_path = StorageResource().remote_segment_path(
rs_meta.tablet_id(), rs_meta.rowset_id().to_string(), seg_id);

std::string inverted_index_file_path =
InvertedIndexDescriptor::get_index_file_path_v2(
InvertedIndexDescriptor::get_index_file_path_prefix(segment_path));
auto st = fs->file_size(inverted_index_file_path, &file_size);
if (!st.ok()) {
file_size = 0;
LOG(WARNING) << "table size correctness check get inverted index v2 size "
"failed! msg:"
<< st.msg()
<< ", inverted index path:" << inverted_index_file_path;
}
total_inverted_index_size += file_size;
}
}
LOG(INFO) << "[Cloud table segment size check info]:"
<< " tablet id: " << rs_meta.tablet_id() << ", rowset id:" << rs_meta.rowset_id()
<< ", rowset data disk size:" << rs_meta.data_disk_size()
<< ", rowset real data disk size:" << total_segment_size
<< ", rowset index disk size:" << rs_meta.index_disk_size()
<< ", rowset real index disk size:" << total_inverted_index_size
<< ", rowset total disk size:" << rs_meta.total_disk_size()
<< ", rowset segment path:"
<< StorageResource().remote_segment_path(rs_meta.tablet_id(),
rs_meta.rowset_id().to_string(), 0)
<< ".";
if (rs_meta.data_disk_size() != total_segment_size) {
LOG(WARNING) << "[Cloud table segment size check failed]:"
<< " tablet id: " << rs_meta.tablet_id()
<< ", rowset id:" << rs_meta.rowset_id()
<< ", rowset data disk size:" << rs_meta.data_disk_size()
<< ", rowset real data disk size:" << total_segment_size
<< ", rowset index disk size:" << rs_meta.index_disk_size()
<< ", rowset real index disk size:" << total_inverted_index_size
<< ", rowset total disk size:" << rs_meta.total_disk_size()
<< ", rowset segment path:"
<< StorageResource().remote_segment_path(
rs_meta.tablet_id(), rs_meta.rowset_id().to_string(), 0)
<< ".";
DCHECK(false);
}
if (rs_meta.index_disk_size() != total_inverted_index_size) {
LOG(WARNING) << "[Cloud table index size check failed]:"
<< " tablet id: " << rs_meta.tablet_id()
<< ", rowset id:" << rs_meta.rowset_id()
<< ", rowset data disk size:" << rs_meta.data_disk_size()
<< ", rowset real data disk size:" << total_segment_size
<< ", rowset index disk size:" << rs_meta.index_disk_size()
<< ", rowset real index disk size:" << total_inverted_index_size
<< ", rowset total disk size:" << rs_meta.total_disk_size()
<< ", rowset segment path:"
<< StorageResource().remote_segment_path(
rs_meta.tablet_id(), rs_meta.rowset_id().to_string(), 0)
<< ".";
DCHECK(false);
}
if (rs_meta.data_disk_size() + rs_meta.index_disk_size() != rs_meta.total_disk_size()) {
LOG(WARNING) << "[Cloud table size check failed]:"
<< " tablet id: " << rs_meta.tablet_id()
<< ", rowset id:" << rs_meta.rowset_id()
<< ", rowset data disk size:" << rs_meta.data_disk_size()
<< ", rowset index disk size:" << rs_meta.index_disk_size()
<< ", rowset total disk size:" << rs_meta.total_disk_size()
<< ", rowset segment path:"
<< StorageResource().remote_segment_path(
rs_meta.tablet_id(), rs_meta.rowset_id().to_string(), 0)
<< ".";
DCHECK(false);
}
}
CreateRowsetRequest req;
Expand Down
6 changes: 3 additions & 3 deletions be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ Status BaseBetaRowsetWriter::add_rowset(RowsetSharedPtr rowset) {
assert(rowset->rowset_meta()->rowset_type() == BETA_ROWSET);
RETURN_IF_ERROR(rowset->link_files_to(_context.tablet_path, _context.rowset_id));
_num_rows_written += rowset->num_rows();
_total_data_size += rowset->rowset_meta()->total_disk_size();
_total_data_size += rowset->rowset_meta()->data_disk_size();
_total_index_size += rowset->rowset_meta()->index_disk_size();
_num_segment += rowset->num_segments();
// append key_bounds to current rowset
Expand Down Expand Up @@ -1009,8 +1009,8 @@ Status BetaRowsetWriter::flush_segment_writer_for_segcompaction(

SegmentStatistics segstat;
segstat.row_num = row_num;
segstat.data_size = segment_size + (*writer)->get_inverted_index_total_size();
segstat.index_size = index_size + (*writer)->get_inverted_index_total_size();
segstat.data_size = segment_size;
segstat.index_size = (*writer)->get_inverted_index_total_size();
segstat.key_bounds = key_bounds;
{
std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
Expand Down
37 changes: 25 additions & 12 deletions be/src/olap/rowset/segment_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,9 @@ Status SegmentFlusher::_flush_segment_writer(
if (row_num == 0) {
return Status::OK();
}
uint64_t segment_size;
uint64_t index_size;
Status s = writer->finalize(&segment_size, &index_size);
uint64_t segment_file_size;
uint64_t common_index_size;
Status s = writer->finalize(&segment_file_size, &common_index_size);
if (!s.ok()) {
return Status::Error(s.code(), "failed to finalize segment: {}", s.to_string());
}
Expand All @@ -245,17 +245,26 @@ Status SegmentFlusher::_flush_segment_writer(
uint32_t segment_id = writer->segment_id();
SegmentStatistics segstat;
segstat.row_num = row_num;
segstat.data_size = segment_size + writer->get_inverted_index_total_size();
segstat.index_size = index_size + writer->get_inverted_index_total_size();
// Attention: Data size = segment file size(.dat file size, which includes
// common index like zone map index but not include inverted index because
// inverted index has its own file).
// Index size = inverted index file size(.idx file size, which only includes
// inverted index.)
segstat.data_size = segment_file_size;
segstat.index_size = writer->get_inverted_index_total_size();
segstat.key_bounds = key_bounds;
LOG(INFO) << "tablet_id:" << _context.tablet_id
<< ", flushing rowset_dir: " << _context.tablet_path
<< ", rowset_id:" << _context.rowset_id << ", data size:" << segstat.data_size
<< ", index size:" << segstat.index_size;

_idx_files_info.add_file_info(segment_id, writer->get_inverted_index_file_info());
writer.reset();

RETURN_IF_ERROR(_context.segment_collector->add(segment_id, segstat, flush_schema));

if (flush_size) {
*flush_size = segment_size + index_size;
*flush_size = segment_file_size;
}
return Status::OK();
}
Expand All @@ -271,9 +280,9 @@ Status SegmentFlusher::_flush_segment_writer(std::unique_ptr<segment_v2::Segment
if (row_num == 0) {
return Status::OK();
}
uint64_t segment_size;
uint64_t index_size;
Status s = writer->finalize(&segment_size, &index_size);
uint64_t segment_file_size;
uint64_t common_index_size;
Status s = writer->finalize(&segment_file_size, &common_index_size);
if (!s.ok()) {
return Status::Error(s.code(), "failed to finalize segment: {}", s.to_string());
}
Expand All @@ -291,17 +300,21 @@ Status SegmentFlusher::_flush_segment_writer(std::unique_ptr<segment_v2::Segment
uint32_t segment_id = writer->get_segment_id();
SegmentStatistics segstat;
segstat.row_num = row_num;
segstat.data_size = segment_size + writer->get_inverted_index_total_size();
segstat.index_size = index_size + writer->get_inverted_index_total_size();
segstat.data_size = segment_file_size;
segstat.index_size = writer->get_inverted_index_total_size();
segstat.key_bounds = key_bounds;
LOG(INFO) << "tablet_id:" << _context.tablet_id
<< ", flushing rowset_dir: " << _context.tablet_path
<< ", rowset_id:" << _context.rowset_id << ", data size:" << segstat.data_size
<< ", index size:" << segstat.index_size;

_idx_files_info.add_file_info(segment_id, writer->get_inverted_index_file_info());
writer.reset();

RETURN_IF_ERROR(_context.segment_collector->add(segment_id, segstat, flush_schema));

if (flush_size) {
*flush_size = segment_size + index_size;
*flush_size = segment_file_size;
}
return Status::OK();
}
Expand Down
Loading

0 comments on commit dd6223a

Please sign in to comment.