Skip to content

Commit

Permalink
refactor( hyperloglog): remove LatestSnapshot and GetOptions
Browse files Browse the repository at this point in the history
  • Loading branch information
PokIsemaine committed Aug 4, 2024
1 parent d0b501d commit 6999b1c
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 112 deletions.
11 changes: 7 additions & 4 deletions src/commands/cmd_hll.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ class CommandPfAdd final : public Commander {
hashes[i - 2] = redis::HyperLogLog::HllHash(args_[i]);
}
uint64_t ret{};
auto s = hll.Add(args_[1], hashes, &ret);
engine::Context ctx(srv->storage);
auto s = hll.Add(ctx, args_[1], hashes, &ret);
if (!s.ok() && !s.IsNotFound()) {
return {Status::RedisExecErr, s.ToString()};
}
Expand All @@ -61,11 +62,12 @@ class CommandPfCount final : public Commander {
rocksdb::Status s;
// The first argument is the command name, so we need to skip it.
DCHECK_GE(args_.size(), 2);
engine::Context ctx(srv->storage);
if (args_.size() > 2) {
std::vector<Slice> keys(args_.begin() + 1, args_.end());
s = hll.CountMultiple(keys, &ret);
s = hll.CountMultiple(ctx, keys, &ret);
} else {
s = hll.Count(args_[1], &ret);
s = hll.Count(ctx, args_[1], &ret);
}
if (!s.ok() && !s.IsNotFound()) {
return {Status::RedisExecErr, s.ToString()};
Expand All @@ -86,7 +88,8 @@ class CommandPfMerge final : public Commander {
redis::HyperLogLog hll(srv->storage, conn->GetNamespace());
DCHECK_GT(args_.size(), 1);
std::vector<Slice> src_user_keys(args_.begin() + 2, args_.end());
auto s = hll.Merge(/*dest_user_key=*/args_[1], src_user_keys);
engine::Context ctx(srv->storage);
auto s = hll.Merge(ctx, /*dest_user_key=*/args_[1], src_user_keys);
if (!s.ok() && !s.IsNotFound()) {
return {Status::RedisExecErr, s.ToString()};
}
Expand Down
9 changes: 0 additions & 9 deletions src/storage/redis_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,6 @@ class Database {
public:
static constexpr uint64_t RANDOM_KEY_SCAN_LIMIT = 60;

struct GetOptions {
// If snapshot is not nullptr, read from the specified snapshot,
// otherwise read from the "latest" snapshot.
const rocksdb::Snapshot *snapshot = nullptr;

GetOptions() = default;
explicit GetOptions(const rocksdb::Snapshot *ss) : snapshot(ss) {}
};

explicit Database(engine::Storage *storage, std::string ns = "");
/// Parsing metadata with type of `types` from bytes, the metadata is a base class of all metadata.
/// When parsing, the bytes will be consumed.
Expand Down
83 changes: 35 additions & 48 deletions src/types/redis_hyperloglog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,8 @@ class HllSegmentCache {
}
};

rocksdb::Status HyperLogLog::GetMetadata(Database::GetOptions get_options, const Slice &ns_key,
HyperLogLogMetadata *metadata) {
return Database::GetMetadata(get_options, {kRedisHyperLogLog}, ns_key, metadata);
rocksdb::Status HyperLogLog::GetMetadata(engine::Context &ctx, const Slice &ns_key, HyperLogLogMetadata *metadata) {
return Database::GetMetadata(ctx, {kRedisHyperLogLog}, ns_key, metadata);
}

uint64_t HyperLogLog::HllHash(std::string_view element) {
Expand All @@ -108,13 +107,14 @@ uint64_t HyperLogLog::HllHash(std::string_view element) {
}

/* the max 0 pattern counter of the subset the element belongs to is incremented if needed */
rocksdb::Status HyperLogLog::Add(const Slice &user_key, const std::vector<uint64_t> &element_hashes, uint64_t *ret) {
rocksdb::Status HyperLogLog::Add(engine::Context &ctx, const Slice &user_key,
const std::vector<uint64_t> &element_hashes, uint64_t *ret) {
*ret = 0;
std::string ns_key = AppendNamespacePrefix(user_key);

LockGuard guard(storage_->GetLockManager(), ns_key);
HyperLogLogMetadata metadata{};
rocksdb::Status s = GetMetadata(GetOptions(), ns_key, &metadata);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) {
return s;
}
Expand All @@ -131,11 +131,11 @@ rocksdb::Status HyperLogLog::Add(const Slice &user_key, const std::vector<uint64
HllSegmentCache::SegmentEntry *entry{nullptr};
s = cache.Get(
segment_index,
[this, &ns_key, &metadata](uint32_t segment_index, std::string *segment) -> rocksdb::Status {
[this, &ns_key, &metadata, &ctx](uint32_t segment_index, std::string *segment) -> rocksdb::Status {
std::string sub_key =
InternalKey(ns_key, std::to_string(segment_index), metadata.version, storage_->IsSlotIdEncoded())
.Encode();
return storage_->Get(rocksdb::ReadOptions(), sub_key, segment);
return storage_->Get(ctx, ctx.GetReadOptions(), sub_key, segment);
},
&entry);
if (!s.ok()) return s;
Expand Down Expand Up @@ -172,33 +172,29 @@ rocksdb::Status HyperLogLog::Add(const Slice &user_key, const std::vector<uint64
metadata.Encode(&bytes);
batch->Put(metadata_cf_handle_, ns_key, bytes);
}
return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}

rocksdb::Status HyperLogLog::Count(const Slice &user_key, uint64_t *ret) {
rocksdb::Status HyperLogLog::Count(engine::Context &ctx, const Slice &user_key, uint64_t *ret) {
std::string ns_key = AppendNamespacePrefix(user_key);
*ret = 0;
std::vector<rocksdb::PinnableSlice> registers;
{
LatestSnapShot ss(storage_);
Database::GetOptions get_options(ss.GetSnapShot());
auto s = getRegisters(get_options, ns_key, &registers);
if (!s.ok()) {
return s;
}
auto s = getRegisters(ctx, ns_key, &registers);
if (!s.ok()) {
return s;
}
DCHECK_EQ(kHyperLogLogSegmentCount, registers.size());
std::vector<nonstd::span<const uint8_t>> register_segments = TransformToSpan(registers);
*ret = HllDenseEstimate(register_segments);
return rocksdb::Status::OK();
}

rocksdb::Status HyperLogLog::mergeUserKeys(Database::GetOptions get_options, const std::vector<Slice> &user_keys,
rocksdb::Status HyperLogLog::mergeUserKeys(engine::Context &ctx, const std::vector<Slice> &user_keys,
std::vector<std::string> *register_segments) {
DCHECK_GE(user_keys.size(), static_cast<size_t>(1));

std::string first_ns_key = AppendNamespacePrefix(user_keys[0]);
rocksdb::Status s = getRegisters(get_options, first_ns_key, register_segments);
rocksdb::Status s = getRegisters(ctx, first_ns_key, register_segments);
if (!s.ok()) return s;
// The set of keys that have been seen so far
std::unordered_set<std::string_view> seend_user_keys;
Expand All @@ -212,7 +208,7 @@ rocksdb::Status HyperLogLog::mergeUserKeys(Database::GetOptions get_options, con
}
std::string source_key = AppendNamespacePrefix(source_user_key);
std::vector<rocksdb::PinnableSlice> source_registers;
s = getRegisters(get_options, source_key, &source_registers);
s = getRegisters(ctx, source_key, &source_registers);
if (!s.ok()) return s;
DCHECK_EQ(kHyperLogLogSegmentCount, source_registers.size());
DCHECK_EQ(kHyperLogLogSegmentCount, register_segments->size());
Expand All @@ -222,20 +218,18 @@ rocksdb::Status HyperLogLog::mergeUserKeys(Database::GetOptions get_options, con
return rocksdb::Status::OK();
}

rocksdb::Status HyperLogLog::CountMultiple(const std::vector<Slice> &user_key, uint64_t *ret) {
rocksdb::Status HyperLogLog::CountMultiple(engine::Context &ctx, const std::vector<Slice> &user_key, uint64_t *ret) {
DCHECK_GT(user_key.size(), static_cast<size_t>(1));
std::vector<std::string> register_segments;
// Using same snapshot for all get operations
LatestSnapShot ss(storage_);
Database::GetOptions get_options(ss.GetSnapShot());
auto s = mergeUserKeys(get_options, user_key, &register_segments);
auto s = mergeUserKeys(ctx, user_key, &register_segments);
if (!s.ok()) return s;
std::vector<nonstd::span<const uint8_t>> register_segment_span = TransformToSpan(register_segments);
*ret = HllDenseEstimate(register_segment_span);
return rocksdb::Status::OK();
}

rocksdb::Status HyperLogLog::Merge(const Slice &dest_user_key, const std::vector<Slice> &source_user_keys) {
rocksdb::Status HyperLogLog::Merge(engine::Context &ctx, const Slice &dest_user_key,
const std::vector<Slice> &source_user_keys) {
if (source_user_keys.empty()) {
return rocksdb::Status::OK();
}
Expand All @@ -244,22 +238,17 @@ rocksdb::Status HyperLogLog::Merge(const Slice &dest_user_key, const std::vector
LockGuard guard(storage_->GetLockManager(), dest_key);
std::vector<std::string> registers;
HyperLogLogMetadata metadata;

rocksdb::Status s = GetMetadata(ctx, dest_user_key, &metadata);
if (!s.ok() && !s.IsNotFound()) return s;
{
// Using same snapshot for all get operations and release it after
// finishing the merge operation
LatestSnapShot ss(storage_);
Database::GetOptions get_options(ss.GetSnapShot());
rocksdb::Status s = GetMetadata(get_options, dest_user_key, &metadata);
if (!s.ok() && !s.IsNotFound()) return s;
{
std::vector<Slice> all_user_keys;
all_user_keys.reserve(source_user_keys.size() + 1);
all_user_keys.push_back(dest_user_key);
for (const auto &source_user_key : source_user_keys) {
all_user_keys.push_back(source_user_key);
}
s = mergeUserKeys(get_options, all_user_keys, &registers);
std::vector<Slice> all_user_keys;
all_user_keys.reserve(source_user_keys.size() + 1);
all_user_keys.push_back(dest_user_key);
for (const auto &source_user_key : source_user_keys) {
all_user_keys.push_back(source_user_key);
}
s = mergeUserKeys(ctx, all_user_keys, &registers);
}

auto batch = storage_->GetWriteBatchBase();
Expand All @@ -283,13 +272,13 @@ rocksdb::Status HyperLogLog::Merge(const Slice &dest_user_key, const std::vector
batch->Put(metadata_cf_handle_, dest_key, bytes);
}

return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}

rocksdb::Status HyperLogLog::getRegisters(Database::GetOptions get_options, const Slice &ns_key,
rocksdb::Status HyperLogLog::getRegisters(engine::Context &ctx, const Slice &ns_key,
std::vector<rocksdb::PinnableSlice> *register_segments) {
HyperLogLogMetadata metadata;
rocksdb::Status s = GetMetadata(get_options, ns_key, &metadata);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) {
if (s.IsNotFound()) {
// return empty registers with the right size.
Expand All @@ -299,8 +288,6 @@ rocksdb::Status HyperLogLog::getRegisters(Database::GetOptions get_options, cons
return s;
}

rocksdb::ReadOptions read_options = storage_->DefaultMultiGetOptions();
read_options.snapshot = get_options.snapshot;
// Multi get all segments
std::vector<std::string> sub_segment_keys;
sub_segment_keys.reserve(kHyperLogLogSegmentCount);
Expand All @@ -316,8 +303,8 @@ rocksdb::Status HyperLogLog::getRegisters(Database::GetOptions get_options, cons
}
register_segments->resize(kHyperLogLogSegmentCount);
std::vector<rocksdb::Status> statuses(kHyperLogLogSegmentCount);
storage_->MultiGet(read_options, storage_->GetDB()->DefaultColumnFamily(), kHyperLogLogSegmentCount,
sub_segment_slices.data(), register_segments->data(), statuses.data());
storage_->MultiGet(ctx, ctx.DefaultMultiGetOptions(), storage_->GetDB()->DefaultColumnFamily(),
kHyperLogLogSegmentCount, sub_segment_slices.data(), register_segments->data(), statuses.data());
for (size_t i = 0; i < kHyperLogLogSegmentCount; i++) {
if (!statuses[i].ok() && !statuses[i].IsNotFound()) {
register_segments->at(i).clear();
Expand All @@ -327,10 +314,10 @@ rocksdb::Status HyperLogLog::getRegisters(Database::GetOptions get_options, cons
return rocksdb::Status::OK();
}

rocksdb::Status HyperLogLog::getRegisters(Database::GetOptions get_options, const Slice &ns_key,
rocksdb::Status HyperLogLog::getRegisters(engine::Context &ctx, const Slice &ns_key,
std::vector<std::string> *register_segments) {
std::vector<rocksdb::PinnableSlice> pinnable_slices;
rocksdb::Status s = getRegisters(get_options, ns_key, &pinnable_slices);
rocksdb::Status s = getRegisters(ctx, ns_key, &pinnable_slices);
if (!s.ok()) return s;
register_segments->reserve(kHyperLogLogSegmentCount);
for (auto &pinnable_slice : pinnable_slices) {
Expand Down
18 changes: 9 additions & 9 deletions src/types/redis_hyperloglog.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,27 @@ namespace redis {
class HyperLogLog : public Database {
public:
explicit HyperLogLog(engine::Storage *storage, const std::string &ns) : Database(storage, ns) {}
rocksdb::Status Add(const Slice &user_key, const std::vector<uint64_t> &element_hashes, uint64_t *ret);
rocksdb::Status Count(const Slice &user_key, uint64_t *ret);
rocksdb::Status Add(engine::Context &ctx, const Slice &user_key, const std::vector<uint64_t> &element_hashes,
uint64_t *ret);
rocksdb::Status Count(engine::Context &ctx, const Slice &user_key, uint64_t *ret);
/// The count when user_keys.size() is greater than 1.
rocksdb::Status CountMultiple(const std::vector<Slice> &user_key, uint64_t *ret);
rocksdb::Status Merge(const Slice &dest_user_key, const std::vector<Slice> &source_user_keys);
rocksdb::Status CountMultiple(engine::Context &ctx, const std::vector<Slice> &user_key, uint64_t *ret);
rocksdb::Status Merge(engine::Context &ctx, const Slice &dest_user_key, const std::vector<Slice> &source_user_keys);

static uint64_t HllHash(std::string_view);

private:
[[nodiscard]] rocksdb::Status GetMetadata(Database::GetOptions get_options, const Slice &ns_key,
HyperLogLogMetadata *metadata);
[[nodiscard]] rocksdb::Status GetMetadata(engine::Context &ctx, const Slice &ns_key, HyperLogLogMetadata *metadata);

[[nodiscard]] rocksdb::Status mergeUserKeys(Database::GetOptions get_options, const std::vector<Slice> &user_keys,
[[nodiscard]] rocksdb::Status mergeUserKeys(engine::Context &ctx, const std::vector<Slice> &user_keys,
std::vector<std::string> *register_segments);
/// Using multi-get to acquire the register_segments
///
/// If the metadata is not found, register_segments will be initialized with 16 empty slices.
[[nodiscard]] rocksdb::Status getRegisters(Database::GetOptions get_options, const Slice &ns_key,
[[nodiscard]] rocksdb::Status getRegisters(engine::Context &ctx, const Slice &ns_key,
std::vector<rocksdb::PinnableSlice> *register_segments);
/// Same with getRegisters, but the result is stored in a vector of strings.
[[nodiscard]] rocksdb::Status getRegisters(Database::GetOptions get_options, const Slice &ns_key,
[[nodiscard]] rocksdb::Status getRegisters(engine::Context &ctx, const Slice &ns_key,
std::vector<std::string> *register_segments);
};

Expand Down
Loading

0 comments on commit 6999b1c

Please sign in to comment.