Skip to content

Commit

Permalink
fix: Remove some LatestSnapshot;Add batch for Iterator, MultGet
Browse files Browse the repository at this point in the history
  • Loading branch information
PokIsemaine committed May 22, 2024
1 parent 98725b7 commit 8ae70f3
Show file tree
Hide file tree
Showing 49 changed files with 1,464 additions and 1,493 deletions.
9 changes: 7 additions & 2 deletions src/cluster/slot_migrate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ Status SlotMigrator::sendSnapshotByCmd() {
Slice prefix_slice(prefix);
read_options.iterate_lower_bound = &prefix_slice;
rocksdb::ColumnFamilyHandle *cf_handle = storage_->GetCFHandle(ColumnFamilyID::Metadata);
// TODO: ctx
auto iter = util::UniqueIterator(storage_->GetDB()->NewIterator(read_options, cf_handle));

// Seek to the beginning of keys start with 'prefix' and iterate all these keys
Expand Down Expand Up @@ -748,6 +749,7 @@ Status SlotMigrator::migrateComplexKey(const rocksdb::Slice &key, const Metadata
Slice prefix_slice(prefix_subkey);
read_options.iterate_lower_bound = &prefix_slice;
// Should use th raw db iterator to avoid reading uncommitted writes in transaction mode
// TODO: ctx
auto iter = util::UniqueIterator(storage_->GetDB()->NewIterator(read_options));

int item_count = 0;
Expand Down Expand Up @@ -851,6 +853,7 @@ Status SlotMigrator::migrateStream(const Slice &key, const StreamMetadata &metad
read_options.iterate_lower_bound = &prefix_key_slice;

// Should use th raw db iterator to avoid reading uncommitted writes in transaction mode
// TODO: ctx
auto iter =
util::UniqueIterator(storage_->GetDB()->NewIterator(read_options, storage_->GetCFHandle(ColumnFamilyID::Stream)));

Expand Down Expand Up @@ -1208,7 +1211,9 @@ Status SlotMigrator::sendSnapshotByRawKV() {
read_options.snapshot = slot_snapshot_;
rocksdb::Slice prefix_slice(prefix);
read_options.iterate_lower_bound = &prefix_slice;
engine::DBIterator iter(storage_, read_options);
// TODO: ctx
engine::Context ctx(storage_);
engine::DBIterator iter(ctx, storage_, read_options);

BatchSender batch_sender(*dst_fd_, migrate_batch_size_bytes_, migrate_batch_bytes_per_sec_);

Expand All @@ -1226,7 +1231,7 @@ Status SlotMigrator::sendSnapshotByRawKV() {

GET_OR_RET(batch_sender.Put(storage_->GetCFHandle(ColumnFamilyID::Metadata), iter.Key(), iter.Value()));

auto subkey_iter = iter.GetSubKeyIterator();
auto subkey_iter = iter.GetSubKeyIterator(ctx);
if (!subkey_iter) {
continue;
}
Expand Down
3 changes: 2 additions & 1 deletion src/commands/cmd_json.cc
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,8 @@ class CommandJsonMGet : public Commander {
}

std::vector<JsonValue> json_values;
auto statuses = json.MGet(user_keys, path, json_values);
engine::Context ctx(svr->storage);
auto statuses = json.MGet(ctx, user_keys, path, json_values);

std::vector<std::string> values;
values.resize(user_keys.size());
Expand Down
4 changes: 2 additions & 2 deletions src/commands/cmd_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1112,7 +1112,7 @@ class CommandRestore : public Commander {

auto stream_ptr = std::make_unique<RdbStringStream>(args_[3]);
RDB rdb(srv->storage, conn->GetNamespace(), std::move(stream_ptr));
auto s = rdb.Restore(args_[1], args_[3], ttl_ms_);
auto s = rdb.Restore(ctx, args_[1], args_[3], ttl_ms_);
if (!s.IsOK()) return {Status::RedisExecErr, s.Msg()};
*output = redis::SimpleString("OK");
return Status::OK();
Expand Down Expand Up @@ -1161,7 +1161,7 @@ class CommandRdb : public Commander {
GET_OR_RET(stream_ptr->Open());

RDB rdb(srv->storage, conn->GetNamespace(), std::move(stream_ptr));
GET_OR_RET(rdb.LoadRdb(db_index_, overwrite_exist_key_));
GET_OR_RET(rdb.LoadRdb(ctx, db_index_, overwrite_exist_key_));

*output = redis::SimpleString("OK");
return Status::OK();
Expand Down
3 changes: 2 additions & 1 deletion src/commands/cmd_string.cc
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ class CommandMGet : public Commander {
}
std::vector<std::string> values;
// always return OK
auto statuses = string_db.MGet(keys, &values);
engine::Context ctx(srv->storage);
auto statuses = string_db.MGet(ctx, keys, &values);
*output = conn->MultiBulkString(values, statuses);
return Status::OK();
}
Expand Down
8 changes: 4 additions & 4 deletions src/common/db_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ struct UniqueIterator : std::unique_ptr<rocksdb::Iterator> {
using BaseType = std::unique_ptr<rocksdb::Iterator>;

explicit UniqueIterator(rocksdb::Iterator* iter) : BaseType(iter) {}
UniqueIterator(engine::Storage* storage, const rocksdb::ReadOptions& options,
UniqueIterator(engine::Context& ctx, engine::Storage* storage, const rocksdb::ReadOptions& options,
rocksdb::ColumnFamilyHandle* column_family)
: BaseType(storage->NewIterator(options, column_family)) {}
UniqueIterator(engine::Storage* storage, const rocksdb::ReadOptions& options)
: BaseType(storage->NewIterator(options)) {}
: BaseType(storage->NewIterator(ctx, options, column_family)) {}
UniqueIterator(engine::Context& ctx, engine::Storage* storage, const rocksdb::ReadOptions& options)
: BaseType(storage->NewIterator(ctx, options)) {}
};

namespace details {
Expand Down
5 changes: 4 additions & 1 deletion src/search/executors/full_index_scan_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,12 @@ struct FullIndexScanExecutor : ExecutorNode {

auto ns_key = NSKey(*prefix_iter);
if (!iter) {
// TODO: ctx?
rocksdb::ReadOptions read_options = ctx->storage->DefaultScanOptions();
read_options.snapshot = ss.GetSnapShot();
iter = util::UniqueIterator(ctx->storage, read_options, ctx->storage->GetCFHandle(ColumnFamilyID::Metadata));
engine::Context iter_ctx(ctx->storage);
iter = util::UniqueIterator(iter_ctx, ctx->storage, read_options,
ctx->storage->GetCFHandle(ColumnFamilyID::Metadata));
iter->Seek(ns_key);
}

Expand Down
6 changes: 4 additions & 2 deletions src/search/executors/numeric_field_scan_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,12 @@ struct NumericFieldScanExecutor : ExecutorNode {

StatusOr<Result> Next() override {
if (!iter) {
// TODO: ctx?
rocksdb::ReadOptions read_options = ctx->storage->DefaultScanOptions();
read_options.snapshot = ss.GetSnapShot();

iter = util::UniqueIterator(ctx->storage, read_options, ctx->storage->GetCFHandle(ColumnFamilyID::Search));
engine::Context iter_ctx(ctx->storage);
iter =
util::UniqueIterator(iter_ctx, ctx->storage, read_options, ctx->storage->GetCFHandle(ColumnFamilyID::Search));
if (scan->order == SortByClause::ASC) {
iter->Seek(IndexKey(scan->range.l));
} else {
Expand Down
6 changes: 4 additions & 2 deletions src/search/executors/tag_field_scan_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,12 @@ struct TagFieldScanExecutor : ExecutorNode {

StatusOr<Result> Next() override {
if (!iter) {
// TODO: ctx?
engine::Context iter_ctx(ctx->storage);
rocksdb::ReadOptions read_options = ctx->storage->DefaultScanOptions();
read_options.snapshot = ss.GetSnapShot();

iter = util::UniqueIterator(ctx->storage, read_options, ctx->storage->GetCFHandle(ColumnFamilyID::Search));
iter =
util::UniqueIterator(iter_ctx, ctx->storage, read_options, ctx->storage->GetCFHandle(ColumnFamilyID::Search));
iter->Seek(index_key);
}

Expand Down
5 changes: 3 additions & 2 deletions src/storage/batch_indexer.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,10 @@ class WriteBatchIndexer : public rocksdb::WriteBatch::Handler {

rocksdb::Status DeleteRangeCF(uint32_t column_family_id, const rocksdb::Slice& begin_key,
const rocksdb::Slice& end_key) override {
rocksdb::ReadOptions read_options; // TODO: snapshot?
std::cout << "DeleteRangeCF" << std::endl;
rocksdb::ReadOptions read_options; // TODO: ctx snapshot?
auto cf_handle = storage_->GetCFHandle(static_cast<ColumnFamilyID>(column_family_id));
std::unique_ptr<rocksdb::Iterator> it(storage_->NewIterator(read_options, cf_handle));
std::unique_ptr<rocksdb::Iterator> it(storage_->GetDB()->NewIterator(read_options, cf_handle));
for (it->Seek(begin_key); it->Valid() && it->key().compare(end_key) < 0; it->Next()) {
auto s = dest_batch_->Delete(cf_handle, it->key());
if (!s.ok()) {
Expand Down
13 changes: 7 additions & 6 deletions src/storage/iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
#include "db_util.h"

namespace engine {
DBIterator::DBIterator(Storage *storage, rocksdb::ReadOptions read_options, int slot)
DBIterator::DBIterator(engine::Context &ctx, Storage *storage, rocksdb::ReadOptions read_options, int slot)
: storage_(storage),
read_options_(std::move(read_options)),
slot_(slot),
metadata_cf_handle_(storage_->GetCFHandle(ColumnFamilyID::Metadata)) {
metadata_iter_ = util::UniqueIterator(storage_->NewIterator(read_options_, metadata_cf_handle_));
metadata_iter_ = util::UniqueIterator(storage_->NewIterator(ctx, read_options_, metadata_cf_handle_));
}

void DBIterator::Next() {
Expand Down Expand Up @@ -100,7 +100,7 @@ void DBIterator::Seek(const std::string &target) {
nextUntilValid();
}

std::unique_ptr<SubKeyIterator> DBIterator::GetSubKeyIterator() const {
std::unique_ptr<SubKeyIterator> DBIterator::GetSubKeyIterator(engine::Context &ctx) const {
if (!Valid()) {
return nullptr;
}
Expand All @@ -111,17 +111,18 @@ std::unique_ptr<SubKeyIterator> DBIterator::GetSubKeyIterator() const {
}

auto prefix = InternalKey(Key(), "", metadata_.version, storage_->IsSlotIdEncoded()).Encode();
return std::make_unique<SubKeyIterator>(storage_, read_options_, type, std::move(prefix));
return std::make_unique<SubKeyIterator>(ctx, storage_, read_options_, type, std::move(prefix));
}

SubKeyIterator::SubKeyIterator(Storage *storage, rocksdb::ReadOptions read_options, RedisType type, std::string prefix)
SubKeyIterator::SubKeyIterator(engine::Context &ctx, Storage *storage, rocksdb::ReadOptions read_options,
RedisType type, std::string prefix)
: storage_(storage), read_options_(std::move(read_options)), type_(type), prefix_(std::move(prefix)) {
if (type_ == kRedisStream) {
cf_handle_ = storage_->GetCFHandle(ColumnFamilyID::Stream);
} else {
cf_handle_ = storage_->GetCFHandle(ColumnFamilyID::PrimarySubkey);
}
iter_ = util::UniqueIterator(storage_->NewIterator(read_options_, cf_handle_));
iter_ = util::UniqueIterator(storage_->NewIterator(ctx, read_options_, cf_handle_));
}

void SubKeyIterator::Next() {
Expand Down
7 changes: 4 additions & 3 deletions src/storage/iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ namespace engine {

class SubKeyIterator {
public:
explicit SubKeyIterator(Storage *storage, rocksdb::ReadOptions read_options, RedisType type, std::string prefix);
explicit SubKeyIterator(engine::Context &ctx, Storage *storage, rocksdb::ReadOptions read_options, RedisType type,
std::string prefix);
~SubKeyIterator() = default;
bool Valid() const;
void Seek();
Expand All @@ -52,7 +53,7 @@ class SubKeyIterator {

class DBIterator {
public:
explicit DBIterator(Storage *storage, rocksdb::ReadOptions read_options, int slot = -1);
explicit DBIterator(engine::Context &ctx, Storage *storage, rocksdb::ReadOptions read_options, int slot = -1);
~DBIterator() = default;

bool Valid() const;
Expand All @@ -65,7 +66,7 @@ class DBIterator {
Slice Value() const;
RedisType Type() const;
void Reset();
std::unique_ptr<SubKeyIterator> GetSubKeyIterator() const;
std::unique_ptr<SubKeyIterator> GetSubKeyIterator(engine::Context &ctx) const;

private:
void nextUntilValid();
Expand Down
13 changes: 6 additions & 7 deletions src/storage/rdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ StatusOr<std::vector<MemberScore>> RDB::LoadZSetWithZipList() {
return zset;
}

Status RDB::Restore(const std::string &key, std::string_view payload, uint64_t ttl_ms) {
Status RDB::Restore(engine::Context &ctx, const std::string &key, std::string_view payload, uint64_t ttl_ms) {
rocksdb::Status db_status;

// Check the checksum of the payload
Expand All @@ -393,7 +393,7 @@ Status RDB::Restore(const std::string &key, std::string_view payload, uint64_t t

auto value = GET_OR_RET(loadRdbObject(type, key));

return saveRdbObject(type, key, value, ttl_ms); // NOLINT
return saveRdbObject(ctx, type, key, value, ttl_ms); // NOLINT
}

StatusOr<int> RDB::loadRdbType() {
Expand Down Expand Up @@ -454,9 +454,9 @@ StatusOr<RedisObjValue> RDB::loadRdbObject(int type, const std::string &key) {
return {Status::RedisParseErr, fmt::format("unsupported type: {}", type)};
}

Status RDB::saveRdbObject(int type, const std::string &key, const RedisObjValue &obj, uint64_t ttl_ms) {
Status RDB::saveRdbObject(engine::Context &ctx, int type, const std::string &key, const RedisObjValue &obj,
uint64_t ttl_ms) {
rocksdb::Status db_status;
engine::Context ctx(storage_);
if (type == RDBTypeString) {
const auto &value = std::get<std::string>(obj);
redis::String string_db(storage_, ns_);
Expand Down Expand Up @@ -552,7 +552,7 @@ bool RDB::isEmptyRedisObject(const RedisObjValue &value) {
}

// Load RDB file: copy from redis/src/rdb.c:branch 7.0, 76b9c13d.
Status RDB::LoadRdb(uint32_t db_index, bool overwrite_exist_key) {
Status RDB::LoadRdb(engine::Context &ctx, uint32_t db_index, bool overwrite_exist_key) {
char buf[1024] = {0};
GET_OR_RET(LogWhenError(stream_->Read(buf, 9)));
buf[9] = '\0';
Expand Down Expand Up @@ -643,7 +643,6 @@ Status RDB::LoadRdb(uint32_t db_index, bool overwrite_exist_key) {
continue;
}

engine::Context ctx(storage_);
if (!overwrite_exist_key) { // only load not exist key
redis::Database redis(storage_, ns_);
auto s = redis.KeyExist(ctx, key);
Expand All @@ -656,7 +655,7 @@ Status RDB::LoadRdb(uint32_t db_index, bool overwrite_exist_key) {
}
}

auto ret = saveRdbObject(type, key, value, expire_time_ms);
auto ret = saveRdbObject(ctx, type, key, value, expire_time_ms);
if (!ret.IsOK()) {
LOG(WARNING) << "save rdb object key " << key << " failed: " << ret.Msg();
} else {
Expand Down
7 changes: 4 additions & 3 deletions src/storage/rdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class RDB {

Status VerifyPayloadChecksum(const std::string_view &payload);
StatusOr<int> LoadObjectType();
Status Restore(const std::string &key, std::string_view payload, uint64_t ttl_ms);
Status Restore(engine::Context &ctx, const std::string &key, std::string_view payload, uint64_t ttl_ms);

// String
StatusOr<std::string> LoadStringObject();
Expand All @@ -105,7 +105,7 @@ class RDB {
StatusOr<std::vector<std::string>> LoadListWithQuickList(int type);

// Load rdb
Status LoadRdb(uint32_t db_index, bool overwrite_exist_key = true);
Status LoadRdb(engine::Context &ctx, uint32_t db_index, bool overwrite_exist_key = true);

std::unique_ptr<RdbStream> &GetStream() { return stream_; }

Expand Down Expand Up @@ -143,7 +143,8 @@ class RDB {

StatusOr<int> loadRdbType();
StatusOr<RedisObjValue> loadRdbObject(int rdbtype, const std::string &key);
Status saveRdbObject(int type, const std::string &key, const RedisObjValue &obj, uint64_t ttl_ms);
Status saveRdbObject(engine::Context &ctx, int type, const std::string &key, const RedisObjValue &obj,
uint64_t ttl_ms);
StatusOr<uint32_t> loadExpiredTimeSeconds();
StatusOr<uint64_t> loadExpiredTimeMilliseconds(int rdb_version);

Expand Down
19 changes: 9 additions & 10 deletions src/storage/redis_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,8 @@ rocksdb::Status Database::MDel(engine::Context &ctx, const std::vector<Slice> &k

std::vector<rocksdb::Status> statuses(slice_keys.size());
std::vector<rocksdb::PinnableSlice> pin_values(slice_keys.size());
storage_->MultiGet(ctx.GetReadOptions(), metadata_cf_handle_, slice_keys.size(), slice_keys.data(), pin_values.data(),
statuses.data());
storage_->MultiGet(ctx, ctx.GetReadOptions(), metadata_cf_handle_, slice_keys.size(), slice_keys.data(),
pin_values.data(), statuses.data());

for (size_t i = 0; i < slice_keys.size(); i++) {
if (!statuses[i].ok() && !statuses[i].IsNotFound()) return statuses[i];
Expand Down Expand Up @@ -259,7 +259,7 @@ rocksdb::Status Database::Keys(engine::Context &ctx, const std::string &prefix,
}

uint64_t ttl_sum = 0;
auto iter = util::UniqueIterator(storage_, ctx.GetReadOptions(), metadata_cf_handle_);
auto iter = util::UniqueIterator(ctx, storage_, ctx.GetReadOptions(), metadata_cf_handle_);

while (true) {
ns_prefix.empty() ? iter->SeekToFirst() : iter->Seek(ns_prefix);
Expand Down Expand Up @@ -313,7 +313,7 @@ rocksdb::Status Database::Scan(engine::Context &ctx, const std::string &cursor,
std::string ns_prefix;
std::string user_key;

auto iter = util::UniqueIterator(storage_, ctx.GetReadOptions(), metadata_cf_handle_);
auto iter = util::UniqueIterator(ctx, storage_, ctx.GetReadOptions(), metadata_cf_handle_);

std::string ns_cursor = AppendNamespacePrefix(cursor);
if (storage_->IsSlotIdEncoded()) {
Expand Down Expand Up @@ -430,7 +430,7 @@ rocksdb::Status Database::FlushDB(engine::Context &ctx) {
}

rocksdb::Status Database::FlushAll(engine::Context &ctx) {
auto iter = util::UniqueIterator(storage_, ctx.GetReadOptions(), metadata_cf_handle_);
auto iter = util::UniqueIterator(ctx, storage_, ctx.GetReadOptions(), metadata_cf_handle_);
iter->SeekToFirst();
if (!iter->Valid()) {
return rocksdb::Status::OK();
Expand Down Expand Up @@ -516,7 +516,7 @@ rocksdb::Status Database::FindKeyRangeWithPrefix(engine::Context &ctx, const std
begin->clear();
end->clear();

auto iter = util::UniqueIterator(storage_, ctx.GetReadOptions(), cf_handle);
auto iter = util::UniqueIterator(ctx, storage_, ctx.GetReadOptions(), cf_handle);
iter->Seek(prefix);
if (!iter->Valid() || !iter->key().starts_with(prefix)) {
return rocksdb::Status::NotFound();
Expand Down Expand Up @@ -582,12 +582,11 @@ rocksdb::Status SubKeyScanner::Scan(engine::Context &ctx, RedisType type, const
uint64_t cnt = 0;
std::string ns_key = AppendNamespacePrefix(user_key);
Metadata metadata(type, false);
LatestSnapShot ss(storage_);
rocksdb::Status s = GetMetadata(ctx, {type}, ns_key, &metadata);
if (!s.ok()) return s;

rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
auto iter = util::UniqueIterator(storage_, read_options);
auto iter = util::UniqueIterator(ctx, storage_, read_options);
std::string match_prefix_key =
InternalKey(ns_key, subkey_prefix, metadata.version, storage_->IsSlotIdEncoded()).Encode();

Expand Down Expand Up @@ -708,7 +707,7 @@ rocksdb::Status Database::Copy(engine::Context &ctx, const std::string &key, con
WriteBatchLogData log_data(type);
batch->PutLogData(log_data.Encode());

engine::DBIterator iter(storage_, ctx.GetReadOptions());
engine::DBIterator iter(ctx, storage_, ctx.GetReadOptions());
iter.Seek(key);

if (delete_old) {
Expand All @@ -717,7 +716,7 @@ rocksdb::Status Database::Copy(engine::Context &ctx, const std::string &key, con
// copy metadata
batch->Put(metadata_cf_handle_, new_key, iter.Value());

auto subkey_iter = iter.GetSubKeyIterator();
auto subkey_iter = iter.GetSubKeyIterator(ctx);

if (subkey_iter != nullptr) {
auto zset_score_cf = type == kRedisZSet ? storage_->GetCFHandle(ColumnFamilyID::SecondarySubkey) : nullptr;
Expand Down
Loading

0 comments on commit 8ae70f3

Please sign in to comment.