Skip to content

Commit

Permalink
add master client defering close test cases.
Browse files Browse the repository at this point in the history
  • Loading branch information
patpatbear committed Feb 6, 2024
1 parent 2f33ba5 commit 99d08ec
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 15 deletions.
1 change: 1 addition & 0 deletions src/ctrip_swap.h
Original file line number Diff line number Diff line change
Expand Up @@ -1820,6 +1820,7 @@ int clientSwap(client *c);
void continueProcessCommand(client *c);
int replClientSwap(client *c);
void replicationCacheSwapDrainingMaster(client *c);
void replicationHandleMasterDisconnectionWithoutReconnect(void);
void replClientDiscardSwappingState(client *c);
void submitDeferredClientKeyRequests(client *c, getKeyRequestsResult *result, clientKeyRequestFinished cb, void* ctx_pd);
void submitClientKeyRequests(client *c, getKeyRequestsResult *result, clientKeyRequestFinished cb, void* ctx_pd);
Expand Down
11 changes: 11 additions & 0 deletions src/ctrip_swap_repl.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,17 @@

#include "ctrip_swap.h"

/* See replicationHandleMasterDisconnection for more details */
void replicationHandleMasterDisconnectionWithoutReconnect(void) {
/* Fire the master link modules event. */
if (server.repl_state == REPL_STATE_CONNECTED)
moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
REDISMODULE_SUBEVENT_MASTER_LINK_DOWN,
NULL);
server.master = NULL;
server.repl_down_since = server.unixtime;
}

/* See replicationCacheMaster for more details */
void replicationCacheSwapDrainingMaster(client *c) {
serverAssert(server.swap_draining_master != NULL && server.cached_master == NULL);
Expand Down
19 changes: 15 additions & 4 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -1250,7 +1250,13 @@ void disconnectSlaves(void) {
while((ln = listNext(&li))) {
/* Flush pending output to slaves before disconnect so it will be
* more likely to psync when failover. */
writeToClient((client*)ln->value,0);
client *slave = (client *)ln->value;
writeToClient(slave,0);
if (clientHasPendingReplies(slave)) {
sds client_desc = catClientInfoString(sdsempty(), slave);
serverLog(LL_NOTICE, "Slave still have pending replies when disconnect: %s", client_desc);
sdsfree(client_desc);
}
freeClient((client*)ln->value);
}
}
Expand Down Expand Up @@ -1424,7 +1430,6 @@ void freeClient(client *c) {
serverLog(LL_WARNING, "Connection with master lost (defer start with %d key requests).", c->keyrequests_count);
server.swap_draining_master = server.master;
server.master = NULL;
server.repl_state = REPL_STATE_CONNECT;
}
deferFreeClient(c);
return;
Expand All @@ -1436,7 +1441,8 @@ void freeClient(client *c) {

if (c->flags & CLIENT_SWAP_SHIFT_REPL_ID) {
c->flags &= ~CLIENT_SWAP_SHIFT_REPL_ID;
serverLog(LL_WARNING, "Replication id shift defer done.");
serverLog(LL_NOTICE, "Replication id shift defer done(replid=%s, master_repl_offset=%lld).",
server.replid, server.master_repl_offset);
shiftReplicationId();
}

Expand Down Expand Up @@ -1540,7 +1546,12 @@ void freeClient(client *c) {

/* Master/slave cleanup Case 2:
* we lost the connection with the master. */
if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection();
if (c->flags & CLIENT_MASTER) {
if (c->flags & CLIENT_SWAP_DONT_RECONNECT_MASTER)
replicationHandleMasterDisconnectionWithoutReconnect();
else
replicationHandleMasterDisconnection();
}

/* Remove the contribution that this client gave to our
* incrementally computed memory usage. */
Expand Down
30 changes: 20 additions & 10 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -2643,26 +2643,28 @@ int waitSwapDrainingMaster(struct aeEventLoop *eventLoop, long long id, void *cl
UNUSED(eventLoop), UNUSED(id), UNUSED(clientData);

if (server.swap_draining_master == NULL) {
serverLog(LL_WARNING, "Wait swap master drain done");
serverLog(LL_NOTICE, "Wait master client drain done");
if (server.repl_state == REPL_STATE_CONNECT) connectWithMaster();
isWaitSwapDrainingMasterRunning = 0;
return AE_NOMORE;
} else {
if (server.mstime - logged_time > 1000) {
logged_time = server.mstime;
sds client_desc = catClientInfoString(sdsempty(), server.swap_draining_master);
serverLog(LL_WARNING, "Wait swap master drainning: %s.", client_desc);
serverLog(LL_NOTICE, "Wait master client drain before connect: %s.", client_desc);
sdsfree(client_desc);
}
return SWAP_WAIT_DRAINING_MASTER_INTERVAL_MS;
}
}

int connectWithMaster(void) {
if (server.swap_draining_master != NULL && !isWaitSwapDrainingMasterRunning) {
isWaitSwapDrainingMasterRunning = 1;
aeCreateTimeEvent(server.el,SWAP_WAIT_DRAINING_MASTER_INTERVAL_MS,
if (server.swap_draining_master != NULL) {
if (!isWaitSwapDrainingMasterRunning) {
isWaitSwapDrainingMasterRunning = 1;
aeCreateTimeEvent(server.el,SWAP_WAIT_DRAINING_MASTER_INTERVAL_MS,
waitSwapDrainingMaster,NULL,NULL);
}
return C_OK;
} else {
return doConnectWithMaster();
Expand Down Expand Up @@ -2755,6 +2757,12 @@ void replicationSetMaster(char *ip, int port) {
if (was_master) {
replicationDiscardCachedMaster();
replicationCacheMasterUsingMyself();

/* dont trigger connect master when drain, master connection is
* started right below. */
if (server.swap_draining_master) {
server.swap_draining_master->flags |= CLIENT_SWAP_DONT_RECONNECT_MASTER;
}
}

/* Fire the role change modules event. */
Expand Down Expand Up @@ -2802,7 +2810,9 @@ void replicationUnsetMaster(void) {
* offset trimming the final PINGs. See Github issue #7320. */
if (server.swap_draining_master) {
server.swap_draining_master->flags |= CLIENT_SWAP_SHIFT_REPL_ID;
serverLog(LL_WARNING, "Replication id shift defer start (wait untill master swap drain).");
server.swap_draining_master->flags |= CLIENT_SWAP_DONT_RECONNECT_MASTER;
serverLog(LL_NOTICE, "Replication id shift defer start(replid=%s, master_repl_offset=%lld).",
server.replid, server.master_repl_offset);
} else {
shiftReplicationId();
}
Expand Down Expand Up @@ -3103,16 +3113,15 @@ void replicationCacheMasterUsingMyself(void) {
/* Free a cached master, called when there are no longer the conditions for
* a partial resync on reconnection. */
void replicationDiscardCachedMaster(void) {
if (server.swap_draining_master)
server.swap_draining_master->flags |= CLIENT_SWAP_DISCARD_CACHED_MASTER;

if (server.cached_master == NULL) return;

serverLog(LL_NOTICE,"Discarding previously cached master state.");
server.cached_master->flags &= ~CLIENT_MASTER;
freeClient(server.cached_master);
server.cached_master = NULL;

if (server.swap_draining_master) {
server.swap_draining_master->flags |= CLIENT_SWAP_DISCARD_CACHED_MASTER;
}
}

/* Turn the cached master into the current master, using the file descriptor
Expand Down Expand Up @@ -3446,6 +3455,7 @@ void replicationCron(void) {

/* Timed out master when we are an already connected slave? */
if (server.masterhost && server.repl_state == REPL_STATE_CONNECTED &&
server.master &&
(time(NULL)-server.master->lastinteraction) > server.repl_timeout)
{
serverLog(LL_WARNING,"MASTER timeout: no data nor PING received...");
Expand Down
13 changes: 12 additions & 1 deletion src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -4505,6 +4505,17 @@ int processCommand(client *c) {
return C_OK;
}

/* Don't accept write commands if this is a master with previous
* master client draining: replid shift defered and write command
* would mix replication log from prev and current replid. */
if (server.masterhost == NULL && server.swap_draining_master &&
server.swap_draining_master->flags & CLIENT_SWAP_SHIFT_REPL_ID &&
!(c->flags & CLIENT_MASTER) && is_write_command)
{
rejectCommandFormat(c, "Previous master draining.");
return C_OK;
}

/* Only allow a subset of commands in the context of Pub/Sub if the
* connection is in RESP2 mode. With RESP3 there are no limits. */
if ((c->flags & CLIENT_PUBSUB && c->resp == 2) &&
Expand Down Expand Up @@ -5484,7 +5495,7 @@ sds genRedisInfoString(const char *section) {
"slave_repl_offset:%lld\r\n"
,server.masterhost,
server.masterport,
(server.repl_state == REPL_STATE_CONNECTED) ?
(server.repl_state == REPL_STATE_CONNECTED && server.master) ?
"up" : "down",
server.master ?
((int)(server.unixtime-server.master->lastinteraction)) : -1,
Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define CLIENT_SWAP_REWINDING (1ULL<<46) /* The client is waiting rewind. */
#define CLIENT_SWAP_DISCARD_CACHED_MASTER (1ULL<<47) /* The client will not be saved as cached_master. */
#define CLIENT_SWAP_SHIFT_REPL_ID (1ULL<<48) /* shift repl id when this client (drainning master) drained. */
#define CLIENT_SWAP_DONT_RECONNECT_MASTER (1ULL<<49) /* shift repl id when this client (drainning master) drained. */

/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */
Expand Down
145 changes: 145 additions & 0 deletions tests/swap/integration/replication.tcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
proc log_file_matches {log pattern} {
set fp [open $log r]
set content [read $fp]
close $fp
string match $pattern $content
}

start_server {tags {"swap replication"} overrides {}} {
start_server {} {
start_server {} {
set master2 [srv -2 client]
set master2_host [srv -2 host]
set master2_port [srv -2 port]
set master2_log [srv -2 stdout]

set master [srv -1 client]
set master_host [srv -1 host]
set master_port [srv -1 port]
set master_log [srv -1 stdout]

set slave [srv 0 client]
set slave_host [srv 0 host]
set slave_port [srv 0 port]
set slave_log [srv 0 stdout]

set keycount 250

test {Replication setup and init cold key} {
$slave slaveof $master_host $master_port
after 500
assert_equal [s 0 role] slave

for {set i 0} {$i < $keycount} {incr i} {
$master set key_$i val_$i
}
wait_keyspace_cold $master
wait_keyspace_cold $slave
}

test {slave reconnect master will defer without discarding replicated commands} {
$slave config set swap-debug-rio-delay-micro 1000

for {set i 0} {$i < $keycount} {incr i} {
$master set key_$i val_$i
}

$master client kill type replica

set master_repl_offset [status $master master_repl_offset]

assert_equal [status $slave master_link_status] {down}

assert {[status $master master_repl_offset] > [status $slave master_repl_offset]}

wait_for_sync $slave
assert {[log_file_matches $master_log "*Sending 0 bytes of backlog starting from offset [expr $master_repl_offset+1]*"]}
}

test {shift replid will be defered untill previous master client drain} {
$slave config set swap-debug-rio-delay-micro 10000

set master_rd [redis_deferring_client -1]
set slave_rd [redis_deferring_client 0]

for {set i 0} {$i < $keycount} {incr i} {
$master set key_$i val_$i
}

set master_repl_offset [status $master master_repl_offset]

$slave_rd slaveof no one
$master_rd slaveof $slave_host $slave_port

$slave_rd read
$master_rd read

wait_for_condition 50 1000 {
[log_file_matches $slave_log "*Sending 0 bytes of backlog starting from offset [expr $master_repl_offset+1]*"]
} else {
fail "Drainging slaveof no one results in fullresync! "
}

assert {[log_file_matches $slave_log "*Replication id shift defer done*master_repl_offset=$master_repl_offset*"]}
assert {[log_file_matches $master_log "*NOMASTERLINK Can't SYNC while replid shift in progress*"]}

# restore master slave replication
$slave_rd slaveof $master_host $master_port
$master_rd slaveof no one
$slave_rd read
$master_rd read
wait_for_sync $slave
}

test {slave => master(defering) => slave: fullresync because replid not match} {
$slave config set swap-debug-rio-delay-micro 1000

set master_rd [redis_deferring_client -1]
set slave_rd [redis_deferring_client 0]

for {set i 0} {$i < $keycount} {incr i} {
$master set key_$i val_$i
}

set master_replid [status $master master_replid]
set master_repl_offset [status $master master_repl_offset]
set slave_repl_offset [status $slave master_repl_offset]

$slave_rd slaveof no one
$slave_rd slaveof $master_host $master_port

$slave_rd read
$slave_rd read

wait_for_sync $slave

# psync with different repid
assert {[log_file_matches $slave_log "*Setting secondary replication ID to $master_replid, valid up to offset: [expr $master_repl_offset+1]*"]}
assert {[log_file_matches $slave_log "*Full resync from master: $master_replid:$master_repl_offset*"]}
}

test {slave slaveof another master will defer without descarding replicated commands} {
$slave config set swap-debug-rio-delay-micro 1000

set slave_rd [redis_deferring_client 0]

for {set i 0} {$i < $keycount} {incr i} {
$master set key_$i val_$i
}

$slave_rd slaveof $master2_host $master2_port

set master_repl_offset [status $master master_repl_offset]

assert {[status $master master_repl_offset] > [status $slave master_repl_offset]}

$slave_rd read
wait_for_sync $slave

assert {[log_file_matches $slave_log "*Trying a partial resynchronization *:[expr $master_repl_offset+1]*"]}
}

}
}
}

0 comments on commit 99d08ec

Please sign in to comment.