Skip to content

Commit

Permalink
Add network-bytes-out metric support for CLUSTER SLOT-STATS command (v…
Browse files Browse the repository at this point in the history
…alkey-io#20).

The metric tracks network egress bytes under per-slot context,
by hooking onto COB buffer mutations.

The metric can be viewed by calling the CLUSTER SLOT-STATS command,
with sample response attached below;

```
127.0.0.1:6379> cluster slot-stats slotsrange 0 0
1) 1) (integer) 0
   2) 1) "key-count"
      2) (integer) 1
      3) "network-bytes-out"
      4) (integer) 175
```

Signed-off-by: Kyle Kim <[email protected]>
  • Loading branch information
kyle-yh-kim committed Jul 10, 2024
1 parent a323dce commit d679312
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 5 deletions.
5 changes: 5 additions & 0 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,10 @@ struct _clusterNode {
Update with updateAndCountChangedNodeHealth(). */
};

typedef struct slotStat {
uint64_t network_bytes_out;
} slotStat;

struct clusterState {
clusterNode *myself; /* This node */
uint64_t currentEpoch;
Expand Down Expand Up @@ -376,6 +380,7 @@ struct clusterState {
* stops claiming the slot. This prevents spreading incorrect information (that
* source still owns the slot) using UPDATE messages. */
unsigned char owner_not_claiming_slot[CLUSTER_SLOTS / 8];
slotStat slot_stats[CLUSTER_SLOTS];
};


Expand Down
57 changes: 54 additions & 3 deletions src/cluster_slot_stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
* SPDX-License-Identifier: BSD 3-Clause
*/

#include "server.h"
#include "cluster.h"
#include "cluster_slot_stats.h"

#define UNASSIGNED_SLOT 0

typedef enum {
KEY_COUNT,
NETWORK_BYTES_OUT,
INVALID,
} slotStatTypes;

Expand Down Expand Up @@ -88,9 +88,11 @@ static void addReplySlotStat(client *c, int slot) {
addReplyArrayLen(c, 2); /* Array of size 2, where 0th index represents (int) slot,
* and 1st index represents (map) usage statistics. */
addReplyLongLong(c, slot);
addReplyMapLen(c, 1); /* Nested map representing slot usage statistics. */
addReplyMapLen(c, 2); /* Nested map representing slot usage statistics. */
addReplyBulkCString(c, "key-count");
addReplyLongLong(c, countKeysInSlot(slot));
addReplyBulkCString(c, "network-bytes-out");
addReplyLongLong(c, server.cluster->slot_stats[slot].network_bytes_out);
}

/* Adds reply for the SLOTSRANGE variant.
Expand All @@ -113,6 +115,53 @@ static void addReplySortedSlotStats(client *c, slotStatForSort slot_stats[], lon
}
}

/* Resets applicable slot statistics. */
void clusterSlotStatReset(int slot) {
/* key-count is exempt, as it is queried separately through countKeysInSlot(). */
server.cluster->slot_stats[slot].network_bytes_out = 0;
}

void clusterSlotStatResetAll(void) {
if (server.cluster == NULL) return;

memset(server.cluster->slot_stats, 0, sizeof(server.cluster->slot_stats));
}

static int canAddNetworkBytesOut(client *c) {
return server.cluster_enabled && c->slot != -1;
}

void clusterSlotStatsAddNetworkBytesOut(client *c) {
if (!canAddNetworkBytesOut(c)) return;

serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd;
}

void clusterSlotStatsAddNetworkBytesOutForReplication(int len) {
client *c = server.current_client;
if (c == NULL || !canAddNetworkBytesOut(c)) return;

serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].network_bytes_out += (len * listLength(server.replicas));
}

void clusterSlotStatsAddNetworkBytesOutForShardedPubSub(client *c, int slot) {
/* For a blocked client, c->slot could be pre-filled.
* Thus c->slot is backed-up for restoration after aggregation is completed. */
int _slot = c->slot;
c->slot = slot;
if (!canAddNetworkBytesOut(c)) return;

serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd;

/* For sharded pubsub, the client's network bytes metrics must be reset here,
* as resetClient() is not called until subscription ends. */
c->net_output_bytes_curr_cmd = 0;
c->slot = _slot;
}

/* Adds reply for the ORDERBY variant.
* Response is ordered based on the sort result. */
static void addReplyOrderBy(client *c, int order_by, long limit, int desc) {
Expand Down Expand Up @@ -149,6 +198,8 @@ void clusterSlotStatsCommand(client *c) {
int desc = 1, order_by = INVALID;
if (!strcasecmp(c->argv[3]->ptr, "key-count")) {
order_by = KEY_COUNT;
} else if (!strcasecmp(c->argv[3]->ptr, "network-bytes-out")) {
order_by = NETWORK_BYTES_OUT;
} else {
addReplyError(c, "Unrecognized sort metric for ORDER BY. The supported metrics are: key-count.");
return;
Expand Down
9 changes: 9 additions & 0 deletions src/cluster_slot_stats.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#include "server.h"
#include "cluster.h"
#include "cluster_legacy.h"

void clusterSlotStatReset(int slot);
void clusterSlotStatResetAll(void);
void clusterSlotStatsAddNetworkBytesOut(client *c);
void clusterSlotStatsAddNetworkBytesOutForReplication(int len);
void clusterSlotStatsAddNetworkBytesOutForShardedPubSub(client *c, int slot);
6 changes: 6 additions & 0 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

#include "server.h"
#include "cluster.h"
#include "cluster_slot_stats.h"
#include "script.h"
#include "fpconv_dtoa.h"
#include "fmtargs.h"
Expand Down Expand Up @@ -225,6 +226,7 @@ client *createClient(connection *conn) {
initClientMultiState(c);
c->net_input_bytes = 0;
c->net_output_bytes = 0;
c->net_output_bytes_curr_cmd = 0;
c->commands_processed = 0;
return c;
}
Expand Down Expand Up @@ -442,6 +444,8 @@ void _addReplyToBufferOrList(client *c, const char *s, size_t len) {
return;
}

c->net_output_bytes_curr_cmd += len;

/* We call it here because this function may affect the reply
* buffer offset (see function comment) */
reqresSaveClientReplyOffset(c);
Expand Down Expand Up @@ -2443,6 +2447,7 @@ void resetClient(client *c) {
c->slot = -1;
c->flag.executing_command = 0;
c->flag.replication_done = 0;
c->net_output_bytes_curr_cmd = 0;

/* Make sure the duration has been recorded to some command. */
serverAssert(c->duration == 0);
Expand Down Expand Up @@ -2833,6 +2838,7 @@ int processCommandAndResetClient(client *c) {
client *old_client = server.current_client;
server.current_client = c;
if (processCommand(c) == C_OK) {
clusterSlotStatsAddNetworkBytesOut(c);
commandProcessed(c);
/* Update the client's memory to include output buffer growth following the
* processed command. */
Expand Down
6 changes: 4 additions & 2 deletions src/pubsub.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

#include "server.h"
#include "cluster.h"
#include "cluster_slot_stats.h"

/* Structure to hold the pubsub related metadata. Currently used
* for pubsub and pubsubshard feature. */
Expand Down Expand Up @@ -475,20 +476,21 @@ int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type)
int receivers = 0;
dictEntry *de;
dictIterator *di;
unsigned int slot = 0;
int slot = -1;

/* Send to clients listening for that channel */
if (server.cluster_enabled && type.shard) {
slot = keyHashSlot(channel->ptr, sdslen(channel->ptr));
}
de = kvstoreDictFind(*type.serverPubSubChannels, slot, channel);
de = kvstoreDictFind(*type.serverPubSubChannels, (slot == -1) ? 0 : slot, channel);
if (de) {
dict *clients = dictGetVal(de);
dictEntry *entry;
dictIterator *iter = dictGetIterator(clients);
while ((entry = dictNext(iter)) != NULL) {
client *c = dictGetKey(entry);
addReplyPubsubMessage(c, channel, message, *type.messageBulk);
clusterSlotStatsAddNetworkBytesOutForShardedPubSub(c, slot);
updateClientMemUsageAndBucket(c);
receivers++;
}
Expand Down
8 changes: 8 additions & 0 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

#include "server.h"
#include "cluster.h"
#include "cluster_slot_stats.h"
#include "bio.h"
#include "functions.h"
#include "connection.h"
Expand Down Expand Up @@ -320,6 +321,8 @@ void feedReplicationBuffer(char *s, size_t len) {

if (server.repl_backlog == NULL) return;

clusterSlotStatsAddNetworkBytesOutForReplication(len);

while (len > 0) {
size_t start_pos = 0; /* The position of referenced block to start sending. */
listNode *start_node = NULL; /* Replica/backlog starts referenced node. */
Expand Down Expand Up @@ -470,6 +473,11 @@ void replicationFeedReplicas(int dictid, robj **argv, int argc) {

feedReplicationBufferWithObject(selectcmd);

/* Although the SELECT command is not associated with any slot,
* its per-slot network-bytes-out accumulation is made by the above function call.
* To cancel-out this accumulation, below adjustment is made. */
clusterSlotStatsAddNetworkBytesOutForReplication(-sdslen(selectcmd->ptr));

if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) decrRefCount(selectcmd);

server.replicas_eldb = dictid;
Expand Down
2 changes: 2 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "server.h"
#include "monotonic.h"
#include "cluster.h"
#include "cluster_slot_stats.h"
#include "slowlog.h"
#include "bio.h"
#include "latency.h"
Expand Down Expand Up @@ -2516,6 +2517,7 @@ void resetServerStats(void) {
memset(server.duration_stats, 0, sizeof(durationStats) * EL_DURATION_TYPE_NUM);
server.el_cmd_cnt_max = 0;
lazyfreeResetStats();
clusterSlotStatResetAll();
}

/* Make the thread killable at any time, so that kill threads functions
Expand Down
2 changes: 2 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1323,6 +1323,8 @@ typedef struct client {
unsigned long long net_input_bytes; /* Total network input bytes read from this client. */
unsigned long long net_output_bytes; /* Total network output bytes sent to this client. */
unsigned long long commands_processed; /* Total count of commands this client executed. */
unsigned long long
net_output_bytes_curr_cmd; /* Total network output bytes sent to this client, by the current command. */
} client;

/* ACL information */
Expand Down

0 comments on commit d679312

Please sign in to comment.