Skip to content

Commit

Permalink
mobile: Fix merge race conflict (envoyproxy#34056)
Browse files Browse the repository at this point in the history
Fix race merge conflict

Signed-off-by: Fredy Wijaya <[email protected]>
  • Loading branch information
fredyw authored May 9, 2024
1 parent 9489540 commit 1ffd452
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 73 deletions.
107 changes: 39 additions & 68 deletions mobile/test/common/integration/base_client_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,53 +79,56 @@ BaseClientIntegrationTest::BaseClientIntegrationTest(Network::Address::IpVersion

void BaseClientIntegrationTest::initialize() {
BaseIntegrationTest::initialize();
{
absl::MutexLock l(&engine_lock_);
stream_prototype_ = engine_->streamClient()->newStreamPrototype();
}

HttpTestUtility::addDefaultHeaders(default_request_headers_);
default_request_headers_.setHost(fake_upstreams_[0]->localAddress()->asStringView());
}

createNewStream(stream_prototype_, cc_, stream_);
EnvoyStreamCallbacks BaseClientIntegrationTest::createDefaultStreamCallbacks() {
EnvoyStreamCallbacks stream_callbacks;
stream_callbacks.on_headers_ = [this](const Http::ResponseHeaderMap& headers, bool,
envoy_stream_intel intel) {
cc_.on_headers_calls++;
cc_.status = absl::StrCat(headers.getStatusValue());
cc_.on_header_consumed_bytes_from_response = intel.consumed_bytes_from_response;
};
stream_callbacks.on_data_ = [this](const Buffer::Instance&, uint64_t /* length */,
bool /* end_stream */,
envoy_stream_intel) { cc_.on_data_calls++; };
stream_callbacks.on_complete_ = [this](envoy_stream_intel, envoy_final_stream_intel final_intel) {
memcpy(&last_stream_final_intel_, &final_intel, sizeof(envoy_final_stream_intel));
if (expect_data_streams_) {
validateStreamIntel(final_intel, expect_dns_, upstream_tls_, cc_.on_complete_calls == 0);
}
cc_.on_complete_received_byte_count = final_intel.received_byte_count;
cc_.on_complete_calls++;
cc_.terminal_callback->setReady();
};
stream_callbacks.on_error_ = [this](EnvoyError, envoy_stream_intel, envoy_final_stream_intel) {
cc_.on_error_calls++;
cc_.terminal_callback->setReady();
};
stream_callbacks.on_cancel_ = [this](envoy_stream_intel, envoy_final_stream_intel final_intel) {
EXPECT_NE(-1, final_intel.stream_start_ms);
cc_.on_cancel_calls++;
cc_.terminal_callback->setReady();
};
return stream_callbacks;
}

void BaseClientIntegrationTest::createNewStream(
Platform::StreamPrototypeSharedPtr& stream_prototype, callbacks_called& cc,
Platform::StreamSharedPtr& stream) {
Platform::StreamSharedPtr
BaseClientIntegrationTest::createNewStream(Platform::StreamPrototypeSharedPtr& stream_prototype,
EnvoyStreamCallbacks&& stream_callbacks) {

{
absl::MutexLock l(&engine_lock_);
stream_prototype = engine_->streamClient()->newStreamPrototype();
}

stream_prototype->setOnHeaders(
[&](Platform::ResponseHeadersSharedPtr headers, bool, envoy_stream_intel intel) {
cc.on_headers_calls++;
cc.status = absl::StrCat(headers->httpStatus());
cc.on_header_consumed_bytes_from_response = intel.consumed_bytes_from_response;
});
stream_prototype->setOnData([&](envoy_data c_data, bool) {
cc.on_data_calls++;
release_envoy_data(c_data);
});
stream_prototype->setOnComplete([&](envoy_stream_intel, envoy_final_stream_intel final_intel) {
memcpy(&last_stream_final_intel_, &final_intel, sizeof(envoy_final_stream_intel));
if (expect_data_streams_) {
validateStreamIntel(final_intel, expect_dns_, upstream_tls_, cc.on_complete_calls == 0);
}
cc.on_complete_received_byte_count = final_intel.received_byte_count;
cc.on_complete_calls++;
cc.terminal_callback->setReady();
});
stream_prototype->setOnError(
[&](Platform::EnvoyErrorSharedPtr, envoy_stream_intel, envoy_final_stream_intel) {
cc.on_error_calls++;
cc.terminal_callback->setReady();
});
stream_prototype->setOnCancel([&](envoy_stream_intel, envoy_final_stream_intel final_intel) {
EXPECT_NE(-1, final_intel.stream_start_ms);
cc.on_cancel_calls++;
cc.terminal_callback->setReady();
});

stream = (*stream_prototype).start(explicit_flow_control_);
return stream_prototype->start(std::move(stream_callbacks), explicit_flow_control_);
}

void BaseClientIntegrationTest::threadRoutine(absl::Notification& engine_running) {
Expand Down Expand Up @@ -234,36 +237,4 @@ testing::AssertionResult BaseClientIntegrationTest::waitForGaugeGe(const std::st
return testing::AssertionSuccess();
}

EnvoyStreamCallbacks BaseClientIntegrationTest::createDefaultStreamCallbacks() {
EnvoyStreamCallbacks stream_callbacks;
stream_callbacks.on_headers_ = [this](const Http::ResponseHeaderMap& headers, bool,
envoy_stream_intel intel) {
cc_.on_headers_calls++;
cc_.status = absl::StrCat(headers.getStatusValue());
cc_.on_header_consumed_bytes_from_response = intel.consumed_bytes_from_response;
};
stream_callbacks.on_data_ = [this](const Buffer::Instance&, uint64_t /* length */,
bool /* end_stream */,
envoy_stream_intel) { cc_.on_data_calls++; };
stream_callbacks.on_complete_ = [this](envoy_stream_intel, envoy_final_stream_intel final_intel) {
memcpy(&last_stream_final_intel_, &final_intel, sizeof(envoy_final_stream_intel));
if (expect_data_streams_) {
validateStreamIntel(final_intel, expect_dns_, upstream_tls_, cc_.on_complete_calls == 0);
}
cc_.on_complete_received_byte_count = final_intel.received_byte_count;
cc_.on_complete_calls++;
cc_.terminal_callback->setReady();
};
stream_callbacks.on_error_ = [this](EnvoyError, envoy_stream_intel, envoy_final_stream_intel) {
cc_.on_error_calls++;
cc_.terminal_callback->setReady();
};
stream_callbacks.on_cancel_ = [this](envoy_stream_intel, envoy_final_stream_intel final_intel) {
EXPECT_NE(-1, final_intel.stream_start_ms);
cc_.on_cancel_calls++;
cc_.terminal_callback->setReady();
};
return stream_callbacks;
}

} // namespace Envoy
6 changes: 3 additions & 3 deletions mobile/test/common/integration/base_client_integration_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ class BaseClientIntegrationTest : public BaseIntegrationTest {
absl::MutexLock l(&engine_lock_);
return reinterpret_cast<envoy_engine_t>(engine_->engine_);
}
virtual void initialize() override;
void createNewStream(Platform::StreamPrototypeSharedPtr& stream_prototype, callbacks_called& cc,
Platform::StreamSharedPtr& stream_);
void initialize() override;
Platform::StreamSharedPtr createNewStream(Platform::StreamPrototypeSharedPtr& stream_prototype,
EnvoyStreamCallbacks&& stream_callbacks);

void createEnvoy() override;
void threadRoutine(absl::Notification& engine_running);
Expand Down
5 changes: 3 additions & 2 deletions mobile/test/common/integration/client_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,7 @@ TEST_P(ClientIntegrationTest, ReresolveAndDrain) {
// Send a request. The original upstream should be used because of DNS block.
default_request_headers_.setHost(
absl::StrCat("www.lyft.com:", fake_upstreams_[0]->localAddress()->ip()->port()));
stream_ = createNewStream(stream_prototype_, createDefaultStreamCallbacks());
stream_->sendHeaders(std::make_unique<Http::TestRequestHeaderMapImpl>(default_request_headers_),
true);
terminal_callback_.waitReady();
Expand All @@ -654,7 +655,7 @@ TEST_P(ClientIntegrationTest, ReresolveAndDrain) {
ASSERT_TRUE(waitForCounterGe("dns_cache.base_dns_cache.dns_query_attempt", 1));
EXPECT_EQ(0, getCounterValue("dns_cache.base_dns_cache.dns_query_success"));
// The next request should go to the original upstream as there's been no drain.
createNewStream(stream_prototype_, cc_, stream_);
stream_ = createNewStream(stream_prototype_, createDefaultStreamCallbacks());
stream_->sendHeaders(std::make_unique<Http::TestRequestHeaderMapImpl>(default_request_headers_),
true);
terminal_callback_.waitReady();
Expand All @@ -668,7 +669,7 @@ TEST_P(ClientIntegrationTest, ReresolveAndDrain) {
ASSERT_TRUE(waitForCounterGe("dns_cache.base_dns_cache.dns_query_success", 1));

// Do one final request. It should go to the second upstream and return 202
createNewStream(stream_prototype_, cc_, stream_);
stream_ = createNewStream(stream_prototype_, createDefaultStreamCallbacks());
stream_->sendHeaders(std::make_unique<Http::TestRequestHeaderMapImpl>(default_request_headers_),
true);
terminal_callback_.waitReady();
Expand Down

0 comments on commit 1ffd452

Please sign in to comment.