Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Refactor](SchemaCache) remove redundant Schema cache #40350

Merged
merged 1 commit into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 1 addition & 13 deletions be/src/olap/rowset/beta_rowset_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,19 +133,7 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
}
}
VLOG_NOTICE << "read columns size: " << read_columns.size();
std::string schema_key = SchemaCache::get_schema_key(
_read_options.tablet_id, _read_context->tablet_schema, read_columns,
_read_context->tablet_schema->schema_version(), SchemaCache::Type::SCHEMA);
// It is necessary to ensure that there is a schema version when using a cache
// because the absence of a schema version can result in reading a stale version
// of the schema after a schema change.
if (_read_context->tablet_schema->schema_version() < 0 ||
(_input_schema = SchemaCache::instance()->get_schema<SchemaSPtr>(schema_key)) == nullptr) {
_input_schema =
std::make_shared<Schema>(_read_context->tablet_schema->columns(), read_columns);
SchemaCache::instance()->insert_schema(schema_key, _input_schema);
}

_input_schema = std::make_shared<Schema>(_read_context->tablet_schema->columns(), read_columns);
if (_read_context->predicates != nullptr) {
_read_options.column_predicates.insert(_read_options.column_predicates.end(),
_read_context->predicates->begin(),
Expand Down
21 changes: 2 additions & 19 deletions be/src/olap/schema_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,26 +37,9 @@ namespace doris {

SchemaCache* SchemaCache::_s_instance = nullptr;

// format: tabletId-unique_id1-uniqueid2...-version-type
std::string SchemaCache::get_schema_key(int64_t tablet_id, const TabletSchemaSPtr& schema,
const std::vector<uint32_t>& column_ids, int32_t version,
Type type) {
if (column_ids.empty() || schema->column(column_ids[0]).unique_id() < 0) {
return "";
}
std::string key = fmt::format("{}-", tablet_id);
std::for_each(column_ids.begin(), column_ids.end(), [&](const ColumnId& cid) {
uint32_t col_unique_id = schema->column(cid).unique_id();
key.append(fmt::format("{}", col_unique_id));
key.append("-");
});
key.append(fmt::format("{}-{}", version, type));
return key;
}

// format: tabletId-unique_id1-uniqueid2...-version-type
std::string SchemaCache::get_schema_key(int64_t tablet_id, const std::vector<TColumn>& columns,
int32_t version, Type type) {
int32_t version) {
if (columns.empty() || columns[0].col_unique_id < 0) {
return "";
}
Expand All @@ -65,7 +48,7 @@ std::string SchemaCache::get_schema_key(int64_t tablet_id, const std::vector<TCo
key.append(fmt::format("{}", col.col_unique_id));
key.append("-");
});
key.append(fmt::format("{}-{}", version, type));
key.append(fmt::format("{}", version));
return key;
}

Expand Down
32 changes: 5 additions & 27 deletions be/src/olap/schema_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,15 @@ using SegmentIteratorUPtr = std::unique_ptr<SegmentIterator>;
// with high concurrency, where queries are executed simultaneously.
class SchemaCache : public LRUCachePolicy {
public:
enum class Type { TABLET_SCHEMA = 0, SCHEMA = 1 };

static SchemaCache* instance() { return _s_instance; }

static void create_global_instance(size_t capacity);

// get cache schema key, delimiter with SCHEMA_DELIMITER
static std::string get_schema_key(int64_t tablet_id, const TabletSchemaSPtr& schema,
const std::vector<uint32_t>& column_ids, int32_t version,
Type type);
static std::string get_schema_key(int64_t tablet_id, const std::vector<TColumn>& columns,
int32_t version, Type type);
int32_t version);

// Get a shared cached schema from cache, schema_key is a subset of column unique ids
template <typename SchemaType>
SchemaType get_schema(const std::string& schema_key) {
TabletSchemaSPtr get_schema(const std::string& schema_key) {
if (!_s_instance || schema_key.empty()) {
return {};
}
Expand All @@ -71,31 +64,19 @@ class SchemaCache : public LRUCachePolicy {
auto value = (CacheValue*)_cache->value(lru_handle);
value->last_visit_time = UnixMillis();
VLOG_DEBUG << "use cache schema";
if constexpr (std::is_same_v<SchemaType, TabletSchemaSPtr>) {
return value->tablet_schema;
}
if constexpr (std::is_same_v<SchemaType, SchemaSPtr>) {
return value->schema;
}
return value->tablet_schema;
}
return {};
}

// Insert a shared Schema into cache, schema_key is full column unique ids
template <typename SchemaType>
void insert_schema(const std::string& key, SchemaType schema) {
void insert_schema(const std::string& key, TabletSchemaSPtr schema) {
if (!_s_instance || key.empty()) {
return;
}
CacheValue* value = new CacheValue;
value->last_visit_time = UnixMillis();
if constexpr (std::is_same_v<SchemaType, TabletSchemaSPtr>) {
value->type = Type::TABLET_SCHEMA;
value->tablet_schema = schema;
} else if constexpr (std::is_same_v<SchemaType, SchemaSPtr>) {
value->type = Type::SCHEMA;
value->schema = schema;
}
value->tablet_schema = schema;
auto deleter = [](const doris::CacheKey& key, void* value) {
CacheValue* cache_value = (CacheValue*)value;
delete cache_value;
Expand All @@ -109,10 +90,7 @@ class SchemaCache : public LRUCachePolicy {
Status prune();

struct CacheValue : public LRUCacheValueBase {
Type type;
// either tablet_schema or schema
TabletSchemaSPtr tablet_schema = nullptr;
SchemaSPtr schema = nullptr;
};

private:
Expand Down
5 changes: 2 additions & 3 deletions be/src/vec/exec/scan/new_olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,8 @@ Status NewOlapScanner::init() {
!olap_scan_node.columns_desc.empty() &&
olap_scan_node.columns_desc[0].col_unique_id >= 0) {
schema_key = SchemaCache::get_schema_key(tablet_id, olap_scan_node.columns_desc,
olap_scan_node.schema_version,
SchemaCache::Type::TABLET_SCHEMA);
cached_schema = SchemaCache::instance()->get_schema<TabletSchemaSPtr>(schema_key);
olap_scan_node.schema_version);
cached_schema = SchemaCache::instance()->get_schema(schema_key);
}
if (cached_schema) {
_tablet_schema = cached_schema;
Expand Down
Loading