Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[20291] Gather-send implementation #4537

Merged
merged 52 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
2db99a4
Refs #20337: Buffer list in UDP sending function
cferreiragonz Jan 30, 2024
a277e51
Refs #20337: Buffer list in SHM sending function & Copy to shared buf…
cferreiragonz Jan 31, 2024
775b4e4
Refs #20337: Buffer list in TCP sending function
cferreiragonz Jan 31, 2024
5c4de19
Refs #20342: Add new Buffer structure
cferreiragonz Jan 31, 2024
a29d3fa
Refs #20342: Add new lambda to send buffers into SenderResources
cferreiragonz Jan 31, 2024
28b4d38
Refs #20342: Refactor on UDP transport
cferreiragonz Jan 31, 2024
b98b227
Refs #20342: Refactor on UDP test_transport
cferreiragonz Jan 31, 2024
43eb945
Refs #20342: Refactor on TCP Transport
cferreiragonz Feb 1, 2024
bd6f690
Refs #20342: Refactor on TCPChannelResource
cferreiragonz Feb 1, 2024
66e8c44
Refs #20342: Refactor on TCP tests
cferreiragonz Feb 1, 2024
dc475ff
Refs #20342: Refactor on SHM transport & enable copying multiple buffers
cferreiragonz Feb 1, 2024
3d45ab2
Refs #20342: Refactor on SHM tests
cferreiragonz Feb 1, 2024
55e91e1
Refs #20342: Fix mock tests after rebase
cferreiragonz Feb 29, 2024
c63a3dc
Refs #20352: Refactor on addSubmessageData/DataFrag
cferreiragonz Mar 5, 2024
30bcba8
Refs #20352: Add constructor overloads to NetworkBuffers
cferreiragonz Mar 5, 2024
be2fc05
Refs #20352: Add new attributes and methods to RTPSMessageGroup.h
cferreiragonz Mar 5, 2024
c967955
Refs #20352: Refactor on add_data()
cferreiragonz Mar 5, 2024
49f46c8
Refs #20352: Refactor on add_data_frag()
cferreiragonz Mar 5, 2024
0dd12d9
Refs #20352: Refactor on insert_submessage()
cferreiragonz Mar 5, 2024
af7832c
Refs #20352: Refactor on send()
cferreiragonz Mar 5, 2024
3e66776
Refs #20352: Minor changes in RTPSMessageGroup
cferreiragonz Mar 5, 2024
801d861
Refs #20352: Refactor on RTPSMessageSenderInterface
cferreiragonz Mar 5, 2024
5f55553
Refs #20352: Refactor on ChaningTransport and ABI compatible send_lam…
cferreiragonz Mar 6, 2024
336e06b
Refs #20352: Refactor on Statistics module
cferreiragonz Mar 7, 2024
97e62c3
Refs #20352: Add security support
cferreiragonz Mar 7, 2024
7be3243
Refs #20352: Minor fixes
cferreiragonz Mar 7, 2024
2f76bd9
Refs #20352: Doxygen
cferreiragonz Mar 11, 2024
097545c
Refs #20352: Delete Sender's Resource deprecated API
cferreiragonz Mar 11, 2024
ab9f905
Refs #20352: Uncrustify
cferreiragonz Mar 11, 2024
c01b7fb
Refs #20352: Fix Windows build
cferreiragonz Mar 12, 2024
63772f0
Refs #20352: Account for change of namespaces
cferreiragonz May 17, 2024
d6a4146
Refs #20352: Add NetworkBuffer.cpp
cferreiragonz May 17, 2024
6e83cc3
Refs #20352: Revision minor changes
cferreiragonz May 17, 2024
17cc6ec
Refs #20352: Revision minor changes 2
cferreiragonz May 20, 2024
a748ae4
Refs #20291: Avoid stats_msg dynamic malloc
cferreiragonz May 24, 2024
809ae5d
Refs #20291: Use vector instead of list
cferreiragonz May 24, 2024
557c2ca
Refs #20291: Refactor RTPSMessageGroup to avoid Mallocs
cferreiragonz May 27, 2024
153156b
Refs #20291: Use limited vector to avoid repeated mallocs
cferreiragonz May 28, 2024
ffaced5
Refs #20291: Fix rebase
cferreiragonz May 28, 2024
42a51dd
Refs #20291: Improve doxygen
cferreiragonz May 28, 2024
4d9addc
Refs #20291: Revision
cferreiragonz May 31, 2024
4134e23
Refs #20291: Add ResourceLV config into QoS
cferreiragonz Jun 3, 2024
020b04d
Refs #20291: XML - New QoS added
cferreiragonz Jun 3, 2024
c7ce07d
Refs #20291: Get Payload in RTPSMessageGroup
cferreiragonz Jun 5, 2024
f84a62f
Refs #20291: Uncrustify
cferreiragonz Jun 5, 2024
32158d3
Refs #20291: Revision - Use RLContainerConfig and minor changes
cferreiragonz Jun 7, 2024
a5d7bee
Refs #20291: Revision - Get payload after correct RTPSMsg creation
cferreiragonz Jun 7, 2024
fbcc6a7
Refs #20291: Revision - Default values
cferreiragonz Jun 11, 2024
efe7deb
Refs #20291: Update versions.md & CMakeLists
cferreiragonz Jun 12, 2024
ac0f539
Refs #20291: Revision - Headers & versions.md
cferreiragonz Jun 12, 2024
8a9eede
Refs #20291: Adjust payload_pool test
cferreiragonz Jun 13, 2024
4cf3f20
Refs #20291: Test comment
cferreiragonz Jun 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ struct SendBuffersAllocationAttributes
const SendBuffersAllocationAttributes& b) const
{
return (this->preallocated_number == b.preallocated_number) &&
(this->dynamic == b.dynamic);
(this->dynamic == b.dynamic) &&
(this->network_buffers_config == b.network_buffers_config);
}

/** Initial number of send buffers to allocate.
Expand All @@ -86,6 +87,15 @@ struct SendBuffersAllocationAttributes
* buffer to be returned. This is a trade-off between latency and dynamic allocations.
*/
bool dynamic = false;

/** Configuration for the network buffers.
*
* This attribute controls the allocation behavior of the network buffers used by each
* send buffer. The default value will use a value of 16 network buffers for both
* the preallocated buffers and the dynamic increment allocation, with no maximum limit.
*/
ResourceLimitedContainerConfig network_buffers_config = ResourceLimitedContainerConfig(16u,
std::numeric_limits<size_t>::max dummy_avoid_winmax (), 16u);
};

/**
Expand Down
7 changes: 5 additions & 2 deletions include/fastdds/rtps/messages/RTPSMessageSenderInterface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <vector>

#include <fastdds/rtps/common/Guid.h>
#include <fastdds/rtps/transport/NetworkBuffer.hpp>

namespace eprosima {
namespace fastrtps {
Expand Down Expand Up @@ -75,11 +76,13 @@ class RTPSMessageSenderInterface
/**
* Send a message through this interface.
*
* @param message Pointer to the buffer with the message already serialized.
* @param buffers Vector of NetworkBuffers to send with data already serialized.
* @param total_bytes Total number of bytes to send. Should be equal to the sum of the @c size field of all buffers.
* @param max_blocking_time_point Future timepoint where blocking send should end.
*/
virtual bool send(
CDRMessage_t* message,
const std::vector<eprosima::fastdds::rtps::NetworkBuffer>& buffers,
const uint32_t& total_bytes,
std::chrono::steady_clock::time_point max_blocking_time_point) const = 0;

/*!
Expand Down
10 changes: 5 additions & 5 deletions include/fastdds/rtps/transport/ChainingTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -313,12 +313,12 @@ class ChainingTransport : public TransportInterface
* At the end the function must call to the low-level transport's `send()` function.
* @code{.cpp}
// Example of calling the low-level transport `send()` function.
return low_sender_resource->send(send_buffer, send_buffer_size, destination_locators_begin,
return low_sender_resource->send(buffers, total_bytes, destination_locators_begin,
destination_locators_end, timeout);
@endcode
* @param low_sender_resource SenderResource generated by the lower transport.
* @param send_buffer Slice into the raw data to send.
* @param send_buffer_size Size of the raw data. It will be used as a bounds check for the previous argument.
* @param buffers Vector of buffers to send.
* @param total_bytes Length of all buffers to be sent. Will be used as a boundary for the previous parameter.
* It must not exceed the \c sendBufferSize fed to this class during construction.
* @param destination_locators_begin First iterator of the list of Locators describing the remote destinations
* we're sending to.
Expand All @@ -328,8 +328,8 @@ class ChainingTransport : public TransportInterface
*/
FASTDDS_EXPORTED_API virtual bool send(
fastrtps::rtps::SenderResource* low_sender_resource,
const fastrtps::rtps::octet* send_buffer,
uint32_t send_buffer_size,
const std::vector<NetworkBuffer>& buffers,
uint32_t total_bytes,
fastrtps::rtps::LocatorsIterator* destination_locators_begin,
fastrtps::rtps::LocatorsIterator* destination_locators_end,
const std::chrono::steady_clock::time_point& timeout) = 0;
Expand Down
62 changes: 62 additions & 0 deletions include/fastdds/rtps/transport/NetworkBuffer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* @file NetworkBuffer.hpp
*/

#ifndef _FASTDDS_RTPS_NETWORK_NETWORKBUFFER_HPP
#define _FASTDDS_RTPS_NETWORK_NETWORKBUFFER_HPP

#include <cstdint>

namespace asio {
// Forward declaration of asio::const_buffer
class const_buffer;
} // namespace asio

namespace eprosima {
namespace fastdds {
namespace rtps {

/**
* A slice of data to be sent to one or more transports.
* An RTPS datagram is made up of headers and one or more NetworkBuffer instances.
*/
struct NetworkBuffer final
{
//! Pointer to the buffer where the data is stored.
const void* buffer = nullptr;
//! Number of bytes to use starting at @c buffer.
uint32_t size = 0;

NetworkBuffer() = default;

NetworkBuffer(
const void* ptr,
uint32_t s)
: buffer(ptr)
, size(s)
{
}

//! Conversion operator to asio::const_buffer.
operator asio::const_buffer() const;
};

} // namespace rtps
} // namespace fastdds
} // namespace eprosima

#endif // _FASTDDS_RTPS_NETWORK_NETWORKBUFFER_HPP
30 changes: 14 additions & 16 deletions include/fastdds/rtps/transport/SenderResource.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

#include <functional>
#include <vector>
#include <list>
#include <chrono>

#include <fastdds/rtps/common/LocatorList.hpp>
#include <fastdds/rtps/common/LocatorsIterator.hpp>
#include <fastdds/rtps/transport/NetworkBuffer.hpp>

namespace eprosima {
namespace fastrtps {
Expand All @@ -42,32 +44,27 @@ class SenderResource
{
public:

using NetworkBuffer = eprosima::fastdds::rtps::NetworkBuffer;

/**
* Sends to a destination locator, through the channel managed by this resource.
* @param data Raw data slice to be sent.
* @param dataLength Length of the data to be sent. Will be used as a boundary for
* @param buffers Vector of buffers to send.
* @param total_bytes Length of all buffers to be sent. Will be used as a boundary for
* the previous parameter.
* @param destination_locators_begin destination endpoint Locators iterator begin.
* @param destination_locators_end destination endpoint Locators iterator end.
* @param max_blocking_time_point If transport supports it then it will use it as maximum blocking time.
* @return Success of the send operation.
*/
bool send(
const octet* data,
uint32_t dataLength,
const std::vector<NetworkBuffer>& buffers,
const uint32_t& total_bytes,
LocatorsIterator* destination_locators_begin,
LocatorsIterator* destination_locators_end,
const std::chrono::steady_clock::time_point& max_blocking_time_point)
{
bool returned_value = false;

if (send_lambda_)
{
returned_value = send_lambda_(data, dataLength, destination_locators_begin, destination_locators_end,
max_blocking_time_point);
}

return returned_value;
return send_buffers_lambda_(buffers, total_bytes, destination_locators_begin, destination_locators_end,
max_blocking_time_point);
}

/**
Expand All @@ -78,7 +75,7 @@ class SenderResource
SenderResource&& rValueResource)
{
clean_up.swap(rValueResource.clean_up);
send_lambda_.swap(rValueResource.send_lambda_);
send_buffers_lambda_.swap(rValueResource.send_buffers_lambda_);
}

virtual ~SenderResource() = default;
Expand Down Expand Up @@ -110,12 +107,13 @@ class SenderResource
int32_t transport_kind_;

std::function<void()> clean_up;

std::function<bool(
const octet*,
const std::vector<NetworkBuffer>&,
uint32_t,
LocatorsIterator* destination_locators_begin,
LocatorsIterator* destination_locators_end,
const std::chrono::steady_clock::time_point&)> send_lambda_;
const std::chrono::steady_clock::time_point&)> send_buffers_lambda_;

private:

Expand Down
8 changes: 6 additions & 2 deletions include/fastdds/rtps/writer/LocatorSelectorSender.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#ifndef _FASTDDS_RTPS_WRITER_LOCATORSELECTORSENDER_HPP_
#define _FASTDDS_RTPS_WRITER_LOCATORSELECTORSENDER_HPP_

#include <vector>

#include <fastdds/rtps/common/LocatorSelector.hpp>
#include <fastdds/rtps/messages/RTPSMessageSenderInterface.hpp>
#include <fastdds/utils/collections/ResourceLimitedVector.hpp>
Expand Down Expand Up @@ -69,11 +71,13 @@ class LocatorSelectorSender : public RTPSMessageSenderInterface
/*!
* Send a message through this interface.
*
* @param message Pointer to the buffer with the message already serialized.
* @param buffers Vector of NetworkBuffers to send with data already serialized.
* @param total_bytes Total number of bytes to send. Should be equal to the sum of the @c size field of all buffers.
* @param max_blocking_time_point Future timepoint where blocking send should end.
*/
bool send(
CDRMessage_t* message,
const std::vector<fastdds::rtps::NetworkBuffer>& buffers,
const uint32_t& total_bytes,
std::chrono::steady_clock::time_point max_blocking_time_point) const override;

/*!
Expand Down
6 changes: 4 additions & 2 deletions include/fastdds/rtps/writer/RTPSWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -518,13 +518,15 @@ class RTPSWriter
/**
* Send a message through this interface.
*
* @param message Pointer to the buffer with the message already serialized.
* @param buffers Vector of NetworkBuffers to send with data already serialized.
* @param total_bytes Total number of bytes to send. Should be equal to the sum of the @c size field of all buffers.
* @param locator_selector RTPSMessageSenderInterface reference uses for selecting locators. The reference has to
* be a member of this RTPSWriter object.
* @param max_blocking_time_point Future timepoint where blocking send should end.
*/
virtual bool send_nts(
CDRMessage_t* message,
const std::vector<eprosima::fastdds::rtps::NetworkBuffer>& buffers,
const uint32_t& total_bytes,
const LocatorSelectorSender& locator_selector,
std::chrono::steady_clock::time_point& max_blocking_time_point) const;

Expand Down
1 change: 1 addition & 0 deletions resources/xsd/fastdds_profiles.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,7 @@
<xs:all>
<xs:element name="preallocated_number" type="uint32" minOccurs="0" maxOccurs="1"/>
<xs:element name="dynamic" type="boolean" minOccurs="0" maxOccurs="1"/>
<xs:element name="network_buffers_config" type="allocationConfigType" minOccurs="0" maxOccurs="1"/>
</xs:all>
</xs:complexType>
</xs:element>
Expand Down
1 change: 1 addition & 0 deletions src/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ set(${PROJECT_NAME}_source_files
rtps/messages/submessages/DataMsg.hpp
rtps/messages/submessages/GapMsg.hpp
rtps/messages/submessages/HeartbeatMsg.hpp
rtps/network/NetworkBuffer.cpp
rtps/network/NetworkFactory.cpp
rtps/network/ReceiverResource.cpp
rtps/network/utils/external_locators.cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,16 @@ const std::vector<GUID_t>& DirectMessageSender::remote_guids() const
/**
* Send a message through this interface.
*
* @param message Pointer to the buffer with the message already serialized.
* @param buffers Vector of NetworkBuffers to send.
* @param total_bytes Total number of bytes to send. Should be equal to the sum of the @c size field of all buffers.
* @param max_blocking_time_point Future timepoint where blocking send should end.
*/
bool DirectMessageSender::send(
CDRMessage_t* message,
const std::vector<eprosima::fastdds::rtps::NetworkBuffer>& buffers,
const uint32_t& total_bytes,
std::chrono::steady_clock::time_point max_blocking_time_point) const
{
return participant_->sendSync(message, participant_->getGuid(),
return participant_->sendSync(buffers, total_bytes, participant_->getGuid(),
Locators(locators_->begin()), Locators(locators_->end()), max_blocking_time_point);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC

#include <vector>

#include <fastdds/rtps/messages/RTPSMessageSenderInterface.hpp>
#include <fastdds/rtps/common/LocatorList.hpp>

Expand Down Expand Up @@ -78,11 +80,13 @@ class DirectMessageSender : public RTPSMessageSenderInterface
/**
* Send a message through this interface.
*
* @param message Pointer to the buffer with the message already serialized.
* @param buffers Vector of NetworkBuffers to send with data already serialized.
* @param total_bytes Total number of bytes to send. Should be equal to the sum of the @c size field of all buffers.
* @param max_blocking_time_point Future timepoint where blocking send should end.
*/
virtual bool send(
CDRMessage_t* message,
const std::vector<eprosima::fastdds::rtps::NetworkBuffer>& buffers,
const uint32_t& total_bytes,
std::chrono::steady_clock::time_point max_blocking_time_point) const override;

/*
Expand Down
4 changes: 2 additions & 2 deletions src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ static void direct_send(
RTPSParticipantImpl* participant,
LocatorList& locators,
std::vector<GUID_t>& remote_readers,
const CacheChange_t& change,
CacheChange_t& change,
fastrtps::rtps::Endpoint& sender_endpt)
{
DirectMessageSender sender(participant, &remote_readers, &locators);
Expand All @@ -78,7 +78,7 @@ static void direct_send(
static void direct_send(
RTPSParticipantImpl* participant,
LocatorList& locators,
const CacheChange_t& change)
CacheChange_t& change)
{
FakeWriter writer(participant, c_EntityId_SPDPWriter);
std::vector<GUID_t> remote_readers;
Expand Down
14 changes: 11 additions & 3 deletions src/cpp/rtps/messages/RTPSMessageCreator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@
#include <fastdds/rtps/common/Guid.h>
#include <fastdds/rtps/common/SequenceNumber.h>
#include <fastdds/rtps/common/VendorId_t.hpp>
#include <fastdds/rtps/transport/NetworkBuffer.hpp>

#include <rtps/messages/CDRMessage.hpp>

namespace eprosima {
namespace fastrtps {
namespace rtps {

using NetworkBuffer = eprosima::fastdds::rtps::NetworkBuffer;

//!An interface to add inline qos parameters to a CDRMessage
class InlineQosWriter
{
Expand Down Expand Up @@ -127,15 +130,17 @@ class RTPSMessageCreator
const EntityId_t& readerId,
bool expectsInlineQos,
InlineQosWriter* inlineQos);

static bool addSubmessageData(
CDRMessage_t* msg,
const CacheChange_t* change,
TopicKind_t topicKind,
const EntityId_t& readerId,
bool expectsInlineQos,
InlineQosWriter* inlineQos,
bool* is_big_submessage);
bool& is_big_submessage,
bool copy_data,
NetworkBuffer& pending_buffer,
uint8_t& pending_padding);

static bool addMessageDataFrag(
CDRMessage_t* msg,
Expand All @@ -154,7 +159,10 @@ class RTPSMessageCreator
TopicKind_t topicKind,
const EntityId_t& readerId,
bool expectsInlineQos,
InlineQosWriter* inlineQos);
InlineQosWriter* inlineQos,
bool copy_data,
NetworkBuffer& pending_buffer,
uint8_t& pending_padding);

static bool addMessageGap(
CDRMessage_t* msg,
Expand Down
Loading