Skip to content

Commit

Permalink
fix checking duration passed to RunAfter()
Browse files Browse the repository at this point in the history
  • Loading branch information
markdroth committed Sep 11, 2023
1 parent 16f1674 commit dc02276
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 40 deletions.
36 changes: 23 additions & 13 deletions test/core/client_channel/lb_policy/lb_policy_test_lib.h
Original file line number Diff line number Diff line change
Expand Up @@ -1182,26 +1182,36 @@ class TimeAwareLoadBalancingPolicyTest : public LoadBalancingPolicyTest {
}

~TimeAwareLoadBalancingPolicyTest() override {
auto* fuzzing_ee =
static_cast<grpc_event_engine::experimental::FuzzingEventEngine*>(
event_engine_.get());
fuzzing_ee->FuzzingDone();
fuzzing_event_engine()->FuzzingDone();
}

void IncrementTimeBy(Duration duration) {
auto* fuzzing_ee =
static_cast<grpc_event_engine::experimental::FuzzingEventEngine*>(
event_engine_.get());
fuzzing_ee->TickForDuration(duration);
fuzzing_event_engine()->TickForDuration(duration);
// Flush WorkSerializer, in case the timer callback enqueued anything.
WaitForWorkSerializerToFlush();
}

// FIXME
// Called when the LB policy starts a timer.
// May be overridden by subclasses.
virtual void CheckExpectedTimerDuration(
grpc_event_engine::experimental::EventEngine::Duration) {}
void SetExpectedTimerDuration(
absl::optional<grpc_event_engine::experimental::EventEngine::Duration>
duration) {
if (duration.has_value()) {
fuzzing_event_engine()->SetRunAfterDurationCallback(
[expected = *duration](
grpc_event_engine::experimental::EventEngine::Duration duration) {
EXPECT_EQ(duration, expected)
<< "Expected: " << expected.count() << "ns\nActual: "
<< duration.count() << "ns";
});
} else {
fuzzing_event_engine()->SetRunAfterDurationCallback(nullptr);
}
}

grpc_event_engine::experimental::FuzzingEventEngine* fuzzing_event_engine()
const {
return static_cast<grpc_event_engine::experimental::FuzzingEventEngine*>(
event_engine_.get());
}
};

} // namespace testing
Expand Down
14 changes: 3 additions & 11 deletions test/core/client_channel/lb_policy/outlier_detection_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ class OutlierDetectionTest : public TimeAwareLoadBalancingPolicyTest {
};

OutlierDetectionTest()
: lb_policy_(MakeLbPolicy("outlier_detection_experimental")) {}
: lb_policy_(MakeLbPolicy("outlier_detection_experimental")) {
SetExpectedTimerDuration(std::chrono::seconds(10));
}

absl::optional<std::string> DoPickWithFailedCall(
LoadBalancingPolicy::SubchannelPicker* picker) {
Expand All @@ -164,17 +166,7 @@ class OutlierDetectionTest : public TimeAwareLoadBalancingPolicyTest {
return address;
}

void CheckExpectedTimerDuration(
grpc_event_engine::experimental::EventEngine::Duration duration)
override {
EXPECT_EQ(duration, expected_internal_)
<< "Expected: " << expected_internal_.count() << "ns"
<< "\n Actual: " << duration.count() << "ns";
}

OrphanablePtr<LoadBalancingPolicy> lb_policy_;
grpc_event_engine::experimental::EventEngine::Duration expected_internal_ =
std::chrono::seconds(10);
};

TEST_F(OutlierDetectionTest, Basic) {
Expand Down
15 changes: 3 additions & 12 deletions test/core/client_channel/lb_policy/weighted_round_robin_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ class WeightedRoundRobinTest : public TimeAwareLoadBalancingPolicyTest {

WeightedRoundRobinTest() {
lb_policy_ = MakeLbPolicy("weighted_round_robin");
SetExpectedTimerDuration(std::chrono::seconds(1));
}

RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>
Expand Down Expand Up @@ -317,17 +318,7 @@ class WeightedRoundRobinTest : public TimeAwareLoadBalancingPolicyTest {
}
}

void CheckExpectedTimerDuration(
grpc_event_engine::experimental::EventEngine::Duration duration)
override {
EXPECT_EQ(duration, expected_weight_update_interval_)
<< "Expected: " << expected_weight_update_interval_.count() << "ns"
<< "\n Actual: " << duration.count() << "ns";
}

OrphanablePtr<LoadBalancingPolicy> lb_policy_;
grpc_event_engine::experimental::EventEngine::Duration
expected_weight_update_interval_ = std::chrono::seconds(1);
};

TEST_F(WeightedRoundRobinTest, Basic) {
Expand Down Expand Up @@ -642,7 +633,7 @@ TEST_F(WeightedRoundRobinTest, HonorsOobReportingPeriod) {
TEST_F(WeightedRoundRobinTest, HonorsWeightUpdatePeriod) {
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
expected_weight_update_interval_ = std::chrono::seconds(2);
SetExpectedTimerDuration(std::chrono::seconds(2));
auto picker = SendInitialUpdateAndWaitForConnected(
kAddresses, ConfigBuilder().SetWeightUpdatePeriod(Duration::Seconds(2)));
ASSERT_NE(picker, nullptr);
Expand All @@ -660,7 +651,7 @@ TEST_F(WeightedRoundRobinTest, HonorsWeightUpdatePeriod) {
TEST_F(WeightedRoundRobinTest, WeightUpdatePeriodLowerBound) {
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
expected_weight_update_interval_ = std::chrono::milliseconds(100);
SetExpectedTimerDuration(std::chrono::milliseconds(100));
auto picker = SendInitialUpdateAndWaitForConnected(
kAddresses,
ConfigBuilder().SetWeightUpdatePeriod(Duration::Milliseconds(10)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,12 @@ void FuzzingEventEngine::TickForDuration(grpc_core::Duration d) {
TickUntilTimestamp(grpc_core::Timestamp::Now() + d);
}

void FuzzingEventEngine::SetRunAfterDurationCallback(
absl::AnyInvocable<void(Duration)> callback) {
grpc_core::MutexLock lock(&run_after_duration_callback_mu_);
run_after_duration_callback_ = std::move(callback);
}

FuzzingEventEngine::Time FuzzingEventEngine::Now() {
grpc_core::MutexLock lock(&*now_mu_);
return now_;
Expand Down Expand Up @@ -435,8 +441,10 @@ EventEngine::ConnectionHandle FuzzingEventEngine::Connect(
// TODO(ctiller): do something with the timeout
// Schedule a timer to run (with some fuzzer selected delay) the on_connect
// callback.
auto task_handle = RunAfter(
Duration(0), [this, addr, on_connect = std::move(on_connect)]() mutable {
grpc_core::MutexLock lock(&*mu_);
auto task_handle = RunAfterLocked(
RunType::kRunAfter, Duration(0),
[this, addr, on_connect = std::move(on_connect)]() mutable {
// Check for a legal address and extract the target port number.
auto port = ResolvedAddressGetPort(addr);
grpc_core::MutexLock lock(&*mu_);
Expand Down Expand Up @@ -490,11 +498,14 @@ FuzzingEventEngine::GetDNSResolver(const DNSResolver::ResolverOptions&) {
}

void FuzzingEventEngine::Run(Closure* closure) {
RunAfter(Duration::zero(), closure);
grpc_core::MutexLock lock(&*mu_);
RunAfterLocked(RunType::kRunAfter, Duration::zero(),
[closure]() { closure->Run(); });
}

void FuzzingEventEngine::Run(absl::AnyInvocable<void()> closure) {
RunAfter(Duration::zero(), std::move(closure));
grpc_core::MutexLock lock(&*mu_);
RunAfterLocked(RunType::kRunAfter, Duration::zero(), std::move(closure));
}

EventEngine::TaskHandle FuzzingEventEngine::RunAfter(Duration when,
Expand All @@ -504,6 +515,12 @@ EventEngine::TaskHandle FuzzingEventEngine::RunAfter(Duration when,

EventEngine::TaskHandle FuzzingEventEngine::RunAfter(
Duration when, absl::AnyInvocable<void()> closure) {
{
grpc_core::MutexLock lock(&run_after_duration_callback_mu_);
if (run_after_duration_callback_ != nullptr) {
run_after_duration_callback_(when);
}
}
grpc_core::MutexLock lock(&*mu_);
// (b/258949216): Cap it to one year to avoid integer overflow errors.
return RunAfterLocked(RunType::kRunAfter, std::min(when, kOneYear),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ class FuzzingEventEngine : public EventEngine {
// Tick for some grpc_core::Duration
void TickForDuration(grpc_core::Duration d) ABSL_LOCKS_EXCLUDED(mu_);

// Sets a callback to be invoked any time RunAfter() is called.
// Allows tests to verify the specified duration.
void SetRunAfterDurationCallback(absl::AnyInvocable<void(Duration)> callback);

absl::StatusOr<std::unique_ptr<Listener>> CreateListener(
Listener::AcceptCallback on_accept,
absl::AnyInvocable<void(absl::Status)> on_shutdown,
Expand Down Expand Up @@ -290,6 +294,10 @@ class FuzzingEventEngine : public EventEngine {
std::queue<std::queue<size_t>> write_sizes_for_future_connections_
ABSL_GUARDED_BY(mu_);
grpc_pick_port_functions previous_pick_port_functions_;

grpc_core::Mutex run_after_duration_callback_mu_;
absl::AnyInvocable<void(Duration)> run_after_duration_callback_
ABSL_GUARDED_BY(run_after_duration_callback_mu_);
};

} // namespace experimental
Expand Down

0 comments on commit dc02276

Please sign in to comment.