Skip to content

Commit

Permalink
[call v3] implement ClientChannel::WatchConnectivityState()
Browse files Browse the repository at this point in the history
  • Loading branch information
markdroth committed Jun 28, 2024
1 parent b489c2c commit 8fdbdf7
Showing 1 changed file with 87 additions and 4 deletions.
91 changes: 87 additions & 4 deletions src/core/client_channel/client_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -688,10 +688,93 @@ grpc_connectivity_state ClientChannel::CheckConnectivityState(
return state;
}

void ClientChannel::WatchConnectivityState(grpc_connectivity_state, Timestamp,
grpc_completion_queue*, void*) {
// TODO(ctiller): implement
Crash("not implemented");
namespace {

// A fire-and-forget object to handle external connectivity state watches.
class ExternalStateWatcher : public RefCounted<ExternalStateWatcher> {
public:
ExternalStateWatcher(WeakRefCountedPtr<ClientChannel> channel,
grpc_completion_queue* cq, void* tag,
grpc_connectivity_state last_observed_state,
Timestamp deadline)
: channel_(std::move(channel)), cq_(cq), tag_(tag) {
MutexLock lock(&mu_);
// Start watch. This inherits the ref from creation.
auto watcher =
MakeOrphanable<Watcher>(RefCountedPtr<ExternalStateWatcher>(this));
watcher_ = watcher.get();
channel_->AddConnectivityWatcher(last_observed_state, std::move(watcher));
// Start timer. This takes a second ref.
const Duration timeout = deadline - Timestamp::Now();
timer_handle_ =
channel_->event_engine()->RunAfter(timeout, [self = Ref()]() mutable {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
self->MaybeStartCompletion(
absl::DeadlineExceededError(
"Timed out waiting for connection state change"));
// ExternalStateWatcher deletion might require an active ExecCtx.
self.reset();
});
}

private:
class Watcher : public AsyncConnectivityStateWatcherInterface {
public:
explicit Watcher(RefCountedPtr<ExternalStateWatcher> external_state_watcher)
: external_state_watcher_(std::move(external_state_watcher)) {}

void OnConnectivityStateChange(grpc_connectivity_state /*new_state*/,
const absl::Status& /*status*/) override {
external_state_watcher_->MaybeStartCompletion(absl::OkStatus());
}

private:
RefCountedPtr<ExternalStateWatcher> external_state_watcher_;
};

// This is called both when the watch reports a new connectivity state
// and when the timer fires. It will trigger a CQ notification only
// on the first call. Subsequent calls will be ignored, because
// events can come in asynchronously.
void MaybeStartCompletion(absl::Status status) {
MutexLock lock(&mu_);
if (watcher_ == nullptr) return; // Ignore subsequent notifications.
// Cancel watch.
channel_->RemoveConnectivityWatcher(watcher_);
watcher_ = nullptr;
// Cancel timer.
channel_->event_engine()->Cancel(timer_handle_);
// Send CQ completion.
Ref().release(); // Released in FinishedCompletion().
grpc_cq_end_op(cq_, tag_, status, FinishedCompletion, this,
&completion_storage_);
}

// Called when the completion is returned to the CQ.
static void FinishedCompletion(void* arg, grpc_cq_completion* /*ignored*/) {
auto* self = static_cast<ExternalStateWatcher*>(arg);
self->Unref();
}

WeakRefCountedPtr<ClientChannel> channel_;

Mutex mu_;
grpc_completion_queue* cq_ ABSL_GUARDED_BY(&mu_);
void* tag_ ABSL_GUARDED_BY(&mu_);
grpc_cq_completion completion_storage_ ABSL_GUARDED_BY(&mu_);
Watcher* watcher_ ABSL_GUARDED_BY(&mu_) = nullptr;
grpc_event_engine::experimental::EventEngine::TaskHandle timer_handle_
ABSL_GUARDED_BY(&mu_);
};

} // namespace

void ClientChannel::WatchConnectivityState(
grpc_connectivity_state state, Timestamp deadline,
grpc_completion_queue* cq, void* tag) {
new ExternalStateWatcher(WeakRefAsSubclass<ClientChannel>(), cq, tag, state,
deadline);
}

void ClientChannel::AddConnectivityWatcher(
Expand Down

0 comments on commit 8fdbdf7

Please sign in to comment.