Skip to content

Commit

Permalink
FEATURE: Add mset API.
Browse files Browse the repository at this point in the history
  • Loading branch information
uhm0311 committed Aug 12, 2024
1 parent 5151f4a commit 9fa7443
Show file tree
Hide file tree
Showing 9 changed files with 470 additions and 6 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -102,5 +102,6 @@ tests/memrm
tests/memslap
tests/memstat
tests/sasl
tests/storage
unittests/unittests
devtools/*
6 changes: 5 additions & 1 deletion libmemcached/libmemcached_probes.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
*
*
* Libmemcached library
*
* Copyright (C) 2011 Data Differential, http://datadifferential.com/
Expand Down Expand Up @@ -111,6 +111,10 @@
#define LIBMEMCACHED_MEMCACHED_SET_END_ENABLED() (0)
#define LIBMEMCACHED_MEMCACHED_SET_START()
#define LIBMEMCACHED_MEMCACHED_SET_START_ENABLED() (0)
#define LIBMEMCACHED_MEMCACHED_MSET_END()
#define LIBMEMCACHED_MEMCACHED_MSET_END_ENABLED() (0)
#define LIBMEMCACHED_MEMCACHED_MSET_START()
#define LIBMEMCACHED_MEMCACHED_MSET_START_ENABLED() (0)

#endif

Expand Down
1 change: 1 addition & 0 deletions libmemcached/memcached.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ struct memcached_st {
bool no_block:1; // Don't block
bool no_reply:1;
bool piped:1;
bool multi_send:1;
bool randomize_replica_read:1;
bool support_cas:1;
bool tcp_nodelay:1;
Expand Down
225 changes: 223 additions & 2 deletions libmemcached/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,67 @@ static inline uint8_t get_com_code(memcached_storage_action_t verb, bool noreply
return ret;
}

static memcached_return_t before_storage_query(memcached_st *ptr,
const char *group_key, size_t group_key_length,
const memcached_storage_request_st *req,
const size_t number_of_req)
{
memcached_return_t rc= initialize_query(ptr);
if (memcached_failed(rc))
{
return rc;
}

if (ptr->flags.use_udp)
{
return memcached_set_error(*ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT);
}

if (req == NULL)
{
return memcached_set_error(*ptr, MEMCACHED_NOTFOUND, MEMCACHED_AT,
memcached_literal_param("req were null"));
}
if (number_of_req == 0)
{
return memcached_set_error(*ptr, MEMCACHED_NOTFOUND, MEMCACHED_AT,
memcached_literal_param("number_of_req were zero"));
}

if (group_key and group_key_length)
{
if (memcached_failed(memcached_key_test(*ptr, (const char * const *)&group_key, &group_key_length, 1)))
{
return memcached_set_error(*ptr, MEMCACHED_BAD_KEY_PROVIDED, MEMCACHED_AT,
memcached_literal_param("A bad group key was provided."));
}
}

/*
Here is where we pay for the non-block API. We need to remove any data sitting
in the queue before we start our store operations.
It might be optimum to bounce the connection if count > some number.
*/
for (uint32_t x= 0; x < memcached_server_count(ptr); x++)
{
memcached_server_write_instance_st instance=
memcached_server_instance_fetch(ptr, x);

if (memcached_server_response_count(instance))
{
char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];

if (ptr->flags.no_block)
(void)memcached_io_write(instance, NULL, 0, true);

while(memcached_server_response_count(instance))
(void)memcached_response(instance, buffer, sizeof(buffer), &ptr->result);
}
}
return MEMCACHED_SUCCESS;
}

static memcached_return_t memcached_send_binary(memcached_st *ptr,
const char *group_key,
size_t group_key_length,
Expand Down Expand Up @@ -227,7 +288,7 @@ static memcached_return_t memcached_send_binary(memcached_st *ptr,
return MEMCACHED_BUFFERED;
}

if (noreply)
if (ptr->flags.multi_send || noreply)
{
return MEMCACHED_SUCCESS;
}
Expand Down Expand Up @@ -349,7 +410,7 @@ static memcached_return_t memcached_send_ascii(memcached_st *ptr,

if (rc == MEMCACHED_SUCCESS)
{
if (ptr->flags.no_reply)
if (ptr->flags.multi_send || ptr->flags.no_reply)
{
rc= (to_write == false) ? MEMCACHED_BUFFERED : MEMCACHED_SUCCESS;
}
Expand Down Expand Up @@ -428,6 +489,144 @@ static inline memcached_return_t memcached_send(memcached_st *ptr,
return rc;
}

static memcached_return_t memcached_fetch_storage_result(memcached_st *ptr,
const char *key,
const size_t key_length)
{
unlikely (not ptr or not key)
{
return MEMCACHED_INVALID_ARGUMENTS;
}
unlikely (ptr->flags.use_udp)
{
return MEMCACHED_NOT_SUPPORTED;
}

uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, key, key_length);
memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key);

char result[MEMCACHED_DEFAULT_COMMAND_SIZE];
memcached_return_t rc= memcached_read_one_response(instance, result, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);

if (rc == MEMCACHED_STORED)
{
return MEMCACHED_SUCCESS;
}
return rc;
}

static memcached_return_t memcached_multi_send(memcached_st *ptr,
const char *group_key,
size_t group_key_length,
const memcached_storage_request_st *req,
const size_t number_of_req,
memcached_storage_action_t verb,
memcached_return_t *results)
{
arcus_server_check_for_update(ptr);

memcached_return_t rc;
if (memcached_failed(rc= before_storage_query(ptr, group_key, group_key_length, req, number_of_req)))
{
return rc;
}

bool is_group_key_set= group_key and group_key_length;
ptr->flags.multi_send= true;

#ifdef ENABLE_REPLICATION
memcached_storage_request_st **resend_req_ptr= (memcached_storage_request_st **) malloc(sizeof(memcached_storage_request_st *) * number_of_req);
size_t *resend_indices= (size_t*) malloc(sizeof(size_t) * number_of_req);
size_t number_of_resend_req= 0;
#endif
for (size_t i= 0; i < number_of_req; i++)
{
if (memcached_failed(rc= memcached_validate_key_length(req[i].key_length, ptr->flags.binary_protocol)))
{
results[i]= rc;
continue;
}
if (memcached_failed(rc= memcached_key_test(*ptr, &(req[i].key), &(req[i].key_length), 1)))
{
results[i]= rc;
continue;
}

char *safe_group_key= (is_group_key_set ? (char *)group_key : req[i].key);
size_t safe_group_key_length= (is_group_key_set ? group_key_length : req[i].key_length);
uint64_t safe_cas = verb == CAS_OP ? req[i].cas : 0;

if (ptr->flags.binary_protocol)
{
results[i]= memcached_send_binary(ptr, safe_group_key, safe_group_key_length,
req[i].key, req[i].key_length,
req[i].value, req[i].value_length,
req[i].expiration, req[i].flags,
safe_cas, verb);
}
else
{
results[i]= memcached_send_ascii(ptr, safe_group_key, safe_group_key_length,
req[i].key, req[i].key_length,
req[i].value, req[i].value_length,
req[i].expiration, req[i].flags,
safe_cas, verb);
}
}

for (size_t i= 0; i < number_of_req; i++)
{
if (memcached_failed(results[i]))
{
continue;
}

char *safe_group_key= (is_group_key_set ? (char *)group_key : req[i].key);
size_t safe_group_key_length= (is_group_key_set ? group_key_length : req[i].key_length);

results[i]= memcached_fetch_storage_result(ptr, safe_group_key, safe_group_key_length);
#ifdef ENABLE_REPLICATION
if (results[i] == MEMCACHED_SWITCHOVER or results[i] == MEMCACHED_REPL_SLAVE)
{
resend_req_ptr[number_of_resend_req]= (memcached_storage_request_st *) &req[i];
resend_indices[number_of_resend_req]= i;
number_of_resend_req++;
}
#endif
}

#ifdef ENABLE_REPLICATION
if (number_of_resend_req == number_of_req)
{
rc= memcached_multi_send(ptr, group_key, group_key_length, req, number_of_req, verb, results);
}
else if (number_of_resend_req > 0)
{
memcached_storage_request_st *resend_req= (memcached_storage_request_st *) malloc(sizeof(memcached_storage_request_st) * number_of_resend_req);
for (size_t i= 0; i < number_of_resend_req; i++)
{
resend_req[i]= *(resend_req_ptr[i]);
}

memcached_return_t *resend_results= (memcached_return_t *) malloc(sizeof(memcached_return_t) * number_of_resend_req);
rc= memcached_multi_send(ptr, group_key, group_key_length, resend_req, number_of_resend_req, verb, resend_results);

for (size_t i= 0; i < number_of_resend_req; i++)
{
results[resend_indices[i]]= resend_results[i];
}

free(resend_results);
free(resend_req);
}

free(resend_indices);
free(resend_req_ptr);
#endif

ptr->flags.multi_send= false;
return MEMCACHED_SUCCESS;
}

memcached_return_t memcached_set(memcached_st *ptr, const char *key, size_t key_length,
const char *value, size_t value_length,
Expand Down Expand Up @@ -605,3 +804,25 @@ memcached_return_t memcached_cas_by_key(memcached_st *ptr,
return rc;
}

memcached_return_t memcached_mset(memcached_st *ptr,
const memcached_storage_request_st *req,
const size_t number_of_req,
memcached_return_t *results)
{
return memcached_mset_by_key(ptr, NULL, 0, req, number_of_req, results);
}

memcached_return_t memcached_mset_by_key(memcached_st *ptr,
const char *group_key,
size_t group_key_length,
const memcached_storage_request_st *req,
const size_t number_of_req,
memcached_return_t *results)
{
memcached_return_t rc;
LIBMEMCACHED_MEMCACHED_MSET_START();
rc= memcached_multi_send(ptr, group_key, group_key_length,
req, number_of_req, SET_OP, results);
LIBMEMCACHED_MEMCACHED_MSET_END();
return rc;
}
26 changes: 25 additions & 1 deletion libmemcached/storage.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
*
*
* Libmemcached library
*
* Copyright (C) 2011 Data Differential, http://datadifferential.com/
Expand Down Expand Up @@ -40,6 +40,16 @@

#include "libmemcached/memcached.h"

struct memcached_storage_request_st {
char *key;
size_t key_length;
char *value;
size_t value_length;
time_t expiration;
uint32_t flags;
uint64_t cas;
};

#ifdef __cplusplus
extern "C" {
#endif
Expand Down Expand Up @@ -129,6 +139,20 @@ memcached_return_t memcached_cas_by_key(memcached_st *ptr,
uint32_t flags,
uint64_t cas);

LIBMEMCACHED_API
memcached_return_t memcached_mset(memcached_st *ptr,
const memcached_storage_request_st *req,
const size_t number_of_req,
memcached_return_t *results);

LIBMEMCACHED_API
memcached_return_t memcached_mset_by_key(memcached_st *ptr,
const char *group_key,
size_t group_key_length,
const memcached_storage_request_st *req,
const size_t number_of_req,
memcached_return_t *results);

#ifdef __cplusplus
}
#endif
Expand Down
3 changes: 2 additions & 1 deletion libmemcached/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/
/* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
*
*
* Libmemcached library
*
* Copyright (C) 2011 Data Differential, http://datadifferential.com/
Expand Down Expand Up @@ -60,6 +60,7 @@ typedef struct memcached_stat_st memcached_stat_st;
typedef struct memcached_analysis_st memcached_analysis_st;
typedef struct memcached_result_st memcached_result_st;
typedef struct memcached_array_st memcached_array_st;
typedef struct memcached_storage_request_st memcached_storage_request_st;
typedef struct memcached_error_t memcached_error_t;

// All of the flavors of memcache_server_st
Expand Down
12 changes: 11 additions & 1 deletion tests/include.am
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ tests_internals_LDADD+= libtest/libtest.la
check_PROGRAMS+= tests/internals
noinst_PROGRAMS+= tests/internals

# Test storage
tests_storage_SOURCES= tests/storage.cc
tests_storage_CXXFLAGS = $(AM_CXXFLAGS) ${PTHREAD_CFLAGS}
tests_storage_DEPENDENCIES= libmemcachedinternal/libmemcachedinternal.la libtest/libtest.la libmemcachedinternal/libmemcachedutilinternal.la
tests_storage_LDADD= libmemcachedinternal/libmemcachedinternal.la
tests_storage_LDADD+= ${PTHREAD_LIBS}
tests_storage_LDADD+= libmemcachedinternal/libmemcachedutilinternal.la
tests_storage_LDADD+= libtest/libtest.la
check_PROGRAMS+= tests/storage
noinst_PROGRAMS+= tests/storage

tests_testapp_CXXFLAGS = $(AM_CXXFLAGS) ${PTHREAD_CFLAGS}
tests_testapp_CFLAGS= $(AM_CFLAGS) $(NO_CONVERSION) $(NO_STRICT_ALIASING)
Expand Down Expand Up @@ -239,7 +249,7 @@ tests_memdump_DEPENDENCIES= libtest/libtest.la $(TESTS_LDADDS)
tests_memdump_LDADD= $(tests_memdump_DEPENDENCIES)
check_PROGRAMS+= tests/memdump
noinst_PROGRAMS+= tests/memdump

# Test linking with C application
tests_c_test_SOURCES= tests/c_test.c
tests_c_test_CFLAGS= ${PTHREAD_CFLAGS}
Expand Down
Loading

0 comments on commit 9fa7443

Please sign in to comment.