Skip to content

Commit

Permalink
[call v3] add missing channelz updates in subchannel
Browse files Browse the repository at this point in the history
  • Loading branch information
markdroth committed Jun 27, 2024
1 parent e7727bc commit e5cde30
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 35 deletions.
66 changes: 41 additions & 25 deletions src/core/client_channel/subchannel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,11 @@ using ::grpc_event_engine::experimental::EventEngine;
// ConnectedSubchannel
//

ConnectedSubchannel::ConnectedSubchannel(
const ChannelArgs& args,
RefCountedPtr<channelz::SubchannelNode> channelz_subchannel)
ConnectedSubchannel::ConnectedSubchannel(const ChannelArgs& args)
: RefCounted<ConnectedSubchannel>(
GRPC_TRACE_FLAG_ENABLED(subchannel_refcount) ? "ConnectedSubchannel"
: nullptr),
args_(args),
channelz_subchannel_(std::move(channelz_subchannel)) {}
args_(args) {}

//
// LegacyConnectedSubchannel
Expand All @@ -114,14 +111,19 @@ class LegacyConnectedSubchannel : public ConnectedSubchannel {
public:
LegacyConnectedSubchannel(
RefCountedPtr<grpc_channel_stack> channel_stack, const ChannelArgs& args,
RefCountedPtr<channelz::SubchannelNode> channelz_subchannel)
: ConnectedSubchannel(args, std::move(channelz_subchannel)),
RefCountedPtr<channelz::SubchannelNode> channelz_node)
: ConnectedSubchannel(args),
channelz_node_(std::move(channelz_node)),
channel_stack_(std::move(channel_stack)) {}

~LegacyConnectedSubchannel() override {
channel_stack_.reset(DEBUG_LOCATION, "ConnectedSubchannel");
}

channelz::SubchannelNode* channelz_node() const {
return channelz_node_.get();
}

void StartWatch(
grpc_pollset_set* interested_parties,
OrphanablePtr<ConnectivityStateWatcherInterface> watcher) override {
Expand Down Expand Up @@ -162,6 +164,7 @@ class LegacyConnectedSubchannel : public ConnectedSubchannel {
}

private:
RefCountedPtr<channelz::SubchannelNode> channelz_node_;
RefCountedPtr<grpc_channel_stack> channel_stack_;
};

Expand Down Expand Up @@ -191,9 +194,8 @@ class NewConnectedSubchannel : public ConnectedSubchannel {
NewConnectedSubchannel(
RefCountedPtr<UnstartedCallDestination> call_destination,
RefCountedPtr<TransportCallDestination> transport,
const ChannelArgs& args,
RefCountedPtr<channelz::SubchannelNode> channelz_subchannel)
: ConnectedSubchannel(args, std::move(channelz_subchannel)),
const ChannelArgs& args)
: ConnectedSubchannel(args),
call_destination_(std::move(call_destination)),
transport_(std::move(transport)) {}

Expand Down Expand Up @@ -240,7 +242,8 @@ RefCountedPtr<SubchannelCall> SubchannelCall::Create(Args args,
}

SubchannelCall::SubchannelCall(Args args, grpc_error_handle* error)
: connected_subchannel_(std::move(args.connected_subchannel)),
: connected_subchannel_(args.connected_subchannel
.TakeAsSubclass<LegacyConnectedSubchannel>()),
deadline_(args.deadline) {
grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(this);
const grpc_call_element_args call_args = {
Expand All @@ -259,7 +262,7 @@ SubchannelCall::SubchannelCall(Args args, grpc_error_handle* error)
return;
}
grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent);
auto* channelz_node = connected_subchannel_->channelz_subchannel();
auto* channelz_node = connected_subchannel_->channelz_node();
if (channelz_node != nullptr) {
channelz_node->RecordCallStarted();
}
Expand Down Expand Up @@ -327,13 +330,9 @@ void SubchannelCall::Destroy(void* arg, grpc_error_handle /*error*/) {
void SubchannelCall::MaybeInterceptRecvTrailingMetadata(
grpc_transport_stream_op_batch* batch) {
// only intercept payloads with recv trailing.
if (!batch->recv_trailing_metadata) {
return;
}
if (!batch->recv_trailing_metadata) return;
// only add interceptor is channelz is enabled.
if (connected_subchannel_->channelz_subchannel() == nullptr) {
return;
}
if (connected_subchannel_->channelz_node() == nullptr) return;
GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, RecvTrailingMetadataReady,
this, grpc_schedule_on_exec_ctx);
// save some state needed for the interception callback.
Expand Down Expand Up @@ -366,13 +365,13 @@ void SubchannelCall::RecvTrailingMetadataReady(void* arg,
CHECK_NE(call->recv_trailing_metadata_, nullptr);
grpc_status_code status = GRPC_STATUS_OK;
GetCallStatus(&status, call->deadline_, call->recv_trailing_metadata_, error);
channelz::SubchannelNode* channelz_subchannel =
call->connected_subchannel_->channelz_subchannel();
CHECK_NE(channelz_subchannel, nullptr);
channelz::SubchannelNode* channelz_node =
call->connected_subchannel_->channelz_node();
CHECK_NE(channelz_node, nullptr);
if (status == GRPC_STATUS_OK) {
channelz_subchannel->RecordCallSucceeded();
channelz_node->RecordCallSucceeded();
} else {
channelz_subchannel->RecordCallFailed();
channelz_node->RecordCallFailed();
}
Closure::Run(DEBUG_LOCATION, call->original_recv_trailing_metadata_, error);
}
Expand Down Expand Up @@ -856,6 +855,24 @@ bool Subchannel::PublishTransportLocked() {
->client_transport());
InterceptionChainBuilder builder(
connecting_result_.channel_args.SetObject(transport.get()));
if (channelz_node_ != nullptr) {
// TODO(ctiller): If/when we have a good way to access the subchannel
// from a filter (maybe GetContext<Subchannel>?), consider replacing
// these two hooks with a filter so that we can avoid storing two
// separate refs to the channelz node in each connection.
builder.AddOnClientInitialMetadata(
[channelz_node = channelz_node_](ClientMetadata&) {
channelz_node->RecordCallStarted();
});
builder.AddOnServerTrailingMetadata(
[channelz_node = channelz_node_](ServerMetadata& metadata) {
if (IsStatusOk(metadata)) {
channelz_node->RecordCallSucceeded();
} else {
channelz_node->RecordCallFailed();
}
});
}
CoreConfiguration::Get().channel_init().AddToInterceptionChainBuilder(
GRPC_CLIENT_SUBCHANNEL, builder);
auto transport_destination =
Expand All @@ -870,8 +887,7 @@ bool Subchannel::PublishTransportLocked() {
return false;
}
connected_subchannel_ = MakeRefCounted<NewConnectedSubchannel>(
std::move(*call_destination), std::move(transport_destination), args_,
channelz_node_);
std::move(*call_destination), std::move(transport_destination), args_);
}
connecting_result_.Reset();
// Publish.
Expand Down
14 changes: 4 additions & 10 deletions src/core/client_channel/subchannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,6 @@ class SubchannelCall;
class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
public:
const ChannelArgs& args() const { return args_; }
channelz::SubchannelNode* channelz_subchannel() const {
return channelz_subchannel_.get();
}

virtual void StartWatch(
grpc_pollset_set* interested_parties,
Expand All @@ -85,17 +82,14 @@ class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
virtual void Ping(grpc_closure* on_initiate, grpc_closure* on_ack) = 0;

protected:
ConnectedSubchannel(
const ChannelArgs& args,
RefCountedPtr<channelz::SubchannelNode> channelz_subchannel);
explicit ConnectedSubchannel(const ChannelArgs& args);

private:
ChannelArgs args_;
// ref counted pointer to the channelz node in this connected subchannel's
// owning subchannel.
RefCountedPtr<channelz::SubchannelNode> channelz_subchannel_;
};

class LegacyConnectedSubchannel;

// Implements the interface of RefCounted<>.
class SubchannelCall final {
public:
Expand Down Expand Up @@ -150,7 +144,7 @@ class SubchannelCall final {

static void Destroy(void* arg, grpc_error_handle error);

RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
RefCountedPtr<LegacyConnectedSubchannel> connected_subchannel_;
grpc_closure* after_call_stack_destroy_ = nullptr;
// State needed to support channelz interception of recv trailing metadata.
grpc_closure recv_trailing_metadata_ready_;
Expand Down

0 comments on commit e5cde30

Please sign in to comment.