Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add network-bytes-in and network-bytes-out metric support under CLUSTER SLOT-STATS command (#20) #720

Merged
merged 9 commits into from
Jul 26, 2024
28 changes: 15 additions & 13 deletions src/cluster_slot_stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

#define UNASSIGNED_SLOT 0

typedef enum { KEY_COUNT, CPU_USEC, NETWORK_BYTES_IN, NETWORK_BYTES_OUT, SLOT_STAT_COUNT, INVALID } slotStatTypes;
typedef enum { KEY_COUNT, CPU_USEC, NETWORK_BYTES_IN, NETWORK_BYTES_OUT, SLOT_STAT_COUNT, INVALID } slotStatType;

/* -----------------------------------------------------------------------------
* CLUSTER SLOT-STATS command
Expand Down Expand Up @@ -38,15 +38,15 @@ static int markSlotsAssignedToMyShard(unsigned char *assigned_slots, int start_s
return assigned_slots_count;
}

static uint64_t getSlotStat(int slot, int stat_type) {
serverAssert(stat_type != INVALID);
static uint64_t getSlotStat(int slot, slotStatType stat_type) {
uint64_t slot_stat = 0;
if (stat_type == KEY_COUNT) {
slot_stat = countKeysInSlot(slot);
} else if (stat_type == NETWORK_BYTES_IN) {
slot_stat = server.cluster->slot_stats[slot].network_bytes_in;
} else if (stat_type == CPU_USEC) {
slot_stat = server.cluster->slot_stats[slot].cpu_usec;
switch (stat_type) {
case KEY_COUNT: slot_stat = countKeysInSlot(slot); break;
case CPU_USEC: slot_stat = server.cluster->slot_stats[slot].cpu_usec; break;
case NETWORK_BYTES_IN: slot_stat = server.cluster->slot_stats[slot].network_bytes_in; break;
case NETWORK_BYTES_OUT: slot_stat = server.cluster->slot_stats[slot].network_bytes_out; break;
default: // SLOT_STAT_COUNT, INVALID
madolson marked this conversation as resolved.
Show resolved Hide resolved
serverPanic("Invalid slot stat type %d was found.", stat_type);
}
return slot_stat;
}
Expand All @@ -71,7 +71,7 @@ static int slotStatForSortDescCmp(const void *a, const void *b) {
return entry_b.stat - entry_a.stat;
}

static void collectAndSortSlotStats(slotStatForSort slot_stats[], int order_by, int desc) {
static void collectAndSortSlotStats(slotStatForSort slot_stats[], slotStatType order_by, int desc) {
int i = 0;

for (int slot = 0; slot < CLUSTER_SLOTS; slot++) {
Expand Down Expand Up @@ -157,7 +157,8 @@ void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(clien
int _slot = c->slot;
c->slot = slot;
if (!canAddNetworkBytesOut(c)) {
/* c->slot should be kept idempotent, regardless of the function's early return condition. */
/* c->slot should not change as a side effect of this function,
* regardless of the function's early return condition. */
c->slot = _slot;
return;
}
Expand All @@ -173,7 +174,7 @@ void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(clien

/* Adds reply for the ORDERBY variant.
* Response is ordered based on the sort result. */
static void addReplyOrderBy(client *c, int order_by, long limit, int desc) {
static void addReplyOrderBy(client *c, slotStatType order_by, long limit, int desc) {
slotStatForSort slot_stats[CLUSTER_SLOTS];
collectAndSortSlotStats(slot_stats, order_by, desc);
addReplySortedSlotStats(c, slot_stats, limit);
Expand Down Expand Up @@ -271,7 +272,8 @@ void clusterSlotStatsCommand(client *c) {

} else if (c->argc >= 4 && !strcasecmp(c->argv[2]->ptr, "orderby")) {
/* CLUSTER SLOT-STATS ORDERBY metric [LIMIT limit] [ASC | DESC] */
int desc = 1, order_by = INVALID;
int desc = 1;
slotStatType order_by = INVALID;
if (!strcasecmp(c->argv[3]->ptr, "key-count")) {
order_by = KEY_COUNT;
} else if (!strcasecmp(c->argv[3]->ptr, "cpu-usec") && server.cluster_slot_stats_enabled) {
Expand Down
4 changes: 0 additions & 4 deletions src/multi.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,6 @@ void queueMultiCommand(client *c, uint64_t cmd_flags) {
c->argc = 0;
c->argv_len_sum = 0;
c->argv_len = 0;

/* Since afterCommand() is not reached upon queuing a command,
* below call is made explicitly to accumulate its network ingress bytes. */
clusterSlotStatsAddNetworkBytesInForUserClient(c);
}

void discardTransaction(client *c) {
Expand Down
1 change: 1 addition & 0 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -2902,6 +2902,7 @@ void commandProcessed(client *c) {
if (c->flag.blocked) return;

reqresAppendResponse(c);
clusterSlotStatsAddNetworkBytesInForUserClient(c);
resetClient(c);

long long prev_offset = c->reploff;
Expand Down
4 changes: 4 additions & 0 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -3975,6 +3975,10 @@ void replicationSendAck(void) {
addReplyBulkLongLong(c, server.fsynced_reploff);
}
c->flag.primary_force_reply = 0;

/* Accumulation from above replies must be reset back to 0 manually,
* as this subroutine does not invoke resetClient(). */
c->net_output_bytes_curr_cmd = 0;
madolson marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
1 change: 0 additions & 1 deletion src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -3694,7 +3694,6 @@ void afterCommand(client *c) {
/* Flush pending tracking invalidations. */
trackingHandlePendingKeyInvalidations();

clusterSlotStatsAddNetworkBytesInForUserClient(c);
clusterSlotStatsAddNetworkBytesOutForUserClient(c);

/* Flush other pending push messages. only when we are not in nested call.
Expand Down
131 changes: 80 additions & 51 deletions tests/unit/cluster/slot-stats.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,23 @@ proc assert_empty_slot_stats_with_exception {slot_stats exception_slots metrics_
}
}

proc assert_equal_slot_stats {slot_stats_1 slot_stats_2 metrics_to_assert} {
proc assert_equal_slot_stats {slot_stats_1 slot_stats_2 deterministic_metrics non_deterministic_metrics} {
set slot_stats_1 [convert_array_into_dict $slot_stats_1]
set slot_stats_2 [convert_array_into_dict $slot_stats_2]
assert {[dict size $slot_stats_1] == [dict size $slot_stats_2]}

dict for {slot stats_1} $slot_stats_1 {
assert {[dict exists $slot_stats_2 $slot]}
set stats_2 [dict get $slot_stats_2 $slot]
foreach metric_name $metrics_to_assert {
assert {[dict get $stats_1 $metric_name] == [dict get $stats_2 $metric_name]}

# For deterministic metrics, we assert their equality.
foreach metric $deterministic_metrics {
assert {[dict get $stats_1 $metric] == [dict get $stats_2 $metric]}
}
# For non-deterministic metrics, we assert their non-zeroness as a best-effort.
foreach metric $non_deterministic_metrics {
assert {([dict get $stats_1 $metric] == 0 && [dict get $stats_2 $metric] == 0) || \
([dict get $stats_1 $metric] != 0 && [dict get $stats_2 $metric] != 0)}
}
}
}
Expand Down Expand Up @@ -763,7 +770,9 @@ start_cluster 1 0 {tags {external:skip cluster}} {
# Test cases for CLUSTER SLOT-STATS ORDERBY sub-argument.
# -----------------------------------------------------------------------------

start_cluster 1 0 {tags {external:skip cluster}} {
start_cluster 1 0 {tags {external:skip cluster} overrides {cluster-slot-stats-enabled yes}} {

set metrics [list "key-count" "cpu-usec" "network-bytes-in" "network-bytes-out"]

# SET keys for target hashslots, to encourage ordering.
set hash_tags [list 0 1 2 3 4]
Expand All @@ -784,32 +793,36 @@ start_cluster 1 0 {tags {external:skip cluster}} {
}

test "CLUSTER SLOT-STATS ORDERBY DESC correct ordering" {
set orderby "key-count"
assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY $orderby DESC LIMIT -1}
set slot_stats [R 0 CLUSTER SLOT-STATS ORDERBY $orderby DESC]
assert_slot_stats_monotonic_descent $slot_stats $orderby
foreach orderby $metrics {
set slot_stats [R 0 CLUSTER SLOT-STATS ORDERBY $orderby DESC]
assert_slot_stats_monotonic_descent $slot_stats $orderby
}
}

test "CLUSTER SLOT-STATS ORDERBY ASC correct ordering" {
set orderby "key-count"
set slot_stats [R 0 CLUSTER SLOT-STATS ORDERBY $orderby ASC]
assert_slot_stats_monotonic_ascent $slot_stats $orderby
foreach orderby $metrics {
set slot_stats [R 0 CLUSTER SLOT-STATS ORDERBY $orderby ASC]
assert_slot_stats_monotonic_ascent $slot_stats $orderby
}
}

test "CLUSTER SLOT-STATS ORDERBY LIMIT correct response pagination, where limit is less than number of assigned slots" {
R 0 FLUSHALL SYNC
R 0 CONFIG RESETSTAT

set limit 5
set slot_stats_desc [R 0 CLUSTER SLOT-STATS ORDERBY key-count LIMIT $limit DESC]
set slot_stats_asc [R 0 CLUSTER SLOT-STATS ORDERBY key-count LIMIT $limit ASC]
set slot_stats_desc_length [llength $slot_stats_desc]
set slot_stats_asc_length [llength $slot_stats_asc]
assert {$limit == $slot_stats_desc_length && $limit == $slot_stats_asc_length}

# The key count of all slots is 0, so we will order by slot in ascending order.
set expected_slots [dict create 0 0 1 0 2 0 3 0 4 0]
assert_slot_visibility $slot_stats_desc $expected_slots
assert_slot_visibility $slot_stats_asc $expected_slots
foreach orderby $metrics {
set limit 5
set slot_stats_desc [R 0 CLUSTER SLOT-STATS ORDERBY $orderby LIMIT $limit DESC]
set slot_stats_asc [R 0 CLUSTER SLOT-STATS ORDERBY $orderby LIMIT $limit ASC]
set slot_stats_desc_length [llength $slot_stats_desc]
set slot_stats_asc_length [llength $slot_stats_asc]
assert {$limit == $slot_stats_desc_length && $limit == $slot_stats_asc_length}

# All slot statistics have been reset to 0, so we will order by slot in ascending order.
set expected_slots [dict create 0 0 1 0 2 0 3 0 4 0]
assert_slot_visibility $slot_stats_desc $expected_slots
assert_slot_visibility $slot_stats_asc $expected_slots
}
}

test "CLUSTER SLOT-STATS ORDERBY LIMIT correct response pagination, where limit is greater than number of assigned slots" {
Expand All @@ -818,32 +831,39 @@ start_cluster 1 0 {tags {external:skip cluster}} {
R 0 CLUSTER FLUSHSLOTS
R 0 CLUSTER ADDSLOTS 100 101

set num_assigned_slots 2
set limit 5
set slot_stats_desc [R 0 CLUSTER SLOT-STATS ORDERBY key-count LIMIT $limit DESC]
set slot_stats_asc [R 0 CLUSTER SLOT-STATS ORDERBY key-count LIMIT $limit ASC]
set slot_stats_desc_length [llength $slot_stats_desc]
set slot_stats_asc_length [llength $slot_stats_asc]
set expected_response_length [expr min($num_assigned_slots, $limit)]
assert {$expected_response_length == $slot_stats_desc_length && $expected_response_length == $slot_stats_asc_length}

set expected_slots [dict create 100 0 101 0]
assert_slot_visibility $slot_stats_desc $expected_slots
assert_slot_visibility $slot_stats_asc $expected_slots
foreach orderby $metrics {
set num_assigned_slots 2
set limit 5
set slot_stats_desc [R 0 CLUSTER SLOT-STATS ORDERBY $orderby LIMIT $limit DESC]
set slot_stats_asc [R 0 CLUSTER SLOT-STATS ORDERBY $orderby LIMIT $limit ASC]
set slot_stats_desc_length [llength $slot_stats_desc]
set slot_stats_asc_length [llength $slot_stats_asc]
set expected_response_length [expr min($num_assigned_slots, $limit)]
assert {$expected_response_length == $slot_stats_desc_length && $expected_response_length == $slot_stats_asc_length}

set expected_slots [dict create 100 0 101 0]
assert_slot_visibility $slot_stats_desc $expected_slots
assert_slot_visibility $slot_stats_asc $expected_slots
}
}

test "CLUSTER SLOT-STATS ORDERBY unsupported sort metric." {
set orderby "non-existent-metric"
assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY $orderby}

test "CLUSTER SLOT-STATS ORDERBY arg sanity check." {
# Non-existent argument.
assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY key-count non-existent-arg}
# Negative LIMIT.
assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY key-count DESC LIMIT -1}
# Non-existent ORDERBY metric.
assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY non-existent-metric}
# When cluster-slot-stats-enabled config is disabled, you cannot sort using advanced metrics.
R 0 CONFIG SET cluster-slot-stats-enabled no
set orderby "cpu-usec"
assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY $orderby}
set orderby "network-bytes-in"
assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY $orderby}
set orderby "network-bytes-out"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're missing tests here to make sure we can order by network bytes in/out

assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY $orderby}
}

}

# -----------------------------------------------------------------------------
Expand All @@ -853,17 +873,23 @@ start_cluster 1 0 {tags {external:skip cluster}} {
start_cluster 1 1 {tags {external:skip cluster} overrides {cluster-slot-stats-enabled yes}} {

# Define shared variables.
set key "FOO"
set key "key"
set key_slot [R 0 CLUSTER KEYSLOT $key]
set primary [Rn 0]
set replica [Rn 1]
set metrics_to_assert [list key-count network-bytes-in]

# For replication, only those metrics that are deterministic upon replication are asserted.
# * key-count is asserted, as both the primary and its replica must hold the same number of keys.
# * cpu-usec is not asserted, as its micro-seconds command duration is not guaranteed to be exact
# between the primary and its replica.
set metrics_to_assert [list key-count]
# For replication, assertions are split between deterministic and non-deterministic metrics.
# * For deterministic metrics, strict equality assertions are made.
# * For non-deterministic metrics, non-zeroness assertions are made.
# Non-zeroness as in, both primary and replica should either have some value, or no value at all.
#
# * key-count is deterministic between primary and its replica.
# * cpu-usec is non-deterministic between primary and its replica.
# * network-bytes-in is deterministic between primary and its replica.
# * network-bytes-out will remain empty in the replica, since primary client do not receive replies, unless for replicationSendAck().
set deterministic_metrics [list key-count network-bytes-in]
set non_deterministic_metrics [list cpu-usec]
set empty_metrics [list network-bytes-out]

# Setup replication.
assert {[s -1 role] eq {slave}}
Expand All @@ -884,15 +910,16 @@ start_cluster 1 1 {tags {external:skip cluster} overrides {cluster-slot-stats-en
]
]
set slot_stats_master [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_empty_slot_stats_with_exception $slot_stats_master $expected_slot_stats $metrics_to_assert
assert_empty_slot_stats_with_exception $slot_stats_master $expected_slot_stats $deterministic_metrics

wait_for_condition 500 10 {
[string match {*calls=1,*} [cmdrstat set $replica]]
} else {
fail "Replica did not receive the command."
}
set slot_stats_replica [R 1 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_equal_slot_stats $slot_stats_master $slot_stats_replica $metrics_to_assert
assert_equal_slot_stats $slot_stats_master $slot_stats_replica $deterministic_metrics $non_deterministic_metrics
assert_empty_slot_stats $slot_stats_replica $empty_metrics
}
R 0 CONFIG RESETSTAT
R 1 CONFIG RESETSTAT
Expand All @@ -907,15 +934,16 @@ start_cluster 1 1 {tags {external:skip cluster} overrides {cluster-slot-stats-en
]
]
set slot_stats_master [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_empty_slot_stats_with_exception $slot_stats_master $expected_slot_stats $metrics_to_assert
assert_empty_slot_stats_with_exception $slot_stats_master $expected_slot_stats $deterministic_metrics

wait_for_condition 500 10 {
[string match {*calls=1,*} [cmdrstat set $replica]]
} else {
fail "Replica did not receive the command."
}
set slot_stats_replica [R 1 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_equal_slot_stats $slot_stats_master $slot_stats_replica $metrics_to_assert
assert_equal_slot_stats $slot_stats_master $slot_stats_replica $deterministic_metrics $non_deterministic_metrics
assert_empty_slot_stats $slot_stats_replica $empty_metrics
}
R 0 CONFIG RESETSTAT
R 1 CONFIG RESETSTAT
Expand All @@ -930,15 +958,16 @@ start_cluster 1 1 {tags {external:skip cluster} overrides {cluster-slot-stats-en
]
]
set slot_stats_master [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_empty_slot_stats_with_exception $slot_stats_master $expected_slot_stats $metrics_to_assert
assert_empty_slot_stats_with_exception $slot_stats_master $expected_slot_stats $deterministic_metrics

wait_for_condition 500 10 {
[string match {*calls=1,*} [cmdrstat del $replica]]
} else {
fail "Replica did not receive the command."
}
set slot_stats_replica [R 1 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_equal_slot_stats $slot_stats_master $slot_stats_replica $metrics_to_assert
assert_equal_slot_stats $slot_stats_master $slot_stats_replica $deterministic_metrics $non_deterministic_metrics
assert_empty_slot_stats $slot_stats_replica $empty_metrics
}
R 0 CONFIG RESETSTAT
R 1 CONFIG RESETSTAT
Expand Down
Loading