Skip to content

Commit

Permalink
Add ENVOY_EXECUTION_SCOPE. (#36056)
Browse files Browse the repository at this point in the history
Add `ENVOY_EXECUTION_SCOPE` to mark the start and end of a
Envoy::Tracing::Span or Http::FilterContext, which is active in the
current thread.

This macro only takes effect when `ENVOY_ENABLE_EXECUTION_SCOPE` is
defined.

Commit Message: Add `ENVOY_EXECUTION_SCOPE`.
Additional Description:
Risk Level: No. It is no-op unless `ENVOY_ENABLE_EXECUTION_SCOPE` is
defined.
Testing: Unit test in test/common/common/execution_context_test.cc.
Docs Changes: N/A
Release Notes: N/A
Platform Specific Features:
[Optional Runtime guard:]
[Optional Fixes #Issue]
[Optional Fixes commit #PR or SHA]
[Optional Deprecated:]
[Optional [API
Considerations](https://github.com/envoyproxy/envoy/blob/main/api/review_checklist.md):]

---------

Signed-off-by: Bin Wu <[email protected]>
  • Loading branch information
wu-bin authored Oct 5, 2024
1 parent 9244dc9 commit 16cd72e
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 11 deletions.
2 changes: 2 additions & 0 deletions envoy/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ envoy_cc_library(
deps = [
":pure_lib",
":scope_tracker_interface",
"//source/common/common:cleanup_lib",
"//source/common/common:macros",
],
)

Expand Down
62 changes: 52 additions & 10 deletions envoy/common/execution_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#include "envoy/common/scope_tracker.h"
#include "envoy/stream_info/stream_info.h"

#include "source/common/common/cleanup.h"
#include "source/common/common/macros.h"
#include "source/common/common/non_copyable.h"

namespace Envoy {
Expand All @@ -15,13 +17,42 @@ namespace Envoy {
static constexpr absl::string_view kConnectionExecutionContextFilterStateName =
"envoy.network.connection_execution_context";

namespace Http {
struct FilterContext;
}

namespace Tracing {
class Span;
}

class ScopedExecutionContext;

// ExecutionContext can be inherited by subclasses to represent arbitrary information associated
// with the execution of a piece of code. activate/deactivate are called when the said execution
// starts/ends. For an example usage, please see
// https://github.com/envoyproxy/envoy/issues/32012.
class ExecutionContext : public StreamInfo::FilterState::Object, NonCopyable {
public:
static void setEnabled(bool value) { enabled().store(value, std::memory_order_relaxed); }

static bool isEnabled() { return enabled().load(std::memory_order_relaxed); }

static ExecutionContext* fromStreamInfo(OptRef<const StreamInfo::StreamInfo> info) {
if (!isEnabled() || !info.has_value()) {
return nullptr;
}
const auto* const_context = info->filterState().getDataReadOnly<ExecutionContext>(
kConnectionExecutionContextFilterStateName);
return const_cast<ExecutionContext*>(const_context);
}

// Called when enters a scope in which |*span| is active.
// Returns an object that can do some cleanup when exits the scope.
virtual Envoy::Cleanup onScopeEnter(Envoy::Tracing::Span* span) PURE;
// Called when enters a scope in which |*filter_context| is active.
// Returns an object that can do some cleanup when exits the scope.
virtual Envoy::Cleanup onScopeEnter(const Http::FilterContext* filter_context) PURE;

protected:
// Called when the current thread starts to run code on behalf of the owner of this object.
// protected because it should only be called by ScopedExecutionContext.
Expand All @@ -30,6 +61,9 @@ class ExecutionContext : public StreamInfo::FilterState::Object, NonCopyable {
// protected because it should only be called by ScopedExecutionContext.
virtual void deactivate() PURE;

private:
static std::atomic<bool>& enabled() { MUTABLE_CONSTRUCT_ON_FIRST_USE(std::atomic<bool>); }

friend class ScopedExecutionContext;
};

Expand All @@ -47,7 +81,8 @@ class ScopedExecutionContext : NonCopyable {
public:
ScopedExecutionContext() : ScopedExecutionContext(nullptr) {}
ScopedExecutionContext(const ScopeTrackedObject* object)
: context_(object != nullptr ? getExecutionContext(object->trackedStream()) : nullptr) {
: context_(object != nullptr ? ExecutionContext::fromStreamInfo(object->trackedStream())
: nullptr) {
if (context_ != nullptr) {
context_->activate();
}
Expand All @@ -66,18 +101,25 @@ class ScopedExecutionContext : NonCopyable {
bool isNull() const { return context_ == nullptr; }

private:
ExecutionContext* getExecutionContext(OptRef<const StreamInfo::StreamInfo> info) {
if (!info.has_value()) {
return nullptr;
}
const auto* const_context = info->filterState().getDataReadOnly<ExecutionContext>(
kConnectionExecutionContextFilterStateName);
return const_cast<ExecutionContext*>(const_context);
}

ExecutionContext* context_;
};

#define ENVOY_EXECUTION_SCOPE_CAT_(a, b) a##b
#define ENVOY_EXECUTION_SCOPE_CAT(a, b) ENVOY_EXECUTION_SCOPE_CAT_(a, b)
// Invoked when |scopedObject| is active from the current line to the end of the current c++ scope.
// |trackedStream| is a OptRef<const StreamInfo> from which a ExecutionContext is extracted.
// |scopedObject| is a pointer to a Envoy::Tracing::Span or a Http::FilterContext.
#define ENVOY_EXECUTION_SCOPE(trackedStream, scopedObject) \
Envoy::Cleanup ENVOY_EXECUTION_SCOPE_CAT(on_scope_exit_, __LINE__) = \
[execution_context = ExecutionContext::fromStreamInfo(trackedStream), \
scoped_object = (scopedObject)] { \
if (execution_context == nullptr) { \
return Envoy::Cleanup::Noop(); \
} \
return execution_context->onScopeEnter(scoped_object); \
}()
#else
#define ENVOY_EXECUTION_SCOPE(trackedStream, scopedObject)
#endif

} // namespace Envoy
4 changes: 4 additions & 0 deletions source/common/common/cleanup.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ class Cleanup {

bool cancelled() { return cancelled_; }

static Cleanup Noop() {
return Cleanup([] {});
}

private:
std::function<void()> f_;
bool cancelled_{false};
Expand Down
11 changes: 11 additions & 0 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1416,6 +1416,7 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapSharedPt
traceRequest();
}

ENVOY_EXECUTION_SCOPE(trackedStream(), active_span_.get());
if (!connection_manager_.shouldDeferRequestProxyingToNextIoCycle()) {
filter_manager_.decodeHeaders(*request_headers_, end_stream);
} else {
Expand Down Expand Up @@ -1487,6 +1488,7 @@ void ConnectionManagerImpl::ActiveStream::traceRequest() {
void ConnectionManagerImpl::ActiveStream::decodeData(Buffer::Instance& data, bool end_stream) {
ScopeTrackerScopeState scope(this,
connection_manager_.read_callbacks_->connection().dispatcher());
ENVOY_EXECUTION_SCOPE(trackedStream(), active_span_.get());
maybeRecordLastByteReceived(end_stream);
filter_manager_.streamInfo().addBytesReceived(data.length());
if (!state_.deferred_to_next_io_iteration_) {
Expand All @@ -1504,6 +1506,7 @@ void ConnectionManagerImpl::ActiveStream::decodeTrailers(RequestTrailerMapPtr&&
ENVOY_STREAM_LOG(debug, "request trailers complete:\n{}", *this, *trailers);
ScopeTrackerScopeState scope(this,
connection_manager_.read_callbacks_->connection().dispatcher());
ENVOY_EXECUTION_SCOPE(trackedStream(), active_span_.get());
resetIdleTimer();

ASSERT(!request_trailers_);
Expand All @@ -1524,6 +1527,7 @@ void ConnectionManagerImpl::ActiveStream::decodeTrailers(RequestTrailerMapPtr&&
}

void ConnectionManagerImpl::ActiveStream::decodeMetadata(MetadataMapPtr&& metadata_map) {
ENVOY_EXECUTION_SCOPE(trackedStream(), active_span_.get());
resetIdleTimer();
if (!state_.deferred_to_next_io_iteration_) {
// After going through filters, the ownership of metadata_map will be passed to terminal filter.
Expand Down Expand Up @@ -1728,6 +1732,7 @@ void ConnectionManagerImpl::ActiveStream::onLocalReply(Code code) {
}

void ConnectionManagerImpl::ActiveStream::encode1xxHeaders(ResponseHeaderMap& response_headers) {
ENVOY_EXECUTION_SCOPE(trackedStream(), active_span_.get());
// Strip the T-E headers etc. Defer other header additions as well as drain-close logic to the
// continuation headers.
ConnectionManagerUtility::mutateResponseHeaders(
Expand All @@ -1746,6 +1751,7 @@ void ConnectionManagerImpl::ActiveStream::encode1xxHeaders(ResponseHeaderMap& re

void ConnectionManagerImpl::ActiveStream::encodeHeaders(ResponseHeaderMap& headers,
bool end_stream) {
ENVOY_EXECUTION_SCOPE(trackedStream(), active_span_.get());
// Base headers.

// We want to preserve the original date header, but we add a date header if it is absent
Expand Down Expand Up @@ -1891,6 +1897,7 @@ void ConnectionManagerImpl::ActiveStream::encodeHeaders(ResponseHeaderMap& heade
}

void ConnectionManagerImpl::ActiveStream::encodeData(Buffer::Instance& data, bool end_stream) {
ENVOY_EXECUTION_SCOPE(trackedStream(), active_span_.get());
ENVOY_STREAM_LOG(trace, "encoding data via codec (size={} end_stream={})", *this, data.length(),
end_stream);

Expand All @@ -1899,12 +1906,14 @@ void ConnectionManagerImpl::ActiveStream::encodeData(Buffer::Instance& data, boo
}

void ConnectionManagerImpl::ActiveStream::encodeTrailers(ResponseTrailerMap& trailers) {
ENVOY_EXECUTION_SCOPE(trackedStream(), active_span_.get());
ENVOY_STREAM_LOG(debug, "encoding trailers via codec:\n{}", *this, trailers);

response_encoder_->encodeTrailers(trailers);
}

void ConnectionManagerImpl::ActiveStream::encodeMetadata(MetadataMapPtr&& metadata) {
ENVOY_EXECUTION_SCOPE(trackedStream(), active_span_.get());
MetadataMapVector metadata_map_vector;
metadata_map_vector.emplace_back(std::move(metadata));
ENVOY_STREAM_LOG(debug, "encoding metadata via codec:\n{}", *this, metadata_map_vector);
Expand Down Expand Up @@ -2182,6 +2191,7 @@ void ConnectionManagerImpl::ActiveStream::onRequestDataTooLarge() {

void ConnectionManagerImpl::ActiveStream::recreateStream(
StreamInfo::FilterStateSharedPtr filter_state) {
ENVOY_EXECUTION_SCOPE(trackedStream(), active_span_.get());
ResponseEncoder* response_encoder = response_encoder_;
response_encoder_ = nullptr;

Expand Down Expand Up @@ -2253,6 +2263,7 @@ bool ConnectionManagerImpl::ActiveStream::onDeferredRequestProcessing() {
if (!state_.deferred_to_next_io_iteration_) {
return false;
}
ENVOY_EXECUTION_SCOPE(trackedStream(), active_span_.get());
state_.deferred_to_next_io_iteration_ = false;
bool end_stream = state_.deferred_end_stream_ && deferred_data_ == nullptr &&
deferred_request_trailers_ == nullptr && deferred_metadata_.empty();
Expand Down
9 changes: 9 additions & 0 deletions source/common/http/filter_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,7 @@ void FilterManager::decodeHeaders(ActiveStreamDecoderFilter* filter, RequestHead
(*entry)->end_stream_);

for (; entry != decoder_filters_.end(); entry++) {
ENVOY_EXECUTION_SCOPE(trackedStream(), &(*entry)->filter_context_);
ASSERT(!(state_.filter_call_state_ & FilterCallState::DecodeHeaders));
state_.filter_call_state_ |= FilterCallState::DecodeHeaders;
(*entry)->end_stream_ = (end_stream && continue_data_entry == decoder_filters_.end());
Expand Down Expand Up @@ -653,6 +654,7 @@ void FilterManager::decodeData(ActiveStreamDecoderFilter* filter, Buffer::Instan
(*entry)->end_stream_);

for (; entry != decoder_filters_.end(); entry++) {
ENVOY_EXECUTION_SCOPE(trackedStream(), &(*entry)->filter_context_);
// If the filter pointed by entry has stopped for all frame types, return now.
if (handleDataIfStopAll(**entry, data, state_.decoder_filters_streaming_)) {
return;
Expand Down Expand Up @@ -803,6 +805,7 @@ void FilterManager::decodeTrailers(ActiveStreamDecoderFilter* filter, RequestTra
ASSERT(!state_.decoder_filter_chain_complete_ || entry == decoder_filters_.end());

for (; entry != decoder_filters_.end(); entry++) {
ENVOY_EXECUTION_SCOPE(trackedStream(), &(*entry)->filter_context_);
// If the filter pointed by entry has stopped for all frame type, return now.
if ((*entry)->stoppedAll()) {
return;
Expand Down Expand Up @@ -846,6 +849,7 @@ void FilterManager::decodeMetadata(ActiveStreamDecoderFilter* filter, MetadataMa
ASSERT(!(state_.filter_call_state_ & FilterCallState::DecodeMetadata));

for (; entry != decoder_filters_.end(); entry++) {
ENVOY_EXECUTION_SCOPE(trackedStream(), &(*entry)->filter_context_);
// If the filter pointed by entry has stopped for all frame type, stores metadata and returns.
// If the filter pointed by entry hasn't returned from decodeHeaders, stores newly added
// metadata in case decodeHeaders returns StopAllIteration. The latter can happen when headers
Expand Down Expand Up @@ -1173,6 +1177,7 @@ void FilterManager::encode1xxHeaders(ActiveStreamEncoderFilter* filter,
std::list<ActiveStreamEncoderFilterPtr>::iterator entry =
commonEncodePrefix(filter, false, FilterIterationStartState::AlwaysStartFromNext);
for (; entry != encoder_filters_.end(); entry++) {
ENVOY_EXECUTION_SCOPE(trackedStream(), &(*entry)->filter_context_);
ASSERT(!(state_.filter_call_state_ & FilterCallState::Encode1xxHeaders));
state_.filter_call_state_ |= FilterCallState::Encode1xxHeaders;
const Filter1xxHeadersStatus status = (*entry)->handle_->encode1xxHeaders(headers);
Expand Down Expand Up @@ -1222,6 +1227,7 @@ void FilterManager::encodeHeaders(ActiveStreamEncoderFilter* filter, ResponseHea
std::list<ActiveStreamEncoderFilterPtr>::iterator continue_data_entry = encoder_filters_.end();

for (; entry != encoder_filters_.end(); entry++) {
ENVOY_EXECUTION_SCOPE(trackedStream(), &(*entry)->filter_context_);
ASSERT(!(state_.filter_call_state_ & FilterCallState::EncodeHeaders));
state_.filter_call_state_ |= FilterCallState::EncodeHeaders;
(*entry)->end_stream_ = (end_stream && continue_data_entry == encoder_filters_.end());
Expand Down Expand Up @@ -1306,6 +1312,7 @@ void FilterManager::encodeMetadata(ActiveStreamEncoderFilter* filter,
commonEncodePrefix(filter, false, FilterIterationStartState::CanStartFromCurrent);

for (; entry != encoder_filters_.end(); entry++) {
ENVOY_EXECUTION_SCOPE(trackedStream(), &(*entry)->filter_context_);
// If the filter pointed by entry has stopped for all frame type, stores metadata and returns.
// If the filter pointed by entry hasn't returned from encodeHeaders, stores newly added
// metadata in case encodeHeaders returns StopAllIteration. The latter can happen when headers
Expand Down Expand Up @@ -1394,6 +1401,7 @@ void FilterManager::encodeData(ActiveStreamEncoderFilter* filter, Buffer::Instan

const bool trailers_exists_at_start = filter_manager_callbacks_.responseTrailers().has_value();
for (; entry != encoder_filters_.end(); entry++) {
ENVOY_EXECUTION_SCOPE(trackedStream(), &(*entry)->filter_context_);
// If the filter pointed by entry has stopped for all frame type, return now.
if (handleDataIfStopAll(**entry, data, state_.encoder_filters_streaming_)) {
return;
Expand Down Expand Up @@ -1464,6 +1472,7 @@ void FilterManager::encodeTrailers(ActiveStreamEncoderFilter* filter,
std::list<ActiveStreamEncoderFilterPtr>::iterator entry =
commonEncodePrefix(filter, true, FilterIterationStartState::CanStartFromCurrent);
for (; entry != encoder_filters_.end(); entry++) {
ENVOY_EXECUTION_SCOPE(trackedStream(), &(*entry)->filter_context_);
// If the filter pointed by entry has stopped for all frame type, return now.
if ((*entry)->stoppedAll()) {
return;
Expand Down
2 changes: 2 additions & 0 deletions test/common/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,8 @@ envoy_cc_test(
deps = [
"//envoy/common:execution_context",
"//source/common/api:api_lib",
"//source/common/http:conn_manager_lib",
"//source/common/http:filter_manager_lib",
"//test/mocks:common_lib",
"//test/mocks/stream_info:stream_info_mocks",
],
Expand Down
Loading

0 comments on commit 16cd72e

Please sign in to comment.