diff --git a/README.md b/README.md index 8974193..120a7f1 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,52 @@ Hiredis-vip fully contained and based on [Hiredis](https://github.com/redis/hire ### CLUSTER API: +```c +redisClusterContext *redisClusterContextInit(void); +void redisClusterFree(redisClusterContext *cc); + +int redisClusterSetOptionAddNode(redisClusterContext *cc, const char *addr); +int redisClusterSetOptionAddNodes(redisClusterContext *cc, const char *addrs); +int redisClusterSetOptionConnectBlock(redisClusterContext *cc); +int redisClusterSetOptionConnectNonBlock(redisClusterContext *cc); +int redisClusterSetOptionParseSlaves(redisClusterContext *cc); +int redisClusterSetOptionParseOpenSlots(redisClusterContext *cc); +int redisClusterSetOptionRouteUseSlots(redisClusterContext *cc); +int redisClusterSetOptionConnectTimeout(redisClusterContext *cc, const struct timeval tv); +int redisClusterSetOptionTimeout(redisClusterContext *cc, const struct timeval tv); +int redisClusterSetOptionMaxRedirect(redisClusterContext *cc, int max_redirect_count); + +int redisClusterConnect2(redisClusterContext *cc); + +void *redisClusterFormattedCommand(redisClusterContext *cc, char *cmd, int len); +void *redisClustervCommand(redisClusterContext *cc, const char *format, va_list ap); +void *redisClusterCommand(redisClusterContext *cc, const char *format, ...); +void *redisClusterCommandArgv(redisClusterContext *cc, int argc, const char **argv, const size_t *argvlen); +int redisClusterAppendFormattedCommand(redisClusterContext *cc, char *cmd, int len); +int redisClustervAppendCommand(redisClusterContext *cc, const char *format, va_list ap); +int redisClusterAppendCommand(redisClusterContext *cc, const char *format, ...); +int redisClusterAppendCommandArgv(redisClusterContext *cc, int argc, const char **argv, const size_t *argvlen); +int redisClusterGetReply(redisClusterContext *cc, void **reply); +void redisClusterReset(redisClusterContext *cc); + +redisContext *ctx_get_by_node(redisClusterContext *cc, struct cluster_node *node); + +redisClusterAsyncContext *redisClusterAsyncConnect(const char *addrs, int flags); +int redisClusterAsyncSetConnectCallback(redisClusterAsyncContext *acc, redisConnectCallback *fn); +int redisClusterAsyncSetDisconnectCallback(redisClusterAsyncContext *acc, redisDisconnectCallback *fn); +int redisClusterAsyncFormattedCommand(redisClusterAsyncContext *acc, redisClusterCallbackFn *fn, void *privdata, char *cmd, int len); +int redisClustervAsyncCommand(redisClusterAsyncContext *acc, redisClusterCallbackFn *fn, void *privdata, const char *format, va_list ap); +int redisClusterAsyncCommand(redisClusterAsyncContext *acc, redisClusterCallbackFn *fn, void *privdata, const char *format, ...); +int redisClusterAsyncCommandArgv(redisClusterAsyncContext *acc, redisClusterCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen); + +void redisClusterAsyncDisconnect(redisClusterAsyncContext *acc); +void redisClusterAsyncFree(redisClusterAsyncContext *acc); + +redisAsyncContext *actx_get_by_node(redisClusterAsyncContext *acc, cluster_node *node); +``` + +### CLUSTER API (old api, version <= 0.3.0): + ```c redisClusterContext *redisClusterConnect(const char *addrs, int flags); redisClusterContext *redisClusterConnectWithTimeout(const char *addrs, const struct timeval tv, int flags); @@ -65,23 +111,31 @@ https://github.com/vipshop/hiredis-vip/wiki To consume the synchronous API, there are only a few function calls that need to be introduced: ```c -redisClusterContext *redisClusterConnect(const char *addrs, int flags); -void redisClusterSetMaxRedirect(redisClusterContext *cc, int max_redirect_count); +redisClusterContext *redisClusterContextInit(void); +int redisClusterSetOptionAddNodes(redisClusterContext *cc, const char *addrs); +int redisClusterSetOptionMaxRedirect(redisClusterContext *cc, int max_redirect_count); +int redisClusterSetOptionConnectTimeout(redisClusterContext *cc, const struct timeval tv); +int redisClusterSetOptionTimeout(redisClusterContext *cc, const struct timeval tv); +int redisClusterConnect2(redisClusterContext *cc); void *redisClusterCommand(redisClusterContext *cc, const char *format, ...); void redisClusterFree(redisClusterContext *cc); ``` ### Cluster connecting -The function `redisClusterConnect` is used to create a so-called `redisClusterContext`. The -context is where Hiredis-vip Cluster holds state for connections. The `redisClusterContext` +The function `redisClusterContextInit` is used to create a so-called `redisClusterContext`. +The function `redisClusterSetOptionAddNodes` is used to add the redis cluster address. +The function `redisClusterConnect2` is used to connect to the redis cluser. +The context is where Hiredis-vip Cluster holds state for connections. The `redisClusterContext` struct has an integer `err` field that is non-zero when the connection is in an error state. The field `errstr` will contain a string with a description of the error. After trying to connect to Redis using `redisClusterContext` you should check the `err` field to see if establishing the connection was successful: ```c -redisClusterContext *cc = redisClusterConnect("127.0.0.1:6379", HIRCLUSTER_FLAG_NULL); +redisClusterContext *cc = redisClusterContextInit(); +redisClusterSetOptionAddNodes(cc, "127.0.0.1:6379,127.0.0.1:6380"); +redisClusterConnect2(cc); if (cc != NULL && cc->err) { printf("Error: %s\n", cc->errstr); // handle error @@ -250,6 +304,9 @@ See the `adapters/` directory for bindings to *ae* and *libevent*. ## AUTHORS Hiredis-vip was maintained and used at vipshop(https://github.com/vipshop). + The redis client library part in hiredis-vip is same as hiredis(https://github.com/redis/hiredis). + The redis cluster client library part in hiredis-vip is written by deep(https://github.com/deep011). + Hiredis-vip is released under the BSD license. diff --git a/hircluster.c b/hircluster.c index edf9cb2..b051936 100644 --- a/hircluster.c +++ b/hircluster.c @@ -1279,8 +1279,8 @@ cluster_update_route_by_addr(redisClusterContext *cc, goto error; } - if(cc->timeout){ - c = redisConnectWithTimeout(ip, port, *cc->timeout); + if(cc->connect_timeout){ + c = redisConnectWithTimeout(ip, port, *cc->connect_timeout); }else{ c = redisConnect(ip, port); } @@ -1294,11 +1294,20 @@ cluster_update_route_by_addr(redisClusterContext *cc, goto error; } + if (cc->timeout) { + redisSetTimeout(c, *cc->timeout); + } + if(cc->flags & HIRCLUSTER_FLAG_ROUTE_USE_SLOTS){ reply = redisCommand(c, REDIS_COMMAND_CLUSTER_SLOTS); if(reply == NULL){ - __redisClusterSetError(cc,REDIS_ERR_OTHER, - "Command(cluster slots) reply error(NULL)."); + if (c->err == REDIS_ERR_TIMEOUT) { + __redisClusterSetError(cc,c->err, + "Command(cluster slots) reply error(socket timeout)"); + } else { + __redisClusterSetError(cc,REDIS_ERR_OTHER, + "Command(cluster slots) reply error(NULL)."); + } goto error; }else if(reply->type != REDIS_REPLY_ARRAY){ if(reply->type == REDIS_REPLY_ERROR){ @@ -1313,11 +1322,16 @@ cluster_update_route_by_addr(redisClusterContext *cc, } nodes = parse_cluster_slots(cc, reply, cc->flags); - }else{ + } else { reply = redisCommand(c, REDIS_COMMAND_CLUSTER_NODES); if(reply == NULL){ - __redisClusterSetError(cc,REDIS_ERR_OTHER, - "Command(cluster nodes) reply error(NULL)."); + if (c->err == REDIS_ERR_TIMEOUT) { + __redisClusterSetError(cc,c->err, + "Command(cluster nodes) reply error(socket timeout)"); + } else { + __redisClusterSetError(cc,REDIS_ERR_OTHER, + "Command(cluster nodes) reply error(NULL)."); + } goto error; }else if(reply->type != REDIS_REPLY_STRING){ if(reply->type == REDIS_REPLY_ERROR){ @@ -1512,9 +1526,9 @@ cluster_update_route_with_nodes_old(redisClusterContext *cc, goto error; } - if(cc->timeout) + if(cc->connect_timeout) { - c = redisConnectWithTimeout(ip, port, *cc->timeout); + c = redisConnectWithTimeout(ip, port, *cc->connect_timeout); } else { @@ -1992,7 +2006,7 @@ int test_cluster_update_route(redisClusterContext *cc) return ret; } -static redisClusterContext *redisClusterContextInit(void) { +redisClusterContext *redisClusterContextInit(void) { redisClusterContext *cc; cc = calloc(1,sizeof(redisClusterContext)); @@ -2004,6 +2018,7 @@ static redisClusterContext *redisClusterContextInit(void) { cc->ip = NULL; cc->port = 0; cc->flags = 0; + cc->connect_timeout = NULL; cc->timeout = NULL; cc->nodes = NULL; cc->slots = NULL; @@ -2016,6 +2031,8 @@ static redisClusterContext *redisClusterContextInit(void) { cc->route_version = 0LL; memset(cc->table, 0, REDIS_CLUSTER_SLOTS*sizeof(cluster_node *)); + + cc->flags |= REDIS_BLOCK; return cc; } @@ -2031,6 +2048,11 @@ void redisClusterFree(redisClusterContext *cc) { cc->ip = NULL; } + if (cc->connect_timeout) + { + free(cc->connect_timeout); + } + if (cc->timeout) { free(cc->timeout); @@ -2058,7 +2080,108 @@ void redisClusterFree(redisClusterContext *cc) { free(cc); } -static int redisClusterAddNode(redisClusterContext *cc, const char *addr) +/* Connect to a Redis cluster. On error the field error in the returned + * context will be set to the return value of the error function. + * When no set of reply functions is given, the default set will be used. */ +static int _redisClusterConnect2(redisClusterContext *cc) +{ + + if (cc->nodes == NULL || dictSize(cc->nodes) == 0) + { + __redisClusterSetError(cc,REDIS_ERR_OTHER,"servers address does not set up"); + return REDIS_ERR; + } + + return cluster_update_route(cc); +} + +/* Connect to a Redis cluster. On error the field error in the returned + * context will be set to the return value of the error function. + * When no set of reply functions is given, the default set will be used. */ +static redisClusterContext *_redisClusterConnect(redisClusterContext *cc, const char *addrs) { + + int ret; + + ret = redisClusterSetOptionAddNodes(cc, addrs); + if (ret != REDIS_OK) + { + return cc; + } + + cluster_update_route(cc); + + return cc; +} + +redisClusterContext *redisClusterConnect(const char *addrs, int flags) +{ + redisClusterContext *cc; + + cc = redisClusterContextInit(); + + if(cc == NULL) + { + return NULL; + } + + cc->flags |= REDIS_BLOCK; + if(flags) + { + cc->flags |= flags; + } + + return _redisClusterConnect(cc, addrs); +} + +redisClusterContext *redisClusterConnectWithTimeout( + const char *addrs, const struct timeval tv, int flags) +{ + redisClusterContext *cc; + + cc = redisClusterContextInit(); + + if(cc == NULL) + { + return NULL; + } + + cc->flags |= REDIS_BLOCK; + if(flags) + { + cc->flags |= flags; + } + + if (cc->connect_timeout == NULL) + { + cc->connect_timeout = malloc(sizeof(struct timeval)); + } + + memcpy(cc->connect_timeout, &tv, sizeof(struct timeval)); + + return _redisClusterConnect(cc, addrs); +} + +redisClusterContext *redisClusterConnectNonBlock(const char *addrs, int flags) { + + redisClusterContext *cc; + + cc = redisClusterContextInit(); + + if(cc == NULL) + { + return NULL; + } + + cc->flags &= ~REDIS_BLOCK; + if(flags) + { + cc->flags |= flags; + } + + return _redisClusterConnect(cc, addrs); +} + +int redisClusterSetOptionAddNode(redisClusterContext *cc, const char *addr) { dictEntry *node_entry; cluster_node *node; @@ -2066,6 +2189,7 @@ static int redisClusterAddNode(redisClusterContext *cc, const char *addr) int ip_port_count = 0; sds ip; int port; + sds addr_sds = NULL; if(cc == NULL) { @@ -2081,7 +2205,9 @@ static int redisClusterAddNode(redisClusterContext *cc, const char *addr) } } - node_entry = dictFind(cc->nodes, addr); + addr_sds = sdsnew(addr); + node_entry = dictFind(cc->nodes, addr_sds); + sdsfree(addr_sds); if(node_entry == NULL) { ip_port = sdssplitlen(addr, strlen(addr), @@ -2139,12 +2265,8 @@ static int redisClusterAddNode(redisClusterContext *cc, const char *addr) return REDIS_OK; } - -/* Connect to a Redis cluster. On error the field error in the returned - * context will be set to the return value of the error function. - * When no set of reply functions is given, the default set will be used. */ -static redisClusterContext *_redisClusterConnect(redisClusterContext *cc, const char *addrs) { - +int redisClusterSetOptionAddNodes(redisClusterContext *cc, const char *addrs) +{ int ret; sds *address = NULL; int address_count = 0; @@ -2152,105 +2274,199 @@ static redisClusterContext *_redisClusterConnect(redisClusterContext *cc, const if(cc == NULL) { - return NULL; + return REDIS_ERR; } - address = sdssplitlen(addrs, strlen(addrs), CLUSTER_ADDRESS_SEPARATOR, strlen(CLUSTER_ADDRESS_SEPARATOR), &address_count); if(address == NULL || address_count <= 0) { __redisClusterSetError(cc,REDIS_ERR_OTHER,"servers address is error(correct is like: 127.0.0.1:1234,127.0.0.2:5678)"); - return cc; + return REDIS_ERR; } for(i = 0; i < address_count; i ++) { - ret = redisClusterAddNode(cc, address[i]); + ret = redisClusterSetOptionAddNode(cc, address[i]); if(ret != REDIS_OK) { sdsfreesplitres(address, address_count); - return cc; + return REDIS_ERR; } } sdsfreesplitres(address, address_count); - - cluster_update_route(cc); - return cc; + return REDIS_OK; } -redisClusterContext *redisClusterConnect(const char *addrs, int flags) +int redisClusterSetOptionConnectBlock(redisClusterContext *cc) { - redisClusterContext *cc; - - cc = redisClusterContextInit(); if(cc == NULL) { - return NULL; + return REDIS_ERR; } cc->flags |= REDIS_BLOCK; - if(flags) + + return REDIS_OK; +} + +int redisClusterSetOptionConnectNonBlock(redisClusterContext *cc) +{ + + if(cc == NULL) { - cc->flags |= flags; + return REDIS_ERR; } - - return _redisClusterConnect(cc, addrs); + + cc->flags &= ~REDIS_BLOCK; + + return REDIS_OK; } -redisClusterContext *redisClusterConnectWithTimeout( - const char *addrs, const struct timeval tv, int flags) +int redisClusterSetOptionParseSlaves(redisClusterContext *cc) { - redisClusterContext *cc; - cc = redisClusterContextInit(); + if(cc == NULL) + { + return REDIS_ERR; + } + + cc->flags |= HIRCLUSTER_FLAG_ADD_SLAVE; + + return REDIS_OK; +} + +int redisClusterSetOptionParseOpenSlots(redisClusterContext *cc) +{ if(cc == NULL) { - return NULL; + return REDIS_ERR; } - cc->flags |= REDIS_BLOCK; - if(flags) + cc->flags |= HIRCLUSTER_FLAG_ADD_OPENSLOT; + + return REDIS_OK; +} + +int redisClusterSetOptionRouteUseSlots(redisClusterContext *cc) +{ + + if(cc == NULL) { - cc->flags |= flags; + return REDIS_ERR; + } + + cc->flags |= HIRCLUSTER_FLAG_ROUTE_USE_SLOTS; + + return REDIS_OK; +} + +int redisClusterSetOptionConnectTimeout(redisClusterContext *cc, const struct timeval tv) +{ + + if(cc == NULL) + { + return REDIS_ERR; + } + + if (cc->connect_timeout == NULL) + { + cc->connect_timeout = malloc(sizeof(struct timeval)); } + memcpy(cc->connect_timeout, &tv, sizeof(struct timeval)); + + return REDIS_OK; +} + +int redisClusterSetOptionTimeout(redisClusterContext *cc, const struct timeval tv) +{ + + if(cc == NULL) + { + return REDIS_ERR; + } + if (cc->timeout == NULL) { cc->timeout = malloc(sizeof(struct timeval)); + memcpy(cc->timeout, &tv, sizeof(struct timeval)); } - - memcpy(cc->timeout, &tv, sizeof(struct timeval)); - - return _redisClusterConnect(cc, addrs); -} + else if (cc->timeout->tv_sec != tv.tv_sec || cc->timeout->tv_usec != tv.tv_usec) + { + memcpy(cc->timeout, &tv, sizeof(struct timeval)); -redisClusterContext *redisClusterConnectNonBlock(const char *addrs, int flags) { + if (cc->nodes && dictSize(cc->nodes) > 0) + { + dictEntry *de; + dictIterator *di; + cluster_node *node; - redisClusterContext *cc; + di = dictGetIterator(cc->nodes); - cc = redisClusterContextInit(); + while (de=dictNext(di)) + { + node = dictGetEntryVal(de); + if (node->con && node->con->flags&REDIS_CONNECTED && node->con->err == 0) + { + redisSetTimeout(node->con, tv); + } - if(cc == NULL) + if (node->slaves && listLength(node->slaves) > 0) + { + cluster_node *slave; + listIter *li; + listNode *ln; + + li = listGetIterator(node->slaves, AL_START_HEAD); + while (ln = listNext(li)) + { + slave = listNodeValue(ln); + if (slave->con && slave->con->flags&REDIS_CONNECTED && slave->con->err == 0) + { + redisSetTimeout(slave->con, tv); + } + } + + listReleaseIterator(li); + } + } + + dictReleaseIterator(di); + } + } + + return REDIS_OK; +} + +int redisClusterSetOptionMaxRedirect(redisClusterContext *cc, int max_redirect_count) +{ + if(cc == NULL || max_redirect_count <= 0) { - return NULL; + return REDIS_ERR; } - cc->flags &= ~REDIS_BLOCK; - if(flags) + cc->max_redirect_count = max_redirect_count; + + return REDIS_OK; +} + +int redisClusterConnect2(redisClusterContext *cc) +{ + + if(cc == NULL) { - cc->flags |= flags; + return REDIS_ERR; } - return _redisClusterConnect(cc, addrs); + return _redisClusterConnect2(cc); } -redisContext *ctx_get_by_node(cluster_node *node, - const struct timeval *timeout, int flags) +redisContext *ctx_get_by_node(redisClusterContext *cc, cluster_node *node) { redisContext *c = NULL; if(node == NULL) @@ -2264,6 +2480,10 @@ redisContext *ctx_get_by_node(cluster_node *node, if(c->err) { redisReconnect(c); + + if (cc->timeout && c->err == 0) { + redisSetTimeout(c, *cc->timeout); + } } return c; @@ -2274,20 +2494,17 @@ redisContext *ctx_get_by_node(cluster_node *node, return NULL; } - if(flags & REDIS_BLOCK) + if(cc->connect_timeout) { - if(timeout) - { - c = redisConnectWithTimeout(node->host, node->port, *timeout); - } - else - { - c = redisConnect(node->host, node->port); - } + c = redisConnectWithTimeout(node->host, node->port, *cc->connect_timeout); } else { - c = redisConnectNonBlock(node->host, node->port); + c = redisConnect(node->host, node->port); + } + + if (cc->timeout && c != NULL && c->err == 0) { + redisSetTimeout(c, *cc->timeout); } node->con = c; @@ -2402,7 +2619,7 @@ static cluster_node *node_get_witch_connected(redisClusterContext *cc) continue; } - c = ctx_get_by_node(node, cc->timeout, REDIS_BLOCK); + c = ctx_get_by_node(cc, node); if(c == NULL || c->err) { continue; @@ -2518,7 +2735,7 @@ static char * cluster_config_get(redisClusterContext *cc, goto error; } - c = ctx_get_by_node(node, cc->timeout, cc->flags); + c = ctx_get_by_node(cc, node); reply = redisCommand(c, "config get %s", config_name); if(reply == NULL) @@ -2610,7 +2827,7 @@ static int __redisClusterAppendCommand(redisClusterContext *cc, return REDIS_ERR; } - c = ctx_get_by_node(node, cc->timeout, cc->flags); + c = ctx_get_by_node(cc, node); if(c == NULL) { __redisClusterSetError(cc, REDIS_ERR_OTHER, "ctx get by node is null"); @@ -2650,7 +2867,7 @@ static int __redisClusterGetReply(redisClusterContext *cc, int slot_num, void ** return REDIS_ERR; } - c = ctx_get_by_node(node, cc->timeout, cc->flags); + c = ctx_get_by_node(cc, node); if(c == NULL) { __redisClusterSetError(cc,REDIS_ERR_OOM,"Out of memory"); @@ -2796,7 +3013,7 @@ static void *redis_cluster_command_execute(redisClusterContext *cc, return NULL; } - c = ctx_get_by_node(node, cc->timeout, cc->flags); + c = ctx_get_by_node(cc, node); if(c == NULL) { __redisClusterSetError(cc, REDIS_ERR_OTHER, "ctx get by node is null"); @@ -2819,7 +3036,7 @@ static void *redis_cluster_command_execute(redisClusterContext *cc, return NULL; } - c = ctx_get_by_node(node, cc->timeout, cc->flags); + c = ctx_get_by_node(cc, node); if(c == NULL) { __redisClusterSetError(cc, REDIS_ERR_OTHER, "ctx get by node error"); @@ -2886,7 +3103,7 @@ static void *redis_cluster_command_execute(redisClusterContext *cc, freeReplyObject(reply); reply = NULL; - c = ctx_get_by_node(node, cc->timeout, cc->flags); + c = ctx_get_by_node(cc, node); if(c == NULL) { __redisClusterSetError(cc, REDIS_ERR_OTHER, "ctx get by node error"); @@ -3790,7 +4007,7 @@ static int redisCLusterSendAll(redisClusterContext *cc) continue; } - c = ctx_get_by_node(node, cc->timeout, cc->flags); + c = ctx_get_by_node(cc, node); if(c == NULL) { continue; diff --git a/hircluster.h b/hircluster.h index 5b9c5a3..95585c9 100644 --- a/hircluster.h +++ b/hircluster.h @@ -5,8 +5,8 @@ #include "hiredis.h" #include "async.h" -#define HIREDIS_VIP_MAJOR 0 -#define HIREDIS_VIP_MINOR 3 +#define HIREDIS_VIP_MAJOR 1 +#define HIREDIS_VIP_MINOR 0 #define HIREDIS_VIP_PATCH 0 #define REDIS_CLUSTER_SLOTS 16384 @@ -80,7 +80,9 @@ typedef struct redisClusterContext { int flags; enum redisConnectionType connection_type; - struct timeval *timeout; + struct timeval *connect_timeout; + + struct timeval *timeout; /* receive and send timeout. */ struct hiarray *slots; @@ -103,8 +105,22 @@ redisClusterContext *redisClusterConnectWithTimeout(const char *addrs, const struct timeval tv, int flags); redisClusterContext *redisClusterConnectNonBlock(const char *addrs, int flags); +redisClusterContext *redisClusterContextInit(void); void redisClusterFree(redisClusterContext *cc); +int redisClusterSetOptionAddNode(redisClusterContext *cc, const char *addr); +int redisClusterSetOptionAddNodes(redisClusterContext *cc, const char *addrs); +int redisClusterSetOptionConnectBlock(redisClusterContext *cc); +int redisClusterSetOptionConnectNonBlock(redisClusterContext *cc); +int redisClusterSetOptionParseSlaves(redisClusterContext *cc); +int redisClusterSetOptionParseOpenSlots(redisClusterContext *cc); +int redisClusterSetOptionRouteUseSlots(redisClusterContext *cc); +int redisClusterSetOptionConnectTimeout(redisClusterContext *cc, const struct timeval tv); +int redisClusterSetOptionTimeout(redisClusterContext *cc, const struct timeval tv); +int redisClusterSetOptionMaxRedirect(redisClusterContext *cc, int max_redirect_count); + +int redisClusterConnect2(redisClusterContext *cc); + void redisClusterSetMaxRedirect(redisClusterContext *cc, int max_redirect_count); void *redisClusterFormattedCommand(redisClusterContext *cc, char *cmd, int len); @@ -112,7 +128,7 @@ void *redisClustervCommand(redisClusterContext *cc, const char *format, va_list void *redisClusterCommand(redisClusterContext *cc, const char *format, ...); void *redisClusterCommandArgv(redisClusterContext *cc, int argc, const char **argv, const size_t *argvlen); -redisContext *ctx_get_by_node(struct cluster_node *node, const struct timeval *timeout, int flags); +redisContext *ctx_get_by_node(redisClusterContext *cc, struct cluster_node *node); int redisClusterAppendFormattedCommand(redisClusterContext *cc, char *cmd, int len); int redisClustervAppendCommand(redisClusterContext *cc, const char *format, va_list ap); diff --git a/hiredis.c b/hiredis.c index 73d0251..1c38877 100644 --- a/hiredis.c +++ b/hiredis.c @@ -804,7 +804,15 @@ int redisBufferRead(redisContext *c) { if ((errno == EAGAIN && !(c->flags & REDIS_BLOCK)) || (errno == EINTR)) { /* Try again later */ } else { +#if 1 //shenzheng 2017-5-22 redis cluster + if (errno == EWOULDBLOCK && (c->flags & REDIS_BLOCK)) { + __redisSetError(c,REDIS_ERR_TIMEOUT,"Socket timeout"); + } else { +#endif //shenzheng 2017-5-22 redis cluster __redisSetError(c,REDIS_ERR_IO,NULL); +#if 1 //shenzheng 2017-5-22 redis cluster + } +#endif //shenzheng 2017-5-22 redis cluster return REDIS_ERR; } } else if (nread == 0) { @@ -841,7 +849,15 @@ int redisBufferWrite(redisContext *c, int *done) { if ((errno == EAGAIN && !(c->flags & REDIS_BLOCK)) || (errno == EINTR)) { /* Try again later */ } else { +#if 1 //shenzheng 2017-5-22 redis cluster + if (errno == EWOULDBLOCK && (c->flags & REDIS_BLOCK)) { + __redisSetError(c,REDIS_ERR_TIMEOUT,"Socket timeout"); + } else { +#endif //shenzheng 2017-5-22 redis cluster __redisSetError(c,REDIS_ERR_IO,NULL); +#if 1 //shenzheng 2017-5-22 redis cluster + } +#endif //shenzheng 2017-5-22 redis cluster return REDIS_ERR; } } else if (nwritten > 0) { diff --git a/read.h b/read.h index 088c979..d578c13 100644 --- a/read.h +++ b/read.h @@ -49,6 +49,10 @@ #if 1 //shenzheng 2015-8-10 redis cluster #define REDIS_ERR_CLUSTER_TOO_MANY_REDIRECT 6 #endif //shenzheng 2015-8-10 redis cluster +#if 1 //shenzheng 2017-5-22 redis cluster +#define REDIS_ERR_TIMEOUT 7 +#endif //shenzheng 2017-5-22 redis cluster + #define REDIS_REPLY_STRING 1 #define REDIS_REPLY_ARRAY 2