Skip to content

Commit

Permalink
fix: review1
Browse files Browse the repository at this point in the history
  • Loading branch information
PokIsemaine committed Jul 16, 2024
1 parent bd67a66 commit 56508c0
Show file tree
Hide file tree
Showing 15 changed files with 87 additions and 93 deletions.
6 changes: 1 addition & 5 deletions src/cluster/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,9 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b
}
}

engine::Context ctx;
if (srv_ != nullptr) {
ctx.storage = srv_->storage;
}

// Clear data of migrated slots
if (!migrated_slots_.empty()) {
engine::Context ctx(srv_->storage);
for (const auto &[slot, _] : migrated_slots_) {
if (slots_nodes_[slot] != myself_) {
auto s = srv_->slot_migrator->ClearKeysOfSlotRange(ctx, kDefaultNamespace, SlotRange::GetPoint(slot));
Expand Down
2 changes: 1 addition & 1 deletion src/search/indexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ StatusOr<kqir::Value> FieldValueRetriever::Retrieve(std::string_view field, cons
if (std::holds_alternative<HashData>(db)) {
auto &[hash, metadata, key] = std::get<HashData>(db);
std::string ns_key = hash.AppendNamespacePrefix(key);

// TODO: ctx remove latestsnapshot?
LatestSnapShot ss(hash.storage_);
rocksdb::ReadOptions read_options;
read_options.snapshot = ss.GetSnapShot();
Expand Down
6 changes: 3 additions & 3 deletions src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1674,7 +1674,7 @@ Status Server::ScriptGet(const std::string &sha, std::string *body) const {
std::string func_name = engine::kLuaFuncSHAPrefix + sha;
auto cf = storage->GetCFHandle(ColumnFamilyID::Propagate);
engine::Context ctx(storage);
auto s = storage->Get(ctx, rocksdb::ReadOptions(), cf, func_name, body);
auto s = storage->Get(ctx, ctx.GetReadOptions(), cf, func_name, body);
if (!s.ok()) {
return {s.IsNotFound() ? Status::NotFound : Status::NotOK, s.ToString()};
}
Expand All @@ -1691,7 +1691,7 @@ Status Server::FunctionGetCode(const std::string &lib, std::string *code) const
std::string func_name = engine::kLuaLibCodePrefix + lib;
auto cf = storage->GetCFHandle(ColumnFamilyID::Propagate);
engine::Context ctx(storage);
auto s = storage->Get(ctx, rocksdb::ReadOptions(), cf, func_name, code);
auto s = storage->Get(ctx, ctx.GetReadOptions(), cf, func_name, code);
if (!s.ok()) {
return {s.IsNotFound() ? Status::NotFound : Status::NotOK, s.ToString()};
}
Expand All @@ -1702,7 +1702,7 @@ Status Server::FunctionGetLib(const std::string &func, std::string *lib) const {
std::string func_name = engine::kLuaFuncLibPrefix + func;
auto cf = storage->GetCFHandle(ColumnFamilyID::Propagate);
engine::Context ctx(storage);
auto s = storage->Get(ctx, rocksdb::ReadOptions(), cf, func_name, lib);
auto s = storage->Get(ctx, ctx.GetReadOptions(), cf, func_name, lib);
if (!s.ok()) {
return {s.IsNotFound() ? Status::NotFound : Status::NotOK, s.ToString()};
}
Expand Down
4 changes: 2 additions & 2 deletions src/storage/scripting.cc
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ Status FunctionList(Server *srv, const redis::Connection *conn, const std::strin

engine::Context ctx(srv->storage);
rocksdb::ReadOptions read_options = srv->storage->DefaultScanOptions();
read_options.snapshot = ctx.GetSnapShot();
read_options.snapshot = ctx.snapshot;
rocksdb::Slice upper_bound(end_key);
read_options.iterate_upper_bound = &upper_bound;

Expand Down Expand Up @@ -479,7 +479,7 @@ Status FunctionListFunc(Server *srv, const redis::Connection *conn, const std::s

engine::Context ctx(srv->storage);
rocksdb::ReadOptions read_options = srv->storage->DefaultScanOptions();
read_options.snapshot = ctx.GetSnapShot();
read_options.snapshot = ctx.snapshot;
rocksdb::Slice upper_bound(end_key);
read_options.iterate_upper_bound = &upper_bound;

Expand Down
14 changes: 5 additions & 9 deletions src/storage/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,7 @@ rocksdb::Status Storage::Get(engine::Context &ctx, const rocksdb::ReadOptions &o
rocksdb::Status Storage::Get(engine::Context &ctx, const rocksdb::ReadOptions &options,
rocksdb::ColumnFamilyHandle *column_family, const rocksdb::Slice &key,
std::string *value) {
DCHECK_EQ(ctx.snapshot->GetSequenceNumber(), options.snapshot->GetSequenceNumber());
rocksdb::Status s;
if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
s = txn_write_batch_->GetFromBatchAndDB(db_.get(), options, column_family, key, value);
Expand Down Expand Up @@ -647,6 +648,7 @@ void Storage::recordKeyspaceStat(const rocksdb::ColumnFamilyHandle *column_famil

rocksdb::Iterator *Storage::NewIterator(engine::Context &ctx, const rocksdb::ReadOptions &options,
rocksdb::ColumnFamilyHandle *column_family) {
DCHECK_EQ(ctx.snapshot->GetSequenceNumber(), options.snapshot->GetSequenceNumber());
auto iter = db_->NewIterator(options, column_family);
if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
return txn_write_batch_->NewIteratorWithBase(column_family, iter, &options);
Expand All @@ -659,6 +661,7 @@ rocksdb::Iterator *Storage::NewIterator(engine::Context &ctx, const rocksdb::Rea
void Storage::MultiGet(engine::Context &ctx, const rocksdb::ReadOptions &options,
rocksdb::ColumnFamilyHandle *column_family, const size_t num_keys, const rocksdb::Slice *keys,
rocksdb::PinnableSlice *values, rocksdb::Status *statuses) {
DCHECK_EQ(ctx.snapshot->GetSequenceNumber(), options.snapshot->GetSequenceNumber());
if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
txn_write_batch_->MultiGetFromBatchAndDB(db_.get(), options, column_family, num_keys, keys, values, statuses,
false);
Expand Down Expand Up @@ -1247,19 +1250,12 @@ bool Storage::ReplDataManager::FileExists(Storage *storage, const std::string &d
return crc == tmp_crc;
}

rocksdb::ReadOptions Context::GetReadOptions() {
[[nodiscard]] rocksdb::ReadOptions Context::GetReadOptions() const {
rocksdb::ReadOptions read_opts;
read_opts.snapshot = GetSnapShot();
read_opts.snapshot = snapshot;
return read_opts;
}

const rocksdb::Snapshot *Context::GetSnapShot() {
if (storage && snapshot == nullptr) {
snapshot = storage->GetDB()->GetSnapshot();
}
return snapshot;
}

void Context::SetLatestSnapshot() {
if (snapshot) {
storage->GetDB()->ReleaseSnapshot(snapshot);
Expand Down
12 changes: 9 additions & 3 deletions src/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -369,17 +369,23 @@ class Storage {
void recordKeyspaceStat(const rocksdb::ColumnFamilyHandle *column_family, const rocksdb::Status &s);
};

/// Context passes fixed snapshot and batche between APIs
/// Limitations: Performing a large number of writes on the same Context may reduce performance.
/// Please choose to use the same Context or create a new Context based on the actual situation.
/// Context does not provide thread safety guarantees and is generally only passed as a parameter between APIs.
struct Context {
engine::Storage *storage = nullptr;
/// Snapshot should be specified instead of nullptr when used,
/// and should be consistent with snapshot in ReadOptions to avoid ambiguity.
/// Normally it will be fixed to the latest Snapshot when the Context is constructed
const rocksdb::Snapshot *snapshot = nullptr;
std::unique_ptr<rocksdb::WriteBatchWithIndex> batch = nullptr;

rocksdb::ReadOptions GetReadOptions();
const rocksdb::Snapshot *GetSnapShot();
/// GetReadOptions returns a ReadOptions whose snapshot is specified by Context
[[nodiscard]] rocksdb::ReadOptions GetReadOptions() const;
void SetLatestSnapshot();

explicit Context(engine::Storage *storage) : storage(storage), snapshot(storage->GetDB()->GetSnapshot()) {}
Context() = default;
~Context() {
if (storage && storage->GetDB()) {
storage->GetDB()->ReleaseSnapshot(snapshot);
Expand Down
12 changes: 6 additions & 6 deletions src/types/redis_bitmap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ rocksdb::Status Bitmap::GetString(engine::Context &ctx, const Slice &user_key, c
std::string prefix_key = InternalKey(ns_key, "", metadata.version, storage_->IsSlotIdEncoded()).Encode();

rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
read_options.snapshot = ctx.GetSnapShot();
read_options.snapshot = ctx.snapshot;
Slice prefix_key_slice(prefix_key);
read_options.iterate_lower_bound = &prefix_key_slice;

Expand Down Expand Up @@ -200,7 +200,7 @@ rocksdb::Status Bitmap::SetBit(engine::Context &ctx, const Slice &user_key, uint
std::string sub_key =
InternalKey(ns_key, std::to_string(segment_index), metadata.version, storage_->IsSlotIdEncoded()).Encode();
if (s.ok()) {
s = storage_->Get(ctx, rocksdb::ReadOptions(), sub_key, &value);
s = storage_->Get(ctx, ctx.GetReadOptions(), sub_key, &value);
if (!s.ok() && !s.IsNotFound()) return s;
}
uint32_t bit_offset_in_segment = bit_offset % kBitmapSegmentBits;
Expand Down Expand Up @@ -264,7 +264,7 @@ rocksdb::Status Bitmap::BitCount(engine::Context &ctx, const Slice &user_key, in
auto u_stop = static_cast<uint32_t>(stop_byte);

rocksdb::ReadOptions read_options;
read_options.snapshot = ctx.GetSnapShot();
read_options.snapshot = ctx.snapshot;
uint32_t start_index = u_start / kBitmapSegmentBytes;
uint32_t stop_index = u_stop / kBitmapSegmentBytes;
// Don't use multi get to prevent large range query, and take too much memory
Expand Down Expand Up @@ -355,7 +355,7 @@ rocksdb::Status Bitmap::BitPos(engine::Context &ctx, const Slice &user_key, bool
};

rocksdb::ReadOptions read_options;
read_options.snapshot = ctx.GetSnapShot();
read_options.snapshot = ctx.snapshot;
// if bit index, (Eg start = 1, stop = 35), then
// u_start = 1/8 = 0, u_stop = 35/8 = 4 (in bytes)
uint32_t start_segment_index = (u_start / to_bit_factor) / kBitmapSegmentBytes;
Expand Down Expand Up @@ -508,7 +508,7 @@ rocksdb::Status Bitmap::BitOp(engine::Context &ctx, BitOpFlags op_flag, const st
std::unique_ptr<unsigned char[]> frag_res(new unsigned char[kBitmapSegmentBytes]);

rocksdb::ReadOptions read_options;
read_options.snapshot = ctx.GetSnapShot();
read_options.snapshot = ctx.snapshot;
for (uint64_t frag_index = 0; frag_index <= stop_index; frag_index++) {
std::vector<rocksdb::PinnableSlice> fragments;
uint16_t frag_maxlen = 0, frag_minlen = 0;
Expand Down Expand Up @@ -703,7 +703,7 @@ class Bitmap::SegmentCacheStore {
is_dirty = false;
std::string sub_key =
InternalKey(ns_key_, getSegmentSubKey(index), metadata_.version, storage_->IsSlotIdEncoded()).Encode();
rocksdb::Status s = storage_->Get(ctx, rocksdb::ReadOptions(), sub_key, &str);
rocksdb::Status s = storage_->Get(ctx, ctx.GetReadOptions(), sub_key, &str);
if (!s.ok() && !s.IsNotFound()) {
return s;
}
Expand Down
14 changes: 7 additions & 7 deletions src/types/redis_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ rocksdb::Status Hash::IncrBy(engine::Context &ctx, const Slice &user_key, const
std::string sub_key = InternalKey(ns_key, field, metadata.version, storage_->IsSlotIdEncoded()).Encode();
if (s.ok()) {
std::string value_bytes;
s = storage_->Get(ctx, rocksdb::ReadOptions(), sub_key, &value_bytes);
s = storage_->Get(ctx, ctx.GetReadOptions(), sub_key, &value_bytes);
if (!s.ok() && !s.IsNotFound()) return s;
if (s.ok()) {
auto parse_result = ParseInt<int64_t>(value_bytes, 10);
Expand Down Expand Up @@ -122,7 +122,7 @@ rocksdb::Status Hash::IncrByFloat(engine::Context &ctx, const Slice &user_key, c
std::string sub_key = InternalKey(ns_key, field, metadata.version, storage_->IsSlotIdEncoded()).Encode();
if (s.ok()) {
std::string value_bytes;
s = storage_->Get(ctx, rocksdb::ReadOptions(), sub_key, &value_bytes);
s = storage_->Get(ctx, ctx.GetReadOptions(), sub_key, &value_bytes);
if (!s.ok() && !s.IsNotFound()) return s;
if (s.ok()) {
auto value_stat = ParseFloat(value_bytes);
Expand Down Expand Up @@ -165,7 +165,7 @@ rocksdb::Status Hash::MGet(engine::Context &ctx, const Slice &user_key, const st
}

rocksdb::ReadOptions read_options = storage_->DefaultMultiGetOptions();
read_options.snapshot = ctx.GetSnapShot();
read_options.snapshot = ctx.snapshot;
std::vector<rocksdb::Slice> keys;

keys.reserve(fields.size());
Expand Down Expand Up @@ -216,7 +216,7 @@ rocksdb::Status Hash::Delete(engine::Context &ctx, const Slice &user_key, const
continue;
}
std::string sub_key = InternalKey(ns_key, field, metadata.version, storage_->IsSlotIdEncoded()).Encode();
s = storage_->Get(ctx, rocksdb::ReadOptions(), sub_key, &value);
s = storage_->Get(ctx, ctx.GetReadOptions(), sub_key, &value);
if (s.ok()) {
*deleted_cnt += 1;
batch->Delete(sub_key);
Expand Down Expand Up @@ -257,7 +257,7 @@ rocksdb::Status Hash::MSet(engine::Context &ctx, const Slice &user_key, const st

if (metadata.size > 0) {
std::string field_value;
s = storage_->Get(ctx, rocksdb::ReadOptions(), sub_key, &field_value);
s = storage_->Get(ctx, ctx.GetReadOptions(), sub_key, &field_value);
if (!s.ok() && !s.IsNotFound()) return s;

if (s.ok()) {
Expand Down Expand Up @@ -300,7 +300,7 @@ rocksdb::Status Hash::RangeByLex(engine::Context &ctx, const Slice &user_key, co
std::string next_version_prefix_key =
InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode();
rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
read_options.snapshot = ctx.GetSnapShot();
read_options.snapshot = ctx.snapshot;
rocksdb::Slice upper_bound(next_version_prefix_key);
read_options.iterate_upper_bound = &upper_bound;
rocksdb::Slice lower_bound(prefix_key);
Expand Down Expand Up @@ -355,7 +355,7 @@ rocksdb::Status Hash::GetAll(engine::Context &ctx, const Slice &user_key, std::v
InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode();

rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
read_options.snapshot = ctx.GetSnapShot();
read_options.snapshot = ctx.snapshot;
rocksdb::Slice upper_bound(next_version_prefix_key);
read_options.iterate_upper_bound = &upper_bound;

Expand Down
2 changes: 1 addition & 1 deletion src/types/redis_json.cc
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ rocksdb::Status Json::MSet(engine::Context &ctx, const std::vector<std::string>
std::vector<rocksdb::Status> Json::readMulti(engine::Context &ctx, const std::vector<Slice> &ns_keys,
std::vector<JsonValue> &values) {
rocksdb::ReadOptions read_options = storage_->DefaultMultiGetOptions();
read_options.snapshot = ctx.GetSnapShot();
read_options.snapshot = ctx.snapshot;

std::vector<rocksdb::Status> statuses(ns_keys.size());
std::vector<rocksdb::PinnableSlice> pin_values(ns_keys.size());
Expand Down
14 changes: 7 additions & 7 deletions src/types/redis_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ rocksdb::Status List::Rem(engine::Context &ctx, const Slice &user_key, int count
bool reversed = count < 0;
std::vector<uint64_t> to_delete_indexes;
rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
read_options.snapshot = ctx.GetSnapShot();
read_options.snapshot = ctx.snapshot;
rocksdb::Slice upper_bound(next_version_prefix);
read_options.iterate_upper_bound = &upper_bound;
rocksdb::Slice lower_bound(prefix);
Expand Down Expand Up @@ -274,7 +274,7 @@ rocksdb::Status List::Insert(engine::Context &ctx, const Slice &user_key, const
std::string next_version_prefix = InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode();

rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
read_options.snapshot = ctx.GetSnapShot();
read_options.snapshot = ctx.snapshot;
rocksdb::Slice upper_bound(next_version_prefix);
read_options.iterate_upper_bound = &upper_bound;

Expand Down Expand Up @@ -375,7 +375,7 @@ rocksdb::Status List::Range(engine::Context &ctx, const Slice &user_key, int sta
std::string next_version_prefix = InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode();

rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
read_options.snapshot = ctx.GetSnapShot();
read_options.snapshot = ctx.snapshot;
rocksdb::Slice upper_bound(next_version_prefix);
read_options.iterate_upper_bound = &upper_bound;

Expand Down Expand Up @@ -418,7 +418,7 @@ rocksdb::Status List::Pos(engine::Context &ctx, const Slice &user_key, const Sli
std::string next_version_prefix = InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode();

rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
read_options.snapshot = ctx.GetSnapShot();
read_options.snapshot = ctx.snapshot;
rocksdb::Slice upper_bound(next_version_prefix);
read_options.iterate_upper_bound = &upper_bound;
rocksdb::Slice lower_bound(prefix);
Expand Down Expand Up @@ -463,7 +463,7 @@ rocksdb::Status List::Set(engine::Context &ctx, const Slice &user_key, int index
std::string buf, value;
PutFixed64(&buf, metadata.head + index);
std::string sub_key = InternalKey(ns_key, buf, metadata.version, storage_->IsSlotIdEncoded()).Encode();
s = storage_->Get(ctx, rocksdb::ReadOptions(), sub_key, &value);
s = storage_->Get(ctx, ctx.GetReadOptions(), sub_key, &value);
if (!s.ok()) {
return s;
}
Expand Down Expand Up @@ -502,7 +502,7 @@ rocksdb::Status List::lmoveOnSingleList(engine::Context &ctx, const rocksdb::Sli
PutFixed64(&curr_index_buf, curr_index);
std::string curr_sub_key =
InternalKey(ns_key, curr_index_buf, metadata.version, storage_->IsSlotIdEncoded()).Encode();
s = storage_->Get(ctx, rocksdb::ReadOptions(), curr_sub_key, elem);
s = storage_->Get(ctx, ctx.GetReadOptions(), curr_sub_key, elem);
if (!s.ok()) {
return s;
}
Expand Down Expand Up @@ -576,7 +576,7 @@ rocksdb::Status List::lmoveOnTwoLists(engine::Context &ctx, const rocksdb::Slice
PutFixed64(&src_buf, src_index);
std::string src_sub_key =
InternalKey(src_ns_key, src_buf, src_metadata.version, storage_->IsSlotIdEncoded()).Encode();
s = storage_->Get(ctx, rocksdb::ReadOptions(), src_sub_key, elem);
s = storage_->Get(ctx, ctx.GetReadOptions(), src_sub_key, elem);
if (!s.ok()) {
return s;
}
Expand Down
6 changes: 3 additions & 3 deletions src/types/redis_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ rocksdb::Status Set::Add(engine::Context &ctx, const Slice &user_key, const std:
continue;
}
std::string sub_key = InternalKey(ns_key, member, metadata.version, storage_->IsSlotIdEncoded()).Encode();
s = storage_->Get(ctx, rocksdb::ReadOptions(), sub_key, &value);
s = storage_->Get(ctx, ctx.GetReadOptions(), sub_key, &value);
if (s.ok()) continue;
batch->Put(sub_key, Slice());
*added_cnt += 1;
Expand Down Expand Up @@ -109,7 +109,7 @@ rocksdb::Status Set::Remove(engine::Context &ctx, const Slice &user_key, const s
continue;
}
std::string sub_key = InternalKey(ns_key, member, metadata.version, storage_->IsSlotIdEncoded()).Encode();
s = storage_->Get(ctx, rocksdb::ReadOptions(), sub_key, &value);
s = storage_->Get(ctx, ctx.GetReadOptions(), sub_key, &value);
if (!s.ok()) continue;
batch->Delete(sub_key);
*removed_cnt += 1;
Expand Down Expand Up @@ -152,7 +152,7 @@ rocksdb::Status Set::Members(engine::Context &ctx, const Slice &user_key, std::v
std::string next_version_prefix = InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode();

rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
read_options.snapshot = ctx.GetSnapShot();
read_options.snapshot = ctx.snapshot;
rocksdb::Slice upper_bound(next_version_prefix);
read_options.iterate_upper_bound = &upper_bound;

Expand Down
Loading

0 comments on commit 56508c0

Please sign in to comment.