Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add CLIENT PAUSE and CLIENT REPLY subcommands #2495

Open
wants to merge 14 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 61 additions & 1 deletion src/commands/cmd_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,53 @@ class CommandClient : public Commander {
}
return Status::OK();
}
return {Status::RedisInvalidCmd, "Syntax error, try CLIENT LIST|INFO|KILL ip:port|GETNAME|SETNAME"};

if ((subcommand_ == "pause")) {
if (args.size() != 3 && args.size() != 4) {
return {Status::RedisParseErr, errInvalidSyntax};
}

pause_timeout_ms_ = atoi(args[2].c_str());

if (args.size() == 3) {
pause_type_ = kPauseAll;
} else {
if (!strcasecmp(args[3].c_str(), "all")) {
git-hulk marked this conversation as resolved.
Show resolved Hide resolved
pause_type_ = kPauseAll;
} else if (!strcasecmp(args[3].c_str(), "write")) {
pause_type_ = kPauseWrite;
} else {
return {Status::RedisParseErr, errInvalidSyntax};
}
}

return Status::OK();
}

if (subcommand_ == "reply") {
if (args.size() != 2 && args.size() != 3) {
return {Status::RedisParseErr, errInvalidSyntax};
}

if (args.size() == 2) {
reply_type_ = 0;
} else {
if (!strcasecmp(args[2].c_str(), "on")) {
reply_type_ = 0;
} else if (!strcasecmp(args[2].c_str(), "off")) {
reply_type_ = Connection::Flag::kReplyModeOff;
} else if (!strcasecmp(args[2].c_str(), "skip")) {
reply_type_ = Connection::Flag::kReplyModeSkipNext;
} else {
return {Status::RedisParseErr, errInvalidSyntax};
}
}

return Status::OK();
}

return {Status::RedisInvalidCmd,
"Syntax error, try CLIENT LIST|INFO|KILL|PAUSE|REPLY ip:port|GETNAME|SETNAME|timeout"};
}

Status Execute(Server *srv, Connection *conn, std::string *output) override {
Expand Down Expand Up @@ -510,6 +556,17 @@ class CommandClient : public Commander {
*output = redis::SimpleString("OK");
}
return Status::OK();
} else if (subcommand_ == "pause") {
srv->PauseCommands(pause_type_, pause_timeout_ms_);
return Status::OK();
} else if (subcommand_ == "reply") {
conn->DisableFlag(redis::Connection::Flag::kReplyModeOff);
conn->DisableFlag(redis::Connection::Flag::kReplyModeSkip);
conn->DisableFlag(redis::Connection::Flag::kReplyModeSkipNext);
if (reply_type_ != 0) {
conn->EnableFlag((redis::Connection::Flag)reply_type_);
}
return Status::OK();
}

return {Status::RedisInvalidCmd, "Syntax error, try CLIENT LIST|INFO|KILL ip:port|GETNAME|SETNAME"};
Expand All @@ -521,6 +578,9 @@ class CommandClient : public Commander {
std::string subcommand_;
bool skipme_ = false;
int64_t kill_type_ = 0;
int64_t pause_type_ = 0;
int64_t pause_timeout_ms_ = 0;
int64_t reply_type_ = 0;
uint64_t id_ = 0;
bool new_format_ = true;
};
Expand Down
28 changes: 28 additions & 0 deletions src/server/redis_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,17 @@ void Connection::OnEvent(bufferevent *bev, int16_t events) {
}

void Connection::Reply(const std::string &msg) {
// Do not send replies for both SKIP and OFF modes
if (IsFlagEnabled(Flag::kReplyModeOff) || IsFlagEnabled(Flag::kReplyModeSkip)) {
return;
}

// Skip starting from the next reply for SKIP mode
if (IsFlagEnabled(Flag::kReplyModeSkipNext)) {
DisableFlag(Flag::kReplyModeSkipNext);
EnableFlag(Flag::kReplyModeSkip);
}

owner_->srv->stats.IncrOutboundBytes(msg.size());
redis::Reply(bufferevent_get_output(bev_), msg);
}
Expand Down Expand Up @@ -368,10 +379,16 @@ static bool IsCmdForIndexing(const CommandAttributes *attr) {
}

void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
if (srv_->GetCommandPauseType() == kPauseAll) {
return;
}

const Config *config = srv_->GetConfig();
std::string reply;
std::string password = config->requirepass;

std::vector<CommandTokens> commands_to_push_back;

while (!to_process_cmds->empty()) {
CommandTokens cmd_tokens = std::move(to_process_cmds->front());
to_process_cmds->pop_front();
Expand All @@ -396,6 +413,13 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
auto cmd_name = attributes->name;
auto cmd_flags = attributes->GenerateFlags(cmd_tokens);

// Pause the processing of only the write commands, and push them back
// to a list that adds them back to the queue to process them when we unpause
if (srv_->GetCommandPauseType() == kPauseWrite && (cmd_flags & kCmdWrite)) {
furkan-bilgin marked this conversation as resolved.
Show resolved Hide resolved
commands_to_push_back.push_back(cmd_tokens);
continue;
}

if (GetNamespace().empty()) {
if (!password.empty()) {
if (cmd_name != "auth" && cmd_name != "hello") {
Expand Down Expand Up @@ -549,6 +573,10 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
if (!reply.empty()) Reply(reply);
reply.clear();
}

for (const auto &cmd_tokens : commands_to_push_back) {
to_process_cmds->emplace_back(cmd_tokens);
}
}

void Connection::ResetMultiExec() {
Expand Down
4 changes: 4 additions & 0 deletions src/server/redis_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ class Connection : public EvbufCallbackBase<Connection> {
kMultiExec = 1 << 8,
kReadOnly = 1 << 9,
kAsking = 1 << 10,
kReplyModeOff = 1 << 11,
kReplyModeSkip = 1 << 12,
kReplyModeSkipNext = 1 << 13,
};

explicit Connection(bufferevent *bev, Worker *owner);
Expand Down Expand Up @@ -195,6 +198,7 @@ class Connection : public EvbufCallbackBase<Connection> {
std::string last_cmd_;
int64_t create_time_;
int64_t last_interaction_;
int64_t reply_mode_ = kReplyModeOn;

bufferevent *bev_;
Request req_;
Expand Down
14 changes: 14 additions & 0 deletions src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1311,6 +1311,15 @@ void Server::GetInfo(const std::string &ns, const std::string &section, std::str
*info = string_stream.str();
}

int64_t Server::GetCommandPauseType() {
// Stop pausing command if the timeout has reached
if (pause_end_timestamp_ms_ <= util::GetTimeStampMS()) {
pause_type_ = kPauseNone;
}

return pause_type_;
}

std::string Server::GetRocksDBStatsJson() const {
jsoncons::json stats_json;

Expand Down Expand Up @@ -1787,6 +1796,11 @@ Status Server::ExecPropagatedCommand(const std::vector<std::string> &tokens) {
return Status::OK();
}

void Server::PauseCommands(uint64_t type, uint64_t timeout_ms) {
pause_type_ = type;
pause_end_timestamp_ms_ = util::GetTimeStampMS() + timeout_ms;
}

// AdjustOpenFilesLimit only try best to raise the max open files according to
// the max clients and RocksDB open file configuration. It also reserves a number
// of file descriptors(128) for extra operations of persistence, listening sockets,
Expand Down
10 changes: 10 additions & 0 deletions src/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ enum ClientType {
kTypeSlave = (1ULL << 3), // slave client
};

enum ClientCommandPauseType {
kPauseNone = (1ULL << 0), // pause no commands
kPauseWrite = (1ULL << 1), // pause write commands
kPauseAll = (1ULL << 2) // pause all commands
};

enum ServerLogType { kServerLogNone, kReplIdLog };

enum class AuthResult {
Expand Down Expand Up @@ -242,6 +248,7 @@ class Server {
void GetCommandsStatsInfo(std::string *info);
void GetClusterInfo(std::string *info);
void GetInfo(const std::string &ns, const std::string &section, std::string *info);
int64_t GetCommandPauseType();
std::string GetRocksDBStatsJson() const;
ReplState GetReplicationState();

Expand Down Expand Up @@ -284,6 +291,7 @@ class Server {
Status Propagate(const std::string &channel, const std::vector<std::string> &tokens) const;
Status ExecPropagatedCommand(const std::vector<std::string> &tokens);
Status ExecPropagateScriptCommand(const std::vector<std::string> &tokens);
void PauseCommands(uint64_t type, uint64_t timeout_ms);

void SetCurrentConnection(redis::Connection *conn) { curr_connection_ = conn; }
redis::Connection *GetCurrentConnection() { return curr_connection_; }
Expand Down Expand Up @@ -340,6 +348,8 @@ class Server {
Config *config_ = nullptr;
std::string last_random_key_cursor_;
std::mutex last_random_key_cursor_mu_;
std::int64_t pause_type_ = kPauseNone;
std::int64_t pause_end_timestamp_ms_ = 0;

std::atomic<lua_State *> lua_;

Expand Down
Loading