Skip to content

Commit

Permalink
trace: enrich injectContext with upstream context (envoyproxy#33347)
Browse files Browse the repository at this point in the history
* enrich injectcontext

Signed-off-by: Boteng Yao <[email protected]>

---------

Signed-off-by: Boteng Yao <[email protected]>
  • Loading branch information
botengyao authored Apr 17, 2024
1 parent 0ea133e commit 93ecf70
Show file tree
Hide file tree
Showing 33 changed files with 138 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,13 @@ void UpstreamRequest::onUpstreamSuccess(Upstream::HostDescriptionConstSharedPtr

if (span_ != nullptr) {
TraceContextBridge trace_context{*parent_.request_stream_};
span_->injectContext(trace_context, upstream_info_->upstream_host_);
Tracing::UpstreamContext upstream_context(
upstream_info_->upstream_host_.get(), // host_
&upstream_info_->upstream_host_->cluster(), // cluster_
Tracing::ServiceType::Unknown, // service_type_
false // async_client_span_
);
span_->injectContext(trace_context, upstream_context);
}

sendRequestStartToUpstream();
Expand Down
46 changes: 43 additions & 3 deletions envoy/tracing/trace_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,47 @@ namespace Tracing {
class Span;
using SpanPtr = std::unique_ptr<Span>;

/**
* The upstream sevice type.
*/
enum class ServiceType {
// Service type is unknown.
Unknown,
// Service is treated as HTTP.
Http,
// Service is treated as GoogleGrpc.
GoogleGrpc,
// Service is treated as EnvoyGrpc.
EnvoyGrpc
};

/**
* Contains upstream context information essential for the injectContext process.
*
* @param host Optional reference to the upstream host description.
* @param cluster Optional reference to the upstream cluster information.
* @param service_type The type of service the upstream context relates to.
* @param async_client_span Indicates if the injectContext originates from an asynchronous
* client.
*/
struct UpstreamContext {
UpstreamContext(const Upstream::HostDescription* host = nullptr,
const Upstream::ClusterInfo* cluster = nullptr,
const ServiceType service_type = ServiceType::Unknown,
const bool async_client_span = false)
: host_(makeOptRefFromPtr(host)), cluster_(makeOptRefFromPtr(cluster)),
service_type_(service_type), async_client_span_(async_client_span) {}

OptRef<const Upstream::HostDescription> host_;
OptRef<const Upstream::ClusterInfo> cluster_;
const ServiceType service_type_;

// TODO(botengyao): further distinction for the shared upstream code path can be
// added if needed. Setting this flag to true only means it is called from async
// client at current stage.
const bool async_client_span_;
};

/**
* Basic abstraction for span.
*/
Expand Down Expand Up @@ -51,10 +92,9 @@ class Span {
* Mutate the provided headers with the context necessary to propagate this
* (implementation-specific) trace.
* @param request_headers the headers to which propagation context will be added
* @param upstream connecting host description
* @param upstream upstream context info
*/
virtual void injectContext(TraceContext& trace_conext,
const Upstream::HostDescriptionConstSharedPtr& upstream) PURE;
virtual void injectContext(TraceContext& trace_conext, const UpstreamContext& upstream) PURE;

/**
* Create and start a child Span, with this Span as its parent in the trace.
Expand Down
8 changes: 7 additions & 1 deletion source/common/grpc/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ void AsyncStreamImpl::initialize(bool buffer_body_for_retry) {
return;
}

cluster_info_ = thread_local_cluster->info();
auto& http_async_client = thread_local_cluster->httpAsyncClient();
dispatcher_ = &http_async_client.dispatcher();
stream_ = http_async_client.start(*this, options_.setBufferBodyForRetry(buffer_body_for_retry));
Expand Down Expand Up @@ -281,7 +282,12 @@ void AsyncRequestImpl::cancel() {

void AsyncRequestImpl::onCreateInitialMetadata(Http::RequestHeaderMap& metadata) {
Tracing::HttpTraceContext trace_context(metadata);
current_span_->injectContext(trace_context, nullptr);
Tracing::UpstreamContext upstream_context(nullptr, // host_
cluster_info_.get(), // cluster_
Tracing::ServiceType::EnvoyGrpc, // service_type_
true // async_client_span_
);
current_span_->injectContext(trace_context, upstream_context);
callbacks_.onCreateInitialMetadata(metadata);
}

Expand Down
3 changes: 3 additions & 0 deletions source/common/grpc/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ class AsyncStreamImpl : public RawAsyncStream,
bool hasResetStream() const { return http_reset_; }
const StreamInfo::StreamInfo& streamInfo() const override { return stream_->streamInfo(); }

protected:
Upstream::ClusterInfoConstSharedPtr cluster_info_;

private:
void streamError(Status::GrpcStatus grpc_status, const std::string& message);
void streamError(Status::GrpcStatus grpc_status) { streamError(grpc_status, EMPTY_STRING); }
Expand Down
7 changes: 6 additions & 1 deletion source/common/grpc/google_async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,12 @@ void GoogleAsyncRequestImpl::cancel() {

void GoogleAsyncRequestImpl::onCreateInitialMetadata(Http::RequestHeaderMap& metadata) {
Tracing::HttpTraceContext trace_context(metadata);
current_span_->injectContext(trace_context, nullptr);
Tracing::UpstreamContext upstream_context(nullptr, // host_
nullptr, // cluster_
Tracing::ServiceType::GoogleGrpc, // service_type_
true // async_client_span_
);
current_span_->injectContext(trace_context, upstream_context);
callbacks_.onCreateInitialMetadata(metadata);
}

Expand Down
14 changes: 12 additions & 2 deletions source/common/http/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,12 @@ AsyncRequestSharedImpl::AsyncRequestSharedImpl(AsyncClientImpl& parent,

void AsyncRequestImpl::initialize() {
Tracing::HttpTraceContext trace_context(request_->headers());
child_span_->injectContext(trace_context, nullptr);
Tracing::UpstreamContext upstream_context(nullptr, // host_
parent_.cluster_.get(), // cluster_
Tracing::ServiceType::Http, // service_type_
true // async_client_span_
);
child_span_->injectContext(trace_context, upstream_context);
sendHeaders(request_->headers(), request_->body().length() == 0);
if (request_->body().length() != 0) {
// It's possible this will be a no-op due to a local response synchronously generated in
Expand All @@ -312,7 +317,12 @@ void AsyncRequestImpl::initialize() {

void AsyncOngoingRequestImpl::initialize() {
Tracing::HttpTraceContext trace_context(*request_headers_);
child_span_->injectContext(trace_context, nullptr);
Tracing::UpstreamContext upstream_context(nullptr, // host_
parent_.cluster_.get(), // cluster_
Tracing::ServiceType::Http, // service_type_
true // async_client_span_
);
child_span_->injectContext(trace_context, upstream_context);
sendHeaders(*request_headers_, false);
}

Expand Down
10 changes: 8 additions & 2 deletions source/common/router/upstream_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,19 @@ UpstreamRequest::UpstreamRequest(RouterFilterInterface& parent,
// The router checks that the connection pool is non-null before creating the upstream request.
auto upstream_host = conn_pool_->host();
Tracing::HttpTraceContext trace_context(*parent_.downstreamHeaders());
Tracing::UpstreamContext upstream_context(upstream_host.get(), // host_
&upstream_host->cluster(), // cluster_
Tracing::ServiceType::Unknown, // service_type_
false // async_client_span_
);

if (span_ != nullptr) {
span_->injectContext(trace_context, upstream_host);
span_->injectContext(trace_context, upstream_context);
} else {
// No independent child span for current upstream request then inject the parent span's tracing
// context into the request headers.
// The injectContext() of the parent span may be called repeatedly when the request is retried.
parent_.callbacks()->activeSpan().injectContext(trace_context, upstream_host);
parent_.callbacks()->activeSpan().injectContext(trace_context, upstream_context);
}

stream_info_.setUpstreamInfo(std::make_shared<StreamInfo::UpstreamInfoImpl>());
Expand Down
3 changes: 1 addition & 2 deletions source/common/tracing/null_span_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ class NullSpan : public Span {
void setTag(absl::string_view, absl::string_view) override {}
void log(SystemTime, const std::string&) override {}
void finishSpan() override {}
void injectContext(Tracing::TraceContext&,
const Upstream::HostDescriptionConstSharedPtr&) override {}
void injectContext(Tracing::TraceContext&, const UpstreamContext&) override {}
void setBaggage(absl::string_view, absl::string_view) override {}
std::string getBaggage(absl::string_view) override { return EMPTY_STRING; }
std::string getTraceIdAsHex() const override { return EMPTY_STRING; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ std::string OpenTracingSpan::getBaggage(absl::string_view key) {
}

void OpenTracingSpan::injectContext(Tracing::TraceContext& trace_context,
const Upstream::HostDescriptionConstSharedPtr&) {
const Tracing::UpstreamContext&) {
if (driver_.propagationMode() == OpenTracingDriver::PropagationMode::SingleHeader) {
// Inject the span context using Envoy's single-header format.
std::ostringstream oss;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class OpenTracingSpan : public Tracing::Span, Logger::Loggable<Logger::Id::traci
void setTag(absl::string_view name, const absl::string_view) override;
void log(SystemTime timestamp, const std::string& event) override;
void injectContext(Tracing::TraceContext& trace_context,
const Upstream::HostDescriptionConstSharedPtr&) override;
const Tracing::UpstreamContext&) override;
Tracing::SpanPtr spawnChild(const Tracing::Config& config, const std::string& name,
SystemTime start_time) override;
void setSampled(bool) override;
Expand Down
3 changes: 1 addition & 2 deletions source/extensions/tracers/datadog/span.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ void Span::log(SystemTime, const std::string&) {

void Span::finishSpan() { span_.reset(); }

void Span::injectContext(Tracing::TraceContext& trace_context,
const Upstream::HostDescriptionConstSharedPtr&) {
void Span::injectContext(Tracing::TraceContext& trace_context, const Tracing::UpstreamContext&) {
if (!span_) {
return;
}
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/tracers/datadog/span.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class Span : public Tracing::Span {
void log(SystemTime, const std::string&) override;
void finishSpan() override;
void injectContext(Tracing::TraceContext& trace_context,
const Upstream::HostDescriptionConstSharedPtr& upstream) override;
const Tracing::UpstreamContext& upstream) override;
Tracing::SpanPtr spawnChild(const Tracing::Config& config, const std::string& name,
SystemTime start_time) override;
void setSampled(bool sampled) override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class Span : public Tracing::Span {
void log(SystemTime timestamp, const std::string& event) override;
void finishSpan() override;
void injectContext(Tracing::TraceContext& trace_context,
const Upstream::HostDescriptionConstSharedPtr&) override;
const Tracing::UpstreamContext&) override;
Tracing::SpanPtr spawnChild(const Tracing::Config& config, const std::string& name,
SystemTime start_time) override;
void setSampled(bool sampled) override;
Expand Down Expand Up @@ -203,8 +203,7 @@ void Span::log(SystemTime /*timestamp*/, const std::string& event) {

void Span::finishSpan() { span_.End(); }

void Span::injectContext(Tracing::TraceContext& trace_context,
const Upstream::HostDescriptionConstSharedPtr&) {
void Span::injectContext(Tracing::TraceContext& trace_context, const Tracing::UpstreamContext&) {
using OpenCensusConfig = envoy::config::trace::v3::OpenCensusConfig;
const auto& ctx = span_.context();
for (const auto& outgoing : oc_config_.outgoing_trace_context()) {
Expand Down
3 changes: 1 addition & 2 deletions source/extensions/tracers/opentelemetry/tracer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ void Span::finishSpan() {
}
}

void Span::injectContext(Tracing::TraceContext& trace_context,
const Upstream::HostDescriptionConstSharedPtr&) {
void Span::injectContext(Tracing::TraceContext& trace_context, const Tracing::UpstreamContext&) {
std::string trace_id_hex = absl::BytesToHexString(span_.trace_id());
std::string span_id_hex = absl::BytesToHexString(span_.span_id());
std::vector<uint8_t> trace_flags_vec{sampled()};
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/tracers/opentelemetry/tracer.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class Span : Logger::Loggable<Logger::Id::tracing>, public Tracing::Span {
void log(SystemTime /*timestamp*/, const std::string& /*event*/) override{};
void finishSpan() override;
void injectContext(Envoy::Tracing::TraceContext& /*trace_context*/,
const Upstream::HostDescriptionConstSharedPtr&) override;
const Tracing::UpstreamContext&) override;
Tracing::SpanPtr spawnChild(const Tracing::Config& config, const std::string& name,
SystemTime start_time) override;

Expand Down
4 changes: 2 additions & 2 deletions source/extensions/tracers/skywalking/tracer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ void Span::finishSpan() {
}

void Span::injectContext(Tracing::TraceContext& trace_context,
const Upstream::HostDescriptionConstSharedPtr& upstream) {
const Tracing::UpstreamContext& upstream) {
absl::string_view remote_address =
upstream != nullptr ? upstream->address()->asStringView() : trace_context.host();
upstream.host_.has_value() ? upstream.host_->address()->asStringView() : trace_context.host();

auto sw8_header =
tracing_context_->createSW8HeaderValue({remote_address.data(), remote_address.size()});
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/tracers/skywalking/tracer.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class Span : public Tracing::Span {
void log(SystemTime timestamp, const std::string& event) override;
void finishSpan() override;
void injectContext(Tracing::TraceContext& trace_context,
const Upstream::HostDescriptionConstSharedPtr& upstream) override;
const Tracing::UpstreamContext& upstream) override;
Tracing::SpanPtr spawnChild(const Tracing::Config& config, const std::string& name,
SystemTime start_time) override;
void setSampled(bool do_sample) override;
Expand Down
3 changes: 1 addition & 2 deletions source/extensions/tracers/xray/tracer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ const Tracing::TraceContextHandler& xForwardedForHeader() {
CONSTRUCT_ON_FIRST_USE(Tracing::TraceContextHandler, "x-forwarded-for");
}

void Span::injectContext(Tracing::TraceContext& trace_context,
const Upstream::HostDescriptionConstSharedPtr&) {
void Span::injectContext(Tracing::TraceContext& trace_context, const Tracing::UpstreamContext&) {
const std::string xray_header_value =
fmt::format("Root={};Parent={};Sampled={}", traceId(), id(), sampled() ? "1" : "0");
xRayTraceHeader().setRefKey(trace_context, xray_header_value);
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/tracers/xray/tracer.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class Span : public Tracing::Span, Logger::Loggable<Logger::Id::config> {
* Adds X-Ray trace header to the set of outgoing headers.
*/
void injectContext(Tracing::TraceContext& trace_context,
const Upstream::HostDescriptionConstSharedPtr&) override;
const Tracing::UpstreamContext&) override;

/**
* Gets the start time of this Span.
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/tracers/zipkin/zipkin_tracer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ void ZipkinSpan::setBaggage(absl::string_view, absl::string_view) {}
std::string ZipkinSpan::getBaggage(absl::string_view) { return EMPTY_STRING; }

void ZipkinSpan::injectContext(Tracing::TraceContext& trace_context,
const Upstream::HostDescriptionConstSharedPtr&) {
const Tracing::UpstreamContext&) {
// Set the trace-id and span-id headers properly, based on the newly-created span structure.
ZipkinCoreConstants::get().X_B3_TRACE_ID.setRefKey(trace_context, span_.traceIdAsHexString());
ZipkinCoreConstants::get().X_B3_SPAN_ID.setRefKey(trace_context, span_.idAsHexString());
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/tracers/zipkin/zipkin_tracer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class ZipkinSpan : public Tracing::Span {
void log(SystemTime timestamp, const std::string& event) override;

void injectContext(Tracing::TraceContext& trace_context,
const Upstream::HostDescriptionConstSharedPtr&) override;
const Tracing::UpstreamContext&) override;
Tracing::SpanPtr spawnChild(const Tracing::Config&, const std::string& name,
SystemTime start_time) override;

Expand Down
5 changes: 4 additions & 1 deletion test/common/tracing/tracer_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,9 @@ TEST(NullTracerTest, BasicFunctionality) {
Tracing::TestTraceContextImpl trace_context{};
Upstream::HostDescriptionConstSharedPtr host{
new testing::NiceMock<Upstream::MockHostDescription>()};
Upstream::ClusterInfoConstSharedPtr cluster{new testing::NiceMock<Upstream::MockClusterInfo>()};
Tracing::UpstreamContext upstream_context(host.get(), cluster.get(), Tracing::ServiceType::Http,
false);

SpanPtr span_ptr =
null_tracer.startSpan(config, trace_context, stream_info, {Reason::Sampling, true});
Expand All @@ -262,7 +265,7 @@ TEST(NullTracerTest, BasicFunctionality) {
span_ptr->setBaggage("key", "value");
ASSERT_EQ("", span_ptr->getBaggage("baggage_key"));
ASSERT_EQ(span_ptr->getTraceIdAsHex(), "");
span_ptr->injectContext(trace_context, host);
span_ptr->injectContext(trace_context, upstream_context);
span_ptr->log(SystemTime(), "fake_event");

EXPECT_NE(nullptr, span_ptr->spawnChild(config, "foo", SystemTime()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ TEST_F(OpenTracingDriverTest, InjectFailure) {
stats_.counter("tracing.opentracing.span_context_injection_error").value();
EXPECT_FALSE(
request_headers_.context_map_.contains(Http::CustomHeaders::get().OtSpanContext.get()));
span->injectContext(request_headers_, nullptr);
span->injectContext(request_headers_, Tracing::UpstreamContext());

EXPECT_EQ(span_context_injection_error_count + 1,
stats_.counter("tracing.opentracing.span_context_injection_error").value());
Expand All @@ -332,7 +332,7 @@ TEST_F(OpenTracingDriverTest, ExtractWithUnindexedHeader) {

Tracing::SpanPtr first_span = driver_->startSpan(
config_, request_headers_, stream_info_, operation_name_, {Tracing::Reason::Sampling, true});
first_span->injectContext(request_headers_, nullptr);
first_span->injectContext(request_headers_, Tracing::UpstreamContext());

Tracing::SpanPtr second_span = driver_->startSpan(
config_, request_headers_, stream_info_, operation_name_, {Tracing::Reason::Sampling, true});
Expand Down
4 changes: 2 additions & 2 deletions test/extensions/tracers/datadog/span_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ TEST_F(DatadogTracerSpanTest, InjectContext) {
Span span{std::move(span_)};

Tracing::TestTraceContextImpl context{};
span.injectContext(context, nullptr);
span.injectContext(context, Tracing::UpstreamContext());
// Span::injectContext doesn't modify any of named fields.
EXPECT_EQ("", context.context_protocol_);
EXPECT_EQ("", context.context_host_);
Expand Down Expand Up @@ -414,7 +414,7 @@ TEST_F(DatadogTracerSpanTest, NoOpMode) {
// `Span::log` doesn't do anything in any case.
span.log(time_.timeSystem().systemTime(), "ignored");
Tracing::TestTraceContextImpl context{};
span.injectContext(context, nullptr);
span.injectContext(context, Tracing::UpstreamContext());
EXPECT_EQ("", context.context_protocol_);
EXPECT_EQ("", context.context_host_);
EXPECT_EQ("", context.context_path_);
Expand Down
2 changes: 1 addition & 1 deletion test/extensions/tracers/opencensus/tracer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ void testIncomingHeaders(
{
Tracing::SpanPtr span = driver->startSpan(config, request_headers, stream_info, operation_name,
{Tracing::Reason::Sampling, false});
span->injectContext(injected_headers, nullptr);
span->injectContext(injected_headers, Tracing::UpstreamContext());
span->finishSpan();

// Check contents via public API.
Expand Down
Loading

0 comments on commit 93ecf70

Please sign in to comment.