diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index dd95cc6bb7..d530b953a5 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -35,6 +35,7 @@ #include "server.h" #include "cluster.h" #include "cluster_legacy.h" +#include "cluster_slot_stats.h" #include "endianconv.h" #include "connection.h" @@ -1042,6 +1043,7 @@ void clusterInit(void) { clusterUpdateMyselfIp(); clusterUpdateMyselfHostname(); clusterUpdateMyselfHumanNodename(); + clusterSlotStatsReset(); } void clusterInitLast(void) { @@ -4943,6 +4945,7 @@ int clusterAddSlot(clusterNode *n, int slot) { clusterNodeSetSlotBit(n, slot); server.cluster->slots[slot] = n; bitmapClearBit(server.cluster->owner_not_claiming_slot, slot); + clusterSlotStatReset(slot); return C_OK; } @@ -4961,6 +4964,7 @@ int clusterDelSlot(int slot) { server.cluster->slots[slot] = NULL; /* Make owner_not_claiming_slot flag consistent with slot ownership information. */ bitmapClearBit(server.cluster->owner_not_claiming_slot, slot); + clusterSlotStatReset(slot); return C_OK; } diff --git a/src/cluster_slot_stats.c b/src/cluster_slot_stats.c index a2a6bfdd01..580e125655 100644 --- a/src/cluster_slot_stats.c +++ b/src/cluster_slot_stats.c @@ -4,14 +4,14 @@ * SPDX-License-Identifier: BSD 3-Clause */ -#include "server.h" -#include "cluster.h" +#include "cluster_slot_stats.h" #define UNASSIGNED_SLOT 0 typedef enum { - KEY_COUNT, INVALID, + KEY_COUNT, + NETWORK_BYTES_IN, } slotStatTypes; /* ----------------------------------------------------------------------------- @@ -24,6 +24,14 @@ typedef struct { uint64_t stat; } slotStatForSort; +/* Struct used for storing slot statistics. */ +typedef struct slotStat { + uint64_t network_bytes_in; +} slotStat; + +/* Struct used for storing slot statistics, for all slots owned by the current shard. */ +struct slotStat cluster_slot_stats[CLUSTER_SLOTS]; + static int doesSlotBelongToMyShard(int slot) { clusterNode *myself = getMyClusterNode(); clusterNode *primary = clusterNodeGetPrimary(myself); @@ -47,6 +55,8 @@ static uint64_t getSlotStat(int slot, int stat_type) { uint64_t slot_stat = 0; if (stat_type == KEY_COUNT) { slot_stat = countKeysInSlot(slot); + } else if (stat_type == NETWORK_BYTES_IN) { + slot_stat = cluster_slot_stats[slot].network_bytes_in; } return slot_stat; } @@ -88,9 +98,11 @@ static void addReplySlotStat(client *c, int slot) { addReplyArrayLen(c, 2); /* Array of size 2, where 0th index represents (int) slot, * and 1st index represents (map) usage statistics. */ addReplyLongLong(c, slot); - addReplyMapLen(c, 1); /* Nested map representing slot usage statistics. */ + addReplyMapLen(c, 2); /* Nested map representing slot usage statistics. */ addReplyBulkCString(c, "key-count"); addReplyLongLong(c, countKeysInSlot(slot)); + addReplyBulkCString(c, "network-bytes-in"); + addReplyLongLong(c, cluster_slot_stats[slot].network_bytes_in); } /* Adds reply for the SLOTSRANGE variant. @@ -121,6 +133,35 @@ static void addReplyOrderBy(client *c, int order_by, long limit, int desc) { addReplySortedSlotStats(c, slot_stats, limit); } +static int canAddNetworkBytes(client *c) { + /* First, cluster mode must be enabled. + * Second, command should target a specific slot. + * Third, blocked client is not aggregated, to avoid duplicate aggregation upon unblocking. */ + return server.cluster_enabled && c->slot != -1 && !(c->flag.blocked); +} + +/* Resets applicable slot statistics. */ +void clusterSlotStatReset(int slot) { + /* key-count is exempt, as it is queried separately through countKeysInSlot(). */ + cluster_slot_stats[slot].network_bytes_in = 0; +} + +void clusterSlotStatsReset(void) { + memset(cluster_slot_stats, 0, sizeof(cluster_slot_stats)); +} + +/* Adds network ingress bytes of the current command in execution, + * calculated earlier within networking.c layer. + * + * Note: Below function should only be called once c->slot is parsed. + * Otherwise, the aggregation will be skipped due to canAddNetworkBytes() check failure. + * */ +void clusterSlotStatsAddNetworkBytesIn(client *c) { + if (!canAddNetworkBytes(c)) return; + + cluster_slot_stats[c->slot].network_bytes_in += c->net_input_bytes_curr_cmd; +} + void clusterSlotStatsCommand(client *c) { if (server.cluster_enabled == 0) { addReplyError(c, "This instance has cluster support disabled"); @@ -149,8 +190,11 @@ void clusterSlotStatsCommand(client *c) { int desc = 1, order_by = INVALID; if (!strcasecmp(c->argv[3]->ptr, "key-count")) { order_by = KEY_COUNT; + } else if (!strcasecmp(c->argv[3]->ptr, "network-bytes-in")) { + order_by = NETWORK_BYTES_IN; } else { - addReplyError(c, "Unrecognized sort metric for ORDER BY. The supported metrics are: key-count."); + addReplyError(c, "Unrecognized sort metric for ORDER BY. The supported " + "metrics are: key-count and cpu-usec."); return; } int i = 4; /* Next argument index, following ORDERBY */ diff --git a/src/cluster_slot_stats.h b/src/cluster_slot_stats.h new file mode 100644 index 0000000000..75017a3297 --- /dev/null +++ b/src/cluster_slot_stats.h @@ -0,0 +1,12 @@ +/* + * Copyright Valkey Contributors. + * All rights reserved. + * SPDX-License-Identifier: BSD 3-Clause + */ + +#include "server.h" +#include "cluster.h" + +void clusterSlotStatReset(int slot); +void clusterSlotStatsReset(void); +void clusterSlotStatsAddNetworkBytesIn(client *c); diff --git a/src/commands/cluster-slot-stats.json b/src/commands/cluster-slot-stats.json index 7dfcd415ec..f6e5062acb 100644 --- a/src/commands/cluster-slot-stats.json +++ b/src/commands/cluster-slot-stats.json @@ -35,6 +35,9 @@ "properties": { "key-count": { "type": "integer" + }, + "memory-bytes-in": { + "type": "integer" } } } diff --git a/src/networking.c b/src/networking.c index bb7bab02c3..78ee83c120 100644 --- a/src/networking.c +++ b/src/networking.c @@ -29,6 +29,7 @@ #include "server.h" #include "cluster.h" +#include "cluster_slot_stats.h" #include "script.h" #include "fpconv_dtoa.h" #include "fmtargs.h" @@ -215,6 +216,7 @@ client *createClient(connection *conn) { if (conn) linkClient(c); initClientMultiState(c); c->net_input_bytes = 0; + c->net_input_bytes_curr_cmd = 0; c->net_output_bytes = 0; c->commands_processed = 0; return c; @@ -2084,6 +2086,7 @@ void resetClient(client *c) { c->cur_script = NULL; c->reqtype = 0; c->multibulklen = 0; + c->net_input_bytes_curr_cmd = 0; c->bulklen = -1; c->slot = -1; c->flag.executing_command = 0; @@ -2268,6 +2271,21 @@ int processInlineBuffer(client *c) { c->argv_len_sum += sdslen(argv[j]); } zfree(argv); + + /* Per-slot network bytes-in calculation. + * + * Within networking.c, we calculate and store the current command's ingress bytes + * under c->net_input_bytes_curr_cmd, for which its per-slot aggregation is deferred + * until c->slot is parsed later within processCommand(). + * + * Calculation: For inline buffer, every whitespace is of length 1, + * with the exception of the trailing '\r\n' being length 2. + * + * For example; + * Command) SET key value + * Inline) SET key value\r\n + * */ + c->net_input_bytes_curr_cmd = (c->argv_len_sum + (c->argc - 1) + 2); return C_OK; } @@ -2341,7 +2359,8 @@ int processMultibulkBuffer(client *c) { /* We know for sure there is a whole line since newline != NULL, * so go ahead and find out the multi bulk length. */ serverAssertWithInfo(c, NULL, c->querybuf[c->qb_pos] == '*'); - ok = string2ll(c->querybuf + 1 + c->qb_pos, newline - (c->querybuf + 1 + c->qb_pos), &ll); + size_t multibulklen_slen = newline - (c->querybuf + 1 + c->qb_pos); + ok = string2ll(c->querybuf + 1 + c->qb_pos, multibulklen_slen, &ll); if (!ok || ll > INT_MAX) { addReplyError(c, "Protocol error: invalid multibulk length"); setProtocolError("invalid mbulk count", c); @@ -2363,6 +2382,39 @@ int processMultibulkBuffer(client *c) { c->argv_len = min(c->multibulklen, 1024); c->argv = zmalloc(sizeof(robj *) * c->argv_len); c->argv_len_sum = 0; + + /* Per-slot network bytes-in calculation. + * + * Within networking.c, we calculate and store the current command's ingress bytes + * under c->net_input_bytes_curr_cmd, for which its per-slot aggregation is deferred + * until c->slot is parsed later within processCommand(). + * + * Calculation: For multi bulk buffer, we accumulate four factors, namely; + * + * 1) multibulklen_slen + 1 + * Cumulative string length (and not the value of) of multibulklen, + * including +1 from RESP first byte. + * 2) bulklen_slen + c->argc + * Cumulative string length (and not the value of) of bulklen, + * including +1 from RESP first byte per argument count. + * 3) c->argv_len_sum + * Cumulative string length of all argument vectors. + * 4) c->argc * 4 + 2 + * Cumulative string length of all white-spaces, for which there exists a total of + * 4 bytes per argument, plus 2 bytes from the leading '\r\n' from multibulklen. + * + * For example; + * Command) SET key value + * RESP) *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n + * + * 1) String length of "*3" is 2, obtained from (multibulklen_slen + 1). + * 2) String length of "$3" "$3" "$5" is 6, obtained from (bulklen_slen + c->argc). + * 3) String length of "SET" "key" "value" is 11, obtained from (c->argv_len_sum). + * 4) String length of all white-spaces "\r\n" is 14, obtained from (c->argc * 4 + 2). + * + * The 1st component is calculated within the below line. + * */ + c->net_input_bytes_curr_cmd += (multibulklen_slen + 1); } serverAssertWithInfo(c, NULL, c->multibulklen > 0); @@ -2388,7 +2440,8 @@ int processMultibulkBuffer(client *c) { return C_ERR; } - ok = string2ll(c->querybuf + c->qb_pos + 1, newline - (c->querybuf + c->qb_pos + 1), &ll); + size_t bulklen_slen = newline - (c->querybuf + c->qb_pos + 1); + ok = string2ll(c->querybuf + c->qb_pos + 1, bulklen_slen, &ll); if (!ok || ll < 0 || (!c->flag.primary && ll > server.proto_max_bulk_len)) { addReplyError(c, "Protocol error: invalid bulk length"); setProtocolError("invalid bulk length", c); @@ -2430,6 +2483,8 @@ int processMultibulkBuffer(client *c) { } } c->bulklen = ll; + /* Per-slot network bytes-in calculation, 2nd component. */ + c->net_input_bytes_curr_cmd += (bulklen_slen + c->argc); } /* Read bulk argument */ @@ -2466,7 +2521,11 @@ int processMultibulkBuffer(client *c) { } /* We're done when c->multibulk == 0 */ - if (c->multibulklen == 0) return C_OK; + if (c->multibulklen == 0) { + /* Per-slot network bytes-in calculation, 3rd and 4th components. */ + c->net_input_bytes_curr_cmd += (c->argv_len_sum + (c->argc * 4 + 2)); + return C_OK; + } /* Still not ready to process the command */ return C_ERR; diff --git a/src/server.c b/src/server.c index 228307e3cc..92bea7da02 100644 --- a/src/server.c +++ b/src/server.c @@ -30,6 +30,7 @@ #include "server.h" #include "monotonic.h" #include "cluster.h" +#include "cluster_slot_stats.h" #include "slowlog.h" #include "bio.h" #include "latency.h" @@ -2499,6 +2500,7 @@ void resetServerStats(void) { memset(server.duration_stats, 0, sizeof(durationStats) * EL_DURATION_TYPE_NUM); server.el_cmd_cnt_max = 0; lazyfreeResetStats(); + clusterSlotStatsReset(); } /* Make the thread killable at any time, so that kill threads functions @@ -3869,6 +3871,9 @@ int processCommand(client *c) { } } + /* Now that c->slot has been parsed, accumulate the bufferred network bytes-in. */ + clusterSlotStatsAddNetworkBytesIn(c); + if (!server.cluster_enabled && c->capa & CLIENT_CAPA_REDIRECT && server.primary_host && !mustObeyClient(c) && (is_write_command || (is_read_command && !c->flag.readonly))) { addReplyErrorSds(c, sdscatprintf(sdsempty(), "-REDIRECT %s:%d", server.primary_host, server.primary_port)); diff --git a/src/server.h b/src/server.h index d56dfdceee..ae82af1594 100644 --- a/src/server.h +++ b/src/server.h @@ -1304,6 +1304,8 @@ typedef struct client { clientReqResInfo reqres; #endif unsigned long long net_input_bytes; /* Total network input bytes read from this client. */ + unsigned long long net_input_bytes_curr_cmd; /* Total network input bytes read for the + * execution of this client's current command. */ unsigned long long net_output_bytes; /* Total network output bytes sent to this client. */ unsigned long long commands_processed; /* Total count of commands this client executed. */ } client; diff --git a/tests/unit/cluster/slot-stats.tcl b/tests/unit/cluster/slot-stats.tcl index 76bf60fbff..fa90b30b04 100644 --- a/tests/unit/cluster/slot-stats.tcl +++ b/tests/unit/cluster/slot-stats.tcl @@ -44,10 +44,13 @@ proc assert_empty_slot_stats_with_exception {slot_stats exception_slots} { set slot_stats [convert_array_into_dict $slot_stats] dict for {slot stats} $slot_stats { if {[dict exists $exception_slots $slot]} { - set expected_key_count [dict get $exception_slots $slot] + set expected_key_count [dict get $exception_slots $slot key-count] + set expected_network_bytes_in [dict get $exception_slots $slot network-bytes-in] assert {[dict get $stats key-count] == $expected_key_count} + assert {[dict get $stats network-bytes-in] == $expected_network_bytes_in} } else { assert {[dict get $stats key-count] == 0} + assert {[dict get $stats network-bytes-in] == 0} } } } @@ -114,6 +117,51 @@ proc wait_for_replica_key_exists {key key_count} { } } +# ----------------------------------------------------------------------------- +# Test cases for CLUSTER SLOT-STATS network-bytes-in. +# ----------------------------------------------------------------------------- + +start_cluster 1 0 {tags {external:skip cluster}} { + + # Define shared variables. + set key "key" + set key_slot [R 0 cluster keyslot $key] + + test "CLUSTER SLOT-STATS network-bytes-in, multi bulk buffer processing." { + # Command) SET key value + # RESP) *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n + R 0 SET $key value + + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set expected_slot_stats [ + dict create $key_slot [ + dict create key-count 1 network-bytes-in 33 + ] + ] + + assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats + } + R 0 CONFIG RESETSTAT + R 0 FLUSHALL + + test "CLUSTER SLOT-STATS network-bytes-in, in-line buffer processing." { + # Command) SET key value + # Inline) SET key value\r\n + set rd [valkey_deferring_client] + $rd write "SET $key value\r\n" + $rd flush + + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set expected_slot_stats [ + dict create $key_slot [ + dict create key-count 1 network-bytes-in 15 + ] + ] + + assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats + } +} + # ----------------------------------------------------------------------------- # Test cases for CLUSTER SLOT-STATS correctness, without additional arguments. # -----------------------------------------------------------------------------