Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/unstable' into epoch_timeout
Browse files Browse the repository at this point in the history
Signed-off-by: Binbin <[email protected]>
  • Loading branch information
enjoy-binbin committed Sep 14, 2024
2 parents 0270e71 + 09def3c commit 0d13f5d
Show file tree
Hide file tree
Showing 20 changed files with 355 additions and 178 deletions.
14 changes: 14 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# This is a file that can be used by git-blame to ignore some revisions.
# (git 2.23+, released in August 2019)
#
# Can be configured as follow:
#
# $ git config blame.ignoreRevsFile .git-blame-ignore-revs
#
# For more information you can look at git-blame(1) man page.

# Applied clang-format (#323)
c41dd77a3e93e02be3c4bc75d8c76b7b4169a4ce

# Removed terms `master` and `slave` from the source code (#591)
54c97479356ecf41b4b63733494a1be2ab919e17
1 change: 1 addition & 0 deletions src/.clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ SortIncludes: false
AllowAllParametersOfDeclarationOnNextLine: false
BinPackParameters: false
AlignAfterOpenBracket: Align
InsertNewlineAtEOF: true
2 changes: 1 addition & 1 deletion src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ void clusterInitLast(void);
void clusterCron(void);
void clusterBeforeSleep(void);
int verifyClusterConfigWithData(void);
void clusterHandleServerShutdown(void);

int clusterSendModuleMessageToTarget(const char *target,
uint64_t module_id,
Expand Down Expand Up @@ -83,7 +84,6 @@ int getNodeDefaultClientPort(clusterNode *n);
clusterNode *getMyClusterNode(void);
int getClusterSize(void);
int getMyShardSlotCount(void);
int handleDebugClusterCommand(client *c);
int clusterNodePending(clusterNode *node);
int clusterNodeIsPrimary(clusterNode *n);
char **getClusterNodesList(size_t *numnodes);
Expand Down
49 changes: 44 additions & 5 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -1191,6 +1191,28 @@ void clusterInitLast(void) {
}
}

/* Called when a cluster node receives SHUTDOWN. */
void clusterHandleServerShutdown(void) {
/* The error logs have been logged in the save function if the save fails. */
serverLog(LL_NOTICE, "Saving the cluster configuration file before exiting.");
clusterSaveConfig(1);

#if !defined(__sun)
/* Unlock the cluster config file before shutdown, see clusterLockConfig.
*
* This is needed if you shutdown a very large server process, it will take
* a while for the OS to release resources and unlock the cluster configuration
* file. Therefore, if we immediately try to restart the server process, it
* may not be able to acquire the lock on the cluster configuration file and
* fail to start. We explicitly releases the lock on the cluster configuration
* file on shutdown, rather than relying on the OS to release the lock, which
* is a cleaner and safer way to release acquired resources. */
if (server.cluster_config_file_lock_fd != -1) {
flock(server.cluster_config_file_lock_fd, LOCK_UN | LOCK_NB);
}
#endif /* __sun */
}

/* Reset a node performing a soft or hard reset:
*
* 1) All other nodes are forgotten.
Expand Down Expand Up @@ -3170,14 +3192,21 @@ int clusterProcessPacket(clusterLink *link) {
/* Add this node if it is new for us and the msg type is MEET.
* In this stage we don't try to add the node with the right
* flags, replicaof pointer, and so forth, as this details will be
* resolved when we'll receive PONGs from the node. */
* resolved when we'll receive PONGs from the node. The exception
* to this is the flag that indicates extensions are supported, as
* we want to send extensions right away in the return PONG in order
* to reduce the amount of time needed to stabilize the shard ID. */
if (!sender && type == CLUSTERMSG_TYPE_MEET) {
clusterNode *node;

node = createClusterNode(NULL, CLUSTER_NODE_HANDSHAKE);
serverAssert(nodeIp2String(node->ip, link, hdr->myip) == C_OK);
getClientPortFromClusterMsg(hdr, &node->tls_port, &node->tcp_port);
node->cport = ntohs(hdr->cport);
if (hdr->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA) {
node->flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED;
}
setClusterNodeToInboundClusterLink(node, link);
clusterAddNode(node);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}
Expand Down Expand Up @@ -6209,6 +6238,9 @@ int clusterParseSetSlotCommand(client *c, int *slot_out, clusterNode **node_out,
return 0;
}

/* If 'myself' is a replica, 'c' must be the primary client. */
serverAssert(!nodeIsReplica(myself) || c == server.primary);

if ((slot = getSlotOrReply(c, c->argv[2])) == -1) return 0;

if (!strcasecmp(c->argv[3]->ptr, "migrating") && c->argc >= 5) {
Expand Down Expand Up @@ -6397,20 +6429,27 @@ void clusterCommandSetSlot(client *c) {
server.cluster->migrating_slots_to[slot] = NULL;
}

int slot_was_mine = server.cluster->slots[slot] == myself;
clusterNode *my_primary = clusterNodeGetPrimary(myself);
int slot_was_mine = server.cluster->slots[slot] == my_primary;
clusterDelSlot(slot);
clusterAddSlot(n, slot);

/* If we are a primary left without slots, we should turn into a
* replica of the new primary. */
if (slot_was_mine && n != myself && myself->numslots == 0 && server.cluster_allow_replica_migration) {
/* If replica migration is allowed, check if the primary of this shard
* loses its last slot and the shard becomes empty. In this case, we
* should turn into a replica of the new primary. */
if (server.cluster_allow_replica_migration && slot_was_mine && my_primary->numslots == 0) {
serverAssert(n != my_primary);
serverLog(LL_NOTICE,
"Lost my last slot during slot migration. Reconfiguring myself "
"as a replica of %.40s (%s) in shard %.40s",
n->name, n->human_nodename, n->shard_id);
/* `c` is the primary client if `myself` is a replica, prevent it
* from being freed by clusterSetPrimary. */
if (nodeIsReplica(myself)) protectClient(c);
/* We are migrating to a different shard that has a completely different
* replication history, so a full sync is required. */
clusterSetPrimary(n, 1, 1);
if (nodeIsReplica(myself)) unprotectClient(c);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG);
}

Expand Down
62 changes: 32 additions & 30 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ typedef enum {
keyStatus expireIfNeeded(serverDb *db, robj *key, int flags);
int keyIsExpired(serverDb *db, robj *key);
static void dbSetValue(serverDb *db, robj *key, robj *val, int overwrite, dictEntry *de);
static int getKVStoreIndexForKey(sds key);

/* Update LFU when an object is accessed.
* Firstly, decrement the counter if the decrement time is reached.
Expand Down Expand Up @@ -125,7 +126,7 @@ robj *lookupKey(serverDb *db, robj *key, int flags) {
if (!hasActiveChildProcess() && !(flags & LOOKUP_NOTOUCH)) {
if (!canUseSharedObject() && val->refcount == OBJ_SHARED_REFCOUNT) {
val = dupStringObject(val);
kvstoreDictSetVal(db->keys, getKeySlot(key->ptr), de, val);
kvstoreDictSetVal(db->keys, getKVStoreIndexForKey(key->ptr), de, val);
}
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
updateLFU(val);
Expand Down Expand Up @@ -202,15 +203,15 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) {
* if the key already exists, otherwise, it can fall back to dbOverwrite. */
static void dbAddInternal(serverDb *db, robj *key, robj *val, int update_if_existing) {
dictEntry *existing;
int slot = getKeySlot(key->ptr);
dictEntry *de = kvstoreDictAddRaw(db->keys, slot, key->ptr, &existing);
int dict_index = getKVStoreIndexForKey(key->ptr);
dictEntry *de = kvstoreDictAddRaw(db->keys, dict_index, key->ptr, &existing);
if (update_if_existing && existing) {
dbSetValue(db, key, val, 1, existing);
return;
}
serverAssertWithInfo(NULL, key, de != NULL);
initObjectLRUOrLFU(val);
kvstoreDictSetVal(db->keys, slot, de, val);
kvstoreDictSetVal(db->keys, dict_index, de, val);
signalKeyAsReady(db, key, val->type);
notifyKeyspaceEvent(NOTIFY_NEW, "new", key, db->id);
}
Expand All @@ -219,32 +220,33 @@ void dbAdd(serverDb *db, robj *key, robj *val) {
dbAddInternal(db, key, val, 0);
}

/* Returns key's hash slot when cluster mode is enabled, or 0 when disabled.
* The only difference between this function and getKeySlot, is that it's not using cached key slot from the
* current_client and always calculates CRC hash. This is useful when slot needs to be calculated for a key that user
* didn't request for, such as in case of eviction. */
int calculateKeySlot(sds key) {
return server.cluster_enabled ? keyHashSlot(key, (int)sdslen(key)) : 0;
/* Returns which dict index should be used with kvstore for a given key. */
static int getKVStoreIndexForKey(sds key) {
return server.cluster_enabled ? getKeySlot(key) : 0;
}

/* Return slot-specific dictionary for key based on key's hash slot when cluster mode is enabled, else 0.*/
/* Returns the cluster hash slot for a given key, trying to use the cached slot that
* stored on the server.current_client first. If there is no cached value, it will compute the hash slot
* and then cache the value.*/
int getKeySlot(sds key) {
serverAssert(server.cluster_enabled);
/* This is performance optimization that uses pre-set slot id from the current command,
* in order to avoid calculation of the key hash.
*
* This optimization is only used when current_client flag `CLIENT_EXECUTING_COMMAND` is set.
* It only gets set during the execution of command under `call` method. Other flows requesting
* the key slot would fallback to calculateKeySlot.
* the key slot would fallback to keyHashSlot.
*
* Modules and scripts executed on the primary may get replicated as multi-execs that operate on multiple slots,
* so we must always recompute the slot for commands coming from the primary.
*/
if (server.current_client && server.current_client->slot >= 0 && server.current_client->flag.executing_command &&
!server.current_client->flag.primary) {
debugServerAssertWithInfo(server.current_client, NULL, calculateKeySlot(key) == server.current_client->slot);
debugServerAssertWithInfo(server.current_client, NULL,
(int)keyHashSlot(key, (int)sdslen(key)) == server.current_client->slot);
return server.current_client->slot;
}
int slot = calculateKeySlot(key);
int slot = keyHashSlot(key, (int)sdslen(key));
/* For the case of replicated commands from primary, getNodeByQuery() never gets called,
* and thus c->slot never gets populated. That said, if this command ends up accessing a key,
* we are able to backfill c->slot here, where the key's hash calculation is made. */
Expand All @@ -267,11 +269,11 @@ int getKeySlot(sds key) {
* In this case a copy of `key` is copied in kvstore, the caller must ensure the `key` is properly freed.
*/
int dbAddRDBLoad(serverDb *db, sds key, robj *val) {
int slot = getKeySlot(key);
dictEntry *de = kvstoreDictAddRaw(db->keys, slot, key, NULL);
int dict_index = server.cluster_enabled ? getKeySlot(key) : 0;
dictEntry *de = kvstoreDictAddRaw(db->keys, dict_index, key, NULL);
if (de == NULL) return 0;
initObjectLRUOrLFU(val);
kvstoreDictSetVal(db->keys, slot, de, val);
kvstoreDictSetVal(db->keys, dict_index, de, val);
return 1;
}

Expand All @@ -288,8 +290,8 @@ int dbAddRDBLoad(serverDb *db, sds key, robj *val) {
*
* The program is aborted if the key was not already present. */
static void dbSetValue(serverDb *db, robj *key, robj *val, int overwrite, dictEntry *de) {
int slot = getKeySlot(key->ptr);
if (!de) de = kvstoreDictFind(db->keys, slot, key->ptr);
int dict_index = getKVStoreIndexForKey(key->ptr);
if (!de) de = kvstoreDictFind(db->keys, dict_index, key->ptr);
serverAssertWithInfo(NULL, key, de != NULL);
robj *old = dictGetVal(de);

Expand All @@ -309,7 +311,7 @@ static void dbSetValue(serverDb *db, robj *key, robj *val, int overwrite, dictEn
/* Because of RM_StringDMA, old may be changed, so we need get old again */
old = dictGetVal(de);
}
kvstoreDictSetVal(db->keys, slot, de, val);
kvstoreDictSetVal(db->keys, dict_index, de, val);
/* For efficiency, let the I/O thread that allocated an object also deallocate it. */
if (tryOffloadFreeObjToIOThreads(old) == C_OK) {
/* OK */
Expand Down Expand Up @@ -404,8 +406,8 @@ robj *dbRandomKey(serverDb *db) {
int dbGenericDelete(serverDb *db, robj *key, int async, int flags) {
dictEntry **plink;
int table;
int slot = getKeySlot(key->ptr);
dictEntry *de = kvstoreDictTwoPhaseUnlinkFind(db->keys, slot, key->ptr, &plink, &table);
int dict_index = getKVStoreIndexForKey(key->ptr);
dictEntry *de = kvstoreDictTwoPhaseUnlinkFind(db->keys, dict_index, key->ptr, &plink, &table);
if (de) {
robj *val = dictGetVal(de);
/* RM_StringDMA may call dbUnshareStringValue which may free val, so we
Expand All @@ -421,13 +423,13 @@ int dbGenericDelete(serverDb *db, robj *key, int async, int flags) {
if (async) {
/* Because of dbUnshareStringValue, the val in de may change. */
freeObjAsync(key, dictGetVal(de), db->id);
kvstoreDictSetVal(db->keys, slot, de, NULL);
kvstoreDictSetVal(db->keys, dict_index, de, NULL);
}
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
kvstoreDictDelete(db->expires, slot, key->ptr);
kvstoreDictDelete(db->expires, dict_index, key->ptr);

kvstoreDictTwoPhaseUnlinkFree(db->keys, slot, de, plink, table);
kvstoreDictTwoPhaseUnlinkFree(db->keys, dict_index, de, plink, table);
return 1;
} else {
return 0;
Expand Down Expand Up @@ -1664,7 +1666,7 @@ void swapdbCommand(client *c) {
*----------------------------------------------------------------------------*/

int removeExpire(serverDb *db, robj *key) {
return kvstoreDictDelete(db->expires, getKeySlot(key->ptr), key->ptr) == DICT_OK;
return kvstoreDictDelete(db->expires, getKVStoreIndexForKey(key->ptr), key->ptr) == DICT_OK;
}

/* Set an expire to the specified key. If the expire is set in the context
Expand All @@ -1675,10 +1677,10 @@ void setExpire(client *c, serverDb *db, robj *key, long long when) {
dictEntry *kde, *de, *existing;

/* Reuse the sds from the main dict in the expire dict */
int slot = getKeySlot(key->ptr);
kde = kvstoreDictFind(db->keys, slot, key->ptr);
int dict_index = getKVStoreIndexForKey(key->ptr);
kde = kvstoreDictFind(db->keys, dict_index, key->ptr);
serverAssertWithInfo(NULL, key, kde != NULL);
de = kvstoreDictAddRaw(db->expires, slot, dictGetKey(kde), &existing);
de = kvstoreDictAddRaw(db->expires, dict_index, dictGetKey(kde), &existing);
if (existing) {
dictSetSignedIntegerVal(existing, when);
} else {
Expand Down Expand Up @@ -1896,7 +1898,7 @@ int dbExpandExpires(serverDb *db, uint64_t db_size, int try_expand) {
}

static dictEntry *dbFindGeneric(kvstore *kvs, void *key) {
return kvstoreDictFind(kvs, getKeySlot(key), key);
return kvstoreDictFind(kvs, server.cluster_enabled ? getKeySlot(key) : 0, key);
}

dictEntry *dbFind(serverDb *db, void *key) {
Expand Down
Loading

0 comments on commit 0d13f5d

Please sign in to comment.