Skip to content

Commit

Permalink
Rename APIs to get a standalone context from the cluster context
Browse files Browse the repository at this point in the history
These APIs are needed to support PUB/SUB, rename them for
public consumption:

  ctx_get_by_node  -> valkeyClusterGetValkeyContext
  actx_get_by_node -> valkeyClusterGetValkeyAsyncContext

Signed-off-by: Björn Svensson <[email protected]>
  • Loading branch information
bjosv committed Sep 2, 2024
1 parent 60080f8 commit 8b5936b
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 23 deletions.
14 changes: 8 additions & 6 deletions include/valkey/valkeycluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,10 @@ void valkeyClusterReset(valkeyClusterContext *cc);
/* Update the slotmap by querying any node. */
int valkeyClusterUpdateSlotmap(valkeyClusterContext *cc);

/* Internal functions */
valkeyContext *ctx_get_by_node(valkeyClusterContext *cc,
valkeyClusterNode *node);
/* Get the valkeyContext used for communication with a given node.
* Connects or reconnects to the node if necessary. */
valkeyContext *valkeyClusterGetValkeyContext(valkeyClusterContext *cc,
valkeyClusterNode *node);

/*
* Asynchronous API
Expand Down Expand Up @@ -316,9 +317,10 @@ int valkeyClusterAsyncFormattedCommandToNode(valkeyClusterAsyncContext *acc,
void *privdata, char *cmd,
int len);

/* Internal functions */
valkeyAsyncContext *actx_get_by_node(valkeyClusterAsyncContext *acc,
valkeyClusterNode *node);
/* Get the valkeyAsyncContext used for communication with a given node.
* Connects or reconnects to the node if necessary. */
valkeyAsyncContext *valkeyClusterGetValkeyAsyncContext(valkeyClusterAsyncContext *acc,
valkeyClusterNode *node);

/* Cluster node iterator functions */
void valkeyClusterInitNodeIterator(valkeyClusterNodeIterator *iter,
Expand Down
35 changes: 18 additions & 17 deletions src/valkeycluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1788,8 +1788,8 @@ int valkeyClusterConnect2(valkeyClusterContext *cc) {
return valkeyClusterUpdateSlotmap(cc);
}

valkeyContext *ctx_get_by_node(valkeyClusterContext *cc,
valkeyClusterNode *node) {
valkeyContext *valkeyClusterGetValkeyContext(valkeyClusterContext *cc,
valkeyClusterNode *node) {
valkeyContext *c = NULL;
if (node == NULL) {
return NULL;
Expand Down Expand Up @@ -1901,7 +1901,7 @@ static int valkeyClusterAppendCommandInternal(valkeyClusterContext *cc,
return VALKEY_ERR;
}

c = ctx_get_by_node(cc, node);
c = valkeyClusterGetValkeyContext(cc, node);
if (c == NULL) {
return VALKEY_ERR;
} else if (c->err) {
Expand Down Expand Up @@ -2066,7 +2066,7 @@ static void *valkey_cluster_command_execute(valkeyClusterContext *cc,
}
}

c = ctx_get_by_node(cc, node);
c = valkeyClusterGetValkeyContext(cc, node);
if (c == NULL || c->err) {
/* Failed to connect. Maybe there was a failover and this node is gone.
* Update slotmap to find out. */
Expand All @@ -2078,7 +2078,7 @@ static void *valkey_cluster_command_execute(valkeyClusterContext *cc,
if (node == NULL) {
goto error;
}
c = ctx_get_by_node(cc, node);
c = valkeyClusterGetValkeyContext(cc, node);
if (c == NULL) {
goto error;
} else if (c->err) {
Expand Down Expand Up @@ -2154,7 +2154,7 @@ static void *valkey_cluster_command_execute(valkeyClusterContext *cc,
}
}

c = ctx_get_by_node(cc, node);
c = valkeyClusterGetValkeyContext(cc, node);
if (c == NULL) {
goto error;
} else if (c->err) {
Expand All @@ -2174,7 +2174,7 @@ static void *valkey_cluster_command_execute(valkeyClusterContext *cc,
freeReplyObject(reply);
reply = NULL;

c = ctx_get_by_node(cc, node);
c = valkeyClusterGetValkeyContext(cc, node);
if (c == NULL) {
goto error;
} else if (c->err) {
Expand Down Expand Up @@ -2371,7 +2371,7 @@ void *valkeyClustervCommandToNode(valkeyClusterContext *cc,
void *reply;
int updating_slotmap = 0;

c = ctx_get_by_node(cc, node);
c = valkeyClusterGetValkeyContext(cc, node);
if (c == NULL) {
return NULL;
} else if (c->err) {
Expand Down Expand Up @@ -2550,7 +2550,7 @@ int valkeyClustervAppendCommandToNode(valkeyClusterContext *cc,
cc->requests->free = listCommandFree;
}

c = ctx_get_by_node(cc, node);
c = valkeyClusterGetValkeyContext(cc, node);
if (c == NULL) {
return VALKEY_ERR;
} else if (c->err) {
Expand Down Expand Up @@ -2653,7 +2653,7 @@ static int valkeyClusterSendAll(valkeyClusterContext *cc) {
continue;
}

c = ctx_get_by_node(cc, node);
c = valkeyClusterGetValkeyContext(cc, node);
if (c == NULL) {
continue;
}
Expand Down Expand Up @@ -2880,8 +2880,9 @@ static void unlinkAsyncContextAndNode(void *data) {
}
}

valkeyAsyncContext *actx_get_by_node(valkeyClusterAsyncContext *acc,
valkeyClusterNode *node) {
valkeyAsyncContext *
valkeyClusterGetValkeyAsyncContext(valkeyClusterAsyncContext *acc,
valkeyClusterNode *node) {
valkeyAsyncContext *ac;
int ret;

Expand Down Expand Up @@ -3168,7 +3169,7 @@ static int updateSlotMapAsync(valkeyClusterAsyncContext *acc,
}

/* Get libvalkey context, connect if needed */
ac = actx_get_by_node(acc, node);
ac = valkeyClusterGetValkeyAsyncContext(acc, node);
}
if (ac == NULL)
goto error; /* Specific error already set */
Expand Down Expand Up @@ -3277,7 +3278,7 @@ static void valkeyClusterAsyncCallback(valkeyAsyncContext *ac, void *r,
if (slot >= 0) {
cc->table[slot] = node;
}
ac_retry = actx_get_by_node(acc, node);
ac_retry = valkeyClusterGetValkeyAsyncContext(acc, node);

break;
case CLUSTER_ERR_ASK:
Expand All @@ -3287,7 +3288,7 @@ static void valkeyClusterAsyncCallback(valkeyAsyncContext *ac, void *r,
goto done;
}

ac_retry = actx_get_by_node(acc, node);
ac_retry = valkeyClusterGetValkeyAsyncContext(acc, node);
if (ac_retry == NULL) {
/* Specific error already set */
goto done;
Expand Down Expand Up @@ -3411,7 +3412,7 @@ int valkeyClusterAsyncFormattedCommand(valkeyClusterAsyncContext *acc,
goto error;
}

ac = actx_get_by_node(acc, node);
ac = valkeyClusterGetValkeyAsyncContext(acc, node);
if (ac == NULL) {
/* Specific error already set */
goto error;
Expand Down Expand Up @@ -3463,7 +3464,7 @@ int valkeyClusterAsyncFormattedCommandToNode(valkeyClusterAsyncContext *acc,
return VALKEY_ERR;
}

ac = actx_get_by_node(acc, node);
ac = valkeyClusterGetValkeyAsyncContext(acc, node);
if (ac == NULL) {
/* Specific error already set */
return VALKEY_ERR;
Expand Down

0 comments on commit 8b5936b

Please sign in to comment.