Skip to content

Commit

Permalink
[WorkSerializer] add debug-only IsRunningInWorkSerializer() method an…
Browse files Browse the repository at this point in the history
…d use it in client_channel
  • Loading branch information
markdroth committed Aug 16, 2023
1 parent 64a318a commit 54ff7fa
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 0 deletions.
6 changes: 6 additions & 0 deletions src/core/ext/filters/client_channel/client_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,9 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface {
++it->second;
}
}
#ifndef NDEBUG
GPR_ASSERT(chand_->work_serializer_->RunningInWorkSerializer());
#endif
chand_->subchannel_wrappers_.insert(this);
}

Expand All @@ -522,6 +525,9 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface {
"chand=%p: destroying subchannel wrapper %p for subchannel %p",
chand_, this, subchannel_.get());
}
#ifndef NDEBUG
GPR_ASSERT(chand_->work_serializer_->RunningInWorkSerializer());
#endif
chand_->subchannel_wrappers_.erase(this);
if (chand_->channelz_node_ != nullptr) {
auto* subchannel_node = subchannel_->channelz_node();
Expand Down
28 changes: 28 additions & 0 deletions src/core/lib/gprpp/work_serializer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <atomic>
#include <functional>
#include <memory>
#include <thread>
#include <utility>

#include <grpc/support/log.h>
Expand All @@ -47,6 +48,12 @@ class WorkSerializer::WorkSerializerImpl : public Orphanable {
void DrainQueue();
void Orphan() override;

#ifndef NDEBUG
bool RunningInWorkSerializer() const {
return std::this_thread::get_id() == current_thread_;
}
#endif

private:
struct CallbackWrapper {
CallbackWrapper(std::function<void()> cb, const DebugLocation& loc)
Expand Down Expand Up @@ -86,6 +93,9 @@ class WorkSerializer::WorkSerializerImpl : public Orphanable {
// orphaned.
std::atomic<uint64_t> refs_{MakeRefPair(0, 1)};
MultiProducerSingleConsumerQueue queue_;
#ifndef NDEBUG
std::thread::id current_thread_;
#endif
};

void WorkSerializer::WorkSerializerImpl::Run(std::function<void()> callback,
Expand All @@ -102,11 +112,17 @@ void WorkSerializer::WorkSerializerImpl::Run(std::function<void()> callback,
GPR_DEBUG_ASSERT(GetSize(prev_ref_pair) > 0);
if (GetOwners(prev_ref_pair) == 0) {
// We took ownership of the WorkSerializer. Invoke callback and drain queue.
#ifndef NDEBUG
current_thread_ = std::this_thread::get_id();
#endif
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, " Executing immediately");
}
callback();
DrainQueueOwned();
#ifndef NDEBUG
current_thread_ = std::thread::id();
#endif
} else {
// Another thread is holding the WorkSerializer, so decrement the ownership
// count we just added and queue the callback.
Expand Down Expand Up @@ -158,8 +174,14 @@ void WorkSerializer::WorkSerializerImpl::DrainQueue() {
const uint64_t prev_ref_pair =
refs_.fetch_add(MakeRefPair(1, 1), std::memory_order_acq_rel);
if (GetOwners(prev_ref_pair) == 0) {
#ifndef NDEBUG
current_thread_ = std::this_thread::get_id();
#endif
// We took ownership of the WorkSerializer. Drain the queue.
DrainQueueOwned();
#ifndef NDEBUG
current_thread_ = std::thread::id();
#endif
} else {
// Another thread is holding the WorkSerializer, so decrement the ownership
// count we just added and queue a no-op callback.
Expand Down Expand Up @@ -244,4 +266,10 @@ void WorkSerializer::Schedule(std::function<void()> callback,

void WorkSerializer::DrainQueue() { impl_->DrainQueue(); }

#ifndef NDEBUG
bool WorkSerializer::RunningInWorkSerializer() const {
return impl_->RunningInWorkSerializer();
}
#endif

} // namespace grpc_core
5 changes: 5 additions & 0 deletions src/core/lib/gprpp/work_serializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ class ABSL_LOCKABLE WorkSerializer {
// Drains the queue of callbacks.
void DrainQueue();

#ifndef NDEBUG
// Returns true if the current thread is running in the WorkSerializer.
bool RunningInWorkSerializer() const;
#endif

private:
class WorkSerializerImpl;

Expand Down
37 changes: 37 additions & 0 deletions test/core/gprpp/work_serializer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,43 @@ TEST(WorkSerializerTest, WorkSerializerDestructionRaceMultipleThreads) {
}
}

#ifndef NDEBUG
TEST(WorkSerializerTest, RunningInWorkSerializer) {
grpc_core::WorkSerializer work_serializer1;
grpc_core::WorkSerializer work_serializer2;
EXPECT_FALSE(work_serializer1.RunningInWorkSerializer());
EXPECT_FALSE(work_serializer2.RunningInWorkSerializer());
work_serializer1.Run(
[&]() {
EXPECT_TRUE(work_serializer1.RunningInWorkSerializer());
EXPECT_FALSE(work_serializer2.RunningInWorkSerializer());
work_serializer2.Run(
[&]() {
EXPECT_TRUE(work_serializer1.RunningInWorkSerializer());
EXPECT_TRUE(work_serializer2.RunningInWorkSerializer());
},
DEBUG_LOCATION);
},
DEBUG_LOCATION);
EXPECT_FALSE(work_serializer1.RunningInWorkSerializer());
EXPECT_FALSE(work_serializer2.RunningInWorkSerializer());
work_serializer2.Run(
[&]() {
EXPECT_FALSE(work_serializer1.RunningInWorkSerializer());
EXPECT_TRUE(work_serializer2.RunningInWorkSerializer());
work_serializer1.Run(
[&]() {
EXPECT_TRUE(work_serializer1.RunningInWorkSerializer());
EXPECT_TRUE(work_serializer2.RunningInWorkSerializer());
},
DEBUG_LOCATION);
},
DEBUG_LOCATION);
EXPECT_FALSE(work_serializer1.RunningInWorkSerializer());
EXPECT_FALSE(work_serializer2.RunningInWorkSerializer());
}
#endif

} // namespace

int main(int argc, char** argv) {
Expand Down

0 comments on commit 54ff7fa

Please sign in to comment.