Skip to content

Commit

Permalink
FEATURE: Add mset API.
Browse files Browse the repository at this point in the history
  • Loading branch information
uhm0311 committed Aug 12, 2024
1 parent 5151f4a commit 5aa0549
Show file tree
Hide file tree
Showing 9 changed files with 432 additions and 10 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -102,5 +102,6 @@ tests/memrm
tests/memslap
tests/memstat
tests/sasl
tests/storage
unittests/unittests
devtools/*
25 changes: 25 additions & 0 deletions libmemcached/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,31 @@

#include <libmemcached/common.h>

memcached_return_t memcached_fetch_storage_result(memcached_st *ptr,
const char *key, const size_t key_length)
{
unlikely (not ptr or not key)
{
return MEMCACHED_INVALID_ARGUMENTS;
}
unlikely (ptr->flags.use_udp)
{
return MEMCACHED_NOT_SUPPORTED;
}

uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, key, key_length);
memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key);

char result[MEMCACHED_DEFAULT_COMMAND_SIZE];
memcached_return_t rc= memcached_read_one_response(instance, result, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);

if (rc == MEMCACHED_STORED)
{
return MEMCACHED_SUCCESS;
}
return rc;
}

char *memcached_fetch(memcached_st *ptr, char *key, size_t *key_length,
size_t *value_length,
uint32_t *flags,
Expand Down
6 changes: 5 additions & 1 deletion libmemcached/libmemcached_probes.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
*
*
* Libmemcached library
*
* Copyright (C) 2011 Data Differential, http://datadifferential.com/
Expand Down Expand Up @@ -111,6 +111,10 @@
#define LIBMEMCACHED_MEMCACHED_SET_END_ENABLED() (0)
#define LIBMEMCACHED_MEMCACHED_SET_START()
#define LIBMEMCACHED_MEMCACHED_SET_START_ENABLED() (0)
#define LIBMEMCACHED_MEMCACHED_MSET_END()
#define LIBMEMCACHED_MEMCACHED_MSET_END_ENABLED() (0)
#define LIBMEMCACHED_MEMCACHED_MSET_START()
#define LIBMEMCACHED_MEMCACHED_MSET_START_ENABLED() (0)

#endif

Expand Down
163 changes: 157 additions & 6 deletions libmemcached/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ enum memcached_storage_action_t {
CAS_OP
};

enum memcached_storage_send_type_t {
SINGLE,
MULTI
};

/* Inline this */
static inline const char *storage_op_string(memcached_storage_action_t verb)
{
Expand Down Expand Up @@ -121,6 +126,67 @@ static inline uint8_t get_com_code(memcached_storage_action_t verb, bool noreply
return ret;
}

static memcached_return_t before_storage_query(memcached_st *ptr,
const char *group_key, size_t group_key_length,
const memcached_storage_query_st* queries,
const size_t number_of_queries)
{
memcached_return_t rc= initialize_query(ptr);
if (memcached_failed(rc))
{
return rc;
}

if (ptr->flags.use_udp)
{
return memcached_set_error(*ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT);
}

if (queries == NULL)
{
return memcached_set_error(*ptr, MEMCACHED_NOTFOUND, MEMCACHED_AT,
memcached_literal_param("queries were null"));
}
if (number_of_queries == 0)
{
return memcached_set_error(*ptr, MEMCACHED_NOTFOUND, MEMCACHED_AT,
memcached_literal_param("number_of_queries were zero"));
}

if (group_key and group_key_length)
{
if (memcached_failed(memcached_key_test(*ptr, (const char * const *)&group_key, &group_key_length, 1)))
{
return memcached_set_error(*ptr, MEMCACHED_BAD_KEY_PROVIDED, MEMCACHED_AT,
memcached_literal_param("A bad group key was provided."));
}
}

/*
Here is where we pay for the non-block API. We need to remove any data sitting
in the queue before we start our store operations.
It might be optimum to bounce the connection if count > some number.
*/
for (uint32_t x= 0; x < memcached_server_count(ptr); x++)
{
memcached_server_write_instance_st instance=
memcached_server_instance_fetch(ptr, x);

if (memcached_server_response_count(instance))
{
char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];

if (ptr->flags.no_block)
(void)memcached_io_write(instance, NULL, 0, true);

while(memcached_server_response_count(instance))
(void)memcached_response(instance, buffer, sizeof(buffer), &ptr->result);
}
}
return MEMCACHED_SUCCESS;
}

static memcached_return_t memcached_send_binary(memcached_st *ptr,
const char *group_key,
size_t group_key_length,
Expand All @@ -131,7 +197,8 @@ static memcached_return_t memcached_send_binary(memcached_st *ptr,
time_t expiration,
uint32_t flags,
uint64_t cas,
memcached_storage_action_t verb)
memcached_storage_action_t verb,
memcached_storage_send_type_t send_type)
{
bool flush;
protocol_binary_request_set request= {};
Expand Down Expand Up @@ -227,7 +294,7 @@ static memcached_return_t memcached_send_binary(memcached_st *ptr,
return MEMCACHED_BUFFERED;
}

if (noreply)
if (send_type == MULTI || noreply)
{
return MEMCACHED_SUCCESS;
}
Expand Down Expand Up @@ -257,7 +324,8 @@ static memcached_return_t memcached_send_ascii(memcached_st *ptr,
const time_t expiration,
const uint32_t flags,
const uint64_t cas,
memcached_storage_action_t verb)
memcached_storage_action_t verb,
memcached_storage_send_type_t send_type)
{
char flags_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH +1];
int flags_buffer_length= snprintf(flags_buffer, sizeof(flags_buffer), " %u", flags);
Expand Down Expand Up @@ -349,7 +417,7 @@ static memcached_return_t memcached_send_ascii(memcached_st *ptr,

if (rc == MEMCACHED_SUCCESS)
{
if (ptr->flags.no_reply)
if (send_type == MULTI || ptr->flags.no_reply)
{
rc= (to_write == false) ? MEMCACHED_BUFFERED : MEMCACHED_SUCCESS;
}
Expand Down Expand Up @@ -415,19 +483,80 @@ static inline memcached_return_t memcached_send(memcached_st *ptr,
rc= memcached_send_binary(ptr, group_key, group_key_length,
key, key_length,
value, value_length, expiration,
flags, cas, verb);
flags, cas, verb, SINGLE);
}
else
{
rc= memcached_send_ascii(ptr, group_key, group_key_length,
key, key_length,
value, value_length, expiration,
flags, cas, verb);
flags, cas, verb, SINGLE);
}

return rc;
}

static memcached_return_t memcached_multi_send(memcached_st *ptr,
const char *group_key,
size_t group_key_length,
const memcached_storage_query_st* queries,
const size_t number_of_queries,
memcached_return_t *results,
memcached_storage_action_t verb)
{
arcus_server_check_for_update(ptr);

memcached_return_t rc;
if (memcached_failed(rc= before_storage_query(ptr, group_key, group_key_length, queries, number_of_queries)))
{
return rc;
}

if (memcached_server_count(ptr) > MAX_SERVERS_FOR_MULTI_KEY_OPERATION)
{
return memcached_set_error(*ptr, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT,
memcached_literal_param("memcached instances should be <= MAX_SERVERS_FOR_MULTI_KEY_OPERATION"));
}

bool is_group_key_set= group_key and group_key_length;

for (size_t i= 0; i < number_of_queries; i++)
{
if (memcached_failed(rc= memcached_validate_key_length(queries[i].key_length, ptr->flags.binary_protocol)))
{
results[i]= rc;
continue;
}
if (memcached_failed(rc= memcached_key_test(*ptr, &(queries[i].key), &(queries[i].key_length), 1)))
{
results[i]= rc;
continue;
}

char *safe_group_key= (is_group_key_set ? (char *)group_key : queries[i].key);
size_t safe_group_key_length= (is_group_key_set ? group_key_length : queries[i].key_length);
uint64_t safe_cas = verb == CAS_OP ? queries[i].cas : 0;

if (ptr->flags.binary_protocol)
{
results[i]= memcached_send_binary(ptr, safe_group_key, safe_group_key_length,
queries[i].key, queries[i].key_length,
queries[i].value, queries[i].value_length,
queries[i].expiration, queries[i].flags,
safe_cas, verb, MULTI);
}
else
{
results[i]= memcached_send_ascii(ptr, safe_group_key, safe_group_key_length,
queries[i].key, queries[i].key_length,
queries[i].value, queries[i].value_length,
queries[i].expiration, queries[i].flags,
safe_cas, verb, MULTI);
}
}

return MEMCACHED_SUCCESS;
}

memcached_return_t memcached_set(memcached_st *ptr, const char *key, size_t key_length,
const char *value, size_t value_length,
Expand Down Expand Up @@ -605,3 +734,25 @@ memcached_return_t memcached_cas_by_key(memcached_st *ptr,
return rc;
}

memcached_return_t memcached_mset(memcached_st *ptr,
const memcached_storage_query_st* queries,
const size_t number_of_queries,
memcached_return_t *results)
{
return memcached_mset_by_key(ptr, NULL, 0, queries, number_of_queries, results);
}

memcached_return_t memcached_mset_by_key(memcached_st *ptr,
const char *group_key,
size_t group_key_length,
const memcached_storage_query_st* queries,
const size_t number_of_queries,
memcached_return_t *results)
{
memcached_return_t rc;
LIBMEMCACHED_MEMCACHED_MSET_START();
rc= memcached_multi_send(ptr, group_key, group_key_length,
queries, number_of_queries, results, SET_OP);
LIBMEMCACHED_MEMCACHED_MSET_END();
return rc;
}
30 changes: 29 additions & 1 deletion libmemcached/storage.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
*
*
* Libmemcached library
*
* Copyright (C) 2011 Data Differential, http://datadifferential.com/
Expand Down Expand Up @@ -40,6 +40,16 @@

#include "libmemcached/memcached.h"

struct memcached_storage_query_st {
char *key;
size_t key_length;
char *value;
size_t value_length;
time_t expiration;
uint32_t flags;
uint64_t cas;
};

#ifdef __cplusplus
extern "C" {
#endif
Expand Down Expand Up @@ -129,6 +139,24 @@ memcached_return_t memcached_cas_by_key(memcached_st *ptr,
uint32_t flags,
uint64_t cas);

LIBMEMCACHED_API
memcached_return_t memcached_mset(memcached_st *ptr,
const memcached_storage_query_st* queries,
const size_t number_of_queries,
memcached_return_t *results);

LIBMEMCACHED_API
memcached_return_t memcached_mset_by_key(memcached_st *ptr,
const char *group_key,
size_t group_key_length,
const memcached_storage_query_st* queries,
const size_t number_of_queries,
memcached_return_t *results);

LIBMEMCACHED_API
memcached_return_t memcached_fetch_storage_result(memcached_st *ptr,
const char *key, const size_t key_length);

#ifdef __cplusplus
}
#endif
Expand Down
3 changes: 2 additions & 1 deletion libmemcached/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/
/* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
*
*
* Libmemcached library
*
* Copyright (C) 2011 Data Differential, http://datadifferential.com/
Expand Down Expand Up @@ -60,6 +60,7 @@ typedef struct memcached_stat_st memcached_stat_st;
typedef struct memcached_analysis_st memcached_analysis_st;
typedef struct memcached_result_st memcached_result_st;
typedef struct memcached_array_st memcached_array_st;
typedef struct memcached_storage_query_st memcached_storage_query_st;
typedef struct memcached_error_t memcached_error_t;

// All of the flavors of memcache_server_st
Expand Down
12 changes: 11 additions & 1 deletion tests/include.am
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ tests_internals_LDADD+= libtest/libtest.la
check_PROGRAMS+= tests/internals
noinst_PROGRAMS+= tests/internals

# Test storage
tests_storage_SOURCES= tests/storage.cc
tests_storage_CXXFLAGS = $(AM_CXXFLAGS) ${PTHREAD_CFLAGS}
tests_storage_DEPENDENCIES= libmemcachedinternal/libmemcachedinternal.la libtest/libtest.la libmemcachedinternal/libmemcachedutilinternal.la
tests_storage_LDADD= libmemcachedinternal/libmemcachedinternal.la
tests_storage_LDADD+= ${PTHREAD_LIBS}
tests_storage_LDADD+= libmemcachedinternal/libmemcachedutilinternal.la
tests_storage_LDADD+= libtest/libtest.la
check_PROGRAMS+= tests/storage
noinst_PROGRAMS+= tests/storage

tests_testapp_CXXFLAGS = $(AM_CXXFLAGS) ${PTHREAD_CFLAGS}
tests_testapp_CFLAGS= $(AM_CFLAGS) $(NO_CONVERSION) $(NO_STRICT_ALIASING)
Expand Down Expand Up @@ -239,7 +249,7 @@ tests_memdump_DEPENDENCIES= libtest/libtest.la $(TESTS_LDADDS)
tests_memdump_LDADD= $(tests_memdump_DEPENDENCIES)
check_PROGRAMS+= tests/memdump
noinst_PROGRAMS+= tests/memdump

# Test linking with C application
tests_c_test_SOURCES= tests/c_test.c
tests_c_test_CFLAGS= ${PTHREAD_CFLAGS}
Expand Down
Loading

0 comments on commit 5aa0549

Please sign in to comment.