Skip to content

Commit

Permalink
Rename ctx_get_by_node() to valkeyClusterGetValkeyContext()
Browse files Browse the repository at this point in the history
This API is needed to support PUB/SUB, lets rename it for public consumption.
  • Loading branch information
bjosv committed Jun 13, 2024
1 parent 2772db7 commit 1ea2ab1
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 23 deletions.
35 changes: 18 additions & 17 deletions libvalkeycluster/valkeycluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1596,8 +1596,8 @@ int valkeyClusterConnect2(valkeyClusterContext *cc) {
return valkeyClusterUpdateSlotmap(cc);
}

valkeyContext *ctx_get_by_node(valkeyClusterContext *cc,
valkeyClusterNode *node) {
valkeyContext *valkeyClusterGetValkeyContext(valkeyClusterContext *cc,
valkeyClusterNode *node) {
valkeyContext *c = NULL;
if (node == NULL) {
return NULL;
Expand Down Expand Up @@ -1709,7 +1709,7 @@ static int valkeyClusterAppendCommandInternal(valkeyClusterContext *cc,
return VALKEY_ERR;
}

c = ctx_get_by_node(cc, node);
c = valkeyClusterGetValkeyContext(cc, node);
if (c == NULL) {
return VALKEY_ERR;
} else if (c->err) {
Expand Down Expand Up @@ -1874,7 +1874,7 @@ static void *valkey_cluster_command_execute(valkeyClusterContext *cc,
}
}

c = ctx_get_by_node(cc, node);
c = valkeyClusterGetValkeyContext(cc, node);
if (c == NULL || c->err) {
/* Failed to connect. Maybe there was a failover and this node is gone.
* Update slotmap to find out. */
Expand All @@ -1886,7 +1886,7 @@ static void *valkey_cluster_command_execute(valkeyClusterContext *cc,
if (node == NULL) {
goto error;
}
c = ctx_get_by_node(cc, node);
c = valkeyClusterGetValkeyContext(cc, node);
if (c == NULL) {
goto error;
} else if (c->err) {
Expand Down Expand Up @@ -1962,7 +1962,7 @@ static void *valkey_cluster_command_execute(valkeyClusterContext *cc,
}
}

c = ctx_get_by_node(cc, node);
c = valkeyClusterGetValkeyContext(cc, node);
if (c == NULL) {
goto error;
} else if (c->err) {
Expand All @@ -1982,7 +1982,7 @@ static void *valkey_cluster_command_execute(valkeyClusterContext *cc,
freeReplyObject(reply);
reply = NULL;

c = ctx_get_by_node(cc, node);
c = valkeyClusterGetValkeyContext(cc, node);
if (c == NULL) {
goto error;
} else if (c->err) {
Expand Down Expand Up @@ -2181,7 +2181,7 @@ void *valkeyClusterCommandToNode(valkeyClusterContext *cc,
void *reply;
int updating_slotmap = 0;

c = ctx_get_by_node(cc, node);
c = valkeyClusterGetValkeyContext(cc, node);
if (c == NULL) {
return NULL;
} else if (c->err) {
Expand Down Expand Up @@ -2350,7 +2350,7 @@ int valkeyClusterAppendCommandToNode(valkeyClusterContext *cc,
cc->requests->free = listCommandFree;
}

c = ctx_get_by_node(cc, node);
c = valkeyClusterGetValkeyContext(cc, node);
if (c == NULL) {
return VALKEY_ERR;
} else if (c->err) {
Expand Down Expand Up @@ -2438,7 +2438,7 @@ static int valkeyClusterSendAll(valkeyClusterContext *cc) {
continue;
}

c = ctx_get_by_node(cc, node);
c = valkeyClusterGetValkeyContext(cc, node);
if (c == NULL) {
continue;
}
Expand Down Expand Up @@ -2669,8 +2669,9 @@ static void unlinkAsyncContextAndNode(void *data) {
}
}

valkeyAsyncContext *actx_get_by_node(valkeyClusterAsyncContext *acc,
valkeyClusterNode *node) {
valkeyAsyncContext *
valkeyClusterGetValkeyAsyncContext(valkeyClusterAsyncContext *acc,
valkeyClusterNode *node) {
valkeyAsyncContext *ac;
int ret;

Expand Down Expand Up @@ -2953,7 +2954,7 @@ static int updateSlotMapAsync(valkeyClusterAsyncContext *acc,
}

/* Get libvalkey context, connect if needed */
ac = actx_get_by_node(acc, node);
ac = valkeyClusterGetValkeyAsyncContext(acc, node);
}
if (ac == NULL)
goto error; /* Specific error already set */
Expand Down Expand Up @@ -3061,7 +3062,7 @@ static void valkeyClusterAsyncCallback(valkeyAsyncContext *ac, void *r,
if (slot >= 0) {
cc->table[slot] = node;
}
ac_retry = actx_get_by_node(acc, node);
ac_retry = valkeyClusterGetValkeyAsyncContext(acc, node);

break;
case CLUSTER_ERR_ASK:
Expand All @@ -3071,7 +3072,7 @@ static void valkeyClusterAsyncCallback(valkeyAsyncContext *ac, void *r,
goto done;
}

ac_retry = actx_get_by_node(acc, node);
ac_retry = valkeyClusterGetValkeyAsyncContext(acc, node);
if (ac_retry == NULL) {
/* Specific error already set */
goto done;
Expand Down Expand Up @@ -3189,7 +3190,7 @@ int valkeyClusterAsyncFormattedCommand(valkeyClusterAsyncContext *acc,
goto error;
}

ac = actx_get_by_node(acc, node);
ac = valkeyClusterGetValkeyAsyncContext(acc, node);
if (ac == NULL) {
/* Specific error already set */
goto error;
Expand Down Expand Up @@ -3232,7 +3233,7 @@ int valkeyClusterAsyncFormattedCommandToNode(valkeyClusterAsyncContext *acc,
cluster_async_data *cad = NULL;
struct cmd *command = NULL;

ac = actx_get_by_node(acc, node);
ac = valkeyClusterGetValkeyAsyncContext(acc, node);
if (ac == NULL) {
/* Specific error already set */
return VALKEY_ERR;
Expand Down
15 changes: 9 additions & 6 deletions libvalkeycluster/valkeycluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,10 @@ void valkeyClusterReset(valkeyClusterContext *cc);
/* Update the slotmap by querying any node. */
int valkeyClusterUpdateSlotmap(valkeyClusterContext *cc);

/* Internal functions */
valkeyContext *ctx_get_by_node(valkeyClusterContext *cc,
valkeyClusterNode *node);
/* Returns the valkeyContext used for communication with a given node.
* Connects or reconnects to the node if necessary. */
valkeyContext *valkeyClusterGetValkeyContext(valkeyClusterContext *cc,
valkeyClusterNode *node);

/*
* Asynchronous API
Expand Down Expand Up @@ -304,9 +305,11 @@ int valkeyClusterAsyncFormattedCommandToNode(valkeyClusterAsyncContext *acc,
void *privdata, char *cmd,
int len);

/* Internal functions */
valkeyAsyncContext *actx_get_by_node(valkeyClusterAsyncContext *acc,
valkeyClusterNode *node);
/* Returns the valkeyAsyncContext used for communication with a given node.
* Connects or reconnects to the node if necessary. */
valkeyAsyncContext *
valkeyClusterGetValkeyAsyncContext(valkeyClusterAsyncContext *acc,
valkeyClusterNode *node);

/* Cluster node iterator functions */
void valkeyClusterInitNodeIterator(valkeyClusterNodeIterator *iter,
Expand Down

0 comments on commit 1ea2ab1

Please sign in to comment.