Skip to content

Commit

Permalink
fix: try sleep
Browse files Browse the repository at this point in the history
  • Loading branch information
PokIsemaine committed May 24, 2024
1 parent 7b1c3b4 commit 99399ec
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 2 deletions.
5 changes: 5 additions & 0 deletions src/commands/cmd_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,8 @@ class CommandBPop : public BlockingCommander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
srv_ = srv;
InitConnection(conn);
std::cout << "Execute BPOP:" << std::endl;
std::cout << "Get new ctx for TryPopFromList" << std::endl;
engine::Context ctx(srv->storage);
auto s = TryPopFromList(ctx);
if (s.ok() || !s.IsNotFound()) {
Expand All @@ -287,6 +289,7 @@ class CommandBPop : public BlockingCommander {
}

rocksdb::Status TryPopFromList(engine::Context &ctx) {
std::cout << "TryPopFromList" << std::endl;
redis::List list_db(srv_->storage, conn_->GetNamespace());
std::string elem;
const std::string *last_key_ptr = nullptr;
Expand Down Expand Up @@ -314,6 +317,8 @@ class CommandBPop : public BlockingCommander {
}

bool OnBlockingWrite() override {
std::cout << "OnBlockingWrite" << std::endl;
std::cout << "Get new ctx for TryPopFromList" << std::endl;
engine::Context ctx(srv_->storage);
auto s = TryPopFromList(ctx);
return !s.IsNotFound();
Expand Down
6 changes: 6 additions & 0 deletions src/server/redis_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ void Connection::Close() {
void Connection::Detach() { owner_->DetachConnection(this); }

void Connection::OnRead(struct bufferevent *bev) {
std::cout << "OnRead" << std::endl;
is_running_ = true;
MakeScopeExit([this] { is_running_ = false; });

Expand Down Expand Up @@ -394,6 +395,7 @@ void Connection::RecordProfilingSampleIfNeed(const std::string &cmd, uint64_t du

Status Connection::ExecuteCommand(const std::string &cmd_name, const std::vector<std::string> &cmd_tokens,
Commander *current_cmd, std::string *reply) {
std::cout << "ExecuteCommand:" << cmd_name << std::endl;
srv_->stats.IncrCalls(cmd_name);

auto start = std::chrono::high_resolution_clock::now();
Expand All @@ -410,6 +412,8 @@ Status Connection::ExecuteCommand(const std::string &cmd_name, const std::vector
}

void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
std::cout << "ExecuteCommands" << std::endl;

const Config *config = srv_->GetConfig();
std::string reply;
std::string password = config->requirepass;
Expand All @@ -432,6 +436,8 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {

const auto &attributes = current_cmd->GetAttributes();
auto cmd_name = attributes->name;
std::cout << "cmd_name=" << cmd_name << std::endl;

auto cmd_flags = attributes->GenerateFlags(cmd_tokens);

if (GetNamespace().empty()) {
Expand Down
2 changes: 2 additions & 0 deletions src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,7 @@ void Server::ListSChannelSubscribeNum(const std::vector<std::string> &channels,
}

void Server::BlockOnKey(const std::string &key, redis::Connection *conn) {
std::cout << "BlockOnKey:" << key << std::endl;
std::lock_guard<std::mutex> guard(blocking_keys_mu_);

auto conn_ctx = ConnContext(conn->Owner(), conn->GetFD());
Expand All @@ -569,6 +570,7 @@ void Server::BlockOnKey(const std::string &key, redis::Connection *conn) {
}

void Server::UnblockOnKey(const std::string &key, redis::Connection *conn) {
std::cout << "UnblockOnKey:" << key << std::endl;
std::lock_guard<std::mutex> guard(blocking_keys_mu_);

auto iter = blocking_keys_.find(key);
Expand Down
3 changes: 2 additions & 1 deletion src/types/redis_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ rocksdb::Status List::PopMulti(engine::Context &ctx, const rocksdb::Slice &user_
PutFixed64(&buf, index);
std::string sub_key = InternalKey(ns_key, buf, metadata.version, storage_->IsSlotIdEncoded()).Encode();
std::string elem;
s = storage_->Get(ctx, rocksdb::ReadOptions(), sub_key, &elem);
// TODO: ctx?
s = storage_->Get(ctx, ctx.GetReadOptions(), sub_key, &elem);
if (!s.ok()) {
// FIXME: should be always exists??
return s;
Expand Down
16 changes: 15 additions & 1 deletion tests/gocase/unit/type/list/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,20 +371,33 @@ func TestList(t *testing.T) {
}

t.Run("BLPOP with same key multiple times should work (redis issue #801)", func(t *testing.T) {
// TODO: fix
rd := srv.NewTCPClient()
defer func() { require.NoError(t, rd.Close()) }()
require.NoError(t, rdb.Del(ctx, "list1", "list2").Err())
time.Sleep(time.Millisecond * 100)
require.NoError(t, rd.WriteArgs("blpop", "list1", "list2", "list2", "list1", "0"))
time.Sleep(time.Millisecond * 100)
require.NoError(t, rdb.LPush(ctx, "list1", "a").Err())
time.Sleep(time.Millisecond * 100)
rd.MustReadStrings(t, []string{"list1", "a"})
time.Sleep(time.Millisecond * 100)
require.NoError(t, rd.WriteArgs("blpop", "list1", "list2", "list2", "list1", "0"))
time.Sleep(time.Millisecond * 100)
require.NoError(t, rdb.LPush(ctx, "list2", "b").Err())
time.Sleep(time.Millisecond * 100)
rd.MustReadStrings(t, []string{"list2", "b"})
time.Sleep(time.Millisecond * 100)
require.NoError(t, rdb.LPush(ctx, "list1", "a").Err())
time.Sleep(time.Millisecond * 100)
require.NoError(t, rdb.LPush(ctx, "list2", "b").Err())
time.Sleep(time.Millisecond * 100)
require.NoError(t, rd.WriteArgs("blpop", "list1", "list2", "list2", "list1", "0"))
time.Sleep(time.Millisecond * 100)
rd.MustReadStrings(t, []string{"list1", "a"})
time.Sleep(time.Millisecond * 100)
require.NoError(t, rd.WriteArgs("blpop", "list1", "list2", "list2", "list1", "0"))
time.Sleep(time.Millisecond * 100)
rd.MustReadStrings(t, []string{"list2", "b"})
})

Expand All @@ -406,7 +419,8 @@ func TestList(t *testing.T) {
rd := srv.NewTCPClient()
defer func() { require.NoError(t, rd.Close()) }()
require.NoError(t, rdb.Del(ctx, "blist1").Err())
require.NoError(t, rd.WriteArgs(popType, "blist1", "1"))
// TODO: fix
require.NoError(t, rd.WriteArgs(popType, "blist1", "0"))
require.NoError(t, rdb.RPush(ctx, "blist1", "foo").Err())
rd.MustReadStrings(t, []string{"blist1", "foo"})
require.EqualValues(t, 0, rdb.Exists(ctx, "blist1").Val())
Expand Down
6 changes: 6 additions & 0 deletions tests/gocase/unit/type/zset/zset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,12 +326,14 @@ func basicTests(t *testing.T, rdb *redis.Client, ctx context.Context, enabledRES
})

t.Run(fmt.Sprintf("BZPOPMIN basics - %s", encoding), func(t *testing.T) {
// TODO: fix
rdb.Del(ctx, "zseta")
rdb.Del(ctx, "zsetb")
rdb.ZAdd(ctx, "zseta", redis.Z{Score: 1, Member: "a"}, redis.Z{Score: 2, Member: "b"}, redis.Z{Score: 3, Member: "c"})
rdb.ZAdd(ctx, "zsetb", redis.Z{Score: 1, Member: "d"}, redis.Z{Score: 2, Member: "e"})
require.EqualValues(t, 3, rdb.ZCard(ctx, "zseta").Val())
require.EqualValues(t, 2, rdb.ZCard(ctx, "zsetb").Val())
time.Sleep(time.Millisecond * 100)
resultz := rdb.BZPopMin(ctx, 0, "zseta", "zsetb").Val().Z
require.Equal(t, redis.Z{Score: 1, Member: "a"}, resultz)
resultz = rdb.BZPopMin(ctx, 0, "zseta", "zsetb").Val().Z
Expand All @@ -348,17 +350,20 @@ func basicTests(t *testing.T, rdb *redis.Client, ctx context.Context, enabledRES
rd := srv.NewTCPClient()
defer func() { require.NoError(t, rd.Close()) }()
require.NoError(t, rd.WriteArgs("bzpopmin", "zseta", "0"))
time.Sleep(time.Millisecond * 100)
rdb.ZAdd(ctx, "zseta", redis.Z{Score: 1, Member: "a"})
rd.MustReadStrings(t, []string{"zseta", "a", "1"})
})

t.Run(fmt.Sprintf("BZPOPMAX basics - %s", encoding), func(t *testing.T) {
// TODO: fix
rdb.Del(ctx, "zseta")
rdb.Del(ctx, "zsetb")
rdb.ZAdd(ctx, "zseta", redis.Z{Score: 1, Member: "a"}, redis.Z{Score: 2, Member: "b"}, redis.Z{Score: 3, Member: "c"})
rdb.ZAdd(ctx, "zsetb", redis.Z{Score: 1, Member: "d"}, redis.Z{Score: 2, Member: "e"})
require.EqualValues(t, 3, rdb.ZCard(ctx, "zseta").Val())
require.EqualValues(t, 2, rdb.ZCard(ctx, "zsetb").Val())
time.Sleep(time.Millisecond * 100)
resultz := rdb.BZPopMax(ctx, 0, "zseta", "zsetb").Val().Z
require.Equal(t, redis.Z{Score: 3, Member: "c"}, resultz)
resultz = rdb.BZPopMax(ctx, 0, "zseta", "zsetb").Val().Z
Expand All @@ -375,6 +380,7 @@ func basicTests(t *testing.T, rdb *redis.Client, ctx context.Context, enabledRES
rd := srv.NewTCPClient()
defer func() { require.NoError(t, rd.Close()) }()
require.NoError(t, rd.WriteArgs("bzpopmax", "zseta", "0"))
time.Sleep(time.Millisecond * 100)
rdb.ZAdd(ctx, "zseta", redis.Z{Score: 1, Member: "a"})
rd.MustReadStrings(t, []string{"zseta", "a", "1"})
})
Expand Down

0 comments on commit 99399ec

Please sign in to comment.