Skip to content

Commit

Permalink
Gather-send implementation (#4537)
Browse files Browse the repository at this point in the history
* Refs #20337: Buffer list in UDP sending function

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20337: Buffer list in SHM sending function & Copy to shared buffer function

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20337: Buffer list in TCP sending function

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20342: Add new Buffer structure

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20342: Add new lambda to send buffers into SenderResources

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20342: Refactor on UDP transport

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20342: Refactor on UDP test_transport

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20342: Refactor on TCP Transport

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20342: Refactor on TCPChannelResource

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20342: Refactor on TCP tests

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20342: Refactor on SHM transport & enable copying multiple buffers

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20342: Refactor on SHM tests

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20342: Fix mock tests after rebase

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20352: Refactor on addSubmessageData/DataFrag

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20352: Add constructor overloads to NetworkBuffers

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20352: Add new attributes and methods to RTPSMessageGroup.h

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20352: Refactor on add_data()

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20352: Refactor on add_data_frag()

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20352: Refactor on insert_submessage()

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20352: Refactor on send()

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20352: Minor changes in RTPSMessageGroup

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20352: Refactor on RTPSMessageSenderInterface

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20352: Refactor on ChaningTransport and ABI compatible send_lambda_

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20352: Refactor on Statistics module

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20352: Add security support

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20352: Minor fixes

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20352: Doxygen

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20352: Delete Sender's Resource deprecated API

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20352: Uncrustify

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20352: Fix Windows build

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20352: Account for change of namespaces

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20352: Add NetworkBuffer.cpp

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20352: Revision minor changes

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20352: Revision minor changes 2

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20291: Avoid stats_msg dynamic malloc

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20291: Use vector instead of list

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20291: Refactor RTPSMessageGroup to avoid Mallocs

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20291: Use limited vector to avoid repeated mallocs

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20291: Fix rebase

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20291: Improve doxygen

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20291: Revision

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20291: Add ResourceLV config into QoS

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20291: XML - New QoS added

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20291: Get Payload in RTPSMessageGroup

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20291: Uncrustify

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20291: Revision - Use RLContainerConfig and minor changes

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20291: Revision - Get payload after correct RTPSMsg creation

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20291: Revision - Default values

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20291: Update versions.md & CMakeLists

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20291: Revision - Headers & versions.md

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20291: Adjust payload_pool test

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20291: Test comment

Signed-off-by: cferreiragonz <[email protected]>

---------

Signed-off-by: cferreiragonz <[email protected]>
  • Loading branch information
cferreiragonz authored and JesusPoderoso committed Jun 13, 2024
1 parent ea82731 commit 67a62f8
Show file tree
Hide file tree
Showing 89 changed files with 1,199 additions and 359 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ struct SendBuffersAllocationAttributes
const SendBuffersAllocationAttributes& b) const
{
return (this->preallocated_number == b.preallocated_number) &&
(this->dynamic == b.dynamic);
(this->dynamic == b.dynamic) &&
(this->network_buffers_config == b.network_buffers_config);
}

/** Initial number of send buffers to allocate.
Expand All @@ -86,6 +87,15 @@ struct SendBuffersAllocationAttributes
* buffer to be returned. This is a trade-off between latency and dynamic allocations.
*/
bool dynamic = false;

/** Configuration for the network buffers.
*
* This attribute controls the allocation behavior of the network buffers used by each
* send buffer. The default value will use a value of 16 network buffers for both
* the preallocated buffers and the dynamic increment allocation, with no maximum limit.
*/
ResourceLimitedContainerConfig network_buffers_config = ResourceLimitedContainerConfig(16u,
std::numeric_limits<size_t>::max dummy_avoid_winmax (), 16u);
};

/**
Expand Down
7 changes: 5 additions & 2 deletions include/fastdds/rtps/messages/RTPSMessageSenderInterface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <vector>

#include <fastdds/rtps/common/Guid.h>
#include <fastdds/rtps/transport/NetworkBuffer.hpp>

namespace eprosima {
namespace fastrtps {
Expand Down Expand Up @@ -75,11 +76,13 @@ class RTPSMessageSenderInterface
/**
* Send a message through this interface.
*
* @param message Pointer to the buffer with the message already serialized.
* @param buffers Vector of NetworkBuffers to send with data already serialized.
* @param total_bytes Total number of bytes to send. Should be equal to the sum of the @c size field of all buffers.
* @param max_blocking_time_point Future timepoint where blocking send should end.
*/
virtual bool send(
CDRMessage_t* message,
const std::vector<eprosima::fastdds::rtps::NetworkBuffer>& buffers,
const uint32_t& total_bytes,
std::chrono::steady_clock::time_point max_blocking_time_point) const = 0;

/*!
Expand Down
10 changes: 5 additions & 5 deletions include/fastdds/rtps/transport/ChainingTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -313,12 +313,12 @@ class ChainingTransport : public TransportInterface
* At the end the function must call to the low-level transport's `send()` function.
* @code{.cpp}
// Example of calling the low-level transport `send()` function.
return low_sender_resource->send(send_buffer, send_buffer_size, destination_locators_begin,
return low_sender_resource->send(buffers, total_bytes, destination_locators_begin,
destination_locators_end, timeout);
@endcode
* @param low_sender_resource SenderResource generated by the lower transport.
* @param send_buffer Slice into the raw data to send.
* @param send_buffer_size Size of the raw data. It will be used as a bounds check for the previous argument.
* @param buffers Vector of buffers to send.
* @param total_bytes Length of all buffers to be sent. Will be used as a boundary for the previous parameter.
* It must not exceed the \c sendBufferSize fed to this class during construction.
* @param destination_locators_begin First iterator of the list of Locators describing the remote destinations
* we're sending to.
Expand All @@ -328,8 +328,8 @@ class ChainingTransport : public TransportInterface
*/
FASTDDS_EXPORTED_API virtual bool send(
fastrtps::rtps::SenderResource* low_sender_resource,
const fastrtps::rtps::octet* send_buffer,
uint32_t send_buffer_size,
const std::vector<NetworkBuffer>& buffers,
uint32_t total_bytes,
fastrtps::rtps::LocatorsIterator* destination_locators_begin,
fastrtps::rtps::LocatorsIterator* destination_locators_end,
const std::chrono::steady_clock::time_point& timeout) = 0;
Expand Down
62 changes: 62 additions & 0 deletions include/fastdds/rtps/transport/NetworkBuffer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* @file NetworkBuffer.hpp
*/

#ifndef _FASTDDS_RTPS_NETWORK_NETWORKBUFFER_HPP
#define _FASTDDS_RTPS_NETWORK_NETWORKBUFFER_HPP

#include <cstdint>

namespace asio {
// Forward declaration of asio::const_buffer
class const_buffer;
} // namespace asio

namespace eprosima {
namespace fastdds {
namespace rtps {

/**
* A slice of data to be sent to one or more transports.
* An RTPS datagram is made up of headers and one or more NetworkBuffer instances.
*/
struct NetworkBuffer final
{
//! Pointer to the buffer where the data is stored.
const void* buffer = nullptr;
//! Number of bytes to use starting at @c buffer.
uint32_t size = 0;

NetworkBuffer() = default;

NetworkBuffer(
const void* ptr,
uint32_t s)
: buffer(ptr)
, size(s)
{
}

//! Conversion operator to asio::const_buffer.
operator asio::const_buffer() const;
};

} // namespace rtps
} // namespace fastdds
} // namespace eprosima

#endif // _FASTDDS_RTPS_NETWORK_NETWORKBUFFER_HPP
30 changes: 14 additions & 16 deletions include/fastdds/rtps/transport/SenderResource.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

#include <functional>
#include <vector>
#include <list>
#include <chrono>

#include <fastdds/rtps/common/LocatorList.hpp>
#include <fastdds/rtps/common/LocatorsIterator.hpp>
#include <fastdds/rtps/transport/NetworkBuffer.hpp>

namespace eprosima {
namespace fastrtps {
Expand All @@ -42,32 +44,27 @@ class SenderResource
{
public:

using NetworkBuffer = eprosima::fastdds::rtps::NetworkBuffer;

/**
* Sends to a destination locator, through the channel managed by this resource.
* @param data Raw data slice to be sent.
* @param dataLength Length of the data to be sent. Will be used as a boundary for
* @param buffers Vector of buffers to send.
* @param total_bytes Length of all buffers to be sent. Will be used as a boundary for
* the previous parameter.
* @param destination_locators_begin destination endpoint Locators iterator begin.
* @param destination_locators_end destination endpoint Locators iterator end.
* @param max_blocking_time_point If transport supports it then it will use it as maximum blocking time.
* @return Success of the send operation.
*/
bool send(
const octet* data,
uint32_t dataLength,
const std::vector<NetworkBuffer>& buffers,
const uint32_t& total_bytes,
LocatorsIterator* destination_locators_begin,
LocatorsIterator* destination_locators_end,
const std::chrono::steady_clock::time_point& max_blocking_time_point)
{
bool returned_value = false;

if (send_lambda_)
{
returned_value = send_lambda_(data, dataLength, destination_locators_begin, destination_locators_end,
max_blocking_time_point);
}

return returned_value;
return send_buffers_lambda_(buffers, total_bytes, destination_locators_begin, destination_locators_end,
max_blocking_time_point);
}

/**
Expand All @@ -78,7 +75,7 @@ class SenderResource
SenderResource&& rValueResource)
{
clean_up.swap(rValueResource.clean_up);
send_lambda_.swap(rValueResource.send_lambda_);
send_buffers_lambda_.swap(rValueResource.send_buffers_lambda_);
}

virtual ~SenderResource() = default;
Expand Down Expand Up @@ -110,12 +107,13 @@ class SenderResource
int32_t transport_kind_;

std::function<void()> clean_up;

std::function<bool(
const octet*,
const std::vector<NetworkBuffer>&,
uint32_t,
LocatorsIterator* destination_locators_begin,
LocatorsIterator* destination_locators_end,
const std::chrono::steady_clock::time_point&)> send_lambda_;
const std::chrono::steady_clock::time_point&)> send_buffers_lambda_;

private:

Expand Down
8 changes: 6 additions & 2 deletions include/fastdds/rtps/writer/LocatorSelectorSender.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#ifndef _FASTDDS_RTPS_WRITER_LOCATORSELECTORSENDER_HPP_
#define _FASTDDS_RTPS_WRITER_LOCATORSELECTORSENDER_HPP_

#include <vector>

#include <fastdds/rtps/common/LocatorSelector.hpp>
#include <fastdds/rtps/messages/RTPSMessageSenderInterface.hpp>
#include <fastdds/utils/collections/ResourceLimitedVector.hpp>
Expand Down Expand Up @@ -69,11 +71,13 @@ class LocatorSelectorSender : public RTPSMessageSenderInterface
/*!
* Send a message through this interface.
*
* @param message Pointer to the buffer with the message already serialized.
* @param buffers Vector of NetworkBuffers to send with data already serialized.
* @param total_bytes Total number of bytes to send. Should be equal to the sum of the @c size field of all buffers.
* @param max_blocking_time_point Future timepoint where blocking send should end.
*/
bool send(
CDRMessage_t* message,
const std::vector<fastdds::rtps::NetworkBuffer>& buffers,
const uint32_t& total_bytes,
std::chrono::steady_clock::time_point max_blocking_time_point) const override;

/*!
Expand Down
6 changes: 4 additions & 2 deletions include/fastdds/rtps/writer/RTPSWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -518,13 +518,15 @@ class RTPSWriter
/**
* Send a message through this interface.
*
* @param message Pointer to the buffer with the message already serialized.
* @param buffers Vector of NetworkBuffers to send with data already serialized.
* @param total_bytes Total number of bytes to send. Should be equal to the sum of the @c size field of all buffers.
* @param locator_selector RTPSMessageSenderInterface reference uses for selecting locators. The reference has to
* be a member of this RTPSWriter object.
* @param max_blocking_time_point Future timepoint where blocking send should end.
*/
virtual bool send_nts(
CDRMessage_t* message,
const std::vector<eprosima::fastdds::rtps::NetworkBuffer>& buffers,
const uint32_t& total_bytes,
const LocatorSelectorSender& locator_selector,
std::chrono::steady_clock::time_point& max_blocking_time_point) const;

Expand Down
1 change: 1 addition & 0 deletions resources/xsd/fastdds_profiles.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,7 @@
<xs:all>
<xs:element name="preallocated_number" type="uint32" minOccurs="0" maxOccurs="1"/>
<xs:element name="dynamic" type="boolean" minOccurs="0" maxOccurs="1"/>
<xs:element name="network_buffers_config" type="allocationConfigType" minOccurs="0" maxOccurs="1"/>
</xs:all>
</xs:complexType>
</xs:element>
Expand Down
1 change: 1 addition & 0 deletions src/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ set(${PROJECT_NAME}_source_files
rtps/messages/submessages/DataMsg.hpp
rtps/messages/submessages/GapMsg.hpp
rtps/messages/submessages/HeartbeatMsg.hpp
rtps/network/NetworkBuffer.cpp
rtps/network/NetworkFactory.cpp
rtps/network/ReceiverResource.cpp
rtps/network/utils/external_locators.cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,16 @@ const std::vector<GUID_t>& DirectMessageSender::remote_guids() const
/**
* Send a message through this interface.
*
* @param message Pointer to the buffer with the message already serialized.
* @param buffers Vector of NetworkBuffers to send.
* @param total_bytes Total number of bytes to send. Should be equal to the sum of the @c size field of all buffers.
* @param max_blocking_time_point Future timepoint where blocking send should end.
*/
bool DirectMessageSender::send(
CDRMessage_t* message,
const std::vector<eprosima::fastdds::rtps::NetworkBuffer>& buffers,
const uint32_t& total_bytes,
std::chrono::steady_clock::time_point max_blocking_time_point) const
{
return participant_->sendSync(message, participant_->getGuid(),
return participant_->sendSync(buffers, total_bytes, participant_->getGuid(),
Locators(locators_->begin()), Locators(locators_->end()), max_blocking_time_point);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC

#include <vector>

#include <fastdds/rtps/messages/RTPSMessageSenderInterface.hpp>
#include <fastdds/rtps/common/LocatorList.hpp>

Expand Down Expand Up @@ -78,11 +80,13 @@ class DirectMessageSender : public RTPSMessageSenderInterface
/**
* Send a message through this interface.
*
* @param message Pointer to the buffer with the message already serialized.
* @param buffers Vector of NetworkBuffers to send with data already serialized.
* @param total_bytes Total number of bytes to send. Should be equal to the sum of the @c size field of all buffers.
* @param max_blocking_time_point Future timepoint where blocking send should end.
*/
virtual bool send(
CDRMessage_t* message,
const std::vector<eprosima::fastdds::rtps::NetworkBuffer>& buffers,
const uint32_t& total_bytes,
std::chrono::steady_clock::time_point max_blocking_time_point) const override;

/*
Expand Down
4 changes: 2 additions & 2 deletions src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ static void direct_send(
RTPSParticipantImpl* participant,
LocatorList& locators,
std::vector<GUID_t>& remote_readers,
const CacheChange_t& change,
CacheChange_t& change,
fastrtps::rtps::Endpoint& sender_endpt)
{
DirectMessageSender sender(participant, &remote_readers, &locators);
Expand All @@ -78,7 +78,7 @@ static void direct_send(
static void direct_send(
RTPSParticipantImpl* participant,
LocatorList& locators,
const CacheChange_t& change)
CacheChange_t& change)
{
FakeWriter writer(participant, c_EntityId_SPDPWriter);
std::vector<GUID_t> remote_readers;
Expand Down
14 changes: 11 additions & 3 deletions src/cpp/rtps/messages/RTPSMessageCreator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@
#include <fastdds/rtps/common/Guid.h>
#include <fastdds/rtps/common/SequenceNumber.h>
#include <fastdds/rtps/common/VendorId_t.hpp>
#include <fastdds/rtps/transport/NetworkBuffer.hpp>

#include <rtps/messages/CDRMessage.hpp>

namespace eprosima {
namespace fastrtps {
namespace rtps {

using NetworkBuffer = eprosima::fastdds::rtps::NetworkBuffer;

//!An interface to add inline qos parameters to a CDRMessage
class InlineQosWriter
{
Expand Down Expand Up @@ -127,15 +130,17 @@ class RTPSMessageCreator
const EntityId_t& readerId,
bool expectsInlineQos,
InlineQosWriter* inlineQos);

static bool addSubmessageData(
CDRMessage_t* msg,
const CacheChange_t* change,
TopicKind_t topicKind,
const EntityId_t& readerId,
bool expectsInlineQos,
InlineQosWriter* inlineQos,
bool* is_big_submessage);
bool& is_big_submessage,
bool copy_data,
NetworkBuffer& pending_buffer,
uint8_t& pending_padding);

static bool addMessageDataFrag(
CDRMessage_t* msg,
Expand All @@ -154,7 +159,10 @@ class RTPSMessageCreator
TopicKind_t topicKind,
const EntityId_t& readerId,
bool expectsInlineQos,
InlineQosWriter* inlineQos);
InlineQosWriter* inlineQos,
bool copy_data,
NetworkBuffer& pending_buffer,
uint8_t& pending_padding);

static bool addMessageGap(
CDRMessage_t* msg,
Expand Down
Loading

0 comments on commit 67a62f8

Please sign in to comment.