Skip to content

Commit

Permalink
fixup: cleanup how replicas are kept while parsing
Browse files Browse the repository at this point in the history
Change old way of storing replicas while parsing cluster nodes.
Simply keep lists of cluster nodes in the replica dict,
which then can be moved to the nodes dict.

Signed-off-by: Björn Svensson <[email protected]>
  • Loading branch information
bjosv committed Oct 28, 2024
1 parent 463611f commit 9b9245f
Showing 1 changed file with 45 additions and 53 deletions.
98 changes: 45 additions & 53 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ void dictClusterNodeDestructor(void *privdata, void *val) {
freeValkeyClusterNode(val);
}

/* Destructor function for clusterNodeListDictType. */
void dictClusterNodeListDestructor(void *privdata, void *val) {
DICT_NOTUSED(privdata);
listRelease(val);
}

/* Cluster node hash table
* maps node address (1.2.3.4:6379) to a valkeyClusterNode
* Has ownership of valkeyClusterNode memory
Expand All @@ -145,6 +151,16 @@ dictType clusterNodesDictType = {
dictClusterNodeDestructor /* val destructor */
};

/* Hash table dictType to map node address to a list of valkeyClusterNodes. */
dictType clusterNodeListDictType = {
dictSdsHash, /* hashFunction */
NULL, /* keyDup */
NULL, /* valDup */
dictSdsKeyCompare, /* keyCompare */
dictSdsDestructor, /* keyDestructor */
dictClusterNodeListDestructor /* valDestructor */
};

void listCommandFree(void *command) {
struct cmd *cmd = command;
command_destroy(cmd);
Expand Down Expand Up @@ -676,40 +692,42 @@ static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyReply *reply) {
return NULL;
}

/* Store a parsed replica node in given dict using the primary_id as key.
* Additional replicas for a primary are stored within the first replica. */
static int store_replica_node(dict *replicas, char *primary_id, valkeyClusterNode *node) {
/* Keep lists of parsed replica nodes in a dict using the primary_id as key. */
static int retain_replica_node(dict *replicas, char *primary_id, valkeyClusterNode *node) {
sds key = sdsnew(primary_id);
if (key == NULL)
return VALKEY_ERR;

struct hilist *replicaList;

dictEntry *de = dictFind(replicas, key);
if (de == NULL) {
if (dictAdd(replicas, key, node) != DICT_OK) {
/* Create list to hold replicas for a primary. */
replicaList = listCreate();
if (replicaList == NULL) {
sdsfree(key);
return VALKEY_ERR;
}
return VALKEY_OK;
}

/* Store replica node in the existing replica node. */
sdsfree(key);
valkeyClusterNode *n = dictGetEntryVal(de);
if (n->slaves == NULL) {
n->slaves = listCreate();
if (n->slaves == NULL)
replicaList->free = listClusterNodeDestructor;
if (dictAdd(replicas, key, replicaList) != DICT_OK) {
sdsfree(key);
listRelease(replicaList);
return VALKEY_ERR;
n->slaves->free = listClusterNodeDestructor;
}
} else {
sdsfree(key);
replicaList = dictGetEntryVal(de);
}
if (listAddNodeTail(n->slaves, node) == NULL)

if (listAddNodeTail(replicaList, node) == NULL)
return VALKEY_ERR;

return VALKEY_OK;
}

/* Move parsed replica nodes to its primary node, which holds a list of replica
* nodes. The `replicas` dict shall contain nodes with primary_id as key. */
static int move_replica_nodes(dict *replicas, dict *nodes) {
/* Store parsed replica nodes in the primary nodes, which holds a list of replica
* nodes. The `replicas` dict shall contain lists of nodes with primary_id as key. */
static int store_replica_nodes(dict *nodes, dict *replicas) {
if (replicas == NULL)
return VALKEY_OK;

Expand All @@ -719,39 +737,12 @@ static int move_replica_nodes(dict *replicas, dict *nodes) {
while ((de = dictNext(&di))) {
valkeyClusterNode *primary = dictGetEntryVal(de);

/* Move all replica nodes related to this primary. */
/* Move replica nodes related to this primary. */
dictEntry *der = dictFind(replicas, primary->name);
if (der != NULL) {
if (primary->slaves == NULL) {
primary->slaves = listCreate();
if (primary->slaves == NULL) {
return VALKEY_ERR;
}
primary->slaves->free = listClusterNodeDestructor;
}

/* Move all replicas stored in the first parsed replica. */
valkeyClusterNode *replica = dictGetEntryVal(der);
if (replica->slaves != NULL) {
while (listLength(replica->slaves) > 0) {
listNode *node = listFirst(replica->slaves);
if (listAddNodeTail(primary->slaves, node->value) == NULL) {
return VALKEY_ERR;
}
/* Delete element without freeing the moved cluster node. */
replica->slaves->free = NULL;
listDelNode(replica->slaves, node);
replica->slaves->free = listClusterNodeDestructor;
}
listRelease(replica->slaves);
replica->slaves = NULL;
}
/* Move replica that was parsed first. */
if (listAddNodeHead(primary->slaves, replica) == NULL) {
return VALKEY_ERR;
}
/* All replicas for this primary moved, set dict value
* to NULL avoiding freeing the moved memory. */
assert(primary->slaves == NULL);
/* Move replica list from replicas dict to nodes dict. */
primary->slaves = dictGetEntryVal(der);
dictSetHashVal(replicas, der, NULL);
}
}
Expand Down Expand Up @@ -973,12 +964,13 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) {
} else {
assert(node->role == VALKEY_ROLE_SLAVE);
if (replicas == NULL) {
if ((replicas = dictCreate(&clusterNodesDictType, NULL)) == NULL) {
if ((replicas = dictCreate(&clusterNodeListDictType, NULL)) == NULL) {
freeValkeyClusterNode(node);
goto oom;
}
}
if (store_replica_node(replicas, primary_id, node) != VALKEY_OK) {
/* Retain parsed replica nodes until all primaries are parsed. */
if (retain_replica_node(replicas, primary_id, node) != VALKEY_OK) {
freeValkeyClusterNode(node);
goto oom;
}
Expand All @@ -990,8 +982,8 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) {
goto error;
}

/* Move parsed replica nodes to related primary nodes. */
if (move_replica_nodes(replicas, nodes) != VALKEY_OK) {
/* Store the retained replica nodes in primary nodes. */
if (store_replica_nodes(nodes, replicas) != VALKEY_OK) {
goto oom;
}
dictRelease(replicas);
Expand Down

0 comments on commit 9b9245f

Please sign in to comment.