Skip to content

Commit

Permalink
IPayloadPool refactor (#4892)
Browse files Browse the repository at this point in the history
* Refs #21121: Move paylowad_owner_ to SerializedPayload_t

Signed-off-by: cferreiragonz <[email protected]>

* Refs #21121: Make IPayloadPool manage SerializedPayload_t

Signed-off-by: cferreiragonz <[email protected]>

* Refs #21121: Update Tests with new API

Signed-off-by: cferreiragonz <[email protected]>

* Refs #21121: Clean headers

Signed-off-by: cferreiragonz <[email protected]>

* Refs #21121: Doxygen

Signed-off-by: cferreiragonz <[email protected]>

* Refs #21121: Uncrustify

Signed-off-by: cferreiragonz <[email protected]>

* Refs #21121: Remove unnecessary parameter

Signed-off-by: cferreiragonz <[email protected]>

* Refs #21121: Move payload destructor

Signed-off-by: cferreiragonz <[email protected]>

* Refs #21121: Avoid unsetting payload_owner

Signed-off-by: cferreiragonz <[email protected]>

* Refs #21121: Fix LoanManager

Signed-off-by: cferreiragonz <[email protected]>

* Refs #21121: Revision - remove getters/setter

Signed-off-by: cferreiragonz <[email protected]>

* Refs #21121: Revision - avoid tmp change

Signed-off-by: cferreiragonz <[email protected]>

* Refs #21121: Revision - remove PayloadInfo

Signed-off-by: cferreiragonz <[email protected]>

* Refs #21121: Revision - uncrustify

Signed-off-by: cferreiragonz <[email protected]>

* Refs #21121: Revision - minor changes

Signed-off-by: cferreiragonz <[email protected]>

* Refs #21121: Avoid cyclic release

Signed-off-by: cferreiragonz <[email protected]>

* Refs #21121: Fix Loan ser_payload destruction

Signed-off-by: cferreiragonz <[email protected]>

* Refs #21121: Avoid SerializedPayload_t copies

Signed-off-by: cferreiragonz <[email protected]>

* Refs #21121: Make get_payload's first arg const

Signed-off-by: cferreiragonz <[email protected]>

* Refs #21121: Create SerializedPayload.cpp

Signed-off-by: cferreiragonz <[email protected]>

* Refs #21121: Revision - minor changes 2

Signed-off-by: cferreiragonz <[email protected]>

* Refs #21121: Fix old payload tests

Signed-off-by: cferreiragonz <[email protected]>

* Refs #21121: Update versions.md & CMakeLists

Signed-off-by: cferreiragonz <[email protected]>

---------

Signed-off-by: cferreiragonz <[email protected]>
  • Loading branch information
cferreiragonz authored Jun 12, 2024
1 parent ae42cf7 commit 3780507
Show file tree
Hide file tree
Showing 66 changed files with 671 additions and 712 deletions.
53 changes: 26 additions & 27 deletions examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#include <string.h>

#include <fastdds/rtps/history/IPayloadPool.h>
#include <fastdds/rtps/common/CacheChange.h>
#include <fastdds/rtps/common/SerializedPayload.h>

class CustomPayloadPool : public eprosima::fastrtps::rtps::IPayloadPool
{
Expand All @@ -34,64 +34,63 @@ class CustomPayloadPool : public eprosima::fastrtps::rtps::IPayloadPool

bool get_payload(
unsigned int size,
eprosima::fastrtps::rtps::CacheChange_t& cache_change)
eprosima::fastrtps::rtps::SerializedPayload_t& payload)
{
// Reserve new memory for the payload buffer
unsigned char* payload = new unsigned char[size];
unsigned char* payload_buff = new unsigned char[size];

// Assign the payload buffer to the CacheChange and update sizes
cache_change.serializedPayload.data = payload;
cache_change.serializedPayload.length = size;
cache_change.serializedPayload.max_size = size;
// Assign the payload buffer to the SerializedPayload and update sizes
payload.data = payload_buff;
payload.length = size;
payload.max_size = size;

// Tell the CacheChange who needs to release its payload
cache_change.payload_owner(this);
// Tell the SerializedPayload who needs to release its payload
payload.payload_owner = this;

return true;
}

bool get_payload(
eprosima::fastrtps::rtps::SerializedPayload_t& data,
eprosima::fastrtps::rtps::IPayloadPool*& /*data_owner*/,
eprosima::fastrtps::rtps::CacheChange_t& cache_change)
const eprosima::fastrtps::rtps::SerializedPayload_t& data,
eprosima::fastrtps::rtps::SerializedPayload_t& payload)
{
// Reserve new memory for the payload buffer
unsigned char* payload = new unsigned char[data.length];
unsigned char* payload_buff = new unsigned char[data.length];

// Copy the data
memcpy(payload, data.data, data.length);
memcpy(payload_buff, data.data, data.length);

// Tell the CacheChange who needs to release its payload
cache_change.payload_owner(this);
// Tell the SerializedPayload who needs to release its payload
payload.payload_owner = this;

// Assign the payload buffer to the CacheChange and update sizes
cache_change.serializedPayload.data = payload;
cache_change.serializedPayload.length = data.length;
cache_change.serializedPayload.max_size = data.length;
// Assign the payload buffer to the SerializedPayload and update sizes
payload.data = payload_buff;
payload.length = data.length;
payload.max_size = data.length;

return true;
}

bool release_payload(
eprosima::fastrtps::rtps::CacheChange_t& cache_change)
eprosima::fastrtps::rtps::SerializedPayload_t& payload)
{
// Ensure precondition
if (this != cache_change.payload_owner())
if (this != payload.payload_owner)
{
std::cerr << "Trying to release a payload buffer allocated by a different PayloadPool." << std::endl;
return false;
}

// Dealloc the buffer of the payload
delete[] cache_change.serializedPayload.data;
delete[] payload.data;

// Reset sizes and pointers
cache_change.serializedPayload.data = nullptr;
cache_change.serializedPayload.length = 0;
cache_change.serializedPayload.max_size = 0;
payload.data = nullptr;
payload.length = 0;
payload.max_size = 0;

// Reset the owner of the payload
cache_change.payload_owner(nullptr);
payload.payload_owner = nullptr;

return true;
}
Expand Down
30 changes: 3 additions & 27 deletions include/fastdds/rtps/common/CacheChange.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ namespace eprosima {
namespace fastrtps {
namespace rtps {

struct CacheChange_t;

/*!
* Specific information for a writer.
*/
Expand Down Expand Up @@ -179,14 +181,7 @@ struct FASTDDS_EXPORTED_API CacheChange_t
setFragmentSize(ch_ptr->fragment_size_, false);
}

virtual ~CacheChange_t()
{
if (payload_owner_ != nullptr)
{
payload_owner_->release_payload(*this);
}
assert(payload_owner_ == nullptr);
}
virtual ~CacheChange_t() = default;

/*!
* Get the number of fragments this change is split into.
Expand Down Expand Up @@ -326,22 +321,6 @@ struct FASTDDS_EXPORTED_API CacheChange_t
return is_fully_assembled();
}

IPayloadPool const* payload_owner() const
{
return payload_owner_;
}

IPayloadPool* payload_owner()
{
return payload_owner_;
}

void payload_owner(
IPayloadPool* owner)
{
payload_owner_ = owner;
}

private:

// Fragment size
Expand All @@ -353,9 +332,6 @@ struct FASTDDS_EXPORTED_API CacheChange_t
// First fragment in missing list
uint32_t first_missing_fragment_ = 0;

// Pool that created the payload of this cache change
IPayloadPool* payload_owner_ = nullptr;

uint32_t get_next_missing_fragment(
uint32_t fragment_index)
{
Expand Down
114 changes: 35 additions & 79 deletions include/fastdds/rtps/common/SerializedPayload.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
#include <cstring>
#include <new>
#include <stdexcept>
#include <cassert>
#include <stdint.h>
#include <stdlib.h>

#include <fastdds/fastdds_dll.hpp>
#include <fastdds/rtps/common/Types.h>
#include <fastdds/rtps/history/IPayloadPool.h>

/*!
* @brief Maximum payload is maximum of UDP packet size minus 536bytes (RTPSMESSAGE_COMMON_RTPS_PAYLOAD_SIZE)
Expand Down Expand Up @@ -68,6 +70,8 @@ struct FASTDDS_EXPORTED_API SerializedPayload_t
uint32_t max_size;
//!Position when reading
uint32_t pos;
//!Pool that created the payload
IPayloadPool* payload_owner = nullptr;

//!Default constructor
SerializedPayload_t()
Expand All @@ -89,107 +93,59 @@ struct FASTDDS_EXPORTED_API SerializedPayload_t
this->reserve(len);
}

~SerializedPayload_t()
//!Copy constructor
SerializedPayload_t(
const SerializedPayload_t& other) = delete;
//!Copy operator
SerializedPayload_t& operator = (
const SerializedPayload_t& other) = delete;

//!Move constructor
SerializedPayload_t(
SerializedPayload_t&& other) noexcept
{
this->empty();
*this = std::move(other);
}

//!Move operator
SerializedPayload_t& operator = (
SerializedPayload_t&& other) noexcept;

/*!
* Destructor
* It is expected to release the payload if the payload owner is not nullptr before destruction
*/
~SerializedPayload_t();

bool operator == (
const SerializedPayload_t& other) const
{
return ((encapsulation == other.encapsulation) &&
(length == other.length) &&
(0 == memcmp(data, other.data, length)));
}
const SerializedPayload_t& other) const;

/*!
* Copy another structure (including allocating new space for the data.)
* Copy another structure (including allocating new space for the data).
* @param[in] serData Pointer to the structure to copy
* @param with_limit if true, the function will fail when providing a payload too big
* @return True if correct
*/
bool copy(
const SerializedPayload_t* serData,
bool with_limit = true)
{
length = serData->length;

if (serData->length > max_size)
{
if (with_limit)
{
return false;
}
else
{
this->reserve(serData->length);
}
}
encapsulation = serData->encapsulation;
if (length == 0)
{
return true;
}
memcpy(data, serData->data, length);
return true;
}
bool with_limit = true);

/*!
* Allocate new space for fragmented data
* @param[in] serData Pointer to the structure to copy
* @return True if correct
*/
bool reserve_fragmented(
SerializedPayload_t* serData)
{
length = serData->length;
max_size = serData->length;
encapsulation = serData->encapsulation;
data = (octet*)calloc(length, sizeof(octet));
return true;
}
SerializedPayload_t* serData);

//! Empty the payload
void empty()
{
length = 0;
encapsulation = CDR_BE;
max_size = 0;
if (data != nullptr)
{
free(data);
}
data = nullptr;
}
/*!
* Empty the payload
* @pre payload_owner must be nullptr
*/
void empty();

void reserve(
uint32_t new_size)
{
if (new_size <= this->max_size)
{
return;
}
if (data == nullptr)
{
data = (octet*)calloc(new_size, sizeof(octet));
if (!data)
{
throw std::bad_alloc();
}
}
else
{
void* old_data = data;
data = (octet*)realloc(data, new_size);
if (!data)
{
free(old_data);
throw std::bad_alloc();
}
memset(data + max_size, 0, (new_size - max_size) * sizeof(octet));
}
max_size = new_size;
}
uint32_t new_size);

};

Expand Down
Loading

0 comments on commit 3780507

Please sign in to comment.