Skip to content

Commit

Permalink
Multithreading via C++ thread pool of clients (#125)
Browse files Browse the repository at this point in the history
  • Loading branch information
kentslaney authored Jun 5, 2024
1 parent d32820f commit fb03c5d
Show file tree
Hide file tree
Showing 16 changed files with 977 additions and 84 deletions.
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ set(CMAKE_MACOSX_RPATH 1)

set (MC_VERSION_MAJOR 1)
set (MC_VERSION_MINOR 4)
set (MC_VERSION_PATCH 1)
set (MC_VERSION_PATCH 4)

set (MC_VERSION ${MC_VERSION_MAJOR}.${MC_VERSION_MINOR})
set (MC_APIVERSION ${MC_VERSION}.${MC_VERSION_PATCH})
Expand All @@ -15,7 +15,7 @@ if (NOT CMAKE_BUILD_TYPE)
set(CMAKE_BUILD_TYPE "Release" CACHE STRING "Choose the type of build, options are: Debug Release." FORCE)
endif (NOT CMAKE_BUILD_TYPE)

set(CMAKE_CXX_FLAGS_COMMON "-Wall -fno-rtti -fno-exceptions")
set(CMAKE_CXX_FLAGS_COMMON "-Wall -fno-rtti -fno-exceptions -std=c++17")
set(CMAKE_CXX_FLAGS_DEBUG "-DDEBUG -g2 ${CMAKE_CXX_FLAGS_COMMON}" CACHE STRING "CXX DEBUG FLAGS" FORCE)
set(CMAKE_CXX_FLAGS_RELEASE "-DNDEBUG -O3 ${CMAKE_CXX_FLAGS_COMMON}" CACHE STRING "CXX RELEASE FLAGS" FORCE)
set(CMAKE_INSTALL_INCLUDE include CACHE PATH "Output directory for header files")
Expand Down
106 changes: 106 additions & 0 deletions include/ClientPool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#pragma once

#include <shared_mutex>
#include <iterator>
#include <array>

#include "Client.h"
#include "LockPool.h"

namespace douban {
namespace mc {

template <size_t N>
void duplicate_strings(const char* const * strs, const size_t n,
std::deque<std::array<char, N> >& out, std::vector<char*>& refs) {
out.resize(n);
refs.resize(n);
for (size_t i = 0; i < n; i++) {
if (strs == NULL || strs[i] == NULL) {
out[i][0] = '\0';
refs[i] = NULL;
continue;
}
std::snprintf(out[i].data(), N, "%s", strs[i]);
refs[i] = out[i].data();
}
}

class irange {
int i;

public:
using value_type = int;
using pointer = const int*;
using reference = const int&;
using difference_type = int;
using iterator_category = std::random_access_iterator_tag;

explicit irange(int i) : i(i) {}

reference operator*() const { return i; }
pointer operator->() const { return &i; }
value_type operator[](int n) const { return i + n; }
friend bool operator< (const irange& lhs, const irange& rhs) { return lhs.i < rhs.i; }
friend bool operator> (const irange& lhs, const irange& rhs) { return rhs < lhs; }
friend bool operator<=(const irange& lhs, const irange& rhs) { return !(lhs > rhs); }
friend bool operator>=(const irange& lhs, const irange& rhs) { return !(lhs < rhs); }
friend bool operator==(const irange& lhs, const irange& rhs) { return lhs.i == rhs.i; }
friend bool operator!=(const irange& lhs, const irange& rhs) { return !(lhs == rhs); }
irange& operator++() { ++i; return *this; }
irange& operator--() { --i; return *this; }
irange operator++(int) { irange tmp = *this; ++tmp; return tmp; }
irange operator--(int) { irange tmp = *this; --tmp; return tmp; }
irange& operator+=(difference_type n) { i += n; return *this; }
irange& operator-=(difference_type n) { i -= n; return *this; }
friend irange operator+(const irange& lhs, difference_type n) { irange tmp = lhs; tmp += n; return tmp; }
friend irange operator+(difference_type n, const irange& rhs) { return rhs + n; }
friend irange operator-(const irange& lhs, difference_type n) { irange tmp = lhs; tmp -= n; return tmp; }
friend difference_type operator-(const irange& lhs, const irange& rhs) { return lhs.i - rhs.i; }
};

typedef struct {
Client c;
int index;
} IndexedClient;

class ClientPool : LockPool {
public:
ClientPool();
~ClientPool();
void config(config_options_t opt, int val);
int init(const char* const * hosts, const uint32_t* ports,
const size_t n, const char* const * aliases = NULL);
int updateServers(const char* const * hosts, const uint32_t* ports,
const size_t n, const char* const * aliases = NULL);
IndexedClient* _acquire();
void _release(const IndexedClient* idx);
Client* acquire();
void release(const Client* ref);

private:
int growPool(size_t by);
int setup(Client* c);
inline bool shouldGrowUnsafe();
int autoGrow();

bool m_opt_changed[CLIENT_CONFIG_OPTION_COUNT];
int m_opt_value[CLIENT_CONFIG_OPTION_COUNT];
std::deque<IndexedClient> m_clients;
size_t m_initial_clients;
size_t m_max_clients;
size_t m_max_growth;

std::deque<std::array<char, MC_NI_MAXHOST> > m_hosts_data;
std::deque<std::array<char, MC_NI_MAXHOST + 1 + MC_NI_MAXSERV> > m_aliases_data;
std::vector<uint32_t> m_ports;

std::vector<char*> m_hosts;
std::vector<char*> m_aliases;

std::mutex m_pool_lock;
mutable std::shared_mutex m_acquiring_growth;
};

} // namespace mc
} // namespace douban
14 changes: 12 additions & 2 deletions include/Export.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,21 @@


typedef enum {
CFG_POLL_TIMEOUT,
// Client config options
CFG_POLL_TIMEOUT = 0,
CFG_CONNECT_TIMEOUT,
CFG_RETRY_TIMEOUT,
CFG_HASH_FUNCTION,
CFG_MAX_RETRIES
CFG_MAX_RETRIES,
CFG_SET_FAILOVER,

// type separator to track number of Client config options to save
CLIENT_CONFIG_OPTION_COUNT,

// ClientPool config options
CFG_INITIAL_CLIENTS,
CFG_MAX_CLIENTS,
CFG_MAX_GROWTH
} config_options_t;


Expand Down
100 changes: 100 additions & 0 deletions include/LockPool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#pragma once

#include <mutex>
#include <condition_variable>
#include <queue>
#include <deque>
#include <vector>
#include <atomic>

namespace douban {
namespace mc {

// https://stackoverflow.com/a/14792685/3476782
class OrderedLock {
std::queue<std::condition_variable*> m_fifo_locks;
protected:
std::mutex m_fifo_access;
std::atomic<bool> m_locked;

protected:
OrderedLock() : m_locked(true) {};
std::unique_lock<std::mutex> lock() {
std::unique_lock<std::mutex> acquire(m_fifo_access);
if (m_locked) {
std::condition_variable signal;
m_fifo_locks.emplace(&signal);
signal.wait(acquire);
m_fifo_locks.pop();
} else {
m_locked = true;
}
return acquire;
}

void unlock() {
if (m_fifo_locks.empty()) {
m_locked = false;
} else {
m_fifo_locks.front()->notify_all();
}
}
};

class LockPool : public OrderedLock {
std::deque<size_t> m_available;
std::list<std::mutex*> m_muxes;
std::list<std::mutex*> m_mux_mallocs;

protected:
std::deque<std::mutex*> m_thread_workers;

LockPool() {}
~LockPool() {
std::lock_guard<std::mutex> freeing(m_fifo_access);
for (auto worker : m_thread_workers) {
std::lock_guard<std::mutex> freeing_worker(*worker);
}
for (auto mem : m_muxes) {
mem->std::mutex::~mutex();
}
for (auto mem : m_mux_mallocs) {
delete[] mem;
}
}

void addWorkers(size_t n) {
std::unique_lock<std::mutex> growing_pool(m_fifo_access);
const auto from = m_thread_workers.size();
const auto muxes = new std::mutex[n];
m_mux_mallocs.push_back(muxes);
for (size_t i = 0; i < n; i++) {
m_available.push_back(from + i);
m_muxes.push_back(&muxes[i]);
}
// static_cast needed for some versions of C++
std::transform(
muxes, muxes + n, std::back_inserter(m_thread_workers),
static_cast<std::mutex*(*)(std::mutex&)>(std::addressof<std::mutex>));
unlock();
}

int acquireWorker() {
auto fifo_lock = lock();
const auto res = m_available.front();
m_available.pop_front();
if (!m_available.empty()) {
unlock();
}
return res;
}

void releaseWorker(int worker) {
std::unique_lock<std::mutex> growing_pool(m_fifo_access);
m_available.push_front(worker);
unlock();
}
};

} // namespace mc
} // namespace douban
41 changes: 37 additions & 4 deletions libmc/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
import os
import functools
from ._client import (
PyClient, ThreadUnsafe,
encode_value,
decode_value,
PyClientPool, PyClientUnsafe as ClientUnsafe,

MC_DEFAULT_EXPTIME,
MC_POLL_TIMEOUT,
MC_CONNECT_TIMEOUT,
MC_RETRY_TIMEOUT,
MC_SET_FAILOVER,
MC_INITIAL_CLIENTS,
MC_MAX_CLIENTS,
MC_MAX_GROWTH,

MC_HASH_MD5,
MC_HASH_FNV1_32,
Expand All @@ -27,25 +33,52 @@
__file__ as _libmc_so_file
)

__VERSION__ = "1.4.3"
__version__ = "v1.4.3"
__VERSION__ = "1.4.4"
__version__ = "v1.4.4"
__author__ = "mckelvin"
__email__ = "[email protected]"
__date__ = "Fri Dec 1 07:43:12 2023 +0800"
__date__ = "Sat Jun 1 05:10:05 2024 +0800"


class Client(PyClient):
pass

class ClientPool(PyClientPool):
pass

class ThreadedClient:
def __init__(self, *args, **kwargs):
self._client_pool = ClientPool(*args, **kwargs)

def update_servers(self, servers):
return self._client_pool.update_servers(servers)

def config(self, opt, val):
self._client_pool.config(opt, val)

def __getattr__(self, key):
if not hasattr(Client, key):
raise AttributeError
result = getattr(Client, key)
if callable(result):
@functools.wraps(result)
def wrapper(*args, **kwargs):
with self._client_pool.client() as mc:
return getattr(mc, key)(*args, **kwargs)
return wrapper
return result


DYNAMIC_LIBRARIES = [os.path.abspath(_libmc_so_file)]


__all__ = [
'Client', 'ThreadUnsafe', '__VERSION__', 'encode_value', 'decode_value',
'ClientUnsafe', 'ClientPool', 'ThreadedClient',

'MC_DEFAULT_EXPTIME', 'MC_POLL_TIMEOUT', 'MC_CONNECT_TIMEOUT',
'MC_RETRY_TIMEOUT',
'MC_RETRY_TIMEOUT', 'MC_SET_FAILOVER', 'MC_INITIAL_CLIENTS',
'MC_MAX_CLIENTS', 'MC_MAX_GROWTH',

'MC_HASH_MD5', 'MC_HASH_FNV1_32', 'MC_HASH_FNV1A_32', 'MC_HASH_CRC_32',

Expand Down
Loading

0 comments on commit fb03c5d

Please sign in to comment.