From 1ffd4522cdb86ac62ad307d8f28a166e5261411b Mon Sep 17 00:00:00 2001 From: Fredy Wijaya Date: Thu, 9 May 2024 11:52:54 -0500 Subject: [PATCH] mobile: Fix merge race conflict (#34056) Fix race merge conflict Signed-off-by: Fredy Wijaya --- .../base_client_integration_test.cc | 107 +++++++----------- .../base_client_integration_test.h | 6 +- .../integration/client_integration_test.cc | 5 +- 3 files changed, 45 insertions(+), 73 deletions(-) diff --git a/mobile/test/common/integration/base_client_integration_test.cc b/mobile/test/common/integration/base_client_integration_test.cc index ee810a10ca92..bcddc80a527e 100644 --- a/mobile/test/common/integration/base_client_integration_test.cc +++ b/mobile/test/common/integration/base_client_integration_test.cc @@ -79,53 +79,56 @@ BaseClientIntegrationTest::BaseClientIntegrationTest(Network::Address::IpVersion void BaseClientIntegrationTest::initialize() { BaseIntegrationTest::initialize(); + { + absl::MutexLock l(&engine_lock_); + stream_prototype_ = engine_->streamClient()->newStreamPrototype(); + } HttpTestUtility::addDefaultHeaders(default_request_headers_); default_request_headers_.setHost(fake_upstreams_[0]->localAddress()->asStringView()); +} - createNewStream(stream_prototype_, cc_, stream_); +EnvoyStreamCallbacks BaseClientIntegrationTest::createDefaultStreamCallbacks() { + EnvoyStreamCallbacks stream_callbacks; + stream_callbacks.on_headers_ = [this](const Http::ResponseHeaderMap& headers, bool, + envoy_stream_intel intel) { + cc_.on_headers_calls++; + cc_.status = absl::StrCat(headers.getStatusValue()); + cc_.on_header_consumed_bytes_from_response = intel.consumed_bytes_from_response; + }; + stream_callbacks.on_data_ = [this](const Buffer::Instance&, uint64_t /* length */, + bool /* end_stream */, + envoy_stream_intel) { cc_.on_data_calls++; }; + stream_callbacks.on_complete_ = [this](envoy_stream_intel, envoy_final_stream_intel final_intel) { + memcpy(&last_stream_final_intel_, &final_intel, sizeof(envoy_final_stream_intel)); + if (expect_data_streams_) { + validateStreamIntel(final_intel, expect_dns_, upstream_tls_, cc_.on_complete_calls == 0); + } + cc_.on_complete_received_byte_count = final_intel.received_byte_count; + cc_.on_complete_calls++; + cc_.terminal_callback->setReady(); + }; + stream_callbacks.on_error_ = [this](EnvoyError, envoy_stream_intel, envoy_final_stream_intel) { + cc_.on_error_calls++; + cc_.terminal_callback->setReady(); + }; + stream_callbacks.on_cancel_ = [this](envoy_stream_intel, envoy_final_stream_intel final_intel) { + EXPECT_NE(-1, final_intel.stream_start_ms); + cc_.on_cancel_calls++; + cc_.terminal_callback->setReady(); + }; + return stream_callbacks; } -void BaseClientIntegrationTest::createNewStream( - Platform::StreamPrototypeSharedPtr& stream_prototype, callbacks_called& cc, - Platform::StreamSharedPtr& stream) { +Platform::StreamSharedPtr +BaseClientIntegrationTest::createNewStream(Platform::StreamPrototypeSharedPtr& stream_prototype, + EnvoyStreamCallbacks&& stream_callbacks) { { absl::MutexLock l(&engine_lock_); stream_prototype = engine_->streamClient()->newStreamPrototype(); } - - stream_prototype->setOnHeaders( - [&](Platform::ResponseHeadersSharedPtr headers, bool, envoy_stream_intel intel) { - cc.on_headers_calls++; - cc.status = absl::StrCat(headers->httpStatus()); - cc.on_header_consumed_bytes_from_response = intel.consumed_bytes_from_response; - }); - stream_prototype->setOnData([&](envoy_data c_data, bool) { - cc.on_data_calls++; - release_envoy_data(c_data); - }); - stream_prototype->setOnComplete([&](envoy_stream_intel, envoy_final_stream_intel final_intel) { - memcpy(&last_stream_final_intel_, &final_intel, sizeof(envoy_final_stream_intel)); - if (expect_data_streams_) { - validateStreamIntel(final_intel, expect_dns_, upstream_tls_, cc.on_complete_calls == 0); - } - cc.on_complete_received_byte_count = final_intel.received_byte_count; - cc.on_complete_calls++; - cc.terminal_callback->setReady(); - }); - stream_prototype->setOnError( - [&](Platform::EnvoyErrorSharedPtr, envoy_stream_intel, envoy_final_stream_intel) { - cc.on_error_calls++; - cc.terminal_callback->setReady(); - }); - stream_prototype->setOnCancel([&](envoy_stream_intel, envoy_final_stream_intel final_intel) { - EXPECT_NE(-1, final_intel.stream_start_ms); - cc.on_cancel_calls++; - cc.terminal_callback->setReady(); - }); - - stream = (*stream_prototype).start(explicit_flow_control_); + return stream_prototype->start(std::move(stream_callbacks), explicit_flow_control_); } void BaseClientIntegrationTest::threadRoutine(absl::Notification& engine_running) { @@ -234,36 +237,4 @@ testing::AssertionResult BaseClientIntegrationTest::waitForGaugeGe(const std::st return testing::AssertionSuccess(); } -EnvoyStreamCallbacks BaseClientIntegrationTest::createDefaultStreamCallbacks() { - EnvoyStreamCallbacks stream_callbacks; - stream_callbacks.on_headers_ = [this](const Http::ResponseHeaderMap& headers, bool, - envoy_stream_intel intel) { - cc_.on_headers_calls++; - cc_.status = absl::StrCat(headers.getStatusValue()); - cc_.on_header_consumed_bytes_from_response = intel.consumed_bytes_from_response; - }; - stream_callbacks.on_data_ = [this](const Buffer::Instance&, uint64_t /* length */, - bool /* end_stream */, - envoy_stream_intel) { cc_.on_data_calls++; }; - stream_callbacks.on_complete_ = [this](envoy_stream_intel, envoy_final_stream_intel final_intel) { - memcpy(&last_stream_final_intel_, &final_intel, sizeof(envoy_final_stream_intel)); - if (expect_data_streams_) { - validateStreamIntel(final_intel, expect_dns_, upstream_tls_, cc_.on_complete_calls == 0); - } - cc_.on_complete_received_byte_count = final_intel.received_byte_count; - cc_.on_complete_calls++; - cc_.terminal_callback->setReady(); - }; - stream_callbacks.on_error_ = [this](EnvoyError, envoy_stream_intel, envoy_final_stream_intel) { - cc_.on_error_calls++; - cc_.terminal_callback->setReady(); - }; - stream_callbacks.on_cancel_ = [this](envoy_stream_intel, envoy_final_stream_intel final_intel) { - EXPECT_NE(-1, final_intel.stream_start_ms); - cc_.on_cancel_calls++; - cc_.terminal_callback->setReady(); - }; - return stream_callbacks; -} - } // namespace Envoy diff --git a/mobile/test/common/integration/base_client_integration_test.h b/mobile/test/common/integration/base_client_integration_test.h index c3fc044bd735..7026780ee654 100644 --- a/mobile/test/common/integration/base_client_integration_test.h +++ b/mobile/test/common/integration/base_client_integration_test.h @@ -50,9 +50,9 @@ class BaseClientIntegrationTest : public BaseIntegrationTest { absl::MutexLock l(&engine_lock_); return reinterpret_cast(engine_->engine_); } - virtual void initialize() override; - void createNewStream(Platform::StreamPrototypeSharedPtr& stream_prototype, callbacks_called& cc, - Platform::StreamSharedPtr& stream_); + void initialize() override; + Platform::StreamSharedPtr createNewStream(Platform::StreamPrototypeSharedPtr& stream_prototype, + EnvoyStreamCallbacks&& stream_callbacks); void createEnvoy() override; void threadRoutine(absl::Notification& engine_running); diff --git a/mobile/test/common/integration/client_integration_test.cc b/mobile/test/common/integration/client_integration_test.cc index 43ed635f6c15..92f3cab3b42b 100644 --- a/mobile/test/common/integration/client_integration_test.cc +++ b/mobile/test/common/integration/client_integration_test.cc @@ -635,6 +635,7 @@ TEST_P(ClientIntegrationTest, ReresolveAndDrain) { // Send a request. The original upstream should be used because of DNS block. default_request_headers_.setHost( absl::StrCat("www.lyft.com:", fake_upstreams_[0]->localAddress()->ip()->port())); + stream_ = createNewStream(stream_prototype_, createDefaultStreamCallbacks()); stream_->sendHeaders(std::make_unique(default_request_headers_), true); terminal_callback_.waitReady(); @@ -654,7 +655,7 @@ TEST_P(ClientIntegrationTest, ReresolveAndDrain) { ASSERT_TRUE(waitForCounterGe("dns_cache.base_dns_cache.dns_query_attempt", 1)); EXPECT_EQ(0, getCounterValue("dns_cache.base_dns_cache.dns_query_success")); // The next request should go to the original upstream as there's been no drain. - createNewStream(stream_prototype_, cc_, stream_); + stream_ = createNewStream(stream_prototype_, createDefaultStreamCallbacks()); stream_->sendHeaders(std::make_unique(default_request_headers_), true); terminal_callback_.waitReady(); @@ -668,7 +669,7 @@ TEST_P(ClientIntegrationTest, ReresolveAndDrain) { ASSERT_TRUE(waitForCounterGe("dns_cache.base_dns_cache.dns_query_success", 1)); // Do one final request. It should go to the second upstream and return 202 - createNewStream(stream_prototype_, cc_, stream_); + stream_ = createNewStream(stream_prototype_, createDefaultStreamCallbacks()); stream_->sendHeaders(std::make_unique(default_request_headers_), true); terminal_callback_.waitReady();