Skip to content

Commit

Permalink
[Refactor](SchemaCache) remove redundant Schema cache
Browse files Browse the repository at this point in the history
We already introduced TabletSchema cache and it is enough at present, and Schema cache may introduce inconsistency bettween Schema and TabletSchema
  • Loading branch information
eldenmoon committed Aug 29, 2024
1 parent 06e095c commit 40935f9
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 69 deletions.
16 changes: 1 addition & 15 deletions be/src/olap/rowset/beta_rowset_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,21 +134,7 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
}
}
VLOG_NOTICE << "read columns size: " << read_columns.size();
std::string schema_key = SchemaCache::get_schema_key(
_read_options.tablet_id, _read_context->tablet_schema, read_columns,
_read_context->tablet_schema->schema_version(), SchemaCache::Type::SCHEMA);
// It is necessary to ensure that there is a schema version when using a cache
// because the absence of a schema version can result in reading a stale version
// of the schema after a schema change.
// For table contains variants, it's schema is unstable and variable so we could not use schema cache here
if (_read_context->tablet_schema->schema_version() < 0 ||
_read_context->tablet_schema->num_variant_columns() > 0 ||
(_input_schema = SchemaCache::instance()->get_schema<SchemaSPtr>(schema_key)) == nullptr) {
_input_schema =
std::make_shared<Schema>(_read_context->tablet_schema->columns(), read_columns);
SchemaCache::instance()->insert_schema(schema_key, _input_schema);
}

_input_schema = std::make_shared<Schema>(_read_context->tablet_schema->columns(), read_columns);
if (_read_context->predicates != nullptr) {
_read_options.column_predicates.insert(_read_options.column_predicates.end(),
_read_context->predicates->begin(),
Expand Down
21 changes: 2 additions & 19 deletions be/src/olap/schema_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,9 @@ SchemaCache* SchemaCache::instance() {
return ExecEnv::GetInstance()->schema_cache();
}

// format: tabletId-unique_id1-uniqueid2...-version-type
std::string SchemaCache::get_schema_key(int64_t tablet_id, const TabletSchemaSPtr& schema,
const std::vector<uint32_t>& column_ids, int32_t version,
Type type) {
if (column_ids.empty() || schema->column(column_ids[0]).unique_id() < 0) {
return "";
}
std::string key = fmt::format("{}-", tablet_id);
std::for_each(column_ids.begin(), column_ids.end(), [&](const ColumnId& cid) {
uint32_t col_unique_id = schema->column(cid).unique_id();
key.append(fmt::format("{}", col_unique_id));
key.append("-");
});
key.append(fmt::format("{}-{}", version, type));
return key;
}

// format: tabletId-unique_id1-uniqueid2...-version-type
std::string SchemaCache::get_schema_key(int64_t tablet_id, const std::vector<TColumn>& columns,
int32_t version, Type type) {
int32_t version) {
if (columns.empty() || columns[0].col_unique_id < 0) {
return "";
}
Expand All @@ -67,7 +50,7 @@ std::string SchemaCache::get_schema_key(int64_t tablet_id, const std::vector<TCo
key.append(fmt::format("{}", col.col_unique_id));
key.append("-");
});
key.append(fmt::format("{}-{}", version, type));
key.append(fmt::format("{}", version));
return key;
}

Expand Down
37 changes: 6 additions & 31 deletions be/src/olap/schema_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,15 @@ using SegmentIteratorUPtr = std::unique_ptr<SegmentIterator>;
// with high concurrency, where queries are executed simultaneously.
class SchemaCache : public LRUCachePolicyTrackingManual {
public:
enum class Type { TABLET_SCHEMA = 0, SCHEMA = 1 };

static SchemaCache* instance();

static void create_global_instance(size_t capacity);

// get cache schema key, delimiter with SCHEMA_DELIMITER
static std::string get_schema_key(int64_t tablet_id, const TabletSchemaSPtr& schema,
const std::vector<uint32_t>& column_ids, int32_t version,
Type type);
static std::string get_schema_key(int64_t tablet_id, const std::vector<TColumn>& columns,
int32_t version, Type type);
int32_t version);

// Get a shared cached schema from cache, schema_key is a subset of column unique ids
template <typename SchemaType>
SchemaType get_schema(const std::string& schema_key) {
TabletSchemaSPtr get_schema(const std::string& schema_key) {
if (!instance() || schema_key.empty()) {
return {};
}
Expand All @@ -70,44 +63,26 @@ class SchemaCache : public LRUCachePolicyTrackingManual {
Defer release([cache = this, lru_handle] { cache->release(lru_handle); });
auto* value = (CacheValue*)LRUCachePolicy::value(lru_handle);
VLOG_DEBUG << "use cache schema";
if constexpr (std::is_same_v<SchemaType, TabletSchemaSPtr>) {
return value->tablet_schema;
}
if constexpr (std::is_same_v<SchemaType, SchemaSPtr>) {
return value->schema;
}
return value->tablet_schema;
}
return {};
}

// Insert a shared Schema into cache, schema_key is full column unique ids
template <typename SchemaType>
void insert_schema(const std::string& key, SchemaType schema) {
void insert_schema(const std::string& key, TabletSchemaSPtr schema) {
if (!instance() || key.empty()) {
return;
}
auto* value = new CacheValue;
if constexpr (std::is_same_v<SchemaType, TabletSchemaSPtr>) {
value->type = Type::TABLET_SCHEMA;
value->tablet_schema = schema;
} else if constexpr (std::is_same_v<SchemaType, SchemaSPtr>) {
value->type = Type::SCHEMA;
value->schema = schema;
}
value->tablet_schema = schema;

auto lru_handle = insert(key, value, 1, schema->mem_size(), CachePriority::NORMAL);
auto* lru_handle = insert(key, value, 1, schema->mem_size(), CachePriority::NORMAL);
release(lru_handle);
}

// Try to prune the cache if expired.
Status prune();

class CacheValue : public LRUCacheValueBase {
public:
Type type;
// either tablet_schema or schema
TabletSchemaSPtr tablet_schema = nullptr;
SchemaSPtr schema = nullptr;
};

SchemaCache(size_t capacity)
Expand Down
8 changes: 4 additions & 4 deletions be/src/vec/exec/scan/new_olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,10 @@ Status NewOlapScanner::init() {
!olap_scan_node.columns_desc.empty() &&
olap_scan_node.columns_desc[0].col_unique_id >= 0 &&
tablet->tablet_schema()->num_variant_columns() == 0) {
schema_key = SchemaCache::get_schema_key(
tablet->tablet_id(), olap_scan_node.columns_desc, olap_scan_node.schema_version,
SchemaCache::Type::TABLET_SCHEMA);
cached_schema = SchemaCache::instance()->get_schema<TabletSchemaSPtr>(schema_key);
schema_key =
SchemaCache::get_schema_key(tablet->tablet_id(), olap_scan_node.columns_desc,
olap_scan_node.schema_version);
cached_schema = SchemaCache::instance()->get_schema(schema_key);
}
if (cached_schema) {
tablet_schema = cached_schema;
Expand Down

0 comments on commit 40935f9

Please sign in to comment.