Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Improve consistency and isolation semantics by adding Context parameter to DB API #2332

Merged
merged 44 commits into from
Aug 24, 2024
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
e494580
refactor: Add Context for DB API [draft]
PokIsemaine May 21, 2024
803faca
fix: apache license
PokIsemaine May 21, 2024
98725b7
fix: SIGSEGV
PokIsemaine May 21, 2024
8ae70f3
fix: Remove some LatestSnapshot;Add batch for Iterator, MultGet
PokIsemaine May 22, 2024
d8a71c7
fix: remove temporal cout, optimize context in test
PokIsemaine May 23, 2024
7b1c3b4
fix: try to debug
PokIsemaine May 24, 2024
99399ec
fix: try sleep
PokIsemaine May 24, 2024
f610557
fix: remove temporal cout
PokIsemaine May 24, 2024
c263c75
fix: try sleep
PokIsemaine May 24, 2024
dcb37c1
Merge remote-tracking branch 'upstream/unstable' into unstable
PokIsemaine Jul 8, 2024
1eac02d
style: remove meaningless blank lines
PokIsemaine Jul 8, 2024
452a631
Merge commit 'beb1979f14c5af2ca3b4f7a2f42bd0ce857ab85a' into unstable
PokIsemaine Jul 15, 2024
bd67a66
fix: merge conflict
PokIsemaine Jul 15, 2024
56508c0
fix: review1
PokIsemaine Jul 16, 2024
068f0d9
Update src/storage/storage.h
PokIsemaine Jul 21, 2024
28f6de3
Merge commit 'e57192517b76515692a45726df7a0e071a7e6542' into unstable
PokIsemaine Jul 30, 2024
482d68d
refactor: change for review1
PokIsemaine Aug 3, 2024
b3d4dca
test: commit for debug, don't review
PokIsemaine Aug 3, 2024
07a15b7
fix: use no_txn_ctx
PokIsemaine Aug 3, 2024
10042b6
style: golang-lint
PokIsemaine Aug 3, 2024
1f98cdb
Merge branch 'unstable' into unstable
PokIsemaine Aug 3, 2024
d0b501d
Merge branch 'unstable' into unstable
PokIsemaine Aug 4, 2024
6999b1c
refactor( hyperloglog): remove LatestSnapshot and GetOptions
PokIsemaine Aug 4, 2024
9ce5093
fix: try to fix data race
PokIsemaine Aug 4, 2024
a0781c7
style: nolint comment
PokIsemaine Aug 4, 2024
5f3d186
fix: try to fix data race
PokIsemaine Aug 4, 2024
fc121f1
refactor: GetSubKeyIterator
PokIsemaine Aug 5, 2024
c2425fa
fix: try to fix test
PokIsemaine Aug 5, 2024
9c6bdf6
ci: remove go test -v
PokIsemaine Aug 5, 2024
34337e6
fix: speedb testcase
PokIsemaine Aug 5, 2024
822712d
fix: compatible with speedb
PokIsemaine Aug 6, 2024
2e19608
Merge commit 'bf56a05f93f6406f67db3bed54114a3e64ffe351' into unstable
PokIsemaine Aug 6, 2024
1e0bd2a
Merge commit '76cb42d6d0c505494c65699fcbb84d3a33dbf39f' into unstable
PokIsemaine Aug 6, 2024
38dfc04
test: clear print
PokIsemaine Aug 6, 2024
197cb41
fix: temporary fix #2473
PokIsemaine Aug 7, 2024
524562e
Merge commit '275ab32054faa72c59c2b40b4eff69a8011a6bbc' into unstable
PokIsemaine Aug 9, 2024
94db62f
feat: merge unstable
PokIsemaine Aug 9, 2024
7c7f402
Merge commit '3408318934cb68c0c1f2ffa937f19f02346d0672' into unstable
PokIsemaine Aug 11, 2024
553013a
chore: review
PokIsemaine Aug 13, 2024
2392bb2
chore: review
PokIsemaine Aug 14, 2024
52dbbcc
Merge commit 'a86d31731774d165f6c3669c508eac5f2774d8de' into unstable
PokIsemaine Aug 14, 2024
7f8b61b
Merge branch 'unstable' into unstable
PokIsemaine Aug 19, 2024
c58a73b
Merge branch 'unstable' into unstable
PokIsemaine Aug 21, 2024
7982301
Merge branch 'unstable' into unstable
mapleFU Aug 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/cluster/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ Status Cluster::SetSlotRanges(const std::vector<SlotRange> &slot_ranges, const s
// 3. Update the map of slots to nodes.
// remember: The atomicity of the process is based on
// the transactionality of ClearKeysOfSlotRange().
engine::Context ctx(srv_->storage);
for (auto [s_start, s_end] : slot_ranges) {
for (int slot = s_start; slot <= s_end; slot++) {
std::shared_ptr<ClusterNode> old_node = slots_nodes_[slot];
Expand All @@ -129,7 +130,7 @@ Status Cluster::SetSlotRanges(const std::vector<SlotRange> &slot_ranges, const s
if (old_node == myself_ && old_node != to_assign_node) {
// If slot is migrated from this node
if (migrated_slots_.count(slot) > 0) {
auto s = srv_->slot_migrator->ClearKeysOfSlotRange(kDefaultNamespace, SlotRange::GetPoint(slot));
auto s = srv_->slot_migrator->ClearKeysOfSlotRange(ctx, kDefaultNamespace, SlotRange::GetPoint(slot));
if (!s.ok()) {
LOG(ERROR) << "failed to clear data of migrated slot: " << s.ToString();
}
Expand Down Expand Up @@ -212,9 +213,10 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b

// Clear data of migrated slots
if (!migrated_slots_.empty()) {
engine::Context ctx(srv_->storage);
for (const auto &[slot, _] : migrated_slots_) {
if (slots_nodes_[slot] != myself_) {
auto s = srv_->slot_migrator->ClearKeysOfSlotRange(kDefaultNamespace, SlotRange::GetPoint(slot));
auto s = srv_->slot_migrator->ClearKeysOfSlotRange(ctx, kDefaultNamespace, SlotRange::GetPoint(slot));
if (!s.ok()) {
LOG(ERROR) << "failed to clear data of migrated slots: " << s.ToString();
}
Expand Down
9 changes: 6 additions & 3 deletions src/cluster/slot_import.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ Status SlotImport::Start(const SlotRange &slot_range) {
}

// Clean slot data first
auto s = ClearKeysOfSlotRange(namespace_, slot_range);
engine::Context ctx(srv_->storage);
auto s = ClearKeysOfSlotRange(ctx, namespace_, slot_range);
if (!s.ok()) {
return {Status::NotOK, fmt::format("clear keys of slot(s) error: {}", s.ToString())};
}
Expand Down Expand Up @@ -74,7 +75,8 @@ Status SlotImport::Fail(const SlotRange &slot_range) {
}

// Clean imported slot data
auto s = ClearKeysOfSlotRange(namespace_, slot_range);
engine::Context ctx(srv_->storage);
auto s = ClearKeysOfSlotRange(ctx, namespace_, slot_range);
if (!s.ok()) {
return {Status::NotOK, fmt::format("clear keys of slot(s) error: {}", s.ToString())};
}
Expand All @@ -99,7 +101,8 @@ Status SlotImport::StopForLinkError() {
// from new master.
if (!srv_->IsSlave()) {
// Clean imported slot data
auto s = ClearKeysOfSlotRange(namespace_, import_slot_range_);
engine::Context ctx(srv_->storage);
auto s = ClearKeysOfSlotRange(ctx, namespace_, import_slot_range_);
if (!s.ok()) {
return {Status::NotOK, fmt::format("clear keys of slot error: {}", s.ToString())};
}
Expand Down
3 changes: 2 additions & 1 deletion src/cluster/slot_migrate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1254,7 +1254,8 @@ Status SlotMigrator::sendSnapshotByRawKV() {
read_options.snapshot = slot_snapshot_;
rocksdb::Slice prefix_slice(prefix);
read_options.iterate_lower_bound = &prefix_slice;
engine::DBIterator iter(storage_, read_options);
auto no_txn_ctx = engine::Context::NoTransactionContext(storage_);
engine::DBIterator iter(no_txn_ctx, read_options);

BatchSender batch_sender(*dst_fd_, migrate_batch_size_bytes_, migrate_batch_bytes_per_sec_);

Expand Down
20 changes: 13 additions & 7 deletions src/commands/cmd_bit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ class CommandGetBit : public Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
bool bit = false;
redis::Bitmap bitmap_db(srv->storage, conn->GetNamespace());
auto s = bitmap_db.GetBit(args_[1], offset_, &bit);
engine::Context ctx(srv->storage);
auto s = bitmap_db.GetBit(ctx, args_[1], offset_, &bit);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::Integer(bit ? 1 : 0);
Expand Down Expand Up @@ -80,7 +81,8 @@ class CommandSetBit : public Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
bool old_bit = false;
redis::Bitmap bitmap_db(srv->storage, conn->GetNamespace());
auto s = bitmap_db.SetBit(args_[1], offset_, bit_, &old_bit);
engine::Context ctx(srv->storage);
auto s = bitmap_db.SetBit(ctx, args_[1], offset_, bit_, &old_bit);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::Integer(old_bit ? 1 : 0);
Expand Down Expand Up @@ -135,7 +137,8 @@ class CommandBitCount : public Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
uint32_t cnt = 0;
redis::Bitmap bitmap_db(srv->storage, conn->GetNamespace());
auto s = bitmap_db.BitCount(args_[1], start_, stop_, is_bit_index_, &cnt);
engine::Context ctx(srv->storage);
auto s = bitmap_db.BitCount(ctx, args_[1], start_, stop_, is_bit_index_, &cnt);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::Integer(cnt);
Expand Down Expand Up @@ -194,7 +197,8 @@ class CommandBitPos : public Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
int64_t pos = 0;
redis::Bitmap bitmap_db(srv->storage, conn->GetNamespace());
auto s = bitmap_db.BitPos(args_[1], bit_, start_, stop_, stop_given_, &pos, is_bit_index_);
engine::Context ctx(srv->storage);
auto s = bitmap_db.BitPos(ctx, args_[1], bit_, start_, stop_, stop_given_, &pos, is_bit_index_);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::Integer(pos);
Expand Down Expand Up @@ -239,7 +243,8 @@ class CommandBitOp : public Commander {

int64_t dest_key_len = 0;
redis::Bitmap bitmap_db(srv->storage, conn->GetNamespace());
auto s = bitmap_db.BitOp(op_flag_, args_[1], args_[2], op_keys, &dest_key_len);
engine::Context ctx(srv->storage);
auto s = bitmap_db.BitOp(ctx, op_flag_, args_[1], args_[2], op_keys, &dest_key_len);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::Integer(dest_key_len);
Expand Down Expand Up @@ -336,10 +341,11 @@ class CommandBitfield : public Commander {
redis::Bitmap bitmap_db(srv->storage, conn->GetNamespace());
std::vector<std::optional<BitfieldValue>> rets;
rocksdb::Status s;
engine::Context ctx(srv->storage);
if (read_only_) {
s = bitmap_db.BitfieldReadOnly(args_[1], cmds_, &rets);
s = bitmap_db.BitfieldReadOnly(ctx, args_[1], cmds_, &rets);
} else {
s = bitmap_db.Bitfield(args_[1], cmds_, &rets);
s = bitmap_db.Bitfield(ctx, args_[1], cmds_, &rets);
}
std::vector<std::string> str_rets(rets.size());
for (size_t i = 0; i != rets.size(); ++i) {
Expand Down
24 changes: 16 additions & 8 deletions src/commands/cmd_bloom_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ class CommandBFReserve : public Commander {

Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::BloomChain bloomfilter_db(srv->storage, conn->GetNamespace());
auto s = bloomfilter_db.Reserve(args_[1], capacity_, error_rate_, expansion_);
engine::Context ctx(srv->storage);
auto s = bloomfilter_db.Reserve(ctx, args_[1], capacity_, error_rate_, expansion_);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::SimpleString("OK");
Expand All @@ -108,7 +109,8 @@ class CommandBFAdd : public Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::BloomChain bloom_db(srv->storage, conn->GetNamespace());
BloomFilterAddResult ret = BloomFilterAddResult::kOk;
auto s = bloom_db.Add(args_[1], args_[2], &ret);
engine::Context ctx(srv->storage);
auto s = bloom_db.Add(ctx, args_[1], args_[2], &ret);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

switch (ret) {
Expand Down Expand Up @@ -139,7 +141,8 @@ class CommandBFMAdd : public Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::BloomChain bloom_db(srv->storage, conn->GetNamespace());
std::vector<BloomFilterAddResult> rets(items_.size(), BloomFilterAddResult::kOk);
auto s = bloom_db.MAdd(args_[1], items_, &rets);
engine::Context ctx(srv->storage);
auto s = bloom_db.MAdd(ctx, args_[1], items_, &rets);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::MultiLen(items_.size());
Expand Down Expand Up @@ -234,7 +237,8 @@ class CommandBFInsert : public Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::BloomChain bloom_db(srv->storage, conn->GetNamespace());
std::vector<BloomFilterAddResult> rets(items_.size(), BloomFilterAddResult::kOk);
auto s = bloom_db.InsertCommon(args_[1], items_, insert_options_, &rets);
engine::Context ctx(srv->storage);
auto s = bloom_db.InsertCommon(ctx, args_[1], items_, insert_options_, &rets);
if (s.IsNotFound()) return {Status::RedisExecErr, "key is not found"};
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

Expand Down Expand Up @@ -265,7 +269,8 @@ class CommandBFExists : public Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::BloomChain bloom_db(srv->storage, conn->GetNamespace());
bool exist = false;
auto s = bloom_db.Exists(args_[1], args_[2], &exist);
engine::Context ctx(srv->storage);
auto s = bloom_db.Exists(ctx, args_[1], args_[2], &exist);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::Integer(exist ? 1 : 0);
Expand All @@ -286,7 +291,8 @@ class CommandBFMExists : public Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::BloomChain bloom_db(srv->storage, conn->GetNamespace());
std::vector<bool> exists(items_.size(), false);
auto s = bloom_db.MExists(args_[1], items_, &exists);
engine::Context ctx(srv->storage);
auto s = bloom_db.MExists(ctx, args_[1], items_, &exists);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::MultiLen(items_.size());
Expand Down Expand Up @@ -329,7 +335,8 @@ class CommandBFInfo : public Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::BloomChain bloom_db(srv->storage, conn->GetNamespace());
BloomFilterInfo info;
auto s = bloom_db.Info(args_[1], &info);
engine::Context ctx(srv->storage);
auto s = bloom_db.Info(ctx, args_[1], &info);
if (s.IsNotFound()) return {Status::RedisExecErr, "key is not found"};
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

Expand Down Expand Up @@ -376,7 +383,8 @@ class CommandBFCard : public Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::BloomChain bloom_db(srv->storage, conn->GetNamespace());
BloomFilterInfo info;
auto s = bloom_db.Info(args_[1], &info);
engine::Context ctx(srv->storage);
auto s = bloom_db.Info(ctx, args_[1], &info);
if (!s.ok() && !s.IsNotFound()) return {Status::RedisExecErr, s.ToString()};
if (s.IsNotFound()) {
*output = redis::Integer(0);
Expand Down
5 changes: 3 additions & 2 deletions src/commands/cmd_function.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "commands/command_parser.h"
#include "parse_util.h"
#include "server/redis_reply.h"
#include "server/server.h"
#include "storage/scripting.h"
#include "string_util.h"

Expand All @@ -30,6 +31,7 @@ namespace redis {
struct CommandFunction : Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
CommandParser parser(args_, 1);
engine::Context ctx(srv->storage);
if (parser.EatEqICase("load")) {
bool replace = false;
if (parser.EatEqICase("replace")) {
Expand Down Expand Up @@ -70,8 +72,7 @@ struct CommandFunction : Commander {
if (!lua::FunctionIsLibExist(conn, libname)) {
return {Status::NotOK, "no such library"};
}

auto s = lua::FunctionDelete(srv, libname);
auto s = lua::FunctionDelete(ctx, srv, libname);
if (!s) return s;

*output = SimpleString("OK");
Expand Down
26 changes: 17 additions & 9 deletions src/commands/cmd_geo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ class CommandGeoAdd : public CommandGeoBase {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
uint64_t ret = 0;
redis::Geo geo_db(srv->storage, conn->GetNamespace());
auto s = geo_db.Add(args_[1], &geo_points_, &ret);
engine::Context ctx(srv->storage);
auto s = geo_db.Add(ctx, args_[1], &geo_points_, &ret);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
Expand Down Expand Up @@ -142,7 +143,8 @@ class CommandGeoDist : public CommandGeoBase {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
double distance = 0;
redis::Geo geo_db(srv->storage, conn->GetNamespace());
auto s = geo_db.Dist(args_[1], args_[2], args_[3], &distance);
engine::Context ctx(srv->storage);
auto s = geo_db.Dist(ctx, args_[1], args_[2], args_[3], &distance);
if (!s.ok() && !s.IsNotFound()) {
return {Status::RedisExecErr, s.ToString()};
}
Expand All @@ -168,7 +170,8 @@ class CommandGeoHash : public Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
std::vector<std::string> hashes;
redis::Geo geo_db(srv->storage, conn->GetNamespace());
auto s = geo_db.Hash(args_[1], members_, &hashes);
engine::Context ctx(srv->storage);
auto s = geo_db.Hash(ctx, args_[1], members_, &hashes);
if (!s.ok() && !s.IsNotFound()) {
return {Status::RedisExecErr, s.ToString()};
}
Expand Down Expand Up @@ -197,7 +200,8 @@ class CommandGeoPos : public Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
std::map<std::string, GeoPoint> geo_points;
redis::Geo geo_db(srv->storage, conn->GetNamespace());
auto s = geo_db.Pos(args_[1], members_, &geo_points);
engine::Context ctx(srv->storage);
auto s = geo_db.Pos(ctx, args_[1], members_, &geo_points);
if (!s.ok() && !s.IsNotFound()) {
return {Status::RedisExecErr, s.ToString()};
}
Expand Down Expand Up @@ -304,7 +308,8 @@ class CommandGeoRadius : public CommandGeoBase {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
std::vector<GeoPoint> geo_points;
redis::Geo geo_db(srv->storage, conn->GetNamespace());
auto s = geo_db.Radius(args_[1], longitude_, latitude_, GetRadiusMeters(radius_), count_, sort_, store_key_,
engine::Context ctx(srv->storage);
auto s = geo_db.Radius(ctx, args_[1], longitude_, latitude_, GetRadiusMeters(radius_), count_, sort_, store_key_,
store_distance_, GetUnitConversion(), &geo_points);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
Expand Down Expand Up @@ -452,8 +457,9 @@ class CommandGeoSearch : public CommandGeoBase {
std::vector<GeoPoint> geo_points;
redis::Geo geo_db(srv->storage, conn->GetNamespace());

auto s = geo_db.Search(args_[1], geo_shape_, origin_point_type_, member_, count_, sort_, false, GetUnitConversion(),
&geo_points);
engine::Context ctx(srv->storage);
auto s = geo_db.Search(ctx, args_[1], geo_shape_, origin_point_type_, member_, count_, sort_, false,
GetUnitConversion(), &geo_points);

if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
Expand Down Expand Up @@ -614,7 +620,8 @@ class CommandGeoSearchStore : public CommandGeoSearch {
std::vector<GeoPoint> geo_points;
redis::Geo geo_db(srv->storage, conn->GetNamespace());

auto s = geo_db.SearchStore(args_[2], geo_shape_, origin_point_type_, member_, count_, sort_, store_key_,
engine::Context ctx(srv->storage);
auto s = geo_db.SearchStore(ctx, args_[2], geo_shape_, origin_point_type_, member_, count_, sort_, store_key_,
store_distance_, GetUnitConversion(), &geo_points);

if (!s.ok()) {
Expand Down Expand Up @@ -654,7 +661,8 @@ class CommandGeoRadiusByMember : public CommandGeoRadius {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
std::vector<GeoPoint> geo_points;
redis::Geo geo_db(srv->storage, conn->GetNamespace());
auto s = geo_db.RadiusByMember(args_[1], args_[2], GetRadiusMeters(radius_), count_, sort_, store_key_,
engine::Context ctx(srv->storage);
auto s = geo_db.RadiusByMember(ctx, args_[1], args_[2], GetRadiusMeters(radius_), count_, sort_, store_key_,
store_distance_, GetUnitConversion(), &geo_points);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
Expand Down
Loading
Loading