Skip to content

Commit

Permalink
mobile: Update sendData to use Envoy::Buffer::Instance (envoyproxy#33541
Browse files Browse the repository at this point in the history
)

This PR changes InternalEngine::sendData to take Envoy::Buffer::Instance instead of envoy_data. This change should help to reduce the number of copies and indirection.

This change also adds Stream::sendData(Buffer::InstancePtr), close(Buffer::InstancePtr buffer) and deprecates Stream::sendData(envoy_data data), Stream::close(envoy_data). A missing integration test has also been added.

For Android:
The JNI implementation of sendData has been rewritten to be able to accept direct or non-direct ByteBuffer. For direct ByteBuffer, no copy of data in Envoy::Buffer::Instance will be made. For non-direct ByteBuffer a copy of data in Envoy::Buffer::Instance will be made.

For iOS:
The implementation of sendData has been rewritten to use NSData* directly without converting it to envoy_data.

Risk Level: medium
Testing: unit tests
Docs Changes: n/a
Release Notes: n/a
Platform Specific Features: mobile

Signed-off-by: Fredy Wijaya <[email protected]>
  • Loading branch information
fredyw authored Apr 16, 2024
1 parent aeb37f8 commit d584516
Show file tree
Hide file tree
Showing 20 changed files with 408 additions and 96 deletions.
1 change: 1 addition & 0 deletions mobile/library/cc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ envoy_cc_library(
"//library/common/api:c_types",
"//library/common/data:utility_lib",
"//library/common/extensions/key_value/platform:config",
"@envoy//source/common/buffer:buffer_lib",
"@envoy//source/common/http:header_map_lib",
"@envoy//source/common/http:utility_lib",
],
Expand Down
17 changes: 15 additions & 2 deletions mobile/library/cc/stream.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "stream.h"

#include "library/cc/bridge_utility.h"
#include "library/common/data/utility.h"
#include "library/common/http/header_utility.h"
#include "library/common/internal_engine.h"
#include "library/common/types/c_types.h"
Expand Down Expand Up @@ -32,7 +33,12 @@ Stream& Stream::sendHeaders(Http::RequestHeaderMapPtr headers, bool end_stream)
}

Stream& Stream::sendData(envoy_data data) {
engine_->sendData(handle_, data, false);
Buffer::InstancePtr buffer = Data::Utility::toInternalData(data);
return sendData(std::move(buffer));
}

Stream& Stream::sendData(Buffer::InstancePtr buffer) {
engine_->sendData(handle_, std::move(buffer), false);
return *this;
}

Expand All @@ -55,7 +61,14 @@ void Stream::close(Http::RequestTrailerMapPtr trailers) {
engine_->sendTrailers(handle_, std::move(trailers));
}

void Stream::close(envoy_data data) { engine_->sendData(handle_, data, true); }
void Stream::close(envoy_data data) {
Buffer::InstancePtr buffer = Data::Utility::toInternalData(data);
close(std::move(buffer));
}

void Stream::close(Buffer::InstancePtr buffer) {
engine_->sendData(handle_, std::move(buffer), true);
}

void Stream::cancel() { engine_->cancelStream(handle_); }

Expand Down
20 changes: 18 additions & 2 deletions mobile/library/cc/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <vector>

#include "envoy/buffer/buffer.h"
#include "envoy/http/header_map.h"

#include "library/cc/request_headers.h"
Expand All @@ -27,7 +28,14 @@ class Stream {
*/
Stream& sendHeaders(Http::RequestHeaderMapPtr headers, bool end_stream);

Stream& sendData(envoy_data data);
[[deprecated]] Stream& sendData(envoy_data data);

/**
* Send data over an open HTTP stream. This method can be invoked multiple times.
*
* @param buffer the data to send.
*/
Stream& sendData(Buffer::InstancePtr buffer);

Stream& readData(size_t bytes_to_read);

Expand All @@ -41,7 +49,15 @@ class Stream {
*/
void close(Http::RequestTrailerMapPtr trailers);

void close(envoy_data data);
[[deprecated]] void close(envoy_data data);

/**
* Send data over an open HTTP stream and closes the stream.. This method can only be invoked
* once.
*
* @param buffer the last data to send.
*/
void close(Buffer::InstancePtr buffer);

void cancel();

Expand Down
14 changes: 4 additions & 10 deletions mobile/library/common/http/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
#include "library/common/bridge/utility.h"
#include "library/common/data/utility.h"
#include "library/common/http/header_utility.h"
#include "library/common/http/headers.h"
#include "library/common/stream_info/extra_stream_info.h"
#include "library/common/system/system_helper.h"

Expand Down Expand Up @@ -596,16 +595,11 @@ void Client::readData(envoy_stream_t stream, size_t bytes_to_read) {
}
}

void Client::sendData(envoy_stream_t stream, envoy_data data, bool end_stream) {
void Client::sendData(envoy_stream_t stream, Buffer::InstancePtr buffer, bool end_stream) {
ASSERT(dispatcher_.isThreadSafe());
Client::DirectStreamSharedPtr direct_stream =
getStream(stream, GetStreamFilters::ALLOW_ONLY_FOR_OPEN_STREAMS);

// Take ownership of data early, in case of early returns.
// The buffer is moved internally, in a synchronous fashion, so we don't need the lifetime
// of the InstancePtr to outlive this function call.
Buffer::InstancePtr buf = Data::Utility::toInternalData(data);

// If direct_stream is not found, it means the stream has already closed or been reset
// and the appropriate callback has been issued to the caller. There's nothing to do here
// except silently swallow this.
Expand All @@ -623,9 +617,9 @@ void Client::sendData(envoy_stream_t stream, envoy_data data, bool end_stream) {

ScopeTrackerScopeState scope(direct_stream.get(), scopeTracker());

ENVOY_LOG(debug, "[S{}] request data for stream (length={} end_stream={})\n", stream, data.length,
end_stream);
request_decoder->decodeData(*buf, end_stream);
ENVOY_LOG(debug, "[S{}] request data for stream (length={} end_stream={})\n", stream,
buffer->length(), end_stream);
request_decoder->decodeData(*buffer, end_stream);

if (direct_stream->explicit_flow_control_ && !end_stream) {
if (direct_stream->read_disable_count_ == 0) {
Expand Down
8 changes: 4 additions & 4 deletions mobile/library/common/http/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,11 @@ class Client : public Logger::Loggable<Logger::Id::http> {

/**
* Send data over an open HTTP stream. This method can be invoked multiple times.
* @param stream, the stream to send data over.
* @param data, the data to send.
* @param end_stream, indicates whether to close the stream locally after sending this frame.
* @param stream the stream to send data over.
* @param buffer the data to send.
* @param end_stream indicates whether to close the stream locally after sending this frame.
*/
void sendData(envoy_stream_t stream, envoy_data data, bool end_stream);
void sendData(envoy_stream_t stream, Buffer::InstancePtr buffer, bool end_stream);

/**
* Send metadata over an HTTP stream. This method can be invoked multiple times.
Expand Down
8 changes: 5 additions & 3 deletions mobile/library/common/internal_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,11 @@ envoy_status_t InternalEngine::readData(envoy_stream_t stream, size_t bytes_to_r
[&, stream, bytes_to_read]() { http_client_->readData(stream, bytes_to_read); });
}

envoy_status_t InternalEngine::sendData(envoy_stream_t stream, envoy_data data, bool end_stream) {
return dispatcher_->post(
[&, stream, data, end_stream]() { http_client_->sendData(stream, data, end_stream); });
envoy_status_t InternalEngine::sendData(envoy_stream_t stream, Buffer::InstancePtr buffer,
bool end_stream) {
return dispatcher_->post([&, stream, buffer = std::move(buffer), end_stream]() mutable {
http_client_->sendData(stream, std::move(buffer), end_stream);
});
}

envoy_status_t InternalEngine::sendTrailers(envoy_stream_t stream,
Expand Down
9 changes: 8 additions & 1 deletion mobile/library/common/internal_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,14 @@ class InternalEngine : public Logger::Loggable<Logger::Id::main> {

envoy_status_t readData(envoy_stream_t stream, size_t bytes_to_read);

envoy_status_t sendData(envoy_stream_t stream, envoy_data data, bool end_stream);
/**
* Send data over an open HTTP stream. This method can be invoked multiple times.
*
* @param stream the stream to send data over.
* @param buffer the data to send.
* @param end_stream indicates whether to close the stream locally after sending this frame.
*/
envoy_status_t sendData(envoy_stream_t stream, Buffer::InstancePtr buffer, bool end_stream);

/**
* Send trailers over an open HTTP stream. This method can only be invoked once per stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,21 +66,12 @@ public void sendData(ByteBuffer data, boolean endStream) {
* @param data, the data to send.
* @param length, number of bytes to send: 0 <= length <= ByteBuffer.capacity()
* @param endStream, supplies whether this is the last data in the streamHandle.
* @throws UnsupportedOperationException - if the provided buffer is neither a
* direct ByteBuffer nor backed by an
* on-heap byte array.
*/
public void sendData(ByteBuffer data, int length, boolean endStream) {
if (length < 0 || length > data.capacity()) {
throw new IllegalArgumentException("Length out of bound");
}
if (data.isDirect()) {
JniLibrary.sendData(engineHandle, streamHandle, data, length, endStream);
} else if (data.hasArray()) {
JniLibrary.sendDataByteArray(engineHandle, streamHandle, data.array(), length, endStream);
} else {
throw new UnsupportedOperationException("Unsupported ByteBuffer implementation.");
}
JniLibrary.sendData(engineHandle, streamHandle, data, length, endStream);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,26 +87,12 @@ protected static native int sendHeaders(long engine, long stream,
* Send data over an open HTTP stream. This method can be invoked multiple
* times.
*
* @param engine, the stream's associated engine.
* @param stream, the stream to send data over.
* @param data, the data to send.
* @param length, the size in bytes of the data to send. 0 <= length <= data.length
* @param endStream, supplies whether this is the last data in the stream.
* @return int, the resulting status of the operation.
*/
protected static native int sendDataByteArray(long engine, long stream, byte[] data, int length,
boolean endStream);

/**
* Send data over an open HTTP stream. This method can be invoked multiple
* times.
*
* @param engine, the stream's associated engine.
* @param stream, the stream to send data over.
* @param data, the data to send; must be a <b>direct</b> ByteBuffer.
* @param length, the size in bytes of the data to send. 0 <= length <= data.capacity()
* @param endStream, supplies whether this is the last data in the stream.
* @return int, the resulting status of the operation.
* @param engine the stream's associated engine.
* @param stream the stream to send data over.
* @param data the data to send. It can be direct or non-direct byteBuffer.
* @param length the size in bytes of the data to send. 0 <= length <= data.capacity()
* @param endStream supplies whether this is the last data in the stream.
* @return int the resulting status of the operation.
*/
protected static native int sendData(long engine, long stream, ByteBuffer data, int length,
boolean endStream);
Expand Down
1 change: 1 addition & 0 deletions mobile/library/jni/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ envoy_cc_library(
"//library/jni/import:jni_import_lib",
"//library/jni/types:jni_env_lib",
"//library/jni/types:jni_exception_lib",
"@envoy//source/common/buffer:buffer_lib",
"@envoy//source/common/common:assert_lib",
"@envoy//source/common/http:header_map_lib",
"@envoy//source/common/protobuf",
Expand Down
34 changes: 13 additions & 21 deletions mobile/library/jni/jni_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -954,31 +954,23 @@ extern "C" JNIEXPORT jint JNICALL Java_io_envoyproxy_envoymobile_engine_JniLibra
->readData(static_cast<envoy_stream_t>(stream_handle), byte_count);
}

// The Java counterpart guarantees to invoke this method with a non-null direct ByteBuffer where the
// provided length is between 0 and ByteBuffer.capacity(), inclusively.
// This function can accept either direct (off the JVM heap) ByteBuffer or non-direct (on the JVM
// heap) ByteBuffer.
extern "C" JNIEXPORT jint JNICALL Java_io_envoyproxy_envoymobile_engine_JniLibrary_sendData(
JNIEnv* env, jclass, jlong engine_handle, jlong stream_handle, jobject data, jint length,
JNIEnv* env, jclass, jlong engine_handle, jlong stream_handle, jobject byte_buffer, jint length,
jboolean end_stream) {
Envoy::JNI::JniHelper jni_helper(env);
Envoy::Buffer::InstancePtr cpp_buffer_instance;
if (Envoy::JNI::isJavaDirectByteBuffer(jni_helper, byte_buffer)) {
cpp_buffer_instance =
Envoy::JNI::javaDirectByteBufferToCppBufferInstance(jni_helper, byte_buffer, length);
} else {
cpp_buffer_instance =
Envoy::JNI::javaNonDirectByteBufferToCppBufferInstance(jni_helper, byte_buffer, length);
}
return reinterpret_cast<Envoy::InternalEngine*>(engine_handle)
->sendData(static_cast<envoy_stream_t>(stream_handle),
Envoy::JNI::javaByteBufferToEnvoyData(jni_helper, data, length), end_stream);
}

// The Java counterpart guarantees to invoke this method with a non-null jbyteArray where the
// provided length is between 0 and the size of the jbyteArray, inclusively. And given that this
// jbyteArray comes from a ByteBuffer, it is also guaranteed that its length will not be greater
// than 2^31 - this is why the length type is jint.
extern "C" JNIEXPORT jint JNICALL
Java_io_envoyproxy_envoymobile_engine_JniLibrary_sendDataByteArray(JNIEnv* env, jclass,
jlong engine_handle,
jlong stream_handle,
jbyteArray data, jint length,
jboolean end_stream) {
Envoy::JNI::JniHelper jni_helper(env);
return reinterpret_cast<Envoy::InternalEngine*>(engine_handle)
->sendData(static_cast<envoy_stream_t>(stream_handle),
Envoy::JNI::javaByteArrayToEnvoyData(jni_helper, data, length), end_stream);
->sendData(static_cast<envoy_stream_t>(stream_handle), std::move(cpp_buffer_instance),
end_stream);
}

extern "C" JNIEXPORT jint JNICALL Java_io_envoyproxy_envoymobile_engine_JniLibrary_sendHeaders(
Expand Down
66 changes: 65 additions & 1 deletion mobile/library/jni/jni_utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
#include <cstdlib>
#include <cstring>

#include "source/common/buffer/buffer_impl.h"
#include "source/common/common/assert.h"

#include "library/common/types/matcher_data.h"
#include "library/jni/jni_support.h"
#include "library/jni/types/env.h"
#include "library/jni/types/exception.h"

namespace Envoy {
namespace JNI {
Expand Down Expand Up @@ -527,5 +527,69 @@ void javaHeadersToCppHeaders(JniHelper& jni_helper, jobject java_headers,
}
}

bool isJavaDirectByteBuffer(JniHelper& jni_helper, jobject java_byte_buffer) {
auto java_byte_buffer_class = jni_helper.findClass("java/nio/ByteBuffer");
auto java_byte_buffer_is_direct_method_id =
jni_helper.getMethodId(java_byte_buffer_class.get(), "isDirect", "()Z");
return jni_helper.callBooleanMethod(java_byte_buffer, java_byte_buffer_is_direct_method_id);
}

Buffer::InstancePtr javaDirectByteBufferToCppBufferInstance(JniHelper& jni_helper,
jobject java_byte_buffer,
jlong length) {
void* java_byte_buffer_address = jni_helper.getDirectBufferAddress(java_byte_buffer);
RELEASE_ASSERT(java_byte_buffer != nullptr,
"The ByteBuffer argument is not a direct ByteBuffer.");
Buffer::BufferFragmentImpl* byte_buffer_fragment = new Buffer::BufferFragmentImpl(
java_byte_buffer_address, static_cast<size_t>(length),
[](const void*, size_t, const Buffer::BufferFragmentImpl* this_fragment) {
delete this_fragment;
});
Buffer::InstancePtr cpp_buffer_instance = std::make_unique<Buffer::OwnedImpl>();
cpp_buffer_instance->addBufferFragment(*byte_buffer_fragment);
return cpp_buffer_instance;
}

LocalRefUniquePtr<jobject>
cppBufferInstanceToJavaDirectByteBuffer(JniHelper& jni_helper,
const Buffer::Instance& cpp_buffer_instance) {
// The JNI implementation guarantees that there is only going to be a single slice.
Buffer::RawSlice raw_slice = cpp_buffer_instance.frontSlice();
LocalRefUniquePtr<jobject> java_byte_buffer =
jni_helper.newDirectByteBuffer(raw_slice.mem_, static_cast<jlong>(raw_slice.len_));
return java_byte_buffer;
}

Buffer::InstancePtr javaNonDirectByteBufferToCppBufferInstance(JniHelper& jni_helper,
jobject java_byte_buffer,
jlong length) {
auto java_byte_buffer_class = jni_helper.findClass("java/nio/ByteBuffer");
auto java_byte_buffer_array_method_id =
jni_helper.getMethodId(java_byte_buffer_class.get(), "array", "()[B");
auto java_byte_array =
jni_helper.callObjectMethod<jbyteArray>(java_byte_buffer, java_byte_buffer_array_method_id);
RELEASE_ASSERT(java_byte_array != nullptr,
"The ByteBuffer argument is not a non-direct ByteBuffer.");
auto java_byte_array_elements = jni_helper.getByteArrayElements(java_byte_array.get(), nullptr);
Buffer::InstancePtr cpp_buffer_instance = std::make_unique<Buffer::OwnedImpl>();
cpp_buffer_instance->add(static_cast<void*>(java_byte_array_elements.get()),
static_cast<uint64_t>(length));
return cpp_buffer_instance;
}

LocalRefUniquePtr<jobject>
cppBufferInstanceToJavaNonDirectByteBuffer(JniHelper& jni_helper,
const Buffer::Instance& cpp_buffer_instance) {
auto java_byte_buffer_class = jni_helper.findClass("java/nio/ByteBuffer");
auto java_byte_buffer_wrap_method_id = jni_helper.getStaticMethodId(
java_byte_buffer_class.get(), "wrap", "([B)Ljava/nio/ByteBuffer;");
auto java_byte_array = jni_helper.newByteArray(static_cast<jsize>(cpp_buffer_instance.length()));
auto java_byte_array_elements = jni_helper.getByteArrayElements(java_byte_array.get(), nullptr);
cpp_buffer_instance.copyOut(0, cpp_buffer_instance.length(),
static_cast<void*>(java_byte_array_elements.get()));
return jni_helper.callStaticObjectMethod(java_byte_buffer_class.get(),
java_byte_buffer_wrap_method_id, java_byte_array.get());
}

} // namespace JNI
} // namespace Envoy
Loading

0 comments on commit d584516

Please sign in to comment.