Skip to content

Commit

Permalink
mobile: Add an optional idempotency parameter in sendHeaders (envoypr…
Browse files Browse the repository at this point in the history
…oxy#35474)

This PR adds an optional parameter to indicate whether a request is an
idempotent in the `sendHeaders`. This can be useful to automatically add
a retry policy, such as `http3-post-connect-failure` when the
`idempotent` flag is set. This makes the Envoy Mobile API similar to
Cronet API. The Cronvoy implementation has also been updated to respect
the `idempotency` flag.

Risk Level: low
Testing: unit test
Docs Changes: n/a
Release Notes: n/a
Platform Specific Features: mobile

---------

Signed-off-by: Fredy Wijaya <[email protected]>
  • Loading branch information
fredyw authored Jul 30, 2024
1 parent 2383e1a commit 172679e
Show file tree
Hide file tree
Showing 24 changed files with 120 additions and 42 deletions.
2 changes: 1 addition & 1 deletion mobile/examples/java/hello_world/MainActivity.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ private void makeRequest() {
return Unit.INSTANCE;
})
.start(Executors.newSingleThreadExecutor())
.sendHeaders(requestHeaders, true);
.sendHeaders(requestHeaders, /* endStream= */ true, /* idempotent= */ false);

clear_text = !clear_text;
}
Expand Down
4 changes: 2 additions & 2 deletions mobile/library/cc/stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ namespace Platform {

Stream::Stream(InternalEngine* engine, envoy_stream_t handle) : engine_(engine), handle_(handle) {}

Stream& Stream::sendHeaders(Http::RequestHeaderMapPtr headers, bool end_stream) {
engine_->sendHeaders(handle_, std::move(headers), end_stream);
Stream& Stream::sendHeaders(Http::RequestHeaderMapPtr headers, bool end_stream, bool idempotent) {
engine_->sendHeaders(handle_, std::move(headers), end_stream, idempotent);
return *this;
}

Expand Down
5 changes: 4 additions & 1 deletion mobile/library/cc/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ class Stream {
*
* @param headers the headers to send.
* @param end_stream indicates whether to close the stream locally after sending this frame.
* @param idempotent indicates that the request is idempotent. When idempotent is set to true
* Envoy Mobile will retry on HTTP/3 post-handshake failures. By default, it is
* set to false.
*/
Stream& sendHeaders(Http::RequestHeaderMapPtr headers, bool end_stream);
Stream& sendHeaders(Http::RequestHeaderMapPtr headers, bool end_stream, bool idempotent = false);

/**
* Send data over an open HTTP stream. This method can be invoked multiple times.
Expand Down
10 changes: 8 additions & 2 deletions mobile/library/common/http/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include "absl/strings/str_join.h"
#include "absl/strings/string_view.h"
#include "library/common/bridge/utility.h"
#include "library/common/http/header_utility.h"
#include "library/common/stream_info/extra_stream_info.h"
#include "library/common/system/system_helper.h"

Expand Down Expand Up @@ -552,7 +551,8 @@ void Client::startStream(envoy_stream_t new_stream_handle, EnvoyStreamCallbacks&
ENVOY_LOG(debug, "[S{}] start stream", new_stream_handle);
}

void Client::sendHeaders(envoy_stream_t stream, RequestHeaderMapPtr headers, bool end_stream) {
void Client::sendHeaders(envoy_stream_t stream, RequestHeaderMapPtr headers, bool end_stream,
bool idempotent) {
ASSERT(dispatcher_.isThreadSafe());
Client::DirectStreamSharedPtr direct_stream =
getStream(stream, GetStreamFilters::AllowOnlyForOpenStreams);
Expand Down Expand Up @@ -597,6 +597,12 @@ void Client::sendHeaders(envoy_stream_t stream, RequestHeaderMapPtr headers, boo
// a request here:
// https://github.com/envoyproxy/envoy/blob/c9e3b9d2c453c7fe56a0e3615f0c742ac0d5e768/source/common/router/config_impl.cc#L1091-L1096
headers->setReferenceForwardedProto(Headers::get().SchemeValues.Https);
// When the request is idempotent, it is safe to retry.
if (idempotent) {
// https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_filters/router_filter#x-envoy-retry-on
headers->addCopy(Headers::get().EnvoyRetryOn,
Headers::get().EnvoyRetryOnValues.Http3PostConnectFailure);
}
ENVOY_LOG(debug, "[S{}] request headers for stream (end_stream={}):\n{}", stream, end_stream,
*headers);
request_decoder->decodeHeaders(std::move(headers), end_stream);
Expand Down
6 changes: 5 additions & 1 deletion mobile/library/common/http/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,12 @@ class Client : public Logger::Loggable<Logger::Id::http> {
* @param stream the stream to send headers over.
* @param headers the headers to send.
* @param end_stream indicates whether to close the stream locally after sending this frame.
* @param idempotent indicates that the request is idempotent. When idempotent is set to true
* Envoy Mobile will retry on HTTP/3 post-handshake failures. By default, it is
* set to false.
*/
void sendHeaders(envoy_stream_t stream, RequestHeaderMapPtr headers, bool end_stream);
void sendHeaders(envoy_stream_t stream, RequestHeaderMapPtr headers, bool end_stream,
bool idempotent = false);

/**
* Notify the stream that the caller is ready to receive more data from the response stream. Only
Expand Down
9 changes: 5 additions & 4 deletions mobile/library/common/internal_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,11 @@ envoy_status_t InternalEngine::startStream(envoy_stream_t stream,
}

envoy_status_t InternalEngine::sendHeaders(envoy_stream_t stream, Http::RequestHeaderMapPtr headers,
bool end_stream) {
return dispatcher_->post([this, stream, headers = std::move(headers), end_stream]() mutable {
http_client_->sendHeaders(stream, std::move(headers), end_stream);
});
bool end_stream, bool idempotent) {
return dispatcher_->post(
[this, stream, headers = std::move(headers), end_stream, idempotent]() mutable {
http_client_->sendHeaders(stream, std::move(headers), end_stream, idempotent);
});
return ENVOY_SUCCESS;
}

Expand Down
5 changes: 4 additions & 1 deletion mobile/library/common/internal_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,12 @@ class InternalEngine : public Logger::Loggable<Logger::Id::main> {
* @param stream the stream to send headers over.
* @param headers the headers to send.
* @param end_stream indicates whether to close the stream locally after sending this frame.
* @param idempotent indicates that the request is idempotent. When idempotent is set to true
* Envoy Mobile will retry on HTTP/3 post-handshake failures. By default, it is
* set to false.
*/
envoy_status_t sendHeaders(envoy_stream_t stream, Http::RequestHeaderMapPtr headers,
bool end_stream);
bool end_stream, bool idempotent = false);

envoy_status_t readData(envoy_stream_t stream, size_t bytes_to_read);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,14 @@ public EnvoyHTTPStream(long engineHandle, long streamHandle, EnvoyHTTPCallbacks
* Send headers over an open HTTP streamHandle. This method can be invoked once
* and needs to be called before send_data.
*
* @param headers, the headers to send.
* @param endStream, supplies whether this is headers only.
* @param headers the headers to send.
* @param endStream supplies whether this is headers only.
* @param idempotent indicates that the request is idempotent. When idempotent is set to true
* Envoy Mobile will retry on HTTP/3 post-handshake failures.
*/
public void sendHeaders(Map<String, List<String>> headers, boolean endStream) {
JniLibrary.sendHeaders(engineHandle, streamHandle, headers, endStream);
public void sendHeaders(Map<String, List<String>> headers, boolean endStream,
boolean idempotent) {
JniLibrary.sendHeaders(engineHandle, streamHandle, headers, endStream, idempotent);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,17 @@ protected static native int startStream(long engine, long stream, EnvoyHTTPCallb
* Send headers over an open HTTP stream. This method can be invoked once and
* needs to be called before send_data.
*
* @param engine the stream's associated engine.
* @param stream the stream to send headers over.
* @param headers the headers to send.
* @param endStream supplies whether this is headers only.
* @param engine the stream's associated engine.
* @param stream the stream to send headers over.
* @param headers the headers to send.
* @param endStream supplies whether this is headers only.
* @param idempotent indicates that the request is idempotent. When idempotent is set to true
* Envoy Mobile will retry on HTTP/3 post-handshake failures.
* @return the resulting status of the operation.
*/
protected static native int sendHeaders(long engine, long stream,
Map<String, List<String>> headers, boolean endStream);
Map<String, List<String>> headers, boolean endStream,
boolean idempotent);

/**
* Send data over an open HTTP stream. This method can be invoked multiple
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ void sendHeaders(Map<String, List<String>> envoyRequestHeaders, boolean endStrea
if (returnTrueIfCanceledOrIncreaseConcurrentlyRunningStreamOperations()) {
return; // Already Cancelled - to late to send something.
}
mStream.sendHeaders(envoyRequestHeaders, endStream);
mStream.sendHeaders(envoyRequestHeaders, endStream, /* idempotent= */ false);
if (decreaseConcurrentlyRunningStreamOperationsAndReturnTrueIfAwaitingCancel()) {
mStream.cancel(); // Cancel was called previously, so now this is honored.
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ public final class CronvoyUrlRequest extends CronvoyUrlRequestBase {
private String mCurrentUrl;
private volatile CronvoyUrlResponseInfoImpl mUrlResponseInfo;
private String mPendingRedirectUrl;
private boolean mIdempotent;

/**
* @param executor The executor for orchestrating tasks between envoy-mobile callbacks
Expand All @@ -188,7 +189,7 @@ public final class CronvoyUrlRequest extends CronvoyUrlRequestBase {
Executor executor, String userAgent, boolean allowDirectExecutor,
Collection<Object> connectionAnnotations, boolean trafficStatsTagSet,
int trafficStatsTag, boolean trafficStatsUidSet, int trafficStatsUid,
RequestFinishedInfo.Listener requestFinishedListener) {
RequestFinishedInfo.Listener requestFinishedListener, boolean idempotent) {
if (url == null) {
throw new NullPointerException("URL is required");
}
Expand All @@ -210,6 +211,7 @@ public final class CronvoyUrlRequest extends CronvoyUrlRequestBase {
mCurrentUrl = url;
mUserAgent = userAgent;
mRequestAnnotations = connectionAnnotations;
mIdempotent = idempotent;
}

@Override
Expand Down Expand Up @@ -538,7 +540,7 @@ private void fireOpenConnection() {
mCronvoyCallbacks = new CronvoyHttpCallbacks();
mStream.set(mRequestContext.getEnvoyEngine().startStream(mCronvoyCallbacks,
/* explicitFlowControl= */ true));
mStream.get().sendHeaders(envoyRequestHeaders, mUploadDataStream == null);
mStream.get().sendHeaders(envoyRequestHeaders, mUploadDataStream == null, mIdempotent);
if (mUploadDataStream != null && mUrlChain.size() == 1) {
mUploadDataStream.initializeWithRequest();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.atomic.AtomicReference;
import org.chromium.net.BidirectionalStream;
import org.chromium.net.ExperimentalBidirectionalStream;
import org.chromium.net.ExperimentalUrlRequest;
import org.chromium.net.NetworkQualityRttListener;
import org.chromium.net.NetworkQualityThroughputListener;
import org.chromium.net.RequestFinishedInfo;
Expand Down Expand Up @@ -132,7 +133,8 @@ void setTaskToExecuteWhenInitializationIsCompleted(Runnable runnable) {
checkHaveAdapter();
return new CronvoyUrlRequest(this, url, callback, executor, mUserAgent, allowDirectExecutor,
requestAnnotations, trafficStatsTagSet, trafficStatsTag,
trafficStatsUidSet, trafficStatsUid, requestFinishedListener);
trafficStatsUidSet, trafficStatsUid, requestFinishedListener,
idempotency == ExperimentalUrlRequest.Builder.IDEMPOTENT);
}
}

Expand Down
5 changes: 3 additions & 2 deletions mobile/library/jni/jni_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1017,12 +1017,13 @@ extern "C" JNIEXPORT jint JNICALL Java_io_envoyproxy_envoymobile_engine_JniLibra

extern "C" JNIEXPORT jint JNICALL Java_io_envoyproxy_envoymobile_engine_JniLibrary_sendHeaders(
JNIEnv* env, jclass, jlong engine_handle, jlong stream_handle, jobject headers,
jboolean end_stream) {
jboolean end_stream, jboolean idempotent) {
Envoy::JNI::JniHelper jni_helper(env);
auto cpp_headers = Envoy::Http::Utility::createRequestHeaderMapPtr();
Envoy::JNI::javaHeadersToCppHeaders(jni_helper, headers, *cpp_headers);
return reinterpret_cast<Envoy::InternalEngine*>(engine_handle)
->sendHeaders(static_cast<envoy_stream_t>(stream_handle), std::move(cpp_headers), end_stream);
->sendHeaders(static_cast<envoy_stream_t>(stream_handle), std::move(cpp_headers), end_stream,
idempotent);
}

extern "C" JNIEXPORT jint JNICALL Java_io_envoyproxy_envoymobile_engine_JniLibrary_sendTrailers(
Expand Down
10 changes: 8 additions & 2 deletions mobile/library/kotlin/io/envoyproxy/envoymobile/Stream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,16 @@ open class Stream(
*
* @param headers Headers to send over the stream.
* @param endStream Whether this is a headers-only request.
* @param idempotent indicates that the request is idempotent. When idempotent is set to true
* Envoy Mobile will retry on HTTP/3 post-handshake failures. By default, it is set to false.
* @return This stream, for chaining syntax.
*/
open fun sendHeaders(headers: RequestHeaders, endStream: Boolean): Stream {
underlyingStream.sendHeaders(headers.caseSensitiveHeaders(), endStream)
open fun sendHeaders(
headers: RequestHeaders,
endStream: Boolean,
idempotent: Boolean = false
): Stream {
underlyingStream.sendHeaders(headers.caseSensitiveHeaders(), endStream, idempotent)
return this
}

Expand Down
28 changes: 28 additions & 0 deletions mobile/test/common/http/client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,34 @@ TEST_P(ClientTest, BasicStreamHeaders) {
ASSERT_EQ(callbacks_called.on_complete_calls_, 1);
}

TEST_P(ClientTest, BasicStreamHeadersIdempotent) {
// Create a stream, and set up request_decoder_ and response_encoder_
StreamCallbacksCalled callbacks_called;
createStream(createDefaultStreamCallbacks(callbacks_called));

// Send request headers.
EXPECT_CALL(dispatcher_, pushTrackedObject(_));
EXPECT_CALL(dispatcher_, popTrackedObject(_));
TestRequestHeaderMapImpl expected_headers;
HttpTestUtility::addDefaultHeaders(expected_headers);
expected_headers.addCopy("x-envoy-mobile-cluster", "base_clear");
expected_headers.addCopy("x-forwarded-proto", "https");
expected_headers.addCopy("x-envoy-retry-on", "http3-post-connect-failure");
EXPECT_CALL(*request_decoder_, decodeHeaders_(HeaderMapEqual(&expected_headers), true));
http_client_.sendHeaders(stream_, createDefaultRequestHeaders(), /* end_stream= */ true,
/* idempotent= */ true);

// Encode response headers.
EXPECT_CALL(dispatcher_, pushTrackedObject(_));
EXPECT_CALL(dispatcher_, popTrackedObject(_));
EXPECT_CALL(dispatcher_, deferredDelete_(_));
TestResponseHeaderMapImpl response_headers{{":status", "200"}};
response_encoder_->encodeHeaders(response_headers, true);
ASSERT_EQ(callbacks_called.on_headers_calls_, 1);
// Ensure that the callbacks on the EnvoyStreamCallbacks were called.
ASSERT_EQ(callbacks_called.on_complete_calls_, 1);
}

TEST_P(ClientTest, BasicStreamData) {
StreamCallbacksCalled callbacks_called;
callbacks_called.end_stream_with_headers_ = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,8 @@ private Response sendRequest(RequestScenario requestScenario) throws Exception {
.start(requestScenario.useDirectExecutor ? Runnable::run
: Executors.newSingleThreadExecutor());
streamRef.set(stream); // Set before sending headers to avoid race conditions.
stream.sendHeaders(requestScenario.getHeaders(), !requestScenario.hasBody());
stream.sendHeaders(requestScenario.getHeaders(), !requestScenario.hasBody(),
/* idempotent= */ false);
if (requestScenario.hasBody()) {
// The first "send" is assumes that the window is available - API contract.
onSendWindowAvailable(requestScenario, streamRef.get(), chunkIterator, response.get());
Expand Down
3 changes: 2 additions & 1 deletion mobile/test/java/integration/AndroidEngineFlowTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,8 @@ private Response sendRequest(RequestScenario requestScenario) throws Exception {
return null;
})
.start(Executors.newSingleThreadExecutor())
.sendHeaders(requestScenario.getHeaders(), !requestScenario.hasBody());
.sendHeaders(requestScenario.getHeaders(), !requestScenario.hasBody(),
/* idempotent= */ false);
if (requestScenario.cancelBeforeSendingRequestBody) {
stream.cancel();
} else {
Expand Down
3 changes: 2 additions & 1 deletion mobile/test/java/integration/AndroidEngineSocketTagTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ private Response sendRequest(RequestScenario requestScenario) throws Exception {
.start(requestScenario.useDirectExecutor ? Runnable::run
: Executors.newSingleThreadExecutor());
streamRef.set(stream); // Set before sending headers to avoid race conditions.
stream.sendHeaders(requestScenario.getHeaders(), !requestScenario.hasBody());
stream.sendHeaders(requestScenario.getHeaders(), !requestScenario.hasBody(),
/* idempotent= */ false);
latch.await();
response.get().throwAssertionErrorIfAny();
return response.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ private Response sendRequest(RequestScenario requestScenario) throws Exception {
return null;
})
.start(Executors.newSingleThreadExecutor())
.sendHeaders(requestScenario.getHeaders(), !requestScenario.hasBody());
.sendHeaders(requestScenario.getHeaders(), !requestScenario.hasBody(),
/* idempotent= */ false);
requestScenario.getBodyChunks().forEach(stream::sendData);
requestScenario.getClosingBodyChunk().ifPresent(stream::close);
latch.await();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,8 @@ private class MockedStream extends EnvoyHTTPStream {
private MockedStream() { super(0, 0, null, false); }

@Override
public void sendHeaders(Map<String, List<String>> headers, boolean endStream) {
public void sendHeaders(Map<String, List<String>> headers, boolean endStream,
boolean idempotent) {
mSendHeadersInvocationCount.incrementAndGet();
mStartLatch.countDown();
mSendHeadersBlock.block();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void continuousWrite_withCancelOnResponseHeaders() throws Exception {
return null;
})
.start(Runnable::run) // direct executor - all the logic runs on the EM Network Thread.
.sendHeaders(requestHeaders, false));
.sendHeaders(requestHeaders, /* endStream= */ false, /* idempotent= */ false));
ByteBuffer bf = ByteBuffer.allocateDirect(1);
bf.put((byte)'a');
stream.get().sendData(bf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public void testGetRequestWithPlatformCertValidatorFail() throws Exception {
})
.setOnCancel((ignored) -> { throw new AssertionError("Unexpected OnCancel called."); })
.start(Executors.newSingleThreadExecutor())
.sendHeaders(requestScenario.getHeaders(), false);
.sendHeaders(requestScenario.getHeaders(), /* endStream= */ false, /* idempotent= */ false);

latch.await();
assertNotNull(response.get().getEnvoyError());
Expand Down Expand Up @@ -169,7 +169,7 @@ public void testSubjectAltNameErrorWithPlatformCertValidator() throws Exception
})
.setOnCancel((ignored) -> { throw new AssertionError("Unexpected OnCancel called."); })
.start(Executors.newSingleThreadExecutor())
.sendHeaders(requestScenario.getHeaders(), false);
.sendHeaders(requestScenario.getHeaders(), /* endStream= */ false, /* idempotent= */ false);

latch.await();
assertNotNull(response.get().getEnvoyError());
Expand Down Expand Up @@ -220,7 +220,8 @@ private Response sendRequest(RequestScenario requestScenario) throws Exception {
return null;
})
.start(Executors.newSingleThreadExecutor())
.sendHeaders(requestScenario.getHeaders(), /* hasRequestBody= */ false);
.sendHeaders(requestScenario.getHeaders(), /* hasRequestBody= */ false,
/* idempotent= */ false);

latch.await();
response.get().throwAssertionErrorIfAny();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ import java.nio.ByteBuffer
*/
class MockEnvoyHTTPStream(val callbacks: EnvoyHTTPCallbacks, val explicitFlowControl: Boolean) :
EnvoyHTTPStream(0, 0, callbacks, explicitFlowControl) {
override fun sendHeaders(headers: MutableMap<String, MutableList<String>>?, endStream: Boolean) {}
override fun sendHeaders(
headers: MutableMap<String, MutableList<String>>?,
endStream: Boolean,
idempotent: Boolean
) {}

override fun sendData(data: ByteBuffer?, endStream: Boolean) {}

Expand Down
Loading

0 comments on commit 172679e

Please sign in to comment.