From affbea5dc1ae1a0d80019c4f313d2bf1c3fcb7f9 Mon Sep 17 00:00:00 2001 From: bentotten <59932872+bentotten@users.noreply.github.com> Date: Mon, 9 Sep 2024 20:46:02 -0700 Subject: [PATCH 01/17] For MEETs, save the extensions support flag immediately during MEET processing (#778) For backwards compatibility reasons, a node will wait until it receives a cluster message with the extensions flag before sending its own extensions. This leads to a delay in shard ID propagation that can corrupt nodes.conf with inaccurate shard IDs if a node is restarted before this can stabilize. This fixes much of that delay by immediately triggering the extensions-supported flag during the MEET processing and attaching the node to the link, allowing the PONG reply to contain OSS extensions. Partially fixes #774 --------- Signed-off-by: Ben Totten Co-authored-by: Ben Totten --- src/cluster_legacy.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index f3925f5695..12cd03e21c 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -3156,7 +3156,10 @@ int clusterProcessPacket(clusterLink *link) { /* Add this node if it is new for us and the msg type is MEET. * In this stage we don't try to add the node with the right * flags, replicaof pointer, and so forth, as this details will be - * resolved when we'll receive PONGs from the node. */ + * resolved when we'll receive PONGs from the node. The exception + * to this is the flag that indicates extensions are supported, as + * we want to send extensions right away in the return PONG in order + * to reduce the amount of time needed to stabilize the shard ID. */ if (!sender && type == CLUSTERMSG_TYPE_MEET) { clusterNode *node; @@ -3164,6 +3167,10 @@ int clusterProcessPacket(clusterLink *link) { serverAssert(nodeIp2String(node->ip, link, hdr->myip) == C_OK); getClientPortFromClusterMsg(hdr, &node->tls_port, &node->tcp_port); node->cport = ntohs(hdr->cport); + if (hdr->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA) { + node->flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED; + } + setClusterNodeToInboundClusterLink(node, link); clusterAddNode(node); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); } From 9f0c80187e55517a8ee23f4ad31a65622e45fb84 Mon Sep 17 00:00:00 2001 From: uriyage <78144248+uriyage@users.noreply.github.com> Date: Tue, 10 Sep 2024 21:20:10 +0300 Subject: [PATCH 02/17] Fix crash in async IO threads with TLS (#1011) Fix for https://github.com/valkey-io/valkey/issues/997 Root Cause Analysis: 1. Two different jobs (READ and WRITE) may be sent to the same IO thread. 2. When processing the read job in `processIOThreadsReadDone`, the IO thread may find that the write job has also been completed. 3. In this case, the IO thread calls `processClientIOWriteDone` to first process the completed write job and free the COBs https://github.com/valkey-io/valkey/blob/affbea5dc1ae1a0d80019c4f313d2bf1c3fcb7f9/src/networking.c#L4666 4. If there are pending writes (resulting from pipeline commands), a new async IO write job is sent before processing the completed read job https://github.com/valkey-io/valkey/blob/affbea5dc1ae1a0d80019c4f313d2bf1c3fcb7f9/src/networking.c#L2417 When sending the write job, the `TLS_CONN_FLAG_POSTPONE_UPDATE_STATE` flag is set to prevent the IO thread from updating the event loop, which is not thread-safe. 5. Upon resuming the read job processing, the flag is cleared, https://github.com/valkey-io/valkey/blob/affbea5dc1ae1a0d80019c4f313d2bf1c3fcb7f9/src/networking.c#L4685 causing the IO thread to update the event loop. Fix: Prevent sending async write job for pending writes when a read job is about to be processed. Testing: The issue could not be reproduced due to its rare occurrence, which requires multiple specific conditions to align simultaneously. Signed-off-by: Uri Yagelnik --- src/networking.c | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/networking.c b/src/networking.c index 503a85d693..0c6716c504 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2384,8 +2384,9 @@ parseResult handleParseResults(client *c) { /* Process the completion of an IO write operation for a client. * This function handles various post-write tasks, including updating client state, + * allow_async_writes - A flag indicating whether I/O threads can handle pending writes for this client. * returns 1 if processing completed successfully, 0 if processing is skipped. */ -int processClientIOWriteDone(client *c) { +int processClientIOWriteDone(client *c, int allow_async_writes) { /* memory barrier acquire to get the latest client state */ atomic_thread_fence(memory_order_acquire); /* If a client is protected, don't proceed to check the write results as it may trigger conn close. */ @@ -2414,7 +2415,7 @@ int processClientIOWriteDone(client *c) { installClientWriteHandler(c); } else { /* If we can send the client to the I/O thread, let it handle the write. */ - if (trySendWriteToIOThreads(c) == C_OK) return 1; + if (allow_async_writes && trySendWriteToIOThreads(c) == C_OK) return 1; /* Try again in the next eventloop */ putClientInPendingWriteQueue(c); } @@ -2442,7 +2443,7 @@ int processIOThreadsWriteDone(void) { /* Client is still waiting for a pending I/O - skip it */ if (c->io_write_state == CLIENT_PENDING_IO || c->io_read_state == CLIENT_PENDING_IO) continue; - processed += processClientIOWriteDone(c); + processed += processClientIOWriteDone(c, 1); } return processed; @@ -4663,7 +4664,8 @@ int processIOThreadsReadDone(void) { if (c->io_write_state == CLIENT_PENDING_IO || c->io_read_state == CLIENT_PENDING_IO) continue; /* If the write job is done, process it ASAP to free the buffer and handle connection errors */ if (c->io_write_state == CLIENT_COMPLETED_IO) { - processClientIOWriteDone(c); + int allow_async_writes = 0; /* Don't send writes for the client to IO threads before processing the reads */ + processClientIOWriteDone(c, allow_async_writes); } /* memory barrier acquire to get the updated client state */ atomic_thread_fence(memory_order_acquire); From 58fe9c0138af8a45dfcb906a3d5c631a6d6e9a30 Mon Sep 17 00:00:00 2001 From: Lipeng Zhu Date: Wed, 11 Sep 2024 04:09:18 +0800 Subject: [PATCH 03/17] Use hashtable as the default type of temp set object during sunion/sdiff (#996) This patch try to set the temp set object as default hash table type. And did a simple predication of the temp set object encoding when initialize `dstset` to reduce the unnecessary conversation. ## Issue Description According to existing code logic, when did operation like `sunion` and `sdiff` , the temp set object could be `intset`, `listpack` and `hashtable`, for the `listpack`, the efficiency is low when did operation like `find` and `compare` , need to traverse all elements. When we exploring the hotspots, found the `lpFind` and `memcmp` has been the bottleneck when running workloads like below: - [memtier_benchmark-2keys-set-10-100-elements-sunion.yml](https://github.com/redis/redis-benchmarks-specification/blob/main/redis_benchmarks_specification/test-suites/memtier_benchmark-2keys-set-10-100-elements-sunion.yml) - [memtier_benchmark-2keys-set-10-100-elements-sdiff.yml](https://github.com/redis/redis-benchmarks-specification/blob/main/redis_benchmarks_specification/test-suites/memtier_benchmark-2keys-set-10-100-elements-sdiff.yml) ![image](https://github.com/user-attachments/assets/71dfc70b-2ad5-4832-a338-712deefca20e) ## Optimization This patch try to set the temp set object as default hash table type. And did a simple predication of the temp set object encoding when initialize `dstset` to reduce the unnecessary conversation. ### Test Environment - OPERATING SYSTEM: Ubuntu 22.04.4 LTS - Kernel: 5.15.0-116-generic - PROCESSOR: Intel Xeon Platinum 8380 - Server and Client in same socket. #### Server Configuration ``` taskset -c 0-3 ~/valkey/src/valkey-server /tmp/valkey.conf port 9001 bind * -::* daemonize no protected-mode no save "" ``` #### Performance Boost | Test Name| Perf Boost| |-|-| |[memtier_benchmark-2keys-set-10-100-elements-sunion.yml](https://github.com/redis/redis-benchmarks-specification/blob/main/redis_benchmarks_specification/test-suites/memtier_benchmark-2keys-set-10-100-elements-sunion.yml) |41%| |[memtier_benchmark-2keys-set-10-100-elements-sdiff.yml](https://github.com/redis/redis-benchmarks-specification/blob/main/redis_benchmarks_specification/test-suites/memtier_benchmark-2keys-set-10-100-elements-sdiff.yml) |27%| ### More Tests With above test set which have total 110 elements in the 2 given sets. We also did some benchmarking by adjusting the total number of elements in all given sets. We can still observe the performance boost. ![image](https://github.com/user-attachments/assets/b2ab420c-43e5-45de-9715-7d943df229cb) --------- Signed-off-by: Lipeng Zhu Co-authored-by: Wangyang Guo --- src/t_set.c | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/src/t_set.c b/src/t_set.c index b2aeec52e7..a540c3c49b 100644 --- a/src/t_set.c +++ b/src/t_set.c @@ -1445,6 +1445,7 @@ void sunionDiffGenericCommand(client *c, robj **setkeys, int setnum, robj *dstke robj **sets = zmalloc(sizeof(robj *) * setnum); setTypeIterator *si; robj *dstset = NULL; + int dstset_encoding = OBJ_ENCODING_INTSET; char *str; size_t len; int64_t llval; @@ -1463,6 +1464,23 @@ void sunionDiffGenericCommand(client *c, robj **setkeys, int setnum, robj *dstke zfree(sets); return; } + /* For a SET's encoding, according to the factory method setTypeCreate(), currently have 3 types: + * 1. OBJ_ENCODING_INTSET + * 2. OBJ_ENCODING_LISTPACK + * 3. OBJ_ENCODING_HT + * 'dstset_encoding' is used to determine which kind of encoding to use when initialize 'dstset'. + * + * If all sets are all OBJ_ENCODING_INTSET encoding or 'dstkey' is not null, keep 'dstset' + * OBJ_ENCODING_INTSET encoding when initialize. Otherwise it is not efficient to create the 'dstset' + * from intset and then convert to listpack or hashtable. + * + * If one of the set is OBJ_ENCODING_LISTPACK, let's set 'dstset' to hashtable default encoding, + * the hashtable is more efficient when find and compare than the listpack. The corresponding + * time complexity are O(1) vs O(n). */ + if (!dstkey && dstset_encoding == OBJ_ENCODING_INTSET && + (setobj->encoding == OBJ_ENCODING_LISTPACK || setobj->encoding == OBJ_ENCODING_HT)) { + dstset_encoding = OBJ_ENCODING_HT; + } sets[j] = setobj; if (j > 0 && sets[0] == sets[j]) { sameset = 1; @@ -1504,7 +1522,11 @@ void sunionDiffGenericCommand(client *c, robj **setkeys, int setnum, robj *dstke /* We need a temp set object to store our union/diff. If the dstkey * is not NULL (that is, we are inside an SUNIONSTORE/SDIFFSTORE operation) then * this set object will be the resulting object to set into the target key*/ - dstset = createIntsetObject(); + if (dstset_encoding == OBJ_ENCODING_INTSET) { + dstset = createIntsetObject(); + } else { + dstset = createSetObject(); + } if (op == SET_OP_UNION) { /* Union is trivial, just add every element of every set to the From 1b241684508bd6fd63c083c25a9a4195a8e242d9 Mon Sep 17 00:00:00 2001 From: Amit Nagler <58042354+naglera@users.noreply.github.com> Date: Wed, 11 Sep 2024 03:26:28 +0300 Subject: [PATCH 04/17] Dual Channel Replication - Verify Replica Local Buffer Limit Configuration (#989) Prior to comparing the replica buffer against the configured limit, we need to ensure that the limit configuration is enabled. If the limit is set to zero, it indicates that there is no limit, and we should skip the buffer limit check. --------- Signed-off-by: naglera --- src/replication.c | 3 ++- tests/integration/replication-buffer.tcl | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/replication.c b/src/replication.c index c1e4d5c2ce..a7040e2261 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2805,7 +2805,8 @@ void bufferReplData(connection *conn) { remaining_bytes = readIntoReplDataBlock(conn, tail, remaining_bytes); } if (readlen && remaining_bytes == 0) { - if (server.pending_repl_data.len > server.client_obuf_limits[CLIENT_TYPE_REPLICA].hard_limit_bytes) { + if (server.client_obuf_limits[CLIENT_TYPE_REPLICA].hard_limit_bytes && + server.pending_repl_data.len > server.client_obuf_limits[CLIENT_TYPE_REPLICA].hard_limit_bytes) { serverLog(LL_NOTICE, "Replication buffer limit reached, stopping buffering."); /* Stop accumulating primary commands. */ connSetReadHandler(conn, NULL); diff --git a/tests/integration/replication-buffer.tcl b/tests/integration/replication-buffer.tcl index 18f8aa7eb5..66a8581dcc 100644 --- a/tests/integration/replication-buffer.tcl +++ b/tests/integration/replication-buffer.tcl @@ -132,8 +132,9 @@ start_server {} { # with master. $master config set repl-timeout 1000 $replica1 config set repl-timeout 1000 + $replica1 config set client-output-buffer-limit "replica 1024 0 0" $replica2 config set repl-timeout 1000 - $replica2 config set client-output-buffer-limit "replica 0 0 0" + $replica2 config set client-output-buffer-limit "replica 1024 0 0" $replica2 config set dual-channel-replication-enabled $dualchannel $replica1 replicaof $master_host $master_port From 4033c99ef522cd66150878dee8a97dc057b05a25 Mon Sep 17 00:00:00 2001 From: Binbin Date: Wed, 11 Sep 2024 12:00:08 +0800 Subject: [PATCH 05/17] Fix module RdbLoad wrongly disable the AOF (#1001) In RdbLoad, we disable AOF before emptyData and rdbLoad to prevent copy-on-write issues. After rdbLoad completes, AOF should be re-enabled, but the code incorrectly checks server.aof_state, which has been reset to AOF_OFF in stopAppendOnly. This leads to AOF not being re-enabled after being disabled. --------- Signed-off-by: Binbin --- src/module.c | 8 +++++++- tests/unit/moduleapi/rdbloadsave.tcl | 3 +++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/src/module.c b/src/module.c index e45b8f4181..24cc6a42e7 100644 --- a/src/module.c +++ b/src/module.c @@ -12965,6 +12965,9 @@ int VM_RdbLoad(ValkeyModuleCtx *ctx, ValkeyModuleRdbStream *stream, int flags) { disconnectReplicas(); freeReplicationBacklog(); + /* Stop and kill existing AOF rewriting fork as it is saving outdated data, + * we will re-enable it after the rdbLoad. Also killing it will prevent COW + * memory issue. */ if (server.aof_state != AOF_OFF) stopAppendOnly(); /* Kill existing RDB fork as it is saving outdated data. Also killing it @@ -12983,7 +12986,10 @@ int VM_RdbLoad(ValkeyModuleCtx *ctx, ValkeyModuleRdbStream *stream, int flags) { int ret = rdbLoad(stream->data.filename, NULL, RDBFLAGS_NONE); if (server.current_client) unprotectClient(server.current_client); - if (server.aof_state != AOF_OFF) startAppendOnly(); + + /* Here we need to decide whether to enable the AOF based on the aof_enabled, + * since the previous stopAppendOnly sets aof_state to AOF_OFF. */ + if (server.aof_enabled) startAppendOnly(); if (ret != RDB_OK) { errno = (ret == RDB_NOT_EXIST) ? ENOENT : EIO; diff --git a/tests/unit/moduleapi/rdbloadsave.tcl b/tests/unit/moduleapi/rdbloadsave.tcl index 37841aa9aa..6e2fadc601 100644 --- a/tests/unit/moduleapi/rdbloadsave.tcl +++ b/tests/unit/moduleapi/rdbloadsave.tcl @@ -80,6 +80,9 @@ start_server {tags {"modules"}} { fail "Can't find 'Killing AOF child' in recent log lines" } + # Verify that AOF is active. + assert_equal 1 [s aof_enabled] + # Verify the value in the loaded rdb assert_equal v1 [r get k] From c77e8f223ce18b51749fb7d1191ed305a64eda79 Mon Sep 17 00:00:00 2001 From: Mikhail Koviazin Date: Wed, 11 Sep 2024 08:50:35 +0300 Subject: [PATCH 06/17] Added .git-blame-ignore-revs (#1010) This file enables developers to ignore the certain revisions in git-blame. This is quite handy considering there was a commit that reformatted the large amount of code in valkey. As a downside, one has to do a manual step for each clone of valkey to enable this feature. The instructions are available in the file itself. --------- Signed-off-by: Mikhail Koviazin --- .git-blame-ignore-revs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 .git-blame-ignore-revs diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs new file mode 100644 index 0000000000..0cf98960a5 --- /dev/null +++ b/.git-blame-ignore-revs @@ -0,0 +1,14 @@ +# This is a file that can be used by git-blame to ignore some revisions. +# (git 2.23+, released in August 2019) +# +# Can be configured as follow: +# +# $ git config blame.ignoreRevsFile .git-blame-ignore-revs +# +# For more information you can look at git-blame(1) man page. + +# Applied clang-format (#323) +c41dd77a3e93e02be3c4bc75d8c76b7b4169a4ce + +# Removed terms `master` and `slave` from the source code (#591) +54c97479356ecf41b4b63733494a1be2ab919e17 From 2b207ee1b3808c5eb5de6879651104044ca162b2 Mon Sep 17 00:00:00 2001 From: Madelyn Olson Date: Wed, 11 Sep 2024 09:52:34 -0700 Subject: [PATCH 07/17] Improve stability of hostnames test (#1016) Maybe partially resolves https://github.com/valkey-io/valkey/issues/952. The hostnames test relies on an assumption that node zero and node six don't communicate with each other to test a bunch of behavior in the handshake stake. This was done by previously dropping all meet packets, however it seems like there was some case where node zero was sending a single pong message to node 6, which was partially initializing the state. I couldn't track down why this happened, but I adjusted the test to simply pause node zero which also correctly emulates the state we want to be in since we're just testing state on node 6, and removes the chance of errant messages. The test was failing about 5% of the time locally, and I wasn't able to reproduce a failure with this new configuration. --------- Signed-off-by: Madelyn Olson --- tests/unit/cluster/hostnames.tcl | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/unit/cluster/hostnames.tcl b/tests/unit/cluster/hostnames.tcl index 232c6cf818..9a74fd0d56 100644 --- a/tests/unit/cluster/hostnames.tcl +++ b/tests/unit/cluster/hostnames.tcl @@ -132,11 +132,14 @@ test "Verify the nodes configured with prefer hostname only show hostname for ne R $j config set cluster-announce-hostname "shard-$j.com" } + # Grab the ID so we have it later for validation + set primary_id [R 0 CLUSTER MYID] + # Prevent Node 0 and Node 6 from properly meeting, # they'll hang in the handshake phase. This allows us to # test the case where we "know" about it but haven't # successfully retrieved information about it yet. - R 0 DEBUG DROP-CLUSTER-PACKET-FILTER 0 + pause_process [srv 0 pid] R 6 DEBUG DROP-CLUSTER-PACKET-FILTER 0 # Have a replica meet the isolated node @@ -174,12 +177,11 @@ test "Verify the nodes configured with prefer hostname only show hostname for ne # Also make sure we know about the isolated master, we # just can't reach it. - set master_id [R 0 CLUSTER MYID] - assert_match "*$master_id*" [R 6 CLUSTER NODES] + assert_match "*$primary_id*" [R 6 CLUSTER NODES] # Stop dropping cluster packets, and make sure everything # stabilizes - R 0 DEBUG DROP-CLUSTER-PACKET-FILTER -1 + resume_process [srv 0 pid] R 6 DEBUG DROP-CLUSTER-PACKET-FILTER -1 # This operation sometimes spikes to around 5 seconds to resolve the state, From fa348e2e59700244e60df2edeb1219f2989e0284 Mon Sep 17 00:00:00 2001 From: Madelyn Olson Date: Wed, 11 Sep 2024 09:53:42 -0700 Subject: [PATCH 08/17] Optimize the per slot dictionary by checking for cluster mode earlier (#995) While doing some profiling, I noticed that getKeySlot() was a fairly large part (~0.7%) of samples doing perf with high pipeline during standalone. I think this is because we do a very late check for server.cluster_mode, we first call getKeySlot() and then call calculateKeySlot(). (calculateKeySlot was surprisingly not automatically inlined, we were doing a jump into it and then immediately returning zero). We then also do useless work in the form of caching zero in client->slot, which will further mess with cache lines. So, this PR tries to accomplish a few things things. 1) The usage of the `slot` name made a lot more sense before the introduction of the kvstore. Now with kvstore, we call this the database index, so all the references to slot in standalone are no longer really accurate. 2) Pull the cluster mode check all the way out of getKeySlot(), so hopefully a bit more performant. 3) Remove calculateKeySlot() as independent from getKeySlot(). calculateKeySlot used to have 3 call sites outside of db.c, which warranted it's own function. It's now only called in two places, pubsub.c and networking.c. I ran some profiling, and saw about ~0.3% improvement, but don't really trust it because you'll see a much higher (~2%) variance in test runs just by how the branch predictions will get changed with a new memory layout. Running perf again showed no samples in getKeySlot() and a reduction in samples in lookupKey(), so maybe this will help a little bit. --------- Signed-off-by: Madelyn Olson --- src/db.c | 62 +++++++++++++++++++++++++----------------------- src/networking.c | 2 +- src/pubsub.c | 3 ++- 3 files changed, 35 insertions(+), 32 deletions(-) diff --git a/src/db.c b/src/db.c index 312199ec6c..3493e2d863 100644 --- a/src/db.c +++ b/src/db.c @@ -55,6 +55,7 @@ typedef enum { keyStatus expireIfNeeded(serverDb *db, robj *key, int flags); int keyIsExpired(serverDb *db, robj *key); static void dbSetValue(serverDb *db, robj *key, robj *val, int overwrite, dictEntry *de); +static int getKVStoreIndexForKey(sds key); /* Update LFU when an object is accessed. * Firstly, decrement the counter if the decrement time is reached. @@ -125,7 +126,7 @@ robj *lookupKey(serverDb *db, robj *key, int flags) { if (!hasActiveChildProcess() && !(flags & LOOKUP_NOTOUCH)) { if (!canUseSharedObject() && val->refcount == OBJ_SHARED_REFCOUNT) { val = dupStringObject(val); - kvstoreDictSetVal(db->keys, getKeySlot(key->ptr), de, val); + kvstoreDictSetVal(db->keys, getKVStoreIndexForKey(key->ptr), de, val); } if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { updateLFU(val); @@ -202,15 +203,15 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) { * if the key already exists, otherwise, it can fall back to dbOverwrite. */ static void dbAddInternal(serverDb *db, robj *key, robj *val, int update_if_existing) { dictEntry *existing; - int slot = getKeySlot(key->ptr); - dictEntry *de = kvstoreDictAddRaw(db->keys, slot, key->ptr, &existing); + int dict_index = getKVStoreIndexForKey(key->ptr); + dictEntry *de = kvstoreDictAddRaw(db->keys, dict_index, key->ptr, &existing); if (update_if_existing && existing) { dbSetValue(db, key, val, 1, existing); return; } serverAssertWithInfo(NULL, key, de != NULL); initObjectLRUOrLFU(val); - kvstoreDictSetVal(db->keys, slot, de, val); + kvstoreDictSetVal(db->keys, dict_index, de, val); signalKeyAsReady(db, key, val->type); notifyKeyspaceEvent(NOTIFY_NEW, "new", key, db->id); } @@ -219,32 +220,33 @@ void dbAdd(serverDb *db, robj *key, robj *val) { dbAddInternal(db, key, val, 0); } -/* Returns key's hash slot when cluster mode is enabled, or 0 when disabled. - * The only difference between this function and getKeySlot, is that it's not using cached key slot from the - * current_client and always calculates CRC hash. This is useful when slot needs to be calculated for a key that user - * didn't request for, such as in case of eviction. */ -int calculateKeySlot(sds key) { - return server.cluster_enabled ? keyHashSlot(key, (int)sdslen(key)) : 0; +/* Returns which dict index should be used with kvstore for a given key. */ +static int getKVStoreIndexForKey(sds key) { + return server.cluster_enabled ? getKeySlot(key) : 0; } -/* Return slot-specific dictionary for key based on key's hash slot when cluster mode is enabled, else 0.*/ +/* Returns the cluster hash slot for a given key, trying to use the cached slot that + * stored on the server.current_client first. If there is no cached value, it will compute the hash slot + * and then cache the value.*/ int getKeySlot(sds key) { + serverAssert(server.cluster_enabled); /* This is performance optimization that uses pre-set slot id from the current command, * in order to avoid calculation of the key hash. * * This optimization is only used when current_client flag `CLIENT_EXECUTING_COMMAND` is set. * It only gets set during the execution of command under `call` method. Other flows requesting - * the key slot would fallback to calculateKeySlot. + * the key slot would fallback to keyHashSlot. * * Modules and scripts executed on the primary may get replicated as multi-execs that operate on multiple slots, * so we must always recompute the slot for commands coming from the primary. */ if (server.current_client && server.current_client->slot >= 0 && server.current_client->flag.executing_command && !server.current_client->flag.primary) { - debugServerAssertWithInfo(server.current_client, NULL, calculateKeySlot(key) == server.current_client->slot); + debugServerAssertWithInfo(server.current_client, NULL, + (int)keyHashSlot(key, (int)sdslen(key)) == server.current_client->slot); return server.current_client->slot; } - int slot = calculateKeySlot(key); + int slot = keyHashSlot(key, (int)sdslen(key)); /* For the case of replicated commands from primary, getNodeByQuery() never gets called, * and thus c->slot never gets populated. That said, if this command ends up accessing a key, * we are able to backfill c->slot here, where the key's hash calculation is made. */ @@ -267,11 +269,11 @@ int getKeySlot(sds key) { * In this case a copy of `key` is copied in kvstore, the caller must ensure the `key` is properly freed. */ int dbAddRDBLoad(serverDb *db, sds key, robj *val) { - int slot = getKeySlot(key); - dictEntry *de = kvstoreDictAddRaw(db->keys, slot, key, NULL); + int dict_index = server.cluster_enabled ? getKeySlot(key) : 0; + dictEntry *de = kvstoreDictAddRaw(db->keys, dict_index, key, NULL); if (de == NULL) return 0; initObjectLRUOrLFU(val); - kvstoreDictSetVal(db->keys, slot, de, val); + kvstoreDictSetVal(db->keys, dict_index, de, val); return 1; } @@ -288,8 +290,8 @@ int dbAddRDBLoad(serverDb *db, sds key, robj *val) { * * The program is aborted if the key was not already present. */ static void dbSetValue(serverDb *db, robj *key, robj *val, int overwrite, dictEntry *de) { - int slot = getKeySlot(key->ptr); - if (!de) de = kvstoreDictFind(db->keys, slot, key->ptr); + int dict_index = getKVStoreIndexForKey(key->ptr); + if (!de) de = kvstoreDictFind(db->keys, dict_index, key->ptr); serverAssertWithInfo(NULL, key, de != NULL); robj *old = dictGetVal(de); @@ -309,7 +311,7 @@ static void dbSetValue(serverDb *db, robj *key, robj *val, int overwrite, dictEn /* Because of RM_StringDMA, old may be changed, so we need get old again */ old = dictGetVal(de); } - kvstoreDictSetVal(db->keys, slot, de, val); + kvstoreDictSetVal(db->keys, dict_index, de, val); /* For efficiency, let the I/O thread that allocated an object also deallocate it. */ if (tryOffloadFreeObjToIOThreads(old) == C_OK) { /* OK */ @@ -404,8 +406,8 @@ robj *dbRandomKey(serverDb *db) { int dbGenericDelete(serverDb *db, robj *key, int async, int flags) { dictEntry **plink; int table; - int slot = getKeySlot(key->ptr); - dictEntry *de = kvstoreDictTwoPhaseUnlinkFind(db->keys, slot, key->ptr, &plink, &table); + int dict_index = getKVStoreIndexForKey(key->ptr); + dictEntry *de = kvstoreDictTwoPhaseUnlinkFind(db->keys, dict_index, key->ptr, &plink, &table); if (de) { robj *val = dictGetVal(de); /* RM_StringDMA may call dbUnshareStringValue which may free val, so we @@ -421,13 +423,13 @@ int dbGenericDelete(serverDb *db, robj *key, int async, int flags) { if (async) { /* Because of dbUnshareStringValue, the val in de may change. */ freeObjAsync(key, dictGetVal(de), db->id); - kvstoreDictSetVal(db->keys, slot, de, NULL); + kvstoreDictSetVal(db->keys, dict_index, de, NULL); } /* Deleting an entry from the expires dict will not free the sds of * the key, because it is shared with the main dictionary. */ - kvstoreDictDelete(db->expires, slot, key->ptr); + kvstoreDictDelete(db->expires, dict_index, key->ptr); - kvstoreDictTwoPhaseUnlinkFree(db->keys, slot, de, plink, table); + kvstoreDictTwoPhaseUnlinkFree(db->keys, dict_index, de, plink, table); return 1; } else { return 0; @@ -1664,7 +1666,7 @@ void swapdbCommand(client *c) { *----------------------------------------------------------------------------*/ int removeExpire(serverDb *db, robj *key) { - return kvstoreDictDelete(db->expires, getKeySlot(key->ptr), key->ptr) == DICT_OK; + return kvstoreDictDelete(db->expires, getKVStoreIndexForKey(key->ptr), key->ptr) == DICT_OK; } /* Set an expire to the specified key. If the expire is set in the context @@ -1675,10 +1677,10 @@ void setExpire(client *c, serverDb *db, robj *key, long long when) { dictEntry *kde, *de, *existing; /* Reuse the sds from the main dict in the expire dict */ - int slot = getKeySlot(key->ptr); - kde = kvstoreDictFind(db->keys, slot, key->ptr); + int dict_index = getKVStoreIndexForKey(key->ptr); + kde = kvstoreDictFind(db->keys, dict_index, key->ptr); serverAssertWithInfo(NULL, key, kde != NULL); - de = kvstoreDictAddRaw(db->expires, slot, dictGetKey(kde), &existing); + de = kvstoreDictAddRaw(db->expires, dict_index, dictGetKey(kde), &existing); if (existing) { dictSetSignedIntegerVal(existing, when); } else { @@ -1896,7 +1898,7 @@ int dbExpandExpires(serverDb *db, uint64_t db_size, int try_expand) { } static dictEntry *dbFindGeneric(kvstore *kvs, void *key) { - return kvstoreDictFind(kvs, getKeySlot(key), key); + return kvstoreDictFind(kvs, server.cluster_enabled ? getKeySlot(key) : 0, key); } dictEntry *dbFind(serverDb *db, void *key) { diff --git a/src/networking.c b/src/networking.c index 0c6716c504..24c68dc8a2 100644 --- a/src/networking.c +++ b/src/networking.c @@ -4827,7 +4827,7 @@ void ioThreadReadQueryFromClient(void *data) { int numkeys = getKeysFromCommand(c->io_parsed_cmd, c->argv, c->argc, &result); if (numkeys) { robj *first_key = c->argv[result.keys[0].pos]; - c->slot = calculateKeySlot(first_key->ptr); + c->slot = keyHashSlot(first_key->ptr, sdslen(first_key->ptr)); } getKeysFreeResult(&result); } diff --git a/src/pubsub.c b/src/pubsub.c index 047d408621..5b037b5721 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -667,7 +667,8 @@ void pubsubCommand(client *c) { int j; addReplyArrayLen(c, (c->argc - 2) * 2); for (j = 2; j < c->argc; j++) { - unsigned int slot = calculateKeySlot(c->argv[j]->ptr); + sds key = c->argv[j]->ptr; + unsigned int slot = server.cluster_enabled ? keyHashSlot(key, (int)sdslen(key)) : 0; dict *clients = kvstoreDictFetchValue(server.pubsubshard_channels, slot, c->argv[j]); addReplyBulk(c, c->argv[j]); From 8cca11ac541012e6bfbe995fb0367e6a058719b6 Mon Sep 17 00:00:00 2001 From: uriyage <78144248+uriyage@users.noreply.github.com> Date: Thu, 12 Sep 2024 05:36:40 +0300 Subject: [PATCH 09/17] Fix wrong count for replica's tot-net-out (#1013) Fix duplicate calculation of replica's `net_output_bytes` - Remove redundant calculation leftover from previous refactor - Add test to prevent regression Signed-off-by: Uri Yagelnik Signed-off-by: Binbin Co-authored-by: Binbin --- src/networking.c | 9 +----- tests/unit/introspection.tcl | 54 ++++++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 8 deletions(-) diff --git a/src/networking.c b/src/networking.c index 24c68dc8a2..f9e725e16e 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2129,11 +2129,6 @@ int _writeToClient(client *c) { return tot_written > 0 ? C_OK : C_ERR; } -static void postWriteToReplica(client *c) { - serverAssert(inMainThread()); - if (c->nwritten > 0) c->net_output_bytes += c->nwritten; -} - static void _postWriteToClient(client *c) { if (c->nwritten <= 0) return; @@ -2180,9 +2175,7 @@ int postWriteToClient(client *c) { c->io_last_bufpos = 0; /* Update total number of writes on server */ server.stat_total_writes_processed++; - if (getClientType(c) == CLIENT_TYPE_REPLICA) { - postWriteToReplica(c); - } else { + if (getClientType(c) != CLIENT_TYPE_REPLICA) { _postWriteToClient(c); } diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index 6d0e48e39c..d7253f3750 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -836,6 +836,60 @@ start_server {tags {"introspection"}} { assert_match {*'replicaof "--127.0.0.1"'*wrong number of arguments*} $err } {} {external:skip} + test {tot-net-out for replica client} { + start_server {} { + start_server {} { + set primary [srv -1 client] + set primary_host [srv -1 host] + set primary_port [srv -1 port] + set primary_pid [srv -1 pid] + set replica [srv 0 client] + set replica_pid [srv 0 pid] + + $replica replicaof $primary_host $primary_port + + # Wait for replica to be connected before proceeding. + wait_for_ofs_sync $primary $replica + + # Avoid PINGs to make sure tot-net-out is stable. + $primary config set repl-ping-replica-period 3600 + + # Increase repl timeout to avoid replica disconnecting + $primary config set repl-timeout 3600 + $replica config set repl-timeout 3600 + + # Wait for the replica to receive the command. + wait_for_ofs_sync $primary $replica + + # Get the tot-net-out of the replica before sending the command. + set info_list [$primary client list] + foreach info [split $info_list "\r\n"] { + if {[string match "* flags=S *" $info]} { + set out_before [get_field_in_client_info $info "tot-net-out"] + break + } + } + + # Send a command to the primary. + set value_size 10000 + $primary set foo [string repeat "a" $value_size] + + # Get the tot-net-out of the replica after sending the command. + set info_list [$primary client list] + foreach info [split $info_list "\r\n"] { + if {[string match "* flags=S *" $info]} { + set out_after [get_field_in_client_info $info "tot-net-out"] + break + } + } + + assert_morethan $out_before 0 + assert_morethan $out_after 0 + assert_lessthan $out_after [expr $out_before + $value_size + 1000] ; # + 1000 to account for protocol overhead etc + } + } + } {} {external:skip} + test {valkey-server command line arguments - allow passing option name and option value in the same arg} { start_server {config "default.conf" args {"--maxmemory 700mb" "--maxmemory-policy volatile-lru"}} { assert_match [r config get maxmemory] {maxmemory 734003200} From 3513f220276c1905aeaadc362b59a7ed10cad168 Mon Sep 17 00:00:00 2001 From: xu0o0 Date: Thu, 12 Sep 2024 13:33:07 +0800 Subject: [PATCH 10/17] Make clang-format insert a newline at end of file if missing (#1023) clang generates warning if there is no newline at the end of the source file. Update .clang-format to handle the missing newline at eof. Signed-off-by: haoqixu --- src/.clang-format | 1 + 1 file changed, 1 insertion(+) diff --git a/src/.clang-format b/src/.clang-format index dceaa4b029..2f0548a93d 100644 --- a/src/.clang-format +++ b/src/.clang-format @@ -30,3 +30,4 @@ SortIncludes: false AllowAllParametersOfDeclarationOnNextLine: false BinPackParameters: false AlignAfterOpenBracket: Align +InsertNewlineAtEOF: true From 76a59788e6b99deb33f3df7fa9946c063eb981f6 Mon Sep 17 00:00:00 2001 From: Ping Xie Date: Wed, 11 Sep 2024 23:19:32 -0700 Subject: [PATCH 11/17] Re-enable empty-shard slot migration tests (#1024) Related to #734 and #858 Signed-off-by: Ping Xie --- tests/unit/cluster/slot-migration.tcl | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/unit/cluster/slot-migration.tcl b/tests/unit/cluster/slot-migration.tcl index d798971968..c117e8304e 100644 --- a/tests/unit/cluster/slot-migration.tcl +++ b/tests/unit/cluster/slot-migration.tcl @@ -218,7 +218,6 @@ proc create_empty_shard {p r} { # Temporarily disable empty shard migration tests while we # work to reduce their flakiness. See https://github.com/valkey-io/valkey/issues/858. -if {0} { start_cluster 3 5 {tags {external:skip cluster} overrides {cluster-allow-replica-migration no cluster-node-timeout 1000} } { set node_timeout [lindex [R 0 CONFIG GET cluster-node-timeout] 1] @@ -295,7 +294,6 @@ start_cluster 3 5 {tags {external:skip cluster} overrides {cluster-allow-replica wait_for_slot_state 7 "\[609-<-$R0_id\]" } } -} proc migrate_slot {from to slot} { set from_id [R $from CLUSTER MYID] From 38457b73208d9edadbdeb64dda8b18f57099475a Mon Sep 17 00:00:00 2001 From: Binbin Date: Thu, 12 Sep 2024 15:43:12 +0800 Subject: [PATCH 12/17] Trigger a save of the cluster configuration file before shutting down (#822) The cluster configuration file is the metadata "database" for the cluster. It is best to trigger a save when shutdown the server, to avoid inconsistent content that is not refreshed. We save the nodes.conf whenever something that affects the nodes.conf has changed. But we are saving nodes.conf in clusterBeforeSleep, and some events may save it without a fsync, there is a time gap. And shutdown has its own save seems good to me, it doesn't need to care about the others. At the same time, a comment is added in unlock nodes.conf to explain why we actively unlock when shutdown. Signed-off-by: Binbin --- src/cluster.h | 2 +- src/cluster_legacy.c | 22 ++++++++++++++++++++++ src/server.c | 9 ++------- 3 files changed, 25 insertions(+), 8 deletions(-) diff --git a/src/cluster.h b/src/cluster.h index aefb3a7b82..9961710f9d 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -39,6 +39,7 @@ void clusterInitLast(void); void clusterCron(void); void clusterBeforeSleep(void); int verifyClusterConfigWithData(void); +void clusterHandleServerShutdown(void); int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, @@ -83,7 +84,6 @@ int getNodeDefaultClientPort(clusterNode *n); clusterNode *getMyClusterNode(void); int getClusterSize(void); int getMyShardSlotCount(void); -int handleDebugClusterCommand(client *c); int clusterNodePending(clusterNode *node); int clusterNodeIsPrimary(clusterNode *n); char **getClusterNodesList(size_t *numnodes); diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 12cd03e21c..65d8081978 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -1191,6 +1191,28 @@ void clusterInitLast(void) { } } +/* Called when a cluster node receives SHUTDOWN. */ +void clusterHandleServerShutdown(void) { + /* The error logs have been logged in the save function if the save fails. */ + serverLog(LL_NOTICE, "Saving the cluster configuration file before exiting."); + clusterSaveConfig(1); + +#if !defined(__sun) + /* Unlock the cluster config file before shutdown, see clusterLockConfig. + * + * This is needed if you shutdown a very large server process, it will take + * a while for the OS to release resources and unlock the cluster configuration + * file. Therefore, if we immediately try to restart the server process, it + * may not be able to acquire the lock on the cluster configuration file and + * fail to start. We explicitly releases the lock on the cluster configuration + * file on shutdown, rather than relying on the OS to release the lock, which + * is a cleaner and safer way to release acquired resources. */ + if (server.cluster_config_file_lock_fd != -1) { + flock(server.cluster_config_file_lock_fd, LOCK_UN | LOCK_NB); + } +#endif /* __sun */ +} + /* Reset a node performing a soft or hard reset: * * 1) All other nodes are forgotten. diff --git a/src/server.c b/src/server.c index 253e585e44..8970c43724 100644 --- a/src/server.c +++ b/src/server.c @@ -4422,13 +4422,8 @@ int finishShutdown(void) { /* Close the listening sockets. Apparently this allows faster restarts. */ closeListeningSockets(1); -#if !defined(__sun) - /* Unlock the cluster config file before shutdown */ - if (server.cluster_enabled && server.cluster_config_file_lock_fd != -1) { - flock(server.cluster_config_file_lock_fd, LOCK_UN | LOCK_NB); - } -#endif /* __sun */ - + /* Handle cluster-related matters when shutdown. */ + if (server.cluster_enabled) clusterHandleServerShutdown(); serverLog(LL_WARNING, "%s is now ready to exit, bye bye...", server.sentinel_mode ? "Sentinel" : "Valkey"); return C_OK; From f7c5b401830616652fa9a97c916f40a45166ade2 Mon Sep 17 00:00:00 2001 From: Binbin Date: Fri, 13 Sep 2024 14:53:39 +0800 Subject: [PATCH 13/17] Avoid false positive in election tests (#984) The node may not be able to initiate an election in time due to problems with cluster communication. If an election is initiated, make sure its offset is 0. Closes #967. Signed-off-by: Binbin --- tests/support/cluster_util.tcl | 9 +++++++ tests/unit/cluster/replica-migration.tcl | 33 ++++++++++++++++++------ 2 files changed, 34 insertions(+), 8 deletions(-) diff --git a/tests/support/cluster_util.tcl b/tests/support/cluster_util.tcl index c19aea3c15..dd5cd84df2 100644 --- a/tests/support/cluster_util.tcl +++ b/tests/support/cluster_util.tcl @@ -227,6 +227,15 @@ proc cluster_setup {masters replicas node_count slot_allocator replica_allocator # Setup master/replica relationships $replica_allocator $masters $replicas + # A helper debug log that can print the server id in the server logs. + # This can help us locate the corresponding server in the log file. + for {set i 0} {$i < $masters} {incr i} { + R $i DEBUG LOG "========== I am primary $i ==========" + } + for {set i $i} {$i < [expr $masters+$replicas]} {incr i} { + R $i DEBUG LOG "========== I am replica $i ==========" + } + wait_for_cluster_propagation wait_for_cluster_state "ok" diff --git a/tests/unit/cluster/replica-migration.tcl b/tests/unit/cluster/replica-migration.tcl index 49c31128ba..8053859c69 100644 --- a/tests/unit/cluster/replica-migration.tcl +++ b/tests/unit/cluster/replica-migration.tcl @@ -75,9 +75,15 @@ proc test_migrated_replica {type} { fail "Failover does not happened" } - # Make sure the offset of server 3 / 7 is 0. - verify_log_message -3 "*Start of election*offset 0*" 0 - verify_log_message -7 "*Start of election*offset 0*" 0 + # The node may not be able to initiate an election in time due to + # problems with cluster communication. If an election is initiated, + # we make sure the offset of server 3 / 7 is 0. + if {[count_log_message -3 "Start of election"] != 0} { + verify_log_message -3 "*Start of election*offset 0*" 0 + } + if {[count_log_message -7 "Start of election"] != 0} { + verify_log_message -7 "*Start of election*offset 0*" 0 + } # Make sure the right replica gets the higher rank. verify_log_message -4 "*Start of election*rank #0*" 0 @@ -170,9 +176,14 @@ proc test_nonempty_replica {type} { fail "Failover does not happened" } - # Make sure server 7 gets the lower rank and it's offset is 0. verify_log_message -4 "*Start of election*rank #0*" 0 - verify_log_message -7 "*Start of election*offset 0*" 0 + + # The node may not be able to initiate an election in time due to + # problems with cluster communication. If an election is initiated, + # we make sure server 7 gets the lower rank and it's offset is 0. + if {[count_log_message -7 "Start of election"] != 0} { + verify_log_message -7 "*Start of election*offset 0*" 0 + } # Wait for the cluster to be ok. wait_for_condition 1000 50 { @@ -283,9 +294,15 @@ proc test_sub_replica {type} { fail "Failover does not happened" } - # Make sure the offset of server 3 / 7 is 0. - verify_log_message -3 "*Start of election*offset 0*" 0 - verify_log_message -7 "*Start of election*offset 0*" 0 + # The node may not be able to initiate an election in time due to + # problems with cluster communication. If an election is initiated, + # we make sure the offset of server 3 / 7 is 0. + if {[count_log_message -3 "Start of election"] != 0} { + verify_log_message -3 "*Start of election*offset 0*" 0 + } + if {[count_log_message -7 "Start of election"] != 0} { + verify_log_message -7 "*Start of election*offset 0*" 0 + } # Wait for the cluster to be ok. wait_for_condition 1000 50 { From 3cc619f6378e46b005604e9e24c75f948a7322db Mon Sep 17 00:00:00 2001 From: Ping Xie Date: Fri, 13 Sep 2024 00:02:39 -0700 Subject: [PATCH 14/17] Disable flaky empty shard slot migration tests (#1027) Will continue my investigation offline Signed-off-by: Ping Xie --- tests/unit/cluster/slot-migration.tcl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/unit/cluster/slot-migration.tcl b/tests/unit/cluster/slot-migration.tcl index c117e8304e..d798971968 100644 --- a/tests/unit/cluster/slot-migration.tcl +++ b/tests/unit/cluster/slot-migration.tcl @@ -218,6 +218,7 @@ proc create_empty_shard {p r} { # Temporarily disable empty shard migration tests while we # work to reduce their flakiness. See https://github.com/valkey-io/valkey/issues/858. +if {0} { start_cluster 3 5 {tags {external:skip cluster} overrides {cluster-allow-replica-migration no cluster-node-timeout 1000} } { set node_timeout [lindex [R 0 CONFIG GET cluster-node-timeout] 1] @@ -294,6 +295,7 @@ start_cluster 3 5 {tags {external:skip cluster} overrides {cluster-allow-replica wait_for_slot_state 7 "\[609-<-$R0_id\]" } } +} proc migrate_slot {from to slot} { set from_id [R $from CLUSTER MYID] From d090fbefded1c1f67a4450100fe42d0dbdbad564 Mon Sep 17 00:00:00 2001 From: Wen Hui Date: Fri, 13 Sep 2024 12:22:21 -0400 Subject: [PATCH 15/17] Add the missing help output for new command: client capa redirect (#1025) Update client help output message for new command: client capa redirect --------- Signed-off-by: hwware Signed-off-by: Binbin Co-authored-by: Binbin Co-authored-by: Madelyn Olson --- src/networking.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/networking.c b/src/networking.c index f9e725e16e..8f8db33888 100644 --- a/src/networking.c +++ b/src/networking.c @@ -3485,6 +3485,10 @@ void clientCommand(client *c) { const char *help[] = { "CACHING (YES|NO)", " Enable/disable tracking of the keys for next command in OPTIN/OPTOUT modes.", +"CAPA