Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support timedlock of fast/hook pthread and bthread::Mutex #2760

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions src/bthread/butex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,6 @@ inline bvar::Adder<int64_t>& butex_waiter_count() {
}
#endif

// If a thread would suspend for less than so many microseconds, return
// ETIMEDOUT directly.
// Use 1: sleeping for less than 2 microsecond is inefficient and useless.
static const int64_t MIN_SLEEP_US = 2;

enum WaiterState {
WAITER_STATE_NONE,
WAITER_STATE_READY,
Expand Down
5 changes: 5 additions & 0 deletions src/bthread/butex.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@

namespace bthread {

// If a thread would suspend for less than so many microseconds, return
// ETIMEDOUT directly.
// Use 1: sleeping for less than 2 microsecond is inefficient and useless.
static const int64_t MIN_SLEEP_US = 2;

// Create a butex which is a futex-like 32-bit primitive for synchronizing
// bthreads/pthreads.
// Returns a pointer to 32-bit data, NULL on failure.
Expand Down
154 changes: 124 additions & 30 deletions src/bthread/mutex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

// Date: Sun Aug 3 12:46:15 CST 2014

#include <sys/cdefs.h>
#include <pthread.h>
#include <dlfcn.h> // dlsym
#include <fcntl.h> // O_RDONLY
Expand Down Expand Up @@ -47,9 +48,9 @@
#include "bthread/processor.h"
#include "bthread/task_group.h"

extern "C" {
__BEGIN_DECLS
extern void* BAIDU_WEAK _dl_sym(void* handle, const char* symbol, void* caller);
}
__END_DECLS

namespace bthread {

Expand Down Expand Up @@ -391,6 +392,13 @@ int first_sys_pthread_mutex_unlock(pthread_mutex_t* mutex);
static MutexOp sys_pthread_mutex_lock = first_sys_pthread_mutex_lock;
static MutexOp sys_pthread_mutex_trylock = first_sys_pthread_mutex_trylock;
static MutexOp sys_pthread_mutex_unlock = first_sys_pthread_mutex_unlock;
#if HAS_PTHREAD_MUTEX_TIMEDLOCK
typedef int (*TimedMutexOp)(pthread_mutex_t*, const struct timespec*);
int first_sys_pthread_mutex_timedlock(pthread_mutex_t* mutex,
const struct timespec* __abstime);
static TimedMutexOp sys_pthread_mutex_timedlock = first_sys_pthread_mutex_timedlock;
#endif // HAS_PTHREAD_MUTEX_TIMEDLOCK

static pthread_once_t init_sys_mutex_lock_once = PTHREAD_ONCE_INIT;

// dlsym may call malloc to allocate space for dlerror and causes contention
Expand Down Expand Up @@ -438,11 +446,18 @@ static void init_sys_mutex_lock() {
RTLD_NEXT, "pthread_mutex_unlock", (void*)init_sys_mutex_lock);
sys_pthread_mutex_trylock = (MutexOp)_dl_sym(
RTLD_NEXT, "pthread_mutex_trylock", (void*)init_sys_mutex_lock);
#if HAS_PTHREAD_MUTEX_TIMEDLOCK
sys_pthread_mutex_timedlock = (TimedMutexOp)_dl_sym(
RTLD_NEXT, "pthread_mutex_timedlock", (void*)init_sys_mutex_lock);
#endif // HAS_PTHREAD_MUTEX_TIMEDLOCK
} else {
// _dl_sym may be undefined reference in some system, fallback to dlsym
sys_pthread_mutex_lock = (MutexOp)dlsym(RTLD_NEXT, "pthread_mutex_lock");
sys_pthread_mutex_unlock = (MutexOp)dlsym(RTLD_NEXT, "pthread_mutex_unlock");
sys_pthread_mutex_trylock = (MutexOp)dlsym(RTLD_NEXT, "pthread_mutex_trylock");
#if HAS_PTHREAD_MUTEX_TIMEDLOCK
sys_pthread_mutex_timedlock = (TimedMutexOp)dlsym(RTLD_NEXT, "pthread_mutex_timedlock");
#endif // HAS_PTHREAD_MUTEX_TIMEDLOCK
}
#elif defined(OS_MACOSX)
// TODO: look workaround for dlsym on mac
Expand All @@ -465,6 +480,14 @@ int first_sys_pthread_mutex_trylock(pthread_mutex_t* mutex) {
return sys_pthread_mutex_trylock(mutex);
}

#if HAS_PTHREAD_MUTEX_TIMEDLOCK
int first_sys_pthread_mutex_timedlock(pthread_mutex_t* mutex,
const struct timespec* abstime) {
pthread_once(&init_sys_mutex_lock_once, init_sys_mutex_lock);
return sys_pthread_mutex_timedlock(mutex, abstime);
}
#endif // HAS_PTHREAD_MUTEX_TIMEDLOCK

int first_sys_pthread_mutex_unlock(pthread_mutex_t* mutex) {
pthread_once(&init_sys_mutex_lock_once, init_sys_mutex_lock);
return sys_pthread_mutex_unlock(mutex);
Expand Down Expand Up @@ -499,7 +522,7 @@ void CheckBthreadScheSafety() {
true, butil::memory_order_relaxed))) {
butil::debug::StackTrace trace(true);
// It can only be checked once because the counter is messed up.
LOG(ERROR) << "bthread is suspended while holding"
LOG(ERROR) << "bthread is suspended while holding "
<< tls_pthread_lock_count << " pthread locks."
<< std::endl << trace.ToString();
}
Expand Down Expand Up @@ -610,10 +633,27 @@ void submit_contention(const bthread_contention_site_t& csite, int64_t now_ns) {

namespace internal {
#ifndef NO_PTHREAD_MUTEX_HOOK
BUTIL_FORCE_INLINE int pthread_mutex_lock_internal(pthread_mutex_t* mutex) {
++bthread::tls_pthread_lock_count;
return sys_pthread_mutex_lock(mutex);
#if HAS_PTHREAD_MUTEX_TIMEDLOCK
BUTIL_FORCE_INLINE int pthread_mutex_lock_internal(pthread_mutex_t* mutex,
const struct timespec* abstime) {
int rc = NULL == abstime ?
sys_pthread_mutex_lock(mutex) :
sys_pthread_mutex_timedlock(mutex, abstime);
if (0 == rc) {
++tls_pthread_lock_count;
}
return rc;
}
#else
BUTIL_FORCE_INLINE int pthread_mutex_lock_internal(pthread_mutex_t* mutex,
const struct timespec*/* Not supported */) {
int rc = sys_pthread_mutex_lock(mutex);
if (0 == rc) {
++tls_pthread_lock_count;
}
return rc;
}
#endif // HAS_PTHREAD_MUTEX_TIMEDLOCK

BUTIL_FORCE_INLINE int pthread_mutex_trylock_internal(pthread_mutex_t* mutex) {
int rc = sys_pthread_mutex_trylock(mutex);
Expand All @@ -627,11 +667,16 @@ BUTIL_FORCE_INLINE int pthread_mutex_unlock_internal(pthread_mutex_t* mutex) {
--tls_pthread_lock_count;
return sys_pthread_mutex_unlock(mutex);
}
#endif
#endif // NO_PTHREAD_MUTEX_HOOK

BUTIL_FORCE_INLINE int pthread_mutex_lock_internal(FastPthreadMutex* mutex) {
mutex->lock();
return 0;
BUTIL_FORCE_INLINE int pthread_mutex_lock_internal(FastPthreadMutex* mutex,
const struct timespec* abstime) {
if (NULL == abstime) {
mutex->lock();
return 0;
} else {
return mutex->timed_lock(abstime) ? 0 : errno;
}
}

BUTIL_FORCE_INLINE int pthread_mutex_trylock_internal(FastPthreadMutex* mutex) {
Expand All @@ -644,13 +689,13 @@ BUTIL_FORCE_INLINE int pthread_mutex_unlock_internal(FastPthreadMutex* mutex) {
}

template <typename Mutex>
BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(Mutex* mutex) {
BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(Mutex* mutex, const struct timespec* abstime) {
// Don't change behavior of lock when profiler is off.
if (!g_cp ||
// collecting code including backtrace() and submit() may call
// pthread_mutex_lock and cause deadlock. Don't sample.
tls_inside_lock) {
return pthread_mutex_lock_internal(mutex);
return pthread_mutex_lock_internal(mutex, abstime);
}
// Don't slow down non-contended locks.
int rc = pthread_mutex_trylock_internal(mutex);
Expand All @@ -673,16 +718,16 @@ BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(Mutex* mutex) {
csite = &entry.csite;
if (!sampling_range) {
make_contention_site_invalid(&entry.csite);
return pthread_mutex_lock_internal(mutex);
return pthread_mutex_lock_internal(mutex, abstime);
}
}
#endif
if (!sampling_range) { // don't sample
return pthread_mutex_lock_internal(mutex);
return pthread_mutex_lock_internal(mutex, abstime);
}
// Lock and monitor the waiting time.
const int64_t start_ns = butil::cpuwide_time_ns();
rc = pthread_mutex_lock_internal(mutex);
rc = pthread_mutex_lock_internal(mutex, abstime);
if (!rc) { // Inside lock
if (!csite) {
csite = add_pthread_contention_site(mutex);
Expand Down Expand Up @@ -748,13 +793,20 @@ BUTIL_FORCE_INLINE int pthread_mutex_unlock_impl(Mutex* mutex) {

#ifndef NO_PTHREAD_MUTEX_HOOK
BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(pthread_mutex_t* mutex) {
return internal::pthread_mutex_lock_impl(mutex);
return internal::pthread_mutex_lock_impl(mutex, NULL);
}

BUTIL_FORCE_INLINE int pthread_mutex_trylock_impl(pthread_mutex_t* mutex) {
return internal::pthread_mutex_trylock_impl(mutex);
}

#if HAS_PTHREAD_MUTEX_TIMEDLOCK
BUTIL_FORCE_INLINE int pthread_mutex_timedlock_impl(pthread_mutex_t* mutex,
const struct timespec* abstime) {
return internal::pthread_mutex_lock_impl(mutex, abstime);
}
#endif // HAS_PTHREAD_MUTEX_TIMEDLOCK

BUTIL_FORCE_INLINE int pthread_mutex_unlock_impl(pthread_mutex_t* mutex) {
return internal::pthread_mutex_unlock_impl(mutex);
}
Expand All @@ -779,8 +831,7 @@ BAIDU_CASSERT(sizeof(unsigned) == sizeof(MutexInternal),

const int MAX_SPIN_ITER = 4;

inline int mutex_lock_contended_impl(
bthread_mutex_t* m, const struct timespec* __restrict abstime) {
inline int mutex_lock_contended_impl(bthread_mutex_t* m, const struct timespec* abstime) {
// When a bthread first contends for a lock, active spinning makes sense.
// Spin only few times and only if local `rq' is empty.
TaskGroup* g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
Expand Down Expand Up @@ -819,22 +870,41 @@ inline int mutex_lock_contended_impl(
#ifdef BTHREAD_USE_FAST_PTHREAD_MUTEX
namespace internal {

int FastPthreadMutex::lock_contended() {
butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)&_futex;
int FastPthreadMutex::lock_contended(const struct timespec* abstime) {
int64_t abstime_us = 0;
if (NULL != abstime) {
abstime_us = butil::timespec_to_microseconds(*abstime);
}
auto whole = (butil::atomic<unsigned>*)&_futex;
while (whole->exchange(BTHREAD_MUTEX_CONTENDED) & BTHREAD_MUTEX_LOCKED) {
if (futex_wait_private(whole, BTHREAD_MUTEX_CONTENDED, NULL) < 0
&& errno != EWOULDBLOCK) {
timespec* ptimeout = NULL;
timespec timeout{};
if (NULL != abstime) {
timeout = butil::microseconds_to_timespec(
abstime_us - butil::gettimeofday_us());
ptimeout = &timeout;
}
if (NULL == abstime || abstime_us > MIN_SLEEP_US) {
if (futex_wait_private(whole, BTHREAD_MUTEX_CONTENDED, ptimeout) < 0
&& errno != EWOULDBLOCK && errno != EINTR/*note*/) {
// A mutex lock should ignore interruptions in general since
// user code is unlikely to check the return value.
return errno;
}
} else {
errno = ETIMEDOUT;
return errno;
}
}
return 0;
}

void FastPthreadMutex::lock() {
auto split = (bthread::MutexInternal*)&_futex;
if (split->locked.exchange(1, butil::memory_order_acquire)) {
(void)lock_contended();
if (try_lock()) {
return;
}

(void)lock_contended(NULL);
++tls_pthread_lock_count;
}

Expand All @@ -847,30 +917,47 @@ bool FastPthreadMutex::try_lock() {
return lock;
}

bool FastPthreadMutex::timed_lock(const struct timespec* abstime) {
if (try_lock()) {
return true;
}
int rc = lock_contended(abstime);
if (rc == 0) {
++tls_pthread_lock_count;
}
return rc == 0;
}

void FastPthreadMutex::unlock() {
--tls_pthread_lock_count;
auto whole = (butil::atomic<unsigned>*)&_futex;
const unsigned prev = whole->exchange(0, butil::memory_order_release);
// CAUTION: the mutex may be destroyed, check comments before butex_create
if (prev != BTHREAD_MUTEX_LOCKED) {
futex_wake_private(whole, 1);
}
--tls_pthread_lock_count;
}

} // namespace internal
#endif // BTHREAD_USE_FAST_PTHREAD_MUTEX

void FastPthreadMutex::lock() {
internal::pthread_mutex_lock_impl(&_mutex);
internal::pthread_mutex_lock_impl(&_mutex, NULL);
}

void FastPthreadMutex::unlock() {
internal::pthread_mutex_unlock_impl(&_mutex);
}

#if defined(BTHREAD_USE_FAST_PTHREAD_MUTEX) || HAS_PTHREAD_MUTEX_TIMEDLOCK
bool FastPthreadMutex::timed_lock(const struct timespec* abstime) {
return internal::pthread_mutex_lock_impl(&_mutex, abstime) == 0;
}
#endif // BTHREAD_USE_FAST_PTHREAD_MUTEX HAS_PTHREAD_MUTEX_TIMEDLOCK

} // namespace bthread

extern "C" {
__BEGIN_DECLS

int bthread_mutex_init(bthread_mutex_t* __restrict m,
const bthread_mutexattr_t* __restrict) {
Expand Down Expand Up @@ -990,9 +1077,16 @@ int pthread_mutex_lock(pthread_mutex_t* __mutex) {
int pthread_mutex_trylock(pthread_mutex_t* __mutex) {
return bthread::pthread_mutex_trylock_impl(__mutex);
}
#if defined(OS_LINUX) && defined(OS_POSIX) && defined(__USE_XOPEN2K)
int pthread_mutex_timedlock(pthread_mutex_t *__restrict __mutex,
const struct timespec *__restrict __abstime) {
return bthread::pthread_mutex_timedlock_impl(__mutex, __abstime);
}
#endif // OS_POSIX __USE_XOPEN2K
int pthread_mutex_unlock(pthread_mutex_t* __mutex) {
return bthread::pthread_mutex_unlock_impl(__mutex);
}
#endif
#endif // NO_PTHREAD_MUTEX_HOOK


} // extern "C"
__END_DECLS
10 changes: 9 additions & 1 deletion src/bthread/mutex.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ class Mutex {
}
void unlock() { bthread_mutex_unlock(&_mutex); }
bool try_lock() { return !bthread_mutex_trylock(&_mutex); }
bool timed_lock(const struct timespec* abstime) {
return !bthread_mutex_timedlock(&_mutex, abstime);
}
// TODO(chenzhangyi01): Complement interfaces for C++11
private:
DISALLOW_COPY_AND_ASSIGN(Mutex);
Expand All @@ -76,9 +79,10 @@ class FastPthreadMutex {
void lock();
void unlock();
bool try_lock();
bool timed_lock(const struct timespec* abstime);
private:
DISALLOW_COPY_AND_ASSIGN(FastPthreadMutex);
int lock_contended();
int lock_contended(const struct timespec* abstime);
unsigned _futex;
};
#else
Expand All @@ -95,6 +99,10 @@ class FastPthreadMutex {
void lock();
void unlock();
bool try_lock() { return _mutex.try_lock(); }
#if defined(BTHREAD_USE_FAST_PTHREAD_MUTEX) || HAS_PTHREAD_MUTEX_TIMEDLOCK
bool timed_lock(const struct timespec* abstime);
#endif // BTHREAD_USE_FAST_PTHREAD_MUTEX HAS_PTHREAD_MUTEX_TIMEDLOCK

private:
internal::FastPthreadMutex _mutex;
};
Expand Down
13 changes: 12 additions & 1 deletion src/butil/synchronization/lock.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@
#include <windows.h>
#elif defined(OS_POSIX)
#include <pthread.h>
#endif
#if defined(OS_LINUX) && defined(__USE_XOPEN2K)
#define HAS_PTHREAD_MUTEX_TIMEDLOCK 1
#else
#define HAS_PTHREAD_MUTEX_TIMEDLOCK 0
#endif // OS_LINUX __USE_XOPEN2K
#endif // OS_POSIX

#include "butil/base_export.h"
#include "butil/macros.h"
Expand Down Expand Up @@ -90,6 +95,12 @@ class BUTIL_EXPORT Mutex {
#endif
}

#if HAS_PTHREAD_MUTEX_TIMEDLOCK
bool timed_lock(const struct timespec* abstime) {
return pthread_mutex_timedlock(&_native_handle, abstime) == 0;
}
#endif // HAS_PTHREAD_MUTEX_TIMEDLOCK

// Returns the underlying implementation-defined native handle object.
NativeHandle* native_handle() { return &_native_handle; }

Expand Down
Loading
Loading