Skip to content

Commit

Permalink
opt: dont discard dispatched but not executed command of slave client…
Browse files Browse the repository at this point in the history
…, so that if another replica psync with larger offset wont result in full resync.
  • Loading branch information
patpatbear committed Oct 26, 2023
1 parent 5186f66 commit 589dab6
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 63 deletions.
25 changes: 15 additions & 10 deletions src/ctrip_swap.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,27 +108,28 @@ void clientReleaseLocks(client *c, swapCtx *ctx) {
void startSwapRewind(swap_rewind_type rewind_type) {
server.swap_rewind_type = rewind_type;
serverAssert(rewind_type != SWAP_REWIND_OFF);
serverLog(LL_WARNING,"Start swap rewind(%d)", rewind_type);
serverLog(LL_WARNING,"Start swap rewind(%d), current master_repl_offset:%lld", rewind_type, server.master_repl_offset);
}

void endSwapRewind() {
server.swap_rewind_type = SWAP_REWIND_OFF;
listJoin(server.swap_rewinding_clients,server.swap_torewind_clients);
serverLog(LL_WARNING,"End swap rewind");
}

void processSwapRewindingClients(void) {
static void processSwapRewindingClients(void) {
listNode *ln;
while (listLength(server.swap_rewinding_clients)) {
ln = listFirst(server.swap_rewinding_clients);
serverAssert(ln != NULL);
client *c = listNodeValue(ln);
listDelNode(server.swap_rewinding_clients,ln);
c->flags &= ~CLIENT_SWAPPING;
c->flags &= ~CLIENT_SWAP_REWINDING;
queueClientForReprocessing(c);
}
}

void endSwapRewind() {
server.swap_rewind_type = SWAP_REWIND_OFF;
listJoin(server.swap_rewinding_clients,server.swap_torewind_clients);
processSwapRewindingClients();
serverLog(LL_WARNING,"End swap rewind, current master_repl_offset:%lld", server.master_repl_offset);
}

static void startSwapRewindIfNeeded(client *c) {
if (c->cmd && (c->cmd->proc == failoverCommand ||
c->cmd->proc == replicaofCommand)) {
Expand All @@ -138,6 +139,7 @@ static void startSwapRewindIfNeeded(client *c) {

static void registerSwapToRewindClient(client *c) {
serverAssert(c->cmd);
c->flags |= CLIENT_SWAP_REWINDING;
listAddNodeTail(server.swap_torewind_clients,c);
}

Expand Down Expand Up @@ -562,7 +564,10 @@ int dbSwap(client *c) {

swapRateLimitPause(rlctx,c);

if (keyrequests_submit > 0) {
if (c->flags & CLIENT_SWAP_REWINDING) {
/* Rewinding command parsed but not processed, See below */
return C_ERR;
} else if (keyrequests_submit > 0) {
/* Swapping command parsed but not processed, return C_ERR so that:
* 1. repl stream will not propagate to sub-slaves
* 2. client will not reset
Expand Down
2 changes: 1 addition & 1 deletion src/ctrip_swap.h
Original file line number Diff line number Diff line change
Expand Up @@ -1762,7 +1762,7 @@ int dbSwap(client *c);
int clientSwap(client *c);
void continueProcessCommand(client *c);
int replClientSwap(client *c);
int replClientDiscardDispatchedCommands(client *c);
void replicationCacheSwapDrainingMaster(client *c);
void replClientDiscardSwappingState(client *c);
void submitDeferredClientKeyRequests(client *c, getKeyRequestsResult *result, clientKeyRequestFinished cb, void* ctx_pd);
void submitClientKeyRequests(client *c, getKeyRequestsResult *result, clientKeyRequestFinished cb, void* ctx_pd);
Expand Down
76 changes: 39 additions & 37 deletions src/ctrip_swap_repl.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,32 +28,47 @@

#include "ctrip_swap.h"

/* ----------------------------- repl swap ------------------------------ */
int replClientDiscardDispatchedCommands(client *c) {
int discarded = 0, scanned = 0;
listIter li;
listNode *ln;

serverAssert(c);

listRewind(server.repl_worker_clients_used,&li);
while ((ln = listNext(&li))) {
client *wc = listNodeValue(ln);
if (wc->repl_client == c) {
wc->CLIENT_REPL_CMD_DISCARDED = 1;
discarded++;
serverLog(LL_NOTICE, "discarded: cmd_reploff(%lld)", wc->cmd_reploff);
}
scanned++;
/* See replicationCacheMaster for more details */
void replicationCacheSwapDrainingMaster(client *c) {
serverAssert(server.swap_draining_master != NULL && server.cached_master == NULL);
serverLog(LL_NOTICE,"Caching the disconnected swap draining master state.");

/* Unlink the client from the server structures. */
unlinkClient(c);

/* Reset the master client so that's ready to accept new commands:
* we want to discard te non processed query buffers and non processed
* offsets, including pending transactions, already populated arguments,
* pending outputs to the master. */
sdsclear(server.swap_draining_master->querybuf);
sdsclear(server.swap_draining_master->pending_querybuf);
server.swap_draining_master->read_reploff = server.swap_draining_master->reploff;
if (c->flags & CLIENT_MULTI) discardTransaction(c);
listEmpty(c->reply);
c->sentlen = 0;
c->reply_bytes = 0;
c->bufpos = 0;
resetClient(c);

/* Save the master. Server.master will be set to null later by
* replicationHandleMasterDisconnection(). */
server.cached_master = server.swap_draining_master;

/* Invalidate the Peer ID cache. */
if (c->peerid) {
sdsfree(c->peerid);
c->peerid = NULL;
}

if (discarded) {
serverLog(LL_NOTICE,
"discard (%d/%d) dispatched but not executed commands for repl client(reploff:%lld, read_reploff:%lld)",
discarded, scanned, c->reploff, c->read_reploff);
/* Invalidate the Sock Name cache. */
if (c->sockname) {
sdsfree(c->sockname);
c->sockname = NULL;
}

return discarded;
/* Caching the master happens instead of the actual freeClient() call,
* so make sure to adjust the replication state. This function will
* also set server.master to NULL. */
replicationHandleMasterDisconnection();
}

void replClientDiscardSwappingState(client *c) {
Expand Down Expand Up @@ -146,20 +161,7 @@ static void processFinishedReplCommands() {
listDelNode(server.repl_worker_clients_used, ln);
listAddNodeTail(server.repl_worker_clients_free, wc);

/* Discard dispatched but not executed commands like we never reveived, if
* - repl client is closing: client close defered untill all swapping
* dispatched cmds finished, those cmds will be discarded.
* - repl client is cached: client cached but read_reploff will shirnk
* back and dispatched cmd will be discared. */
if (wc->CLIENT_REPL_CMD_DISCARDED) {
commandProcessed(wc);
serverAssert(wc->client_hold_mode == CLIENT_HOLD_MODE_REPL);
clientReleaseLocks(wc,NULL/*ctx unused*/);
wc->CLIENT_REPL_CMD_DISCARDED = 0;
continue;
} else {
serverAssert(c->flags&CLIENT_MASTER);
}
serverAssert(c->flags&CLIENT_MASTER);

backup_cmd = c->cmd;
c->cmd = wc->cmd;
Expand Down
41 changes: 29 additions & 12 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ client *createClient(connection *conn) {
c->client_hold_mode = CLIENT_HOLD_MODE_CMD;
c->CLIENT_DEFERED_CLOSING = 0;
c->CLIENT_REPL_SWAPPING = 0;
c->CLIENT_REPL_CMD_DISCARDED = 0;
c->swap_locks = listCreate();
c->swap_metas = NULL;
c->swap_errcode = 0;
Expand Down Expand Up @@ -1386,9 +1385,6 @@ void freeClientsInDeferedQueue(void) {
void freeClient(client *c) {
listNode *ln;

/* Discard dispatched commands if client is (or was) a repl client. */
replClientDiscardDispatchedCommands(c);

/* Unlinked repl client from server.repl_swapping_clients. */
replClientDiscardSwappingState(c);

Expand Down Expand Up @@ -1419,6 +1415,32 @@ void freeClient(client *c) {
listDelNode(server.clients_to_close,ln);
}

serverAssert(!(server.swap_draining_master && server.master));

if (c->keyrequests_count) {
if (server.master && c->flags & CLIENT_MASTER) {
serverLog(LL_WARNING, "Connection with master lost (defer start with %d key requests).", c->keyrequests_count);
server.swap_draining_master = server.master;
server.master = NULL;
server.repl_state = REPL_STATE_CONNECT;
}
deferFreeClient(c);
return;
}

if (server.swap_draining_master && c->flags & CLIENT_MASTER) {
serverLog(LL_WARNING, "Connection with master lost (defer done, discard cache=%s).",
(c->flags & CLIENT_SWAP_DISCARD_CACHED_MASTER) ? "yes" : "no");
if (!(c->flags & (CLIENT_PROTOCOL_ERROR|CLIENT_BLOCKED|CLIENT_SWAP_DISCARD_CACHED_MASTER))) {
c->flags &= ~(CLIENT_CLOSE_ASAP|CLIENT_CLOSE_AFTER_REPLY);
replicationCacheSwapDrainingMaster(c);
server.swap_draining_master = NULL;
return;
} else {
server.swap_draining_master = NULL;
}
}

/* If it is our master that's being disconnected we should make sure
* to cache the state to try a partial resynchronization later.
*
Expand All @@ -1433,11 +1455,6 @@ void freeClient(client *c) {
}
}

if (c->keyrequests_count) {
deferFreeClient(c);
return;
}

/* Log link disconnection with slave */
if (getClientType(c) == CLIENT_TYPE_SLAVE) {
serverLog(LL_WARNING,"Connection with replica %s lost.",
Expand Down Expand Up @@ -2103,7 +2120,7 @@ void commandProcessed(client *c) {
* The client will be reset in unblockClient().
* 2. Don't update replication offset or propagate commands to replicas,
* since we have not applied the command. */
if (!(c->flags & CLIENT_BLOCKED) && !(c->flags & CLIENT_SWAPPING)) {
if (!(c->flags & CLIENT_BLOCKED) && !(c->flags & CLIENT_SWAPPING) && !(c->flags & CLIENT_SWAP_REWINDING)) {
resetClient(c);
}

Expand Down Expand Up @@ -2191,7 +2208,7 @@ void processInputBuffer(client *c) {
if (c->flags & CLIENT_BLOCKED) break;

/* Also abort if the client is swapping. */
if (c->flags&CLIENT_SWAPPING) break;
if (c->flags&CLIENT_SWAPPING || c->flags&CLIENT_SWAP_REWINDING) break;

/* Don't process more buffers from clients that have already pending
* commands to execute in c->argv. */
Expand Down Expand Up @@ -2277,7 +2294,7 @@ void readQueryFromClient(connection *conn) {
* the event loop. This is the case if threaded I/O is enabled. */
if (postponeClientRead(c)) return;

if (c->flags&CLIENT_SWAPPING) return;
if (c->flags&CLIENT_SWAPPING || c->flags&CLIENT_SWAP_REWINDING) return;

/* Update total number of reads on server */
atomicIncr(server.stat_total_reads_processed, 1);
Expand Down
46 changes: 44 additions & 2 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -2590,7 +2590,7 @@ void syncWithMaster(connection *conn) {
goto error;
}

int connectWithMaster(void) {
int doConnectWithMaster(void) {
server.repl_transfer_s = server.tls_replication ? connCreateTLS() : connCreateSocket();
if (connConnect(server.repl_transfer_s, server.masterhost, server.masterport,
NET_FIRST_BIND_ADDR, syncWithMaster) == C_ERR) {
Expand All @@ -2608,6 +2608,42 @@ int connectWithMaster(void) {
return C_OK;
}

#define SWAP_WAIT_DRAINING_MASTER_INTERVAL_MS 100

int connectWithMaster(void);

int waitSwapDrainingMaster(struct aeEventLoop *eventLoop, long long id, void *clientData) {
static mstime_t logged_time = 0;

UNUSED(eventLoop), UNUSED(id), UNUSED(clientData);

if (server.swap_draining_master == NULL) {
serverLog(LL_WARNING, "Wait swap master drain done");
if (server.repl_state == REPL_STATE_CONNECT) connectWithMaster();
return AE_NOMORE;
} else {
if (server.mstime - logged_time > 1000) {
logged_time = server.mstime;
sds client_desc = catClientInfoString(sdsempty(), server.swap_draining_master);
serverLog(LL_WARNING, "Wait swap master drainning: %s.", client_desc);
sdsfree(client_desc);
}
return SWAP_WAIT_DRAINING_MASTER_INTERVAL_MS;
}
}

int connectWithMaster(void) {
if (server.swap_draining_master != NULL) {
if (aeCreateTimeEvent(server.el,SWAP_WAIT_DRAINING_MASTER_INTERVAL_MS,
waitSwapDrainingMaster,NULL,NULL) != AE_ERR)
return C_OK;
else
return C_ERR;
} else {
return doConnectWithMaster();
}
}

/* This function can be called when a non blocking connection is currently
* in progress to undo it.
* Never call this function directly, use cancelReplicationHandshake() instead.
Expand Down Expand Up @@ -2861,7 +2897,9 @@ void replicaofCommand(client *c) {
addReply(c,shared.ok);

endrewind:
endSwapRewind();
if (server.swap_mode != SWAP_MODE_MEMORY) {
endSwapRewind();
}
}

/* ROLE command: provide information about the role of the instance
Expand Down Expand Up @@ -3041,6 +3079,10 @@ void replicationDiscardCachedMaster(void) {
server.cached_master->flags &= ~CLIENT_MASTER;
freeClient(server.cached_master);
server.cached_master = NULL;

if (server.swap_draining_master) {
server.swap_draining_master->flags |= CLIENT_SWAP_DISCARD_CACHED_MASTER;
}
}

/* Turn the cached master into the current master, using the file descriptor
Expand Down
1 change: 1 addition & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -3668,6 +3668,7 @@ void InitServerLast() {
server.swap_rewind_type = SWAP_REWIND_OFF;
server.swap_torewind_clients = listCreate();
server.swap_rewinding_clients = listCreate();
server.swap_draining_master = NULL;
rocksInit();
server.util_task_manager = createRocksdbUtilTaskManager();
asyncCompleteQueueInit();
Expand Down
5 changes: 4 additions & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,8 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define CLIENT_SWAPPING (1ULL<<43) /* The client is waiting swap. */
#define CLIENT_SWAP_UNLOCKING (1ULL<<44) /* Client is releasing swap lock. */
#define CLIENT_CTRIP_MONITOR (1ULL<<45) /* Client for ctrip monitor. */
#define CLIENT_SWAP_REWINDING (1ULL<<46) /* The client is waiting rewind. */
#define CLIENT_SWAP_DISCARD_CACHED_MASTER (1ULL<<47) /* The client will not be saved as cached_master. */

/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */
Expand Down Expand Up @@ -1017,7 +1019,6 @@ typedef struct client {
int client_hold_mode; /* indicates how client should hold key */
int CLIENT_DEFERED_CLOSING;
int CLIENT_REPL_SWAPPING;
int CLIENT_REPL_CMD_DISCARDED;
long long cmd_reploff; /* Command replication offset when dispatch if this is a repl worker */
struct client *repl_client; /* Master or peer client if this is a repl worker */
list *swap_locks; /* swap locks */
Expand Down Expand Up @@ -1929,6 +1930,8 @@ struct redisServer {
/* swap meta flush */
int swap_flush_meta_deletes_percentage;
unsigned long long swap_flush_meta_deletes_num;

client *swap_draining_master;
};

#define MAX_KEYS_BUFFER 256
Expand Down

0 comments on commit 589dab6

Please sign in to comment.