Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add network-bytes-in and network-bytes-out metric support under CLUSTER SLOT-STATS command (#20) #720

Merged
merged 9 commits into from
Jul 26, 2024
3 changes: 3 additions & 0 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -3297,7 +3297,9 @@ int clusterProcessPacket(clusterLink *link) {
message_len = ntohl(hdr->data.publish.msg.message_len);
channel = createStringObject((char *)hdr->data.publish.msg.bulk_data, channel_len);
message = createStringObject((char *)hdr->data.publish.msg.bulk_data + channel_len, message_len);
clusterSlotStatsSetClusterMsgLength(ntohl(hdr->totlen));
madolson marked this conversation as resolved.
Show resolved Hide resolved
pubsubPublishMessage(channel, message, type == CLUSTERMSG_TYPE_PUBLISHSHARD);
clusterSlotStatsResetClusterMsgLength();
decrRefCount(channel);
decrRefCount(message);
}
Expand Down Expand Up @@ -3998,6 +4000,7 @@ void clusterPropagatePublish(robj *channel, robj *message, int sharded) {
clusterNode *node = listNodeValue(ln);
if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue;
clusterSendMessage(node->link, msgblock);
clusterSlotStatsAddNetworkBytesOutForShardedPubSubExternalPropagation(msgblock->totlen);
}
clusterMsgSendBlockDecrRefCount(msgblock);
}
Expand Down
3 changes: 2 additions & 1 deletion src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,8 @@ struct _clusterNode {
/* Struct used for storing slot statistics. */
typedef struct slotStat {
uint64_t cpu_usec;
uint64_t network_bytes_in;
uint64_t network_bytes_out;
} slotStat;

struct clusterState {
Expand Down Expand Up @@ -385,5 +387,4 @@ struct clusterState {
slotStat slot_stats[CLUSTER_SLOTS];
};


#endif // CLUSTER_LEGACY_H
124 changes: 122 additions & 2 deletions src/cluster_slot_stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

#define UNASSIGNED_SLOT 0

typedef enum { KEY_COUNT, CPU_USEC, SLOT_STAT_COUNT, INVALID } slotStatTypes;
typedef enum { KEY_COUNT, CPU_USEC, NETWORK_BYTES_IN, NETWORK_BYTES_OUT, SLOT_STAT_COUNT, INVALID } slotStatTypes;

/* -----------------------------------------------------------------------------
* CLUSTER SLOT-STATS command
Expand All @@ -20,6 +20,12 @@ typedef struct {
uint64_t stat;
} slotStatForSort;

typedef struct {
uint32_t len;
} pubsubState;

static pubsubState pubsub_state;

static int doesSlotBelongToMyShard(int slot) {
clusterNode *myself = getMyClusterNode();
clusterNode *primary = clusterNodeGetPrimary(myself);
Expand All @@ -43,6 +49,8 @@ static uint64_t getSlotStat(int slot, int stat_type) {
uint64_t slot_stat = 0;
if (stat_type == KEY_COUNT) {
slot_stat = countKeysInSlot(slot);
} else if (stat_type == NETWORK_BYTES_IN) {
slot_stat = server.cluster->slot_stats[slot].network_bytes_in;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
} else if (stat_type == NETWORK_BYTES_IN) {
slot_stat = server.cluster->slot_stats[slot].network_bytes_in;
} else if (stat_type == NETWORK_BYTES_IN) {
slot_stat = server.cluster->slot_stats[slot].network_bytes_in;
} else if (stat_type == NETWORK_BYTES_OUT) {
slot_stat = server.cluster->slot_stats[slot].network_bytes_out;

We might do a case switch here so that it throws a warning if we miss a type here.

} else if (stat_type == CPU_USEC) {
slot_stat = server.cluster->slot_stats[slot].cpu_usec;
}
Expand Down Expand Up @@ -96,6 +104,10 @@ static void addReplySlotStat(client *c, int slot) {
if (server.cluster_slot_stats_enabled) {
addReplyBulkCString(c, "cpu-usec");
addReplyLongLong(c, server.cluster->slot_stats[slot].cpu_usec);
addReplyBulkCString(c, "network-bytes-in");
addReplyLongLong(c, server.cluster->slot_stats[slot].network_bytes_in);
addReplyBulkCString(c, "network-bytes-out");
addReplyLongLong(c, server.cluster->slot_stats[slot].network_bytes_out);
}
}

Expand All @@ -119,6 +131,63 @@ static void addReplySortedSlotStats(client *c, slotStatForSort slot_stats[], lon
}
}

static int canAddNetworkBytesOut(client *c) {
return server.cluster_slot_stats_enabled && server.cluster_enabled && c->slot != -1;
}

/* Accumulates egress bytes upon sending RESP responses back to user clients. */
void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c) {
if (!canAddNetworkBytesOut(c)) return;

serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd;
}

/* Accumulates egress bytes upon sending replication stream. This only applies for primary nodes. */
void clusterSlotStatsAddNetworkBytesOutForReplication(int len) {
client *c = server.current_client;
if (c == NULL || !canAddNetworkBytesOut(c)) return;

serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].network_bytes_out += (len * listLength(server.replicas));
}

/* Upon SPUBLISH, two egress events are triggered.
* 1) Internal propagation, for clients that are subscribed to the current node.
* 2) External propagation, for other nodes within the same shard (could either be a primary or replica).
* This function covers the internal propagation component. */
void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot) {
/* For a blocked client, c->slot could be pre-filled.
* Thus c->slot is backed-up for restoration after aggregation is completed. */
int _slot = c->slot;
c->slot = slot;
if (!canAddNetworkBytesOut(c)) {
/* c->slot should be kept idempotent, regardless of the function's early return condition. */
madolson marked this conversation as resolved.
Show resolved Hide resolved
c->slot = _slot;
return;
}

serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd;

/* For sharded pubsub, the client's network bytes metrics must be reset here,
* as resetClient() is not called until subscription ends. */
c->net_output_bytes_curr_cmd = 0;
c->slot = _slot;
}

/* Upon SPUBLISH, two egress events are triggered.
* 1) Internal propagation, for clients that are subscribed to the current node.
* 2) External propagation, for other nodes within the same shard (could either be a primary or replica).
* This function covers the external propagation component. */
void clusterSlotStatsAddNetworkBytesOutForShardedPubSubExternalPropagation(size_t len) {
client *c = server.current_client;
if (c == NULL || !canAddNetworkBytesOut(c)) return;

serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].network_bytes_out += len;
}

/* Adds reply for the ORDERBY variant.
* Response is ordered based on the sort result. */
static void addReplyOrderBy(client *c, int order_by, long limit, int desc) {
Expand Down Expand Up @@ -167,8 +236,55 @@ void clusterSlotStatsInvalidateSlotIfApplicable(scriptRunCtx *ctx) {
ctx->original_client->slot = -1;
}

static int canAddNetworkBytesIn(client *c) {
/* First, cluster mode must be enabled.
* Second, command should target a specific slot.
* Third, blocked client is not aggregated, to avoid duplicate aggregation upon unblocking.
* Fourth, the server is not under a MULTI/EXEC transaction, to avoid duplicate aggregation of
* EXEC's 14 bytes RESP upon nested call()'s afterCommand(). */
return server.cluster_enabled && server.cluster_slot_stats_enabled && c->slot != -1 && !(c->flag.blocked) &&
!server.in_exec;
}

/* Adds network ingress bytes of the current command in execution,
* calculated earlier within networking.c layer.
*
* Note: Below function should only be called once c->slot is parsed.
* Otherwise, the aggregation will be skipped due to canAddNetworkBytesIn() check failure.
* */
void clusterSlotStatsAddNetworkBytesInForUserClient(client *c) {
if (!canAddNetworkBytesIn(c)) return;

if (c->cmd->proc == execCommand) {
/* Accumulate its corresponding MULTI RESP; *1\r\n$5\r\nmulti\r\n */
c->net_input_bytes_curr_cmd += 15;
}

server.cluster->slot_stats[c->slot].network_bytes_in += c->net_input_bytes_curr_cmd;
}

/* Adds network ingress bytes from sharded pubsub subscription.
* Since sharded pubsub targets a specific slot, we are able to aggregate its ingress bytes under per-slot context. */
void clusterSlotStatsAddNetworkBytesInForShardedPubSub(int slot) {
serverAssert(slot >= 0 && slot < CLUSTER_SLOTS);

server.cluster->slot_stats[slot].network_bytes_in += pubsub_state.len;
}

/* To avoid redundant keyHashSlot(), network-bytes-in accumulation for sharded pubsub employs a stateful design pattern.
* The total length of the clusterMsg is first recorded under a state, called pubsub_state.
* This recorded value is then accumulated later upon keyHashSlot() within the call-stack.
* After its accumulation, the state is reset back to 0. */
void clusterSlotStatsSetClusterMsgLength(uint32_t len) {
pubsub_state.len = len;
}

void clusterSlotStatsResetClusterMsgLength(void) {
pubsub_state.len = 0;
}

void clusterSlotStatsCommand(client *c) {
if (server.cluster_enabled == 0) {
if (!server.cluster_enabled) {
addReplyError(c, "This instance has cluster support disabled");
return;
}
Expand Down Expand Up @@ -197,6 +313,10 @@ void clusterSlotStatsCommand(client *c) {
order_by = KEY_COUNT;
} else if (!strcasecmp(c->argv[3]->ptr, "cpu-usec") && server.cluster_slot_stats_enabled) {
order_by = CPU_USEC;
} else if (!strcasecmp(c->argv[3]->ptr, "network-bytes-in") && server.cluster_slot_stats_enabled) {
order_by = NETWORK_BYTES_IN;
} else if (!strcasecmp(c->argv[3]->ptr, "network-bytes-out") && server.cluster_slot_stats_enabled) {
order_by = NETWORK_BYTES_OUT;
} else {
addReplyError(c, "Unrecognized sort metric for ORDERBY.");
return;
Expand Down
15 changes: 15 additions & 0 deletions src/cluster_slot_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,22 @@
#include "script.h"
#include "cluster_legacy.h"

/* General use-cases. */
void clusterSlotStatReset(int slot);
void clusterSlotStatResetAll(void);

/* cpu-usec metric. */
void clusterSlotStatsAddCpuDuration(client *c, ustime_t duration);
void clusterSlotStatsInvalidateSlotIfApplicable(scriptRunCtx *ctx);

/* network-bytes-in metric. */
void clusterSlotStatsAddNetworkBytesInForUserClient(client *c);
void clusterSlotStatsAddNetworkBytesInForShardedPubSub(int slot);
void clusterSlotStatsSetClusterMsgLength(uint32_t len);
void clusterSlotStatsResetClusterMsgLength(void);

/* network-bytes-out metric. */
void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c);
void clusterSlotStatsAddNetworkBytesOutForReplication(int len);
void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot);
void clusterSlotStatsAddNetworkBytesOutForShardedPubSubExternalPropagation(size_t len);
3 changes: 3 additions & 0 deletions src/commands/cluster-slot-stats.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
},
"cpu-usec": {
"type": "integer"
},
"network-bytes-in": {
madolson marked this conversation as resolved.
Show resolved Hide resolved
"type": "integer"
}
}
}
Expand Down
12 changes: 10 additions & 2 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -235,11 +235,19 @@ int getKeySlot(sds key) {
* It only gets set during the execution of command under `call` method. Other flows requesting
* the key slot would fallback to calculateKeySlot.
*/
if (server.current_client && server.current_client->slot >= 0 && server.current_client->flag.executing_command) {
if (server.current_client && server.current_client->slot >= 0 && server.current_client->flag.executing_command &&
!server.current_client->flag.primary) {
madolson marked this conversation as resolved.
Show resolved Hide resolved
debugServerAssertWithInfo(server.current_client, NULL, calculateKeySlot(key) == server.current_client->slot);
return server.current_client->slot;
}
return calculateKeySlot(key);
int slot = calculateKeySlot(key);
/* For the case of replicated commands from primary, getNodeByQuery() never gets called,
* and thus c->slot never gets populated. That said, if this command ends up accessing a key,
* we are able to backfill c->slot here, where the key's hash calculation is made. */
if (server.current_client && server.current_client->flag.primary) {
server.current_client->slot = slot;
}
return slot;
madolson marked this conversation as resolved.
Show resolved Hide resolved
}

/* This is a special version of dbAdd() that is used only when loading
Expand Down
5 changes: 5 additions & 0 deletions src/multi.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
*/

#include "server.h"
#include "cluster_slot_stats.h"
madolson marked this conversation as resolved.
Show resolved Hide resolved

/* ================================ MULTI/EXEC ============================== */

Expand Down Expand Up @@ -91,6 +92,10 @@ void queueMultiCommand(client *c, uint64_t cmd_flags) {
c->argc = 0;
c->argv_len_sum = 0;
c->argv_len = 0;

/* Since afterCommand() is not reached upon queuing a command,
* below call is made explicitly to accumulate its network ingress bytes. */
clusterSlotStatsAddNetworkBytesInForUserClient(c);
}

void discardTransaction(client *c) {
Expand Down
Loading
Loading