From 9fa74431b8be13bba592a82d6f929ec059b3e12e Mon Sep 17 00:00:00 2001 From: uhm0311 Date: Thu, 8 Aug 2024 19:04:23 +0900 Subject: [PATCH] FEATURE: Add mset API. --- .gitignore | 1 + libmemcached/libmemcached_probes.h | 6 +- libmemcached/memcached.h | 1 + libmemcached/storage.cc | 225 ++++++++++++++++++++++++++++- libmemcached/storage.h | 26 +++- libmemcached/types.h | 3 +- tests/include.am | 12 +- tests/storage.cc | 187 ++++++++++++++++++++++++ tests/storage.h | 15 ++ 9 files changed, 470 insertions(+), 6 deletions(-) create mode 100644 tests/storage.cc create mode 100644 tests/storage.h diff --git a/.gitignore b/.gitignore index 4b19f764..01d3d302 100644 --- a/.gitignore +++ b/.gitignore @@ -102,5 +102,6 @@ tests/memrm tests/memslap tests/memstat tests/sasl +tests/storage unittests/unittests devtools/* diff --git a/libmemcached/libmemcached_probes.h b/libmemcached/libmemcached_probes.h index b819f78a..fcca3a44 100644 --- a/libmemcached/libmemcached_probes.h +++ b/libmemcached/libmemcached_probes.h @@ -1,5 +1,5 @@ /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab: - * + * * Libmemcached library * * Copyright (C) 2011 Data Differential, http://datadifferential.com/ @@ -111,6 +111,10 @@ #define LIBMEMCACHED_MEMCACHED_SET_END_ENABLED() (0) #define LIBMEMCACHED_MEMCACHED_SET_START() #define LIBMEMCACHED_MEMCACHED_SET_START_ENABLED() (0) +#define LIBMEMCACHED_MEMCACHED_MSET_END() +#define LIBMEMCACHED_MEMCACHED_MSET_END_ENABLED() (0) +#define LIBMEMCACHED_MEMCACHED_MSET_START() +#define LIBMEMCACHED_MEMCACHED_MSET_START_ENABLED() (0) #endif diff --git a/libmemcached/memcached.h b/libmemcached/memcached.h index 0cbcef17..299d8bbd 100644 --- a/libmemcached/memcached.h +++ b/libmemcached/memcached.h @@ -136,6 +136,7 @@ struct memcached_st { bool no_block:1; // Don't block bool no_reply:1; bool piped:1; + bool multi_send:1; bool randomize_replica_read:1; bool support_cas:1; bool tcp_nodelay:1; diff --git a/libmemcached/storage.cc b/libmemcached/storage.cc index bce3425e..261c2a9f 100644 --- a/libmemcached/storage.cc +++ b/libmemcached/storage.cc @@ -121,6 +121,67 @@ static inline uint8_t get_com_code(memcached_storage_action_t verb, bool noreply return ret; } +static memcached_return_t before_storage_query(memcached_st *ptr, + const char *group_key, size_t group_key_length, + const memcached_storage_request_st *req, + const size_t number_of_req) +{ + memcached_return_t rc= initialize_query(ptr); + if (memcached_failed(rc)) + { + return rc; + } + + if (ptr->flags.use_udp) + { + return memcached_set_error(*ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT); + } + + if (req == NULL) + { + return memcached_set_error(*ptr, MEMCACHED_NOTFOUND, MEMCACHED_AT, + memcached_literal_param("req were null")); + } + if (number_of_req == 0) + { + return memcached_set_error(*ptr, MEMCACHED_NOTFOUND, MEMCACHED_AT, + memcached_literal_param("number_of_req were zero")); + } + + if (group_key and group_key_length) + { + if (memcached_failed(memcached_key_test(*ptr, (const char * const *)&group_key, &group_key_length, 1))) + { + return memcached_set_error(*ptr, MEMCACHED_BAD_KEY_PROVIDED, MEMCACHED_AT, + memcached_literal_param("A bad group key was provided.")); + } + } + + /* + Here is where we pay for the non-block API. We need to remove any data sitting + in the queue before we start our store operations. + + It might be optimum to bounce the connection if count > some number. + */ + for (uint32_t x= 0; x < memcached_server_count(ptr); x++) + { + memcached_server_write_instance_st instance= + memcached_server_instance_fetch(ptr, x); + + if (memcached_server_response_count(instance)) + { + char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; + + if (ptr->flags.no_block) + (void)memcached_io_write(instance, NULL, 0, true); + + while(memcached_server_response_count(instance)) + (void)memcached_response(instance, buffer, sizeof(buffer), &ptr->result); + } + } + return MEMCACHED_SUCCESS; +} + static memcached_return_t memcached_send_binary(memcached_st *ptr, const char *group_key, size_t group_key_length, @@ -227,7 +288,7 @@ static memcached_return_t memcached_send_binary(memcached_st *ptr, return MEMCACHED_BUFFERED; } - if (noreply) + if (ptr->flags.multi_send || noreply) { return MEMCACHED_SUCCESS; } @@ -349,7 +410,7 @@ static memcached_return_t memcached_send_ascii(memcached_st *ptr, if (rc == MEMCACHED_SUCCESS) { - if (ptr->flags.no_reply) + if (ptr->flags.multi_send || ptr->flags.no_reply) { rc= (to_write == false) ? MEMCACHED_BUFFERED : MEMCACHED_SUCCESS; } @@ -428,6 +489,144 @@ static inline memcached_return_t memcached_send(memcached_st *ptr, return rc; } +static memcached_return_t memcached_fetch_storage_result(memcached_st *ptr, + const char *key, + const size_t key_length) +{ + unlikely (not ptr or not key) + { + return MEMCACHED_INVALID_ARGUMENTS; + } + unlikely (ptr->flags.use_udp) + { + return MEMCACHED_NOT_SUPPORTED; + } + + uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, key, key_length); + memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key); + + char result[MEMCACHED_DEFAULT_COMMAND_SIZE]; + memcached_return_t rc= memcached_read_one_response(instance, result, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); + + if (rc == MEMCACHED_STORED) + { + return MEMCACHED_SUCCESS; + } + return rc; +} + +static memcached_return_t memcached_multi_send(memcached_st *ptr, + const char *group_key, + size_t group_key_length, + const memcached_storage_request_st *req, + const size_t number_of_req, + memcached_storage_action_t verb, + memcached_return_t *results) +{ + arcus_server_check_for_update(ptr); + + memcached_return_t rc; + if (memcached_failed(rc= before_storage_query(ptr, group_key, group_key_length, req, number_of_req))) + { + return rc; + } + + bool is_group_key_set= group_key and group_key_length; + ptr->flags.multi_send= true; + +#ifdef ENABLE_REPLICATION + memcached_storage_request_st **resend_req_ptr= (memcached_storage_request_st **) malloc(sizeof(memcached_storage_request_st *) * number_of_req); + size_t *resend_indices= (size_t*) malloc(sizeof(size_t) * number_of_req); + size_t number_of_resend_req= 0; +#endif + for (size_t i= 0; i < number_of_req; i++) + { + if (memcached_failed(rc= memcached_validate_key_length(req[i].key_length, ptr->flags.binary_protocol))) + { + results[i]= rc; + continue; + } + if (memcached_failed(rc= memcached_key_test(*ptr, &(req[i].key), &(req[i].key_length), 1))) + { + results[i]= rc; + continue; + } + + char *safe_group_key= (is_group_key_set ? (char *)group_key : req[i].key); + size_t safe_group_key_length= (is_group_key_set ? group_key_length : req[i].key_length); + uint64_t safe_cas = verb == CAS_OP ? req[i].cas : 0; + + if (ptr->flags.binary_protocol) + { + results[i]= memcached_send_binary(ptr, safe_group_key, safe_group_key_length, + req[i].key, req[i].key_length, + req[i].value, req[i].value_length, + req[i].expiration, req[i].flags, + safe_cas, verb); + } + else + { + results[i]= memcached_send_ascii(ptr, safe_group_key, safe_group_key_length, + req[i].key, req[i].key_length, + req[i].value, req[i].value_length, + req[i].expiration, req[i].flags, + safe_cas, verb); + } + } + + for (size_t i= 0; i < number_of_req; i++) + { + if (memcached_failed(results[i])) + { + continue; + } + + char *safe_group_key= (is_group_key_set ? (char *)group_key : req[i].key); + size_t safe_group_key_length= (is_group_key_set ? group_key_length : req[i].key_length); + + results[i]= memcached_fetch_storage_result(ptr, safe_group_key, safe_group_key_length); +#ifdef ENABLE_REPLICATION + if (results[i] == MEMCACHED_SWITCHOVER or results[i] == MEMCACHED_REPL_SLAVE) + { + resend_req_ptr[number_of_resend_req]= (memcached_storage_request_st *) &req[i]; + resend_indices[number_of_resend_req]= i; + number_of_resend_req++; + } +#endif + } + +#ifdef ENABLE_REPLICATION + if (number_of_resend_req == number_of_req) + { + rc= memcached_multi_send(ptr, group_key, group_key_length, req, number_of_req, verb, results); + } + else if (number_of_resend_req > 0) + { + memcached_storage_request_st *resend_req= (memcached_storage_request_st *) malloc(sizeof(memcached_storage_request_st) * number_of_resend_req); + for (size_t i= 0; i < number_of_resend_req; i++) + { + resend_req[i]= *(resend_req_ptr[i]); + } + + memcached_return_t *resend_results= (memcached_return_t *) malloc(sizeof(memcached_return_t) * number_of_resend_req); + rc= memcached_multi_send(ptr, group_key, group_key_length, resend_req, number_of_resend_req, verb, resend_results); + + for (size_t i= 0; i < number_of_resend_req; i++) + { + results[resend_indices[i]]= resend_results[i]; + } + + free(resend_results); + free(resend_req); + } + + free(resend_indices); + free(resend_req_ptr); +#endif + + ptr->flags.multi_send= false; + return MEMCACHED_SUCCESS; +} memcached_return_t memcached_set(memcached_st *ptr, const char *key, size_t key_length, const char *value, size_t value_length, @@ -605,3 +804,25 @@ memcached_return_t memcached_cas_by_key(memcached_st *ptr, return rc; } +memcached_return_t memcached_mset(memcached_st *ptr, + const memcached_storage_request_st *req, + const size_t number_of_req, + memcached_return_t *results) +{ + return memcached_mset_by_key(ptr, NULL, 0, req, number_of_req, results); +} + +memcached_return_t memcached_mset_by_key(memcached_st *ptr, + const char *group_key, + size_t group_key_length, + const memcached_storage_request_st *req, + const size_t number_of_req, + memcached_return_t *results) +{ + memcached_return_t rc; + LIBMEMCACHED_MEMCACHED_MSET_START(); + rc= memcached_multi_send(ptr, group_key, group_key_length, + req, number_of_req, SET_OP, results); + LIBMEMCACHED_MEMCACHED_MSET_END(); + return rc; +} diff --git a/libmemcached/storage.h b/libmemcached/storage.h index 215f4ebb..480734a2 100644 --- a/libmemcached/storage.h +++ b/libmemcached/storage.h @@ -1,5 +1,5 @@ /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab: - * + * * Libmemcached library * * Copyright (C) 2011 Data Differential, http://datadifferential.com/ @@ -40,6 +40,16 @@ #include "libmemcached/memcached.h" +struct memcached_storage_request_st { + char *key; + size_t key_length; + char *value; + size_t value_length; + time_t expiration; + uint32_t flags; + uint64_t cas; +}; + #ifdef __cplusplus extern "C" { #endif @@ -129,6 +139,20 @@ memcached_return_t memcached_cas_by_key(memcached_st *ptr, uint32_t flags, uint64_t cas); +LIBMEMCACHED_API +memcached_return_t memcached_mset(memcached_st *ptr, + const memcached_storage_request_st *req, + const size_t number_of_req, + memcached_return_t *results); + +LIBMEMCACHED_API +memcached_return_t memcached_mset_by_key(memcached_st *ptr, + const char *group_key, + size_t group_key_length, + const memcached_storage_request_st *req, + const size_t number_of_req, + memcached_return_t *results); + #ifdef __cplusplus } #endif diff --git a/libmemcached/types.h b/libmemcached/types.h index cc8cf828..ea1f7593 100644 --- a/libmemcached/types.h +++ b/libmemcached/types.h @@ -15,7 +15,7 @@ * limitations under the License. */ /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab: - * + * * Libmemcached library * * Copyright (C) 2011 Data Differential, http://datadifferential.com/ @@ -60,6 +60,7 @@ typedef struct memcached_stat_st memcached_stat_st; typedef struct memcached_analysis_st memcached_analysis_st; typedef struct memcached_result_st memcached_result_st; typedef struct memcached_array_st memcached_array_st; +typedef struct memcached_storage_request_st memcached_storage_request_st; typedef struct memcached_error_t memcached_error_t; // All of the flavors of memcache_server_st diff --git a/tests/include.am b/tests/include.am index 29b21027..a53df3da 100644 --- a/tests/include.am +++ b/tests/include.am @@ -65,6 +65,16 @@ tests_internals_LDADD+= libtest/libtest.la check_PROGRAMS+= tests/internals noinst_PROGRAMS+= tests/internals +# Test storage +tests_storage_SOURCES= tests/storage.cc +tests_storage_CXXFLAGS = $(AM_CXXFLAGS) ${PTHREAD_CFLAGS} +tests_storage_DEPENDENCIES= libmemcachedinternal/libmemcachedinternal.la libtest/libtest.la libmemcachedinternal/libmemcachedutilinternal.la +tests_storage_LDADD= libmemcachedinternal/libmemcachedinternal.la +tests_storage_LDADD+= ${PTHREAD_LIBS} +tests_storage_LDADD+= libmemcachedinternal/libmemcachedutilinternal.la +tests_storage_LDADD+= libtest/libtest.la +check_PROGRAMS+= tests/storage +noinst_PROGRAMS+= tests/storage tests_testapp_CXXFLAGS = $(AM_CXXFLAGS) ${PTHREAD_CFLAGS} tests_testapp_CFLAGS= $(AM_CFLAGS) $(NO_CONVERSION) $(NO_STRICT_ALIASING) @@ -239,7 +249,7 @@ tests_memdump_DEPENDENCIES= libtest/libtest.la $(TESTS_LDADDS) tests_memdump_LDADD= $(tests_memdump_DEPENDENCIES) check_PROGRAMS+= tests/memdump noinst_PROGRAMS+= tests/memdump - + # Test linking with C application tests_c_test_SOURCES= tests/c_test.c tests_c_test_CFLAGS= ${PTHREAD_CFLAGS} diff --git a/tests/storage.cc b/tests/storage.cc new file mode 100644 index 00000000..c1396cd6 --- /dev/null +++ b/tests/storage.cc @@ -0,0 +1,187 @@ +#include +#include + +using namespace libtest; + +#include "tests/storage.h" + +#define MSET_COUNT 5 +#define BUFFER_SIZE 64 +#define EXPIRE_TIME 60 + +static void safe_free(char *c) +{ + if (c != NULL) + { + free(c); + } +} + +static void safe_free_req(memcached_storage_request_st req[MSET_COUNT]) +{ + for (int i= 0; i < MSET_COUNT; i++) + { + safe_free(req[i].key); + safe_free(req[i].value); + } +} + +test_return_t mset_and_get_test(memcached_st *mc) +{ + memcached_storage_request_st req[MSET_COUNT]; + srand(time(NULL)); + + for (int i= 0; i < MSET_COUNT; i++) + { + char *key= (char *)malloc(BUFFER_SIZE); + if (not key) + { + printf("key cannot be allocated...\n"); + return TEST_FAILURE; + } + + char *value= (char *)malloc(BUFFER_SIZE); + if (not value) + { + printf("value cannot be allocated...\n"); + safe_free(key); + return TEST_FAILURE; + } + + memset(key, 0, BUFFER_SIZE); + sprintf(key, "MSET:mset-key-%d", i); + + memset(value, 0, BUFFER_SIZE); + sprintf(value, "mset-value-%d", i); + + req[i].key= key; + req[i].key_length= strlen(key); + + req[i].value= value; + req[i].value_length= strlen(value); + + req[i].expiration= EXPIRE_TIME; + req[i].flags= (uint32_t) rand(); + } + + memcached_return_t results[MSET_COUNT]; + memcached_return_t rc= memcached_mset(mc, req, MSET_COUNT, results); + printf("memcached_mset: rc is %s\n", memcached_strerror(mc, rc)); + + if (memcached_failed(rc)) + { + safe_free_req(req); + return TEST_FAILURE; + } + + for (int i= 0; i < MSET_COUNT; i++) + { + printf("memcached_mset: rc[%d] is %s\n", i, memcached_strerror(mc, results[i])); + + if (memcached_failed(results[i])) + { + safe_free_req(req); + return TEST_FAILURE; + } + } + + /*for (int i= 0; i < MSET_COUNT; i++) + { + rc= memcached_fetch_storage_result(mc, req[i].key, req[i].key_length); + printf("memcached_storage_fetch: rc[%d] is %s\n", i, memcached_strerror(mc, rc)); + + if (memcached_failed(rc)) + { + safe_free_req(req); + return TEST_FAILURE; + } + }*/ + + for (int i= 0; i < MSET_COUNT; i++) + { + size_t value_length= -1; + uint32_t flags= -1; + + char *value= memcached_get(mc, req[i].key, req[i].key_length, &value_length, &flags, &rc); + printf("memcached_get: rc[%d] is %s\n", i, memcached_strerror(mc, rc)); + + if (value == NULL || memcached_failed(rc)) + { + safe_free(value); + safe_free_req(req); + return TEST_FAILURE; + } + + printf("memcached_get: flags[%d] is %u\n", i, flags); + printf("memcached_get: value[%d] is %s\n", i, value); + + if (req[i].value_length != value_length) + { + printf("memcached_get: value_length[%d] is not equal... stored %ld but got %ld\n", i, req[i].value_length, value_length); + + safe_free(value); + safe_free_req(req); + return TEST_FAILURE; + } + if (strcmp(req[i].value, value)) + { + printf("memcached_get: value[%d] is not equal... stored %s but got %s\n", i, req[i].value, value); + + safe_free(value); + safe_free_req(req); + return TEST_FAILURE; + } + if (req[i].flags != flags) + { + printf("memcached_get: flags[%d] is not equal... stored %u but got %u\n", i, req[i].flags, flags); + + safe_free(value); + safe_free_req(req); + return TEST_FAILURE; + } + + safe_free(value); + } + + safe_free_req(req); + + return TEST_SUCCESS; +} + +/* + Test cases +*/ +test_st mset_tests[] ={ + {"mset_and_get_test", true, (test_callback_fn*)mset_and_get_test }, + {0, 0, 0} +}; + +collection_st collection[] ={ + {"mset_tests", 0, 0, mset_tests}, + {0, 0, 0, 0} +}; + +#define TEST_PORT_BASE MEMCACHED_DEFAULT_PORT +10 + +#include "tests/libmemcached_world.h" + +void get_world(Framework *world) +{ + world->collections= collection; + + world->_create= (test_callback_create_fn*)world_create; + world->_destroy= (test_callback_destroy_fn*)world_destroy; + + world->item._startup= (test_callback_fn*)world_test_startup; + world->item.set_pre((test_callback_fn*)world_pre_run); + world->item.set_flush((test_callback_fn*)world_flush); + world->item.set_post((test_callback_fn*)world_post_run); + world->_on_error= (test_callback_error_fn*)world_on_error; + + world->collection_startup= (test_callback_fn*)world_container_startup; + world->collection_shutdown= (test_callback_fn*)world_container_shutdown; + + world->set_runner(&default_libmemcached_runner); + + world->set_socket(); +} diff --git a/tests/storage.h b/tests/storage.h new file mode 100644 index 00000000..bdf7d74a --- /dev/null +++ b/tests/storage.h @@ -0,0 +1,15 @@ +#ifndef __TESTS_STORAGE_H__ +#define __TESTS_STORAGE_H__ + +#ifdef __cplusplus +extern "C" { +#endif + +LIBTEST_LOCAL +test_return_t mset_and_get_test(memcached_st *mc); + +#ifdef __cplusplus +} +#endif + +#endif /* __TESTS_STORAGE_H__ */