Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce CLUSTER SLOT-STATS command (#20). #351

Merged
merged 11 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ endif
ENGINE_NAME=valkey
SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX)
ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX)
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
madolson marked this conversation as resolved.
Show resolved Hide resolved
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX)
Expand Down
2 changes: 2 additions & 0 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ int detectAndUpdateCachedNodeHealth(void);
client *createCachedResponseClient(void);
void deleteCachedResponseClient(client *recording_client);
void clearCachedClusterSlotsResponse(void);
unsigned int countKeysInSlot(unsigned int hashslot);
int getSlotOrReply(client *c, robj *o);

/* functions with shared implementations */
int clusterNodeIsMyself(clusterNode *n);
Expand Down
182 changes: 182 additions & 0 deletions src/cluster_slot_stats.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* Copyright Valkey Contributors.
* All rights reserved.
* SPDX-License-Identifier: BSD 3-Clause
*/

#include "server.h"
#include "cluster.h"

#define UNASSIGNED_SLOT 0

typedef enum {
KEY_COUNT,
INVALID,
} slotStatTypes;

/* -----------------------------------------------------------------------------
* CLUSTER SLOT-STATS command
* -------------------------------------------------------------------------- */

/* Struct used to temporarily hold slot statistics for sorting. */
typedef struct {
int slot;
uint64_t stat;
} slotStatForSort;

static int doesSlotBelongToMyShard(int slot) {
clusterNode *myself = getMyClusterNode();
clusterNode *primary = clusterNodeGetPrimary(myself);

return clusterNodeCoversSlot(primary, slot);
}

static int markSlotsAssignedToMyShard(unsigned char *assigned_slots, int start_slot, int end_slot) {
int assigned_slots_count = 0;
for (int slot = start_slot; slot <= end_slot; slot++) {
if (doesSlotBelongToMyShard(slot)) {
assigned_slots[slot]++;
assigned_slots_count++;
}
}
return assigned_slots_count;
}

static uint64_t getSlotStat(int slot, int stat_type) {
serverAssert(stat_type != INVALID);
uint64_t slot_stat = 0;
if (stat_type == KEY_COUNT) {
slot_stat = countKeysInSlot(slot);
}
return slot_stat;
}

static int slotStatForSortAscCmp(const void *a, const void *b) {
slotStatForSort entry_a = *((slotStatForSort *)a);
slotStatForSort entry_b = *((slotStatForSort *)b);
return entry_a.stat - entry_b.stat;
}

static int slotStatForSortDescCmp(const void *a, const void *b) {
slotStatForSort entry_a = *((slotStatForSort *)a);
slotStatForSort entry_b = *((slotStatForSort *)b);
return entry_b.stat - entry_a.stat;
}

static void collectAndSortSlotStats(slotStatForSort slot_stats[], int order_by, int desc) {
int i = 0;

for (int slot = 0; slot < CLUSTER_SLOTS; slot++) {
if (doesSlotBelongToMyShard(slot)) {
slot_stats[i].slot = slot;
slot_stats[i].stat = getSlotStat(slot, order_by);
i++;
}
}
qsort(slot_stats, i, sizeof(slotStatForSort), (desc) ? slotStatForSortDescCmp : slotStatForSortAscCmp);
}

static void addReplySlotStat(client *c, int slot) {
addReplyArrayLen(c, 2); /* Array of size 2, where 0th index represents (int) slot,
* and 1st index represents (map) usage statistics. */
addReplyLongLong(c, slot);
addReplyMapLen(c, 1); /* Nested map representing slot usage statistics. */
addReplyBulkCString(c, "key-count");
addReplyLongLong(c, countKeysInSlot(slot));
}

/* Adds reply for the SLOTSRANGE variant.
* Response is ordered in ascending slot number. */
static void addReplySlotsRange(client *c, unsigned char *assigned_slots, int startslot, int endslot, int len) {
addReplyArrayLen(c, len); /* Top level RESP reply format is defined as an array, due to ordering invariance. */

for (int slot = startslot; slot <= endslot; slot++) {
if (assigned_slots[slot]) addReplySlotStat(c, slot);
}
}

static void addReplySortedSlotStats(client *c, slotStatForSort slot_stats[], long limit) {
int num_slots_assigned = getMyShardSlotCount();
int len = min(limit, num_slots_assigned);
addReplyArrayLen(c, len); /* Top level RESP reply format is defined as an array, due to ordering invariance. */

for (int i = 0; i < len; i++) {
addReplySlotStat(c, slot_stats[i].slot);
}
}

/* Adds reply for the ORDERBY variant.
* Response is ordered based on the sort result. */
static void addReplyOrderBy(client *c, int order_by, long limit, int desc) {
slotStatForSort slot_stats[CLUSTER_SLOTS];
collectAndSortSlotStats(slot_stats, order_by, desc);
addReplySortedSlotStats(c, slot_stats, limit);
}

void clusterSlotStatsCommand(client *c) {
if (server.cluster_enabled == 0) {
addReplyError(c, "This instance has cluster support disabled");
return;
}

/* Parse additional arguments. */
if (c->argc == 5 && !strcasecmp(c->argv[2]->ptr, "slotsrange")) {
/* CLUSTER SLOT-STATS SLOTSRANGE start-slot end-slot */
int startslot, endslot;
if ((startslot = getSlotOrReply(c, c->argv[3])) == C_ERR ||
(endslot = getSlotOrReply(c, c->argv[4])) == C_ERR) {
return;
}
if (startslot > endslot) {
addReplyErrorFormat(c, "Start slot number %d is greater than end slot number %d", startslot, endslot);
return;
}
/* Initialize slot assignment array. */
unsigned char assigned_slots[CLUSTER_SLOTS] = {UNASSIGNED_SLOT};
int assigned_slots_count = markSlotsAssignedToMyShard(assigned_slots, startslot, endslot);
addReplySlotsRange(c, assigned_slots, startslot, endslot, assigned_slots_count);
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved

} else if (c->argc >= 4 && !strcasecmp(c->argv[2]->ptr, "orderby")) {
/* CLUSTER SLOT-STATS ORDERBY metric [LIMIT limit] [ASC | DESC] */
int desc = 1, order_by = INVALID;
if (!strcasecmp(c->argv[3]->ptr, "key-count")) {
madolson marked this conversation as resolved.
Show resolved Hide resolved
order_by = KEY_COUNT;
} else {
addReplyError(c, "Unrecognized sort metric for ORDER BY. The supported metrics are: key-count.");
return;
}
int i = 4; /* Next argument index, following ORDERBY */
int limit_counter = 0, asc_desc_counter = 0;
long limit;
while (i < c->argc) {
int moreargs = c->argc > i + 1;
if (!strcasecmp(c->argv[i]->ptr, "limit") && moreargs) {
if (getRangeLongFromObjectOrReply(
c, c->argv[i + 1], 1, CLUSTER_SLOTS, &limit,
"Limit has to lie in between 1 and 16384 (maximum number of slots).") != C_OK) {
return;
}
i++;
limit_counter++;
} else if (!strcasecmp(c->argv[i]->ptr, "asc")) {
desc = 0;
asc_desc_counter++;
} else if (!strcasecmp(c->argv[i]->ptr, "desc")) {
desc = 1;
asc_desc_counter++;
} else {
addReplyErrorObject(c, shared.syntaxerr);
return;
}
if (limit_counter > 1 || asc_desc_counter > 1) {
addReplyError(c, "Multiple filters of the same type are disallowed.");
return;
}
i++;
}
addReplyOrderBy(c, order_by, limit, desc);

} else {
addReplySubcommandSyntaxError(c);
}
}
51 changes: 51 additions & 0 deletions src/commands.def
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,56 @@ struct COMMAND_ARG CLUSTER_SLAVES_Args[] = {
{MAKE_ARG("node-id",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
};

/********** CLUSTER SLOT_STATS ********************/

#ifndef SKIP_CMD_HISTORY_TABLE
/* CLUSTER SLOT_STATS history */
#define CLUSTER_SLOT_STATS_History NULL
#endif

#ifndef SKIP_CMD_TIPS_TABLE
/* CLUSTER SLOT_STATS tips */
const char *CLUSTER_SLOT_STATS_Tips[] = {
"nondeterministic_output",
"request_policy:all_shards",
};
#endif

#ifndef SKIP_CMD_KEY_SPECS_TABLE
/* CLUSTER SLOT_STATS key specs */
#define CLUSTER_SLOT_STATS_Keyspecs NULL
#endif

/* CLUSTER SLOT_STATS filter slotsrange argument table */
struct COMMAND_ARG CLUSTER_SLOT_STATS_filter_slotsrange_Subargs[] = {
{MAKE_ARG("start-slot",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("end-slot",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
};

/* CLUSTER SLOT_STATS filter orderby order argument table */
struct COMMAND_ARG CLUSTER_SLOT_STATS_filter_orderby_order_Subargs[] = {
{MAKE_ARG("asc",ARG_TYPE_PURE_TOKEN,-1,"ASC",NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("desc",ARG_TYPE_PURE_TOKEN,-1,"DESC",NULL,NULL,CMD_ARG_NONE,0,NULL)},
};

/* CLUSTER SLOT_STATS filter orderby argument table */
struct COMMAND_ARG CLUSTER_SLOT_STATS_filter_orderby_Subargs[] = {
{MAKE_ARG("metric",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("limit",ARG_TYPE_INTEGER,-1,"LIMIT",NULL,NULL,CMD_ARG_OPTIONAL,0,NULL)},
{MAKE_ARG("order",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,2,NULL),.subargs=CLUSTER_SLOT_STATS_filter_orderby_order_Subargs},
};

/* CLUSTER SLOT_STATS filter argument table */
struct COMMAND_ARG CLUSTER_SLOT_STATS_filter_Subargs[] = {
{MAKE_ARG("slotsrange",ARG_TYPE_BLOCK,-1,"SLOTSRANGE",NULL,NULL,CMD_ARG_NONE,2,NULL),.subargs=CLUSTER_SLOT_STATS_filter_slotsrange_Subargs},
{MAKE_ARG("orderby",ARG_TYPE_BLOCK,-1,"ORDERBY",NULL,NULL,CMD_ARG_NONE,3,NULL),.subargs=CLUSTER_SLOT_STATS_filter_orderby_Subargs},
};

/* CLUSTER SLOT_STATS argument table */
struct COMMAND_ARG CLUSTER_SLOT_STATS_Args[] = {
{MAKE_ARG("filter",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,2,NULL),.subargs=CLUSTER_SLOT_STATS_filter_Subargs},
};

/********** CLUSTER SLOTS ********************/

#ifndef SKIP_CMD_HISTORY_TABLE
Expand Down Expand Up @@ -981,6 +1031,7 @@ struct COMMAND_STRUCT CLUSTER_Subcommands[] = {
{MAKE_CMD("setslot","Binds a hash slot to a node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SETSLOT_History,1,CLUSTER_SETSLOT_Tips,0,clusterCommand,-4,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE|CMD_MAY_REPLICATE,0,CLUSTER_SETSLOT_Keyspecs,0,NULL,3),.args=CLUSTER_SETSLOT_Args},
{MAKE_CMD("shards","Returns the mapping of cluster slots to shards.","O(N) where N is the total number of cluster nodes","7.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SHARDS_History,0,CLUSTER_SHARDS_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SHARDS_Keyspecs,0,NULL,0)},
{MAKE_CMD("slaves","Lists the replica nodes of a primary node.","O(N) where N is the number of replicas.","3.0.0",CMD_DOC_DEPRECATED,"`CLUSTER REPLICAS`","5.0.0","cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLAVES_History,0,CLUSTER_SLAVES_Tips,1,clusterCommand,3,CMD_ADMIN|CMD_STALE,0,CLUSTER_SLAVES_Keyspecs,0,NULL,1),.args=CLUSTER_SLAVES_Args},
{MAKE_CMD("slot-stats","Return an array of slot usage statistics for slots assigned to the current node.","O(N) where N is the total number of slots based on arguments. O(N*log(N)) with ORDERBY subcommand.","8.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLOT_STATS_History,0,CLUSTER_SLOT_STATS_Tips,2,clusterSlotStatsCommand,-4,CMD_STALE|CMD_LOADING,0,CLUSTER_SLOT_STATS_Keyspecs,0,NULL,1),.args=CLUSTER_SLOT_STATS_Args},
{MAKE_CMD("slots","Returns the mapping of cluster slots to nodes.","O(N) where N is the total number of Cluster nodes","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLOTS_History,2,CLUSTER_SLOTS_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SLOTS_Keyspecs,0,NULL,0)},
{0}
};
Expand Down
102 changes: 102 additions & 0 deletions src/commands/cluster-slot-stats.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
{
"SLOT-STATS": {
"summary": "Return an array of slot usage statistics for slots assigned to the current node.",
"complexity": "O(N) where N is the total number of slots based on arguments. O(N*log(N)) with ORDERBY subcommand.",
"group": "cluster",
"since": "8.0.0",
"arity": -4,
"container": "CLUSTER",
"function": "clusterSlotStatsCommand",
"command_flags": [
"STALE",
"LOADING"
],
"command_tips": [
"NONDETERMINISTIC_OUTPUT",
"REQUEST_POLICY:ALL_SHARDS"
],
"reply_schema": {
"type": "array",
"description": "Array of nested arrays, where the inner array element represents a slot and its respective usage statistics.",
"items": {
"type": "array",
"description": "Array of size 2, where 0th index represents (int) slot and 1st index represents (map) usage statistics.",
"minItems": 2,
"maxItems": 2,
"items": [
{
"description": "Slot Number.",
"type": "integer"
},
{
"type": "object",
"description": "Map of slot usage statistics.",
"additionalProperties": false,
"properties": {
"key-count": {
"type": "integer"
}
}
}
]
}
},
"arguments": [
{
"name": "filter",
"type": "oneof",
"arguments": [
{
"token": "SLOTSRANGE",
"name": "slotsrange",
"type": "block",
"arguments": [
{
"name": "start-slot",
"type": "integer"
},
{
"name": "end-slot",
"type": "integer"
}
]
},
{
"token": "ORDERBY",
"name": "orderby",
"type": "block",
"arguments": [
{
"name": "metric",
"type": "string"
},
{
"token": "LIMIT",
"name": "limit",
"type": "integer",
"optional": true
},
{
"name": "order",
"type": "oneof",
"optional": true,
"arguments": [
{
"name": "asc",
"type": "pure-token",
"token": "ASC"
},
{
"name": "desc",
"type": "pure-token",
"token": "DESC"
}
]
}
]
}
]
}
]
}
}
madolson marked this conversation as resolved.
Show resolved Hide resolved
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -3746,6 +3746,7 @@ void sunsubscribeCommand(client *c);
void watchCommand(client *c);
void unwatchCommand(client *c);
void clusterCommand(client *c);
void clusterSlotStatsCommand(client *c);
void restoreCommand(client *c);
void migrateCommand(client *c);
void askingCommand(client *c);
Expand Down
Loading
Loading