diff --git a/src/bthread/butex.cpp b/src/bthread/butex.cpp index b603d89c11..1061518337 100644 --- a/src/bthread/butex.cpp +++ b/src/bthread/butex.cpp @@ -68,11 +68,6 @@ inline bvar::Adder& butex_waiter_count() { } #endif -// If a thread would suspend for less than so many microseconds, return -// ETIMEDOUT directly. -// Use 1: sleeping for less than 2 microsecond is inefficient and useless. -static const int64_t MIN_SLEEP_US = 2; - enum WaiterState { WAITER_STATE_NONE, WAITER_STATE_READY, diff --git a/src/bthread/butex.h b/src/bthread/butex.h index b40ec1e04b..aad275a7c4 100644 --- a/src/bthread/butex.h +++ b/src/bthread/butex.h @@ -29,6 +29,11 @@ namespace bthread { +// If a thread would suspend for less than so many microseconds, return +// ETIMEDOUT directly. +// Use 1: sleeping for less than 2 microsecond is inefficient and useless. +static const int64_t MIN_SLEEP_US = 2; + // Create a butex which is a futex-like 32-bit primitive for synchronizing // bthreads/pthreads. // Returns a pointer to 32-bit data, NULL on failure. diff --git a/src/bthread/mutex.cpp b/src/bthread/mutex.cpp index 403f6bb8a8..5b16d41fea 100644 --- a/src/bthread/mutex.cpp +++ b/src/bthread/mutex.cpp @@ -19,6 +19,7 @@ // Date: Sun Aug 3 12:46:15 CST 2014 +#include #include #include // dlsym #include // O_RDONLY @@ -47,9 +48,9 @@ #include "bthread/processor.h" #include "bthread/task_group.h" -extern "C" { +__BEGIN_DECLS extern void* BAIDU_WEAK _dl_sym(void* handle, const char* symbol, void* caller); -} +__END_DECLS namespace bthread { @@ -391,6 +392,13 @@ int first_sys_pthread_mutex_unlock(pthread_mutex_t* mutex); static MutexOp sys_pthread_mutex_lock = first_sys_pthread_mutex_lock; static MutexOp sys_pthread_mutex_trylock = first_sys_pthread_mutex_trylock; static MutexOp sys_pthread_mutex_unlock = first_sys_pthread_mutex_unlock; +#if HAS_PTHREAD_MUTEX_TIMEDLOCK +typedef int (*TimedMutexOp)(pthread_mutex_t*, const struct timespec*); +int first_sys_pthread_mutex_timedlock(pthread_mutex_t* mutex, + const struct timespec* __abstime); +static TimedMutexOp sys_pthread_mutex_timedlock = first_sys_pthread_mutex_timedlock; +#endif // HAS_PTHREAD_MUTEX_TIMEDLOCK + static pthread_once_t init_sys_mutex_lock_once = PTHREAD_ONCE_INIT; // dlsym may call malloc to allocate space for dlerror and causes contention @@ -438,11 +446,18 @@ static void init_sys_mutex_lock() { RTLD_NEXT, "pthread_mutex_unlock", (void*)init_sys_mutex_lock); sys_pthread_mutex_trylock = (MutexOp)_dl_sym( RTLD_NEXT, "pthread_mutex_trylock", (void*)init_sys_mutex_lock); +#if HAS_PTHREAD_MUTEX_TIMEDLOCK + sys_pthread_mutex_timedlock = (TimedMutexOp)_dl_sym( + RTLD_NEXT, "pthread_mutex_timedlock", (void*)init_sys_mutex_lock); +#endif // HAS_PTHREAD_MUTEX_TIMEDLOCK } else { // _dl_sym may be undefined reference in some system, fallback to dlsym sys_pthread_mutex_lock = (MutexOp)dlsym(RTLD_NEXT, "pthread_mutex_lock"); sys_pthread_mutex_unlock = (MutexOp)dlsym(RTLD_NEXT, "pthread_mutex_unlock"); sys_pthread_mutex_trylock = (MutexOp)dlsym(RTLD_NEXT, "pthread_mutex_trylock"); +#if HAS_PTHREAD_MUTEX_TIMEDLOCK + sys_pthread_mutex_timedlock = (TimedMutexOp)dlsym(RTLD_NEXT, "pthread_mutex_timedlock"); +#endif // HAS_PTHREAD_MUTEX_TIMEDLOCK } #elif defined(OS_MACOSX) // TODO: look workaround for dlsym on mac @@ -465,6 +480,14 @@ int first_sys_pthread_mutex_trylock(pthread_mutex_t* mutex) { return sys_pthread_mutex_trylock(mutex); } +#if HAS_PTHREAD_MUTEX_TIMEDLOCK +int first_sys_pthread_mutex_timedlock(pthread_mutex_t* mutex, + const struct timespec* abstime) { + pthread_once(&init_sys_mutex_lock_once, init_sys_mutex_lock); + return sys_pthread_mutex_timedlock(mutex, abstime); +} +#endif // HAS_PTHREAD_MUTEX_TIMEDLOCK + int first_sys_pthread_mutex_unlock(pthread_mutex_t* mutex) { pthread_once(&init_sys_mutex_lock_once, init_sys_mutex_lock); return sys_pthread_mutex_unlock(mutex); @@ -499,7 +522,7 @@ void CheckBthreadScheSafety() { true, butil::memory_order_relaxed))) { butil::debug::StackTrace trace(true); // It can only be checked once because the counter is messed up. - LOG(ERROR) << "bthread is suspended while holding" + LOG(ERROR) << "bthread is suspended while holding " << tls_pthread_lock_count << " pthread locks." << std::endl << trace.ToString(); } @@ -610,10 +633,27 @@ void submit_contention(const bthread_contention_site_t& csite, int64_t now_ns) { namespace internal { #ifndef NO_PTHREAD_MUTEX_HOOK -BUTIL_FORCE_INLINE int pthread_mutex_lock_internal(pthread_mutex_t* mutex) { - ++bthread::tls_pthread_lock_count; - return sys_pthread_mutex_lock(mutex); +#if HAS_PTHREAD_MUTEX_TIMEDLOCK +BUTIL_FORCE_INLINE int pthread_mutex_lock_internal(pthread_mutex_t* mutex, + const struct timespec* abstime) { + int rc = NULL == abstime ? + sys_pthread_mutex_lock(mutex) : + sys_pthread_mutex_timedlock(mutex, abstime); + if (0 == rc) { + ++tls_pthread_lock_count; + } + return rc; } +#else +BUTIL_FORCE_INLINE int pthread_mutex_lock_internal(pthread_mutex_t* mutex, + const struct timespec*/* Not supported */) { + int rc = sys_pthread_mutex_lock(mutex); + if (0 == rc) { + ++tls_pthread_lock_count; + } + return rc; +} +#endif // HAS_PTHREAD_MUTEX_TIMEDLOCK BUTIL_FORCE_INLINE int pthread_mutex_trylock_internal(pthread_mutex_t* mutex) { int rc = sys_pthread_mutex_trylock(mutex); @@ -627,11 +667,16 @@ BUTIL_FORCE_INLINE int pthread_mutex_unlock_internal(pthread_mutex_t* mutex) { --tls_pthread_lock_count; return sys_pthread_mutex_unlock(mutex); } -#endif +#endif // NO_PTHREAD_MUTEX_HOOK -BUTIL_FORCE_INLINE int pthread_mutex_lock_internal(FastPthreadMutex* mutex) { - mutex->lock(); - return 0; +BUTIL_FORCE_INLINE int pthread_mutex_lock_internal(FastPthreadMutex* mutex, + const struct timespec* abstime) { + if (NULL == abstime) { + mutex->lock(); + return 0; + } else { + return mutex->timed_lock(abstime) ? 0 : errno; + } } BUTIL_FORCE_INLINE int pthread_mutex_trylock_internal(FastPthreadMutex* mutex) { @@ -644,13 +689,13 @@ BUTIL_FORCE_INLINE int pthread_mutex_unlock_internal(FastPthreadMutex* mutex) { } template -BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(Mutex* mutex) { +BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(Mutex* mutex, const struct timespec* abstime) { // Don't change behavior of lock when profiler is off. if (!g_cp || // collecting code including backtrace() and submit() may call // pthread_mutex_lock and cause deadlock. Don't sample. tls_inside_lock) { - return pthread_mutex_lock_internal(mutex); + return pthread_mutex_lock_internal(mutex, abstime); } // Don't slow down non-contended locks. int rc = pthread_mutex_trylock_internal(mutex); @@ -673,16 +718,16 @@ BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(Mutex* mutex) { csite = &entry.csite; if (!sampling_range) { make_contention_site_invalid(&entry.csite); - return pthread_mutex_lock_internal(mutex); + return pthread_mutex_lock_internal(mutex, abstime); } } #endif if (!sampling_range) { // don't sample - return pthread_mutex_lock_internal(mutex); + return pthread_mutex_lock_internal(mutex, abstime); } // Lock and monitor the waiting time. const int64_t start_ns = butil::cpuwide_time_ns(); - rc = pthread_mutex_lock_internal(mutex); + rc = pthread_mutex_lock_internal(mutex, abstime); if (!rc) { // Inside lock if (!csite) { csite = add_pthread_contention_site(mutex); @@ -748,13 +793,20 @@ BUTIL_FORCE_INLINE int pthread_mutex_unlock_impl(Mutex* mutex) { #ifndef NO_PTHREAD_MUTEX_HOOK BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(pthread_mutex_t* mutex) { - return internal::pthread_mutex_lock_impl(mutex); + return internal::pthread_mutex_lock_impl(mutex, NULL); } BUTIL_FORCE_INLINE int pthread_mutex_trylock_impl(pthread_mutex_t* mutex) { return internal::pthread_mutex_trylock_impl(mutex); } +#if HAS_PTHREAD_MUTEX_TIMEDLOCK +BUTIL_FORCE_INLINE int pthread_mutex_timedlock_impl(pthread_mutex_t* mutex, + const struct timespec* abstime) { + return internal::pthread_mutex_lock_impl(mutex, abstime); +} +#endif // HAS_PTHREAD_MUTEX_TIMEDLOCK + BUTIL_FORCE_INLINE int pthread_mutex_unlock_impl(pthread_mutex_t* mutex) { return internal::pthread_mutex_unlock_impl(mutex); } @@ -779,8 +831,7 @@ BAIDU_CASSERT(sizeof(unsigned) == sizeof(MutexInternal), const int MAX_SPIN_ITER = 4; -inline int mutex_lock_contended_impl( - bthread_mutex_t* m, const struct timespec* __restrict abstime) { +inline int mutex_lock_contended_impl(bthread_mutex_t* m, const struct timespec* abstime) { // When a bthread first contends for a lock, active spinning makes sense. // Spin only few times and only if local `rq' is empty. TaskGroup* g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group); @@ -819,11 +870,29 @@ inline int mutex_lock_contended_impl( #ifdef BTHREAD_USE_FAST_PTHREAD_MUTEX namespace internal { -int FastPthreadMutex::lock_contended() { - butil::atomic* whole = (butil::atomic*)&_futex; +int FastPthreadMutex::lock_contended(const struct timespec* abstime) { + int64_t abstime_us = 0; + if (NULL != abstime) { + abstime_us = butil::timespec_to_microseconds(*abstime); + } + auto whole = (butil::atomic*)&_futex; while (whole->exchange(BTHREAD_MUTEX_CONTENDED) & BTHREAD_MUTEX_LOCKED) { - if (futex_wait_private(whole, BTHREAD_MUTEX_CONTENDED, NULL) < 0 - && errno != EWOULDBLOCK) { + timespec* ptimeout = NULL; + timespec timeout{}; + if (NULL != abstime) { + timeout = butil::microseconds_to_timespec( + abstime_us - butil::gettimeofday_us()); + ptimeout = &timeout; + } + if (NULL == abstime || abstime_us > MIN_SLEEP_US) { + if (futex_wait_private(whole, BTHREAD_MUTEX_CONTENDED, ptimeout) < 0 + && errno != EWOULDBLOCK && errno != EINTR/*note*/) { + // A mutex lock should ignore interruptions in general since + // user code is unlikely to check the return value. + return errno; + } + } else { + errno = ETIMEDOUT; return errno; } } @@ -831,10 +900,11 @@ int FastPthreadMutex::lock_contended() { } void FastPthreadMutex::lock() { - auto split = (bthread::MutexInternal*)&_futex; - if (split->locked.exchange(1, butil::memory_order_acquire)) { - (void)lock_contended(); + if (try_lock()) { + return; } + + (void)lock_contended(NULL); ++tls_pthread_lock_count; } @@ -847,30 +917,47 @@ bool FastPthreadMutex::try_lock() { return lock; } +bool FastPthreadMutex::timed_lock(const struct timespec* abstime) { + if (try_lock()) { + return true; + } + int rc = lock_contended(abstime); + if (rc == 0) { + ++tls_pthread_lock_count; + } + return rc == 0; +} + void FastPthreadMutex::unlock() { + --tls_pthread_lock_count; auto whole = (butil::atomic*)&_futex; const unsigned prev = whole->exchange(0, butil::memory_order_release); // CAUTION: the mutex may be destroyed, check comments before butex_create if (prev != BTHREAD_MUTEX_LOCKED) { futex_wake_private(whole, 1); } - --tls_pthread_lock_count; } } // namespace internal #endif // BTHREAD_USE_FAST_PTHREAD_MUTEX void FastPthreadMutex::lock() { - internal::pthread_mutex_lock_impl(&_mutex); + internal::pthread_mutex_lock_impl(&_mutex, NULL); } void FastPthreadMutex::unlock() { internal::pthread_mutex_unlock_impl(&_mutex); } +#if defined(BTHREAD_USE_FAST_PTHREAD_MUTEX) || HAS_PTHREAD_MUTEX_TIMEDLOCK +bool FastPthreadMutex::timed_lock(const struct timespec* abstime) { + return internal::pthread_mutex_lock_impl(&_mutex, abstime) == 0; +} +#endif // BTHREAD_USE_FAST_PTHREAD_MUTEX HAS_PTHREAD_MUTEX_TIMEDLOCK + } // namespace bthread -extern "C" { +__BEGIN_DECLS int bthread_mutex_init(bthread_mutex_t* __restrict m, const bthread_mutexattr_t* __restrict) { @@ -990,9 +1077,16 @@ int pthread_mutex_lock(pthread_mutex_t* __mutex) { int pthread_mutex_trylock(pthread_mutex_t* __mutex) { return bthread::pthread_mutex_trylock_impl(__mutex); } +#if defined(OS_LINUX) && defined(OS_POSIX) && defined(__USE_XOPEN2K) +int pthread_mutex_timedlock(pthread_mutex_t *__restrict __mutex, + const struct timespec *__restrict __abstime) { + return bthread::pthread_mutex_timedlock_impl(__mutex, __abstime); +} +#endif // OS_POSIX __USE_XOPEN2K int pthread_mutex_unlock(pthread_mutex_t* __mutex) { return bthread::pthread_mutex_unlock_impl(__mutex); } -#endif +#endif // NO_PTHREAD_MUTEX_HOOK + -} // extern "C" +__END_DECLS diff --git a/src/bthread/mutex.h b/src/bthread/mutex.h index ad6d2e5cbd..318f239348 100644 --- a/src/bthread/mutex.h +++ b/src/bthread/mutex.h @@ -61,6 +61,9 @@ class Mutex { } void unlock() { bthread_mutex_unlock(&_mutex); } bool try_lock() { return !bthread_mutex_trylock(&_mutex); } + bool timed_lock(const struct timespec* abstime) { + return !bthread_mutex_timedlock(&_mutex, abstime); + } // TODO(chenzhangyi01): Complement interfaces for C++11 private: DISALLOW_COPY_AND_ASSIGN(Mutex); @@ -76,9 +79,10 @@ class FastPthreadMutex { void lock(); void unlock(); bool try_lock(); + bool timed_lock(const struct timespec* abstime); private: DISALLOW_COPY_AND_ASSIGN(FastPthreadMutex); - int lock_contended(); + int lock_contended(const struct timespec* abstime); unsigned _futex; }; #else @@ -95,6 +99,10 @@ class FastPthreadMutex { void lock(); void unlock(); bool try_lock() { return _mutex.try_lock(); } +#if defined(BTHREAD_USE_FAST_PTHREAD_MUTEX) || HAS_PTHREAD_MUTEX_TIMEDLOCK + bool timed_lock(const struct timespec* abstime); +#endif // BTHREAD_USE_FAST_PTHREAD_MUTEX HAS_PTHREAD_MUTEX_TIMEDLOCK + private: internal::FastPthreadMutex _mutex; }; diff --git a/src/butil/synchronization/lock.h b/src/butil/synchronization/lock.h index b6f5215c05..e62c76c438 100644 --- a/src/butil/synchronization/lock.h +++ b/src/butil/synchronization/lock.h @@ -23,7 +23,12 @@ #include #elif defined(OS_POSIX) #include -#endif +#if defined(OS_LINUX) && defined(__USE_XOPEN2K) +#define HAS_PTHREAD_MUTEX_TIMEDLOCK 1 +#else +#define HAS_PTHREAD_MUTEX_TIMEDLOCK 0 +#endif // OS_LINUX __USE_XOPEN2K +#endif // OS_POSIX #include "butil/base_export.h" #include "butil/macros.h" @@ -90,6 +95,12 @@ class BUTIL_EXPORT Mutex { #endif } +#if HAS_PTHREAD_MUTEX_TIMEDLOCK + bool timed_lock(const struct timespec* abstime) { + return pthread_mutex_timedlock(&_native_handle, abstime) == 0; + } +#endif // HAS_PTHREAD_MUTEX_TIMEDLOCK + // Returns the underlying implementation-defined native handle object. NativeHandle* native_handle() { return &_native_handle; } diff --git a/test/bthread_mutex_unittest.cpp b/test/bthread_mutex_unittest.cpp index 21bd60446f..b0802f9b63 100644 --- a/test/bthread_mutex_unittest.cpp +++ b/test/bthread_mutex_unittest.cpp @@ -108,8 +108,12 @@ TEST(MutexTest, cpp_wrapper) { mutex.unlock(); mutex.lock(); mutex.unlock(); + struct timespec t = { -2, 0 }; + ASSERT_TRUE(mutex.timed_lock(&t)); + mutex.unlock(); { BAIDU_SCOPED_LOCK(mutex); + ASSERT_FALSE(mutex.timed_lock(&t)); } { std::unique_lock lck1; @@ -132,6 +136,8 @@ TEST(MutexTest, cpp_wrapper) { } ASSERT_TRUE(mutex.try_lock()); mutex.unlock(); + ASSERT_TRUE(mutex.timed_lock(&t)); + mutex.unlock(); } bool g_started = false; @@ -268,6 +274,13 @@ TEST(MutexTest, mix_thread_types) { } } +void* do_fast_pthread_timedlock(void *arg) { + struct timespec t = { -2, 0 }; + EXPECT_FALSE(((bthread::FastPthreadMutex*)arg)->timed_lock(&t)); + EXPECT_EQ(ETIMEDOUT, errno); + return NULL; +} + TEST(MutexTest, fast_pthread_mutex) { bthread::FastPthreadMutex mutex; ASSERT_TRUE(mutex.try_lock()); @@ -276,6 +289,12 @@ TEST(MutexTest, fast_pthread_mutex) { mutex.unlock(); { BAIDU_SCOPED_LOCK(mutex); + struct timespec t = { -2, 0 }; + ASSERT_FALSE(mutex.timed_lock(&t)); + ASSERT_EQ(ETIMEDOUT, errno); + pthread_t th; + ASSERT_EQ(0, pthread_create(&th, NULL, do_fast_pthread_timedlock, &mutex)); + ASSERT_EQ(0, pthread_join(th, NULL)); } { std::unique_lock lck1; @@ -300,4 +319,47 @@ TEST(MutexTest, fast_pthread_mutex) { } } +#if HAS_PTHREAD_MUTEX_TIMEDLOCK +void* do_pthread_timedlock(void *arg) { + struct timespec t = { -2, 0 }; + EXPECT_EQ(ETIMEDOUT, pthread_mutex_timedlock((pthread_mutex_t*)arg, &t)); + EXPECT_EQ(ETIMEDOUT, errno); + return NULL; +} +#endif + +TEST(MutexTest, pthread_mutex) { + pthread_mutex_t mutex; + ASSERT_EQ(0, pthread_mutex_init(&mutex, NULL)); + ASSERT_EQ(0, pthread_mutex_trylock(&mutex)); + ASSERT_EQ(0, pthread_mutex_unlock(&mutex)); + ASSERT_EQ(0, pthread_mutex_lock(&mutex)); + ASSERT_EQ(0, pthread_mutex_unlock(&mutex)); + { + BAIDU_SCOPED_LOCK(mutex); +#if HAS_PTHREAD_MUTEX_TIMEDLOCK + LOG(INFO) << "pthread_mutex_timedlock is available"; + struct timespec t = { -2, 0 }; + ASSERT_EQ(ETIMEDOUT, pthread_mutex_timedlock(&mutex, &t)); + pthread_t th; + ASSERT_EQ(0, pthread_create(&th, NULL, do_fast_pthread_timedlock, &mutex)); + ASSERT_EQ(0, pthread_join(th, NULL)); +#endif + } + ASSERT_EQ(0, pthread_mutex_trylock(&mutex)); + ASSERT_EQ(0, pthread_mutex_unlock(&mutex)); + + const int N = 16; + pthread_t pthreads[N]; + for (int i = 0; i < N; ++i) { + ASSERT_EQ(0, pthread_create(&pthreads[i], NULL, + loop_until_stopped, &mutex)); + } + bthread_usleep(1000L * 1000); + g_stopped = true; + for (int i = 0; i < N; ++i) { + pthread_join(pthreads[i], NULL); + } +} + } // namespace