Skip to content

Commit

Permalink
Minor revision.
Browse files Browse the repository at this point in the history
- Removed sharded pubsub aggregation, except for network-bytes-out
  internal propagation.
- Added network-bytes-out in reply schema.
- Added missing assertions in tcl integration tests.

Signed-off-by: Kyle Kim <[email protected]>
  • Loading branch information
kyle-yh-kim committed Jul 25, 2024
1 parent df9619a commit ffe40e2
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 51 deletions.
3 changes: 0 additions & 3 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -3297,9 +3297,7 @@ int clusterProcessPacket(clusterLink *link) {
message_len = ntohl(hdr->data.publish.msg.message_len);
channel = createStringObject((char *)hdr->data.publish.msg.bulk_data, channel_len);
message = createStringObject((char *)hdr->data.publish.msg.bulk_data + channel_len, message_len);
clusterSlotStatsSetClusterMsgLength(ntohl(hdr->totlen));
pubsubPublishMessage(channel, message, type == CLUSTERMSG_TYPE_PUBLISHSHARD);
clusterSlotStatsResetClusterMsgLength();
decrRefCount(channel);
decrRefCount(message);
}
Expand Down Expand Up @@ -4000,7 +3998,6 @@ void clusterPropagatePublish(robj *channel, robj *message, int sharded) {
clusterNode *node = listNodeValue(ln);
if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue;
clusterSendMessage(node->link, msgblock);
clusterSlotStatsAddNetworkBytesOutForShardedPubSubExternalPropagation(msgblock->totlen);
}
clusterMsgSendBlockDecrRefCount(msgblock);
}
Expand Down
39 changes: 1 addition & 38 deletions src/cluster_slot_stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,6 @@ typedef struct {
uint64_t stat;
} slotStatForSort;

typedef struct {
uint32_t len;
} pubsubState;

static pubsubState pubsub_state;

static int doesSlotBelongToMyShard(int slot) {
clusterNode *myself = getMyClusterNode();
clusterNode *primary = clusterNodeGetPrimary(myself);
Expand Down Expand Up @@ -155,6 +149,7 @@ void clusterSlotStatsAddNetworkBytesOutForReplication(int len) {
/* Upon SPUBLISH, two egress events are triggered.
* 1) Internal propagation, for clients that are subscribed to the current node.
* 2) External propagation, for other nodes within the same shard (could either be a primary or replica).
* This type is not aggregated, to stay consistent with server.stat_net_output_bytes aggregation.
* This function covers the internal propagation component. */
void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot) {
/* For a blocked client, c->slot could be pre-filled.
Expand All @@ -176,18 +171,6 @@ void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(clien
c->slot = _slot;
}

/* Upon SPUBLISH, two egress events are triggered.
* 1) Internal propagation, for clients that are subscribed to the current node.
* 2) External propagation, for other nodes within the same shard (could either be a primary or replica).
* This function covers the external propagation component. */
void clusterSlotStatsAddNetworkBytesOutForShardedPubSubExternalPropagation(size_t len) {
client *c = server.current_client;
if (c == NULL || !canAddNetworkBytesOut(c)) return;

serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].network_bytes_out += len;
}

/* Adds reply for the ORDERBY variant.
* Response is ordered based on the sort result. */
static void addReplyOrderBy(client *c, int order_by, long limit, int desc) {
Expand Down Expand Up @@ -263,26 +246,6 @@ void clusterSlotStatsAddNetworkBytesInForUserClient(client *c) {
server.cluster->slot_stats[c->slot].network_bytes_in += c->net_input_bytes_curr_cmd;
}

/* Adds network ingress bytes from sharded pubsub subscription.
* Since sharded pubsub targets a specific slot, we are able to aggregate its ingress bytes under per-slot context. */
void clusterSlotStatsAddNetworkBytesInForShardedPubSub(int slot) {
serverAssert(slot >= 0 && slot < CLUSTER_SLOTS);

server.cluster->slot_stats[slot].network_bytes_in += pubsub_state.len;
}

/* To avoid redundant keyHashSlot(), network-bytes-in accumulation for sharded pubsub employs a stateful design pattern.
* The total length of the clusterMsg is first recorded under a state, called pubsub_state.
* This recorded value is then accumulated later upon keyHashSlot() within the call-stack.
* After its accumulation, the state is reset back to 0. */
void clusterSlotStatsSetClusterMsgLength(uint32_t len) {
pubsub_state.len = len;
}

void clusterSlotStatsResetClusterMsgLength(void) {
pubsub_state.len = 0;
}

void clusterSlotStatsCommand(client *c) {
if (!server.cluster_enabled) {
addReplyError(c, "This instance has cluster support disabled");
Expand Down
2 changes: 0 additions & 2 deletions src/cluster_slot_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@ void clusterSlotStatsInvalidateSlotIfApplicable(scriptRunCtx *ctx);

/* network-bytes-in metric. */
void clusterSlotStatsAddNetworkBytesInForUserClient(client *c);
void clusterSlotStatsAddNetworkBytesInForShardedPubSub(int slot);
void clusterSlotStatsSetClusterMsgLength(uint32_t len);
void clusterSlotStatsResetClusterMsgLength(void);

/* network-bytes-out metric. */
void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c);
void clusterSlotStatsAddNetworkBytesOutForReplication(int len);
void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot);
void clusterSlotStatsAddNetworkBytesOutForShardedPubSubExternalPropagation(size_t len);
3 changes: 3 additions & 0 deletions src/commands/cluster-slot-stats.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
},
"network-bytes-in": {
"type": "integer"
},
"network-bytes-out": {
"type": "integer"
}
}
}
Expand Down
1 change: 0 additions & 1 deletion src/pubsub.c
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,6 @@ int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type)
/* Send to clients listening for that channel */
if (server.cluster_enabled && type.shard) {
slot = keyHashSlot(channel->ptr, sdslen(channel->ptr));
clusterSlotStatsAddNetworkBytesInForShardedPubSub(slot);
}
de = kvstoreDictFind(*type.serverPubSubChannels, (slot == -1) ? 0 : slot, channel);
if (de) {
Expand Down
17 changes: 10 additions & 7 deletions tests/unit/cluster/slot-stats.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,10 @@ start_cluster 1 0 {tags {external:skip cluster} overrides {cluster-slot-stats-en
$rd BLPOP $key 0
wait_for_blocked_clients_count 1

# Slot-stats must be empty here, as the client is yet to be unblocked.
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_empty_slot_stats $slot_stats $metrics_to_assert

# *3\r\n$5\r\nlpush\r\n$3\r\nkey\r\n$5\r\nvalue\r\n --> 35 bytes.
R 0 LPUSH $key value
wait_for_blocked_clients_count 0
Expand All @@ -395,6 +399,8 @@ start_cluster 1 0 {tags {external:skip cluster} overrides {cluster-slot-stats-en
dict create network-bytes-in 66 ;# 31 + 35 bytes.
]
]

assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
}
R 0 CONFIG RESETSTAT
R 0 FLUSHALL
Expand Down Expand Up @@ -478,7 +484,7 @@ start_cluster 1 1 {tags {external:skip cluster} overrides {cluster-slot-stats-en
set slot_stats [$replica CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
set expected_slot_stats [
dict create $key_slot [
dict create network-bytes-in 2310 ;# 34 + 2276 bytes from clusterMsg.
dict create network-bytes-in 34
]
]
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
Expand Down Expand Up @@ -608,12 +614,11 @@ start_cluster 1 1 {tags {external:skip cluster}} {

# Publisher client) :1\r\n --> 4 bytes.
# Subscriber client) *3\r\n$8\r\nsmessage\r\n$7\r\nchannel\r\n$5\r\nhello\r\n --> 42 bytes.
# Cluster propagation) sdslen(channel) + sdslen(hello) --> 12 bytes.
assert_equal 1 [$publisher SPUBLISH $channel hello]
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
set expected_slot_stats [
dict create $key_slot [
dict create network-bytes-out 2338 ;# 4 + 42 + 12 + 2280 bytes from clusterMsgSendBlock.
dict create network-bytes-out 46 ;# 4 + 42 bytes.
]
]
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
Expand Down Expand Up @@ -650,21 +655,19 @@ start_cluster 1 1 {tags {external:skip cluster}} {
# For primary channel;
# Publisher client) :1\r\n --> 4 bytes.
# Subscriber client) *3\r\n$8\r\nsmessage\r\n$7\r\nchannel\r\n$5\r\nhello\r\n --> 42 bytes.
# Cluster propagation) sdslen(channel) + sdslen(hello) --> 12 bytes.
# For secondary channel;
# Publisher client) :1\r\n --> 4 bytes.
# Subscriber client) *3\r\n$8\r\nsmessage\r\n$8\r\nchannel2\r\n$5\r\nhello\r\n --> 43 bytes.
# Cluster propagation) sdslen(channel2) + sdslen(hello) --> 13 bytes.
assert_equal 1 [$publisher SPUBLISH $channel hello]
assert_equal 1 [$publisher SPUBLISH $channel_secondary hello]
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
set expected_slot_stats [
dict create \
$key_slot [ \
dict create network-bytes-out 2338 ;# 4 + 42 + 12 + 2280 bytes from clusterMsgSendBlock.
dict create network-bytes-out 46 ;# 4 + 42 bytes.
] \
$key_slot_secondary [ \
dict create network-bytes-out 2340 ;# 4 + 43 + 13 + 2280 bytes from clusterMsgSendBlock.
dict create network-bytes-out 47 ;# 4 + 43 bytes.
]
]
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
Expand Down

0 comments on commit ffe40e2

Please sign in to comment.