From d679312350a8eda3a274ce69ae33a8f256ca5dc8 Mon Sep 17 00:00:00 2001 From: Kyle Kim Date: Wed, 10 Jul 2024 21:12:29 +0000 Subject: [PATCH] Add network-bytes-out metric support for CLUSTER SLOT-STATS command (#20). The metric tracks network egress bytes under per-slot context, by hooking onto COB buffer mutations. The metric can be viewed by calling the CLUSTER SLOT-STATS command, with sample response attached below; ``` 127.0.0.1:6379> cluster slot-stats slotsrange 0 0 1) 1) (integer) 0 2) 1) "key-count" 2) (integer) 1 3) "network-bytes-out" 4) (integer) 175 ``` Signed-off-by: Kyle Kim --- src/cluster_legacy.h | 5 ++++ src/cluster_slot_stats.c | 57 +++++++++++++++++++++++++++++++++++++--- src/cluster_slot_stats.h | 9 +++++++ src/networking.c | 6 +++++ src/pubsub.c | 6 +++-- src/replication.c | 8 ++++++ src/server.c | 2 ++ src/server.h | 2 ++ 8 files changed, 90 insertions(+), 5 deletions(-) create mode 100644 src/cluster_slot_stats.h diff --git a/src/cluster_legacy.h b/src/cluster_legacy.h index 3c5696273b..7c10801dd7 100644 --- a/src/cluster_legacy.h +++ b/src/cluster_legacy.h @@ -329,6 +329,10 @@ struct _clusterNode { Update with updateAndCountChangedNodeHealth(). */ }; +typedef struct slotStat { + uint64_t network_bytes_out; +} slotStat; + struct clusterState { clusterNode *myself; /* This node */ uint64_t currentEpoch; @@ -376,6 +380,7 @@ struct clusterState { * stops claiming the slot. This prevents spreading incorrect information (that * source still owns the slot) using UPDATE messages. */ unsigned char owner_not_claiming_slot[CLUSTER_SLOTS / 8]; + slotStat slot_stats[CLUSTER_SLOTS]; }; diff --git a/src/cluster_slot_stats.c b/src/cluster_slot_stats.c index a2a6bfdd01..fec25067e4 100644 --- a/src/cluster_slot_stats.c +++ b/src/cluster_slot_stats.c @@ -4,13 +4,13 @@ * SPDX-License-Identifier: BSD 3-Clause */ -#include "server.h" -#include "cluster.h" +#include "cluster_slot_stats.h" #define UNASSIGNED_SLOT 0 typedef enum { KEY_COUNT, + NETWORK_BYTES_OUT, INVALID, } slotStatTypes; @@ -88,9 +88,11 @@ static void addReplySlotStat(client *c, int slot) { addReplyArrayLen(c, 2); /* Array of size 2, where 0th index represents (int) slot, * and 1st index represents (map) usage statistics. */ addReplyLongLong(c, slot); - addReplyMapLen(c, 1); /* Nested map representing slot usage statistics. */ + addReplyMapLen(c, 2); /* Nested map representing slot usage statistics. */ addReplyBulkCString(c, "key-count"); addReplyLongLong(c, countKeysInSlot(slot)); + addReplyBulkCString(c, "network-bytes-out"); + addReplyLongLong(c, server.cluster->slot_stats[slot].network_bytes_out); } /* Adds reply for the SLOTSRANGE variant. @@ -113,6 +115,53 @@ static void addReplySortedSlotStats(client *c, slotStatForSort slot_stats[], lon } } +/* Resets applicable slot statistics. */ +void clusterSlotStatReset(int slot) { + /* key-count is exempt, as it is queried separately through countKeysInSlot(). */ + server.cluster->slot_stats[slot].network_bytes_out = 0; +} + +void clusterSlotStatResetAll(void) { + if (server.cluster == NULL) return; + + memset(server.cluster->slot_stats, 0, sizeof(server.cluster->slot_stats)); +} + +static int canAddNetworkBytesOut(client *c) { + return server.cluster_enabled && c->slot != -1; +} + +void clusterSlotStatsAddNetworkBytesOut(client *c) { + if (!canAddNetworkBytesOut(c)) return; + + serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS); + server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd; +} + +void clusterSlotStatsAddNetworkBytesOutForReplication(int len) { + client *c = server.current_client; + if (c == NULL || !canAddNetworkBytesOut(c)) return; + + serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS); + server.cluster->slot_stats[c->slot].network_bytes_out += (len * listLength(server.replicas)); +} + +void clusterSlotStatsAddNetworkBytesOutForShardedPubSub(client *c, int slot) { + /* For a blocked client, c->slot could be pre-filled. + * Thus c->slot is backed-up for restoration after aggregation is completed. */ + int _slot = c->slot; + c->slot = slot; + if (!canAddNetworkBytesOut(c)) return; + + serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS); + server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd; + + /* For sharded pubsub, the client's network bytes metrics must be reset here, + * as resetClient() is not called until subscription ends. */ + c->net_output_bytes_curr_cmd = 0; + c->slot = _slot; +} + /* Adds reply for the ORDERBY variant. * Response is ordered based on the sort result. */ static void addReplyOrderBy(client *c, int order_by, long limit, int desc) { @@ -149,6 +198,8 @@ void clusterSlotStatsCommand(client *c) { int desc = 1, order_by = INVALID; if (!strcasecmp(c->argv[3]->ptr, "key-count")) { order_by = KEY_COUNT; + } else if (!strcasecmp(c->argv[3]->ptr, "network-bytes-out")) { + order_by = NETWORK_BYTES_OUT; } else { addReplyError(c, "Unrecognized sort metric for ORDER BY. The supported metrics are: key-count."); return; diff --git a/src/cluster_slot_stats.h b/src/cluster_slot_stats.h new file mode 100644 index 0000000000..346e8b2f82 --- /dev/null +++ b/src/cluster_slot_stats.h @@ -0,0 +1,9 @@ +#include "server.h" +#include "cluster.h" +#include "cluster_legacy.h" + +void clusterSlotStatReset(int slot); +void clusterSlotStatResetAll(void); +void clusterSlotStatsAddNetworkBytesOut(client *c); +void clusterSlotStatsAddNetworkBytesOutForReplication(int len); +void clusterSlotStatsAddNetworkBytesOutForShardedPubSub(client *c, int slot); diff --git a/src/networking.c b/src/networking.c index 0a91dbb645..3ffe3554a0 100644 --- a/src/networking.c +++ b/src/networking.c @@ -29,6 +29,7 @@ #include "server.h" #include "cluster.h" +#include "cluster_slot_stats.h" #include "script.h" #include "fpconv_dtoa.h" #include "fmtargs.h" @@ -225,6 +226,7 @@ client *createClient(connection *conn) { initClientMultiState(c); c->net_input_bytes = 0; c->net_output_bytes = 0; + c->net_output_bytes_curr_cmd = 0; c->commands_processed = 0; return c; } @@ -442,6 +444,8 @@ void _addReplyToBufferOrList(client *c, const char *s, size_t len) { return; } + c->net_output_bytes_curr_cmd += len; + /* We call it here because this function may affect the reply * buffer offset (see function comment) */ reqresSaveClientReplyOffset(c); @@ -2443,6 +2447,7 @@ void resetClient(client *c) { c->slot = -1; c->flag.executing_command = 0; c->flag.replication_done = 0; + c->net_output_bytes_curr_cmd = 0; /* Make sure the duration has been recorded to some command. */ serverAssert(c->duration == 0); @@ -2833,6 +2838,7 @@ int processCommandAndResetClient(client *c) { client *old_client = server.current_client; server.current_client = c; if (processCommand(c) == C_OK) { + clusterSlotStatsAddNetworkBytesOut(c); commandProcessed(c); /* Update the client's memory to include output buffer growth following the * processed command. */ diff --git a/src/pubsub.c b/src/pubsub.c index b79b532bf8..bf40e5b3c1 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -29,6 +29,7 @@ #include "server.h" #include "cluster.h" +#include "cluster_slot_stats.h" /* Structure to hold the pubsub related metadata. Currently used * for pubsub and pubsubshard feature. */ @@ -475,13 +476,13 @@ int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type) int receivers = 0; dictEntry *de; dictIterator *di; - unsigned int slot = 0; + int slot = -1; /* Send to clients listening for that channel */ if (server.cluster_enabled && type.shard) { slot = keyHashSlot(channel->ptr, sdslen(channel->ptr)); } - de = kvstoreDictFind(*type.serverPubSubChannels, slot, channel); + de = kvstoreDictFind(*type.serverPubSubChannels, (slot == -1) ? 0 : slot, channel); if (de) { dict *clients = dictGetVal(de); dictEntry *entry; @@ -489,6 +490,7 @@ int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type) while ((entry = dictNext(iter)) != NULL) { client *c = dictGetKey(entry); addReplyPubsubMessage(c, channel, message, *type.messageBulk); + clusterSlotStatsAddNetworkBytesOutForShardedPubSub(c, slot); updateClientMemUsageAndBucket(c); receivers++; } diff --git a/src/replication.c b/src/replication.c index 21ccb0e92d..aae123e55e 100644 --- a/src/replication.c +++ b/src/replication.c @@ -31,6 +31,7 @@ #include "server.h" #include "cluster.h" +#include "cluster_slot_stats.h" #include "bio.h" #include "functions.h" #include "connection.h" @@ -320,6 +321,8 @@ void feedReplicationBuffer(char *s, size_t len) { if (server.repl_backlog == NULL) return; + clusterSlotStatsAddNetworkBytesOutForReplication(len); + while (len > 0) { size_t start_pos = 0; /* The position of referenced block to start sending. */ listNode *start_node = NULL; /* Replica/backlog starts referenced node. */ @@ -470,6 +473,11 @@ void replicationFeedReplicas(int dictid, robj **argv, int argc) { feedReplicationBufferWithObject(selectcmd); + /* Although the SELECT command is not associated with any slot, + * its per-slot network-bytes-out accumulation is made by the above function call. + * To cancel-out this accumulation, below adjustment is made. */ + clusterSlotStatsAddNetworkBytesOutForReplication(-sdslen(selectcmd->ptr)); + if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) decrRefCount(selectcmd); server.replicas_eldb = dictid; diff --git a/src/server.c b/src/server.c index 465aa29391..075d311103 100644 --- a/src/server.c +++ b/src/server.c @@ -30,6 +30,7 @@ #include "server.h" #include "monotonic.h" #include "cluster.h" +#include "cluster_slot_stats.h" #include "slowlog.h" #include "bio.h" #include "latency.h" @@ -2516,6 +2517,7 @@ void resetServerStats(void) { memset(server.duration_stats, 0, sizeof(durationStats) * EL_DURATION_TYPE_NUM); server.el_cmd_cnt_max = 0; lazyfreeResetStats(); + clusterSlotStatResetAll(); } /* Make the thread killable at any time, so that kill threads functions diff --git a/src/server.h b/src/server.h index 66d6d66da5..334cdda529 100644 --- a/src/server.h +++ b/src/server.h @@ -1323,6 +1323,8 @@ typedef struct client { unsigned long long net_input_bytes; /* Total network input bytes read from this client. */ unsigned long long net_output_bytes; /* Total network output bytes sent to this client. */ unsigned long long commands_processed; /* Total count of commands this client executed. */ + unsigned long long + net_output_bytes_curr_cmd; /* Total network output bytes sent to this client, by the current command. */ } client; /* ACL information */