From 3afe12fe606fb08e56598365fa1a677372d78816 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 8 Sep 2023 00:25:23 +0000 Subject: [PATCH] start fixing LB policy unit tests --- .../lb_policy/lb_policy_test_lib.h | 20 ++++++++++++++++--- .../lb_policy/pick_first_test.cc | 11 ++++++++++ 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/test/core/client_channel/lb_policy/lb_policy_test_lib.h b/test/core/client_channel/lb_policy/lb_policy_test_lib.h index 576f176f1fbff..d2390972dc6db 100644 --- a/test/core/client_channel/lb_policy/lb_policy_test_lib.h +++ b/test/core/client_channel/lb_policy/lb_policy_test_lib.h @@ -153,6 +153,8 @@ class LoadBalancingPolicyTest : public ::testing::Test { void OnConnectivityStateChange(grpc_connectivity_state new_state, const absl::Status& status) override { + gpr_log(GPR_INFO, "notifying watcher: state=%s status=%s", + ConnectivityStateName(new_state), status.ToString().c_str()); watcher()->OnConnectivityStateChange(new_state, status); } @@ -330,16 +332,19 @@ class LoadBalancingPolicyTest : public ::testing::Test { << "bug in test: " << ConnectivityStateName(state) << " must have OK status: " << status; } + absl::Notification notification; work_serializer_->Run( - [this, state, status, validate_state_transition, location]() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_) { + [&]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_) { if (validate_state_transition) { AssertValidConnectivityStateTransition(state_tracker_.state(), state, location); } state_tracker_.SetState(state, status, "set from test"); + work_serializer_->Run([&]() { notification.Notify(); }, + DEBUG_LOCATION); }, DEBUG_LOCATION); + notification.WaitForNotification(); } // Indicates if any of the associated SubchannelInterface objects @@ -704,7 +709,8 @@ class LoadBalancingPolicyTest : public ::testing::Test { work_serializer_->Run( [&]() { status = lb_policy->UpdateLocked(std::move(update_args)); - notification.Notify(); + work_serializer_->Run([&]() { notification.Notify(); }, + DEBUG_LOCATION); }, DEBUG_LOCATION); notification.WaitForNotification(); @@ -1097,6 +1103,14 @@ class LoadBalancingPolicyTest : public ::testing::Test { return &it->second; } + void WaitForWorkSerializerToFlush() { + gpr_log(GPR_INFO, "waiting for WorkSerializer to flush..."); + absl::Notification notification; + work_serializer_->Run([&]() { notification.Notify(); }, DEBUG_LOCATION); + notification.WaitForNotification(); + gpr_log(GPR_INFO, "WorkSerializer flush complete"); + } + std::shared_ptr work_serializer_; std::shared_ptr event_engine_ = grpc_event_engine::experimental::GetDefaultEventEngine(); diff --git a/test/core/client_channel/lb_policy/pick_first_test.cc b/test/core/client_channel/lb_policy/pick_first_test.cc index 8547e9103683a..fc81a1e27efe6 100644 --- a/test/core/client_channel/lb_policy/pick_first_test.cc +++ b/test/core/client_channel/lb_policy/pick_first_test.cc @@ -67,6 +67,10 @@ class PickFirstTest : public LoadBalancingPolicyTest { std::vector* out_address_order) { work_serializer_->Run([&]() { lb_policy_->ExitIdleLocked(); }, DEBUG_LOCATION); + // First flush is for ExitIdle(), second flush is for the resulting + // subchannel connectivity state notifications. + WaitForWorkSerializerToFlush(); + WaitForWorkSerializerToFlush(); out_address_order->clear(); // Construct a map of subchannel to address. // We will remove entries as each subchannel starts to connect. @@ -445,6 +449,13 @@ TEST_F(PickFirstTest, GoesIdleWhenConnectionFailsThenCanReconnect) { // By checking the picker, we told the LB policy to trigger a new // connection attempt, so it should start over with the first // subchannel. + // Note that the picker will have enqueued the ExitIdle() call in the + // WorkSerializer, so the first flush will execute that call. But + // executing that call will result in enqueueing subchannel + // connectivity state notifications, so we need to flush again to make + // sure all of that work is done before we continue. + WaitForWorkSerializerToFlush(); + WaitForWorkSerializerToFlush(); EXPECT_TRUE(subchannel->ConnectionRequested()); // The subchannel starts connecting. subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);