Skip to content

Commit

Permalink
[Bug](coredump) fix regresstion test coredump in multi thread access …
Browse files Browse the repository at this point in the history
…map (#31664)
  • Loading branch information
HappenLee authored Mar 2, 2024
1 parent 65c2dd8 commit 7d4e855
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 36 deletions.
44 changes: 26 additions & 18 deletions be/src/olap/comparison_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -578,24 +578,26 @@ class ComparisonPredicateBase : public ColumnPredicate {

__attribute__((flatten)) int32_t _find_code_from_dictionary_column(
const vectorized::ColumnDictI32& column) const {
if (!_segment_id_to_cached_code.contains(column.get_rowset_segment_id())) {
int32_t code = _is_range() ? column.find_code_by_bound(_value, _is_greater(), _is_eq())
: column.find_code(_value);

// Sometimes the dict is not initialized when run comparison predicate here, for example,
// the full page is null, then the reader will skip read, so that the dictionary is not
// inited. The cached code is wrong during this case, because the following page maybe not
// null, and the dict should have items in the future.
//
// Cached code may have problems, so that add a config here, if not opened, then
// we will return the code and not cache it.
if (column.is_dict_empty() || !config::enable_low_cardinality_cache_code) {
return code;
}
// If the dict is not empty, then the dict is inited and we could cache the value.
_segment_id_to_cached_code[column.get_rowset_segment_id()] = code;
int32_t code = 0;
if (_segment_id_to_cached_code.if_contains(
column.get_rowset_segment_id(),
[&code](const auto& pair) { code = pair.second; })) {
return code;
}
code = _is_range() ? column.find_code_by_bound(_value, _is_greater(), _is_eq())
: column.find_code(_value);
// Sometimes the dict is not initialized when run comparison predicate here, for example,
// the full page is null, then the reader will skip read, so that the dictionary is not
// inited. The cached code is wrong during this case, because the following page maybe not
// null, and the dict should have items in the future.
//
// Cached code may have problems, so that add a config here, if not opened, then
// we will return the code and not cache it.
if (!column.is_dict_empty() && config::enable_low_cardinality_cache_code) {
_segment_id_to_cached_code.emplace(std::pair {column.get_rowset_segment_id(), code});
}
return _segment_id_to_cached_code[column.get_rowset_segment_id()];

return code;
}

std::string _debug_string() const override {
Expand All @@ -604,7 +606,13 @@ class ComparisonPredicateBase : public ColumnPredicate {
return info;
}

mutable std::map<std::pair<RowsetId, uint32_t>, int32_t> _segment_id_to_cached_code;
mutable phmap::parallel_flat_hash_map<
std::pair<RowsetId, uint32_t>, int32_t,
phmap::priv::hash_default_hash<std::pair<RowsetId, uint32_t>>,
phmap::priv::hash_default_eq<std::pair<RowsetId, uint32_t>>,
std::allocator<std::pair<const std::pair<RowsetId, uint32_t>, int32_t>>, 4,
std::shared_mutex>
_segment_id_to_cached_code;
T _value;
};

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/data_dir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ void DataDir::_perform_path_gc_by_rowset(const std::vector<std::string>& tablet_
};

// rowset_id -> is_garbage
std::unordered_map<RowsetId, bool, HashOfRowsetId> checked_rowsets;
std::unordered_map<RowsetId, bool> checked_rowsets;
for (auto&& [rowset_id, filename] : rowsets_not_pending) {
if (auto it = checked_rowsets.find(rowset_id); it != checked_rowsets.end()) {
if (it->second) { // Is checked garbage rowset
Expand Down
28 changes: 16 additions & 12 deletions be/src/olap/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -458,18 +458,7 @@ struct RowsetId {
}
};

// used for hash-struct of hash_map<RowsetId, Rowset*>.
struct HashOfRowsetId {
size_t operator()(const RowsetId& rowset_id) const {
size_t seed = 0;
seed = HashUtil::hash64(&rowset_id.hi, sizeof(rowset_id.hi), seed);
seed = HashUtil::hash64(&rowset_id.mi, sizeof(rowset_id.mi), seed);
seed = HashUtil::hash64(&rowset_id.lo, sizeof(rowset_id.lo), seed);
return seed;
}
};

using RowsetIdUnorderedSet = std::unordered_set<RowsetId, HashOfRowsetId>;
using RowsetIdUnorderedSet = std::unordered_set<RowsetId>;

// Extract rowset id from filename, return uninitialized rowset id if filename is invalid
inline RowsetId extract_rowset_id(std::string_view filename) {
Expand Down Expand Up @@ -517,3 +506,18 @@ struct RidAndPos {
using PartialUpdateReadPlan = std::map<RowsetId, std::map<uint32_t, std::vector<RidAndPos>>>;

} // namespace doris

// This intended to be a "good" hash function. It may change from time to time.
template <>
struct std::hash<doris::RowsetId> {
size_t operator()(const doris::RowsetId& rowset_id) const {
size_t seed = 0;
seed = doris::HashUtil::xxHash64WithSeed((const char*)&rowset_id.hi, sizeof(rowset_id.hi),
seed);
seed = doris::HashUtil::xxHash64WithSeed((const char*)&rowset_id.mi, sizeof(rowset_id.mi),
seed);
seed = doris::HashUtil::xxHash64WithSeed((const char*)&rowset_id.lo, sizeof(rowset_id.lo),
seed);
return seed;
}
};
2 changes: 1 addition & 1 deletion be/src/olap/snapshot_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ Result<std::vector<PendingRowsetGuard>> SnapshotManager::convert_rowset_ids(
tablet_schema->init_from_pb(new_tablet_meta_pb.schema());

std::unordered_map<Version, RowsetMetaPB*, HashOfVersion> rs_version_map;
std::unordered_map<RowsetId, RowsetId, HashOfRowsetId> rowset_id_mapping;
std::unordered_map<RowsetId, RowsetId> rowset_id_mapping;
guards.reserve(cloned_tablet_meta_pb.rs_metas_size() +
cloned_tablet_meta_pb.stale_rs_metas_size());
for (auto&& visible_rowset : cloned_tablet_meta_pb.rs_metas()) {
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class BaseStorageEngine {

// Hold reference of quering rowsets
std::mutex _quering_rowsets_mutex;
std::unordered_map<RowsetId, RowsetSharedPtr, HashOfRowsetId> _querying_rowsets;
std::unordered_map<RowsetId, RowsetSharedPtr> _querying_rowsets;
scoped_refptr<Thread> _evict_quering_rowset_thread;
};

Expand Down Expand Up @@ -409,7 +409,7 @@ class StorageEngine final : public BaseStorageEngine {
std::atomic_bool _stopped {false};

std::mutex _gc_mutex;
std::unordered_map<RowsetId, RowsetSharedPtr, HashOfRowsetId> _unused_rowsets;
std::unordered_map<RowsetId, RowsetSharedPtr> _unused_rowsets;
PendingRowsetSet _pending_local_rowsets;
PendingRowsetSet _pending_remote_rowsets;

Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2432,10 +2432,10 @@ Status Tablet::check_rowid_conversion(
return Status::OK();
}
std::vector<segment_v2::SegmentSharedPtr> dst_segments;

RETURN_IF_ERROR(
std::dynamic_pointer_cast<BetaRowset>(dst_rowset)->load_segments(&dst_segments));
std::unordered_map<RowsetId, std::vector<segment_v2::SegmentSharedPtr>, HashOfRowsetId>
input_rowsets_segment;
std::unordered_map<RowsetId, std::vector<segment_v2::SegmentSharedPtr>> input_rowsets_segment;

VLOG_DEBUG << "check_rowid_conversion, dst_segments size: " << dst_segments.size();
for (auto [src_rowset, locations] : location_map) {
Expand Down

0 comments on commit 7d4e855

Please sign in to comment.