From ab56fa8576124a0d308e24e859929ff6ed651ead Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Poderoso?= <120394830+JesusPoderoso@users.noreply.github.com> Date: Wed, 27 Sep 2023 15:38:40 +0200 Subject: [PATCH] Expose payload pool in DDS layer example (#3598) * Refs #18913: Custom Payload pool PoC Signed-off-by: JesusPoderoso * Refs #19491: Apply rev suggestions Signed-off-by: JesusPoderoso * Refs #19491: Apply latest rev suggestions Signed-off-by: JesusPoderoso * Refs #19491: Please linters Signed-off-by: JesusPoderoso * Refs #19491: Fix public to private heritance * Set payload owner to be allocating PayloadPool Signed-off-by: Juan Lopez Fernandez * Refs #19491: Revert partial change introduced in previous commit b59b76d10 to force payload owner to 'this' always Signed-off-by: JesusPoderoso * Refs #19491: Fix Windows installer build Signed-off-by: JesusPoderoso --------- Signed-off-by: JesusPoderoso Signed-off-by: Juan Lopez Fernandez Co-authored-by: Juan Lopez Fernandez --- examples/cpp/dds/CMakeLists.txt | 1 + .../CustomPayloadPoolExample/CMakeLists.txt | 48 +++ .../CustomPayloadPool.hpp | 101 ++++++ .../CustomPayloadPoolData.cxx | 164 +++++++++ .../CustomPayloadPoolData.h | 184 ++++++++++ .../CustomPayloadPoolData.idl | 5 + .../CustomPayloadPoolDataCdrAux.hpp | 45 +++ .../CustomPayloadPoolDataCdrAux.ipp | 126 +++++++ .../CustomPayloadPoolDataPubSubTypes.cxx | 185 ++++++++++ .../CustomPayloadPoolDataPubSubTypes.h | 130 +++++++ .../CustomPayloadPoolDataPublisher.cpp | 227 +++++++++++++ .../CustomPayloadPoolDataPublisher.h | 115 +++++++ .../CustomPayloadPoolDataSubscriber.cpp | 191 +++++++++++ .../CustomPayloadPoolDataSubscriber.h | 95 ++++++ .../CustomPayloadPool_main.cpp | 318 ++++++++++++++++++ .../dds/CustomPayloadPoolExample/README.md | 13 + 16 files changed, 1948 insertions(+) create mode 100644 examples/cpp/dds/CustomPayloadPoolExample/CMakeLists.txt create mode 100644 examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPool.hpp create mode 100644 examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolData.cxx create mode 100644 examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolData.h create mode 100644 examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolData.idl create mode 100644 examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolDataCdrAux.hpp create mode 100644 examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolDataCdrAux.ipp create mode 100644 examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolDataPubSubTypes.cxx create mode 100644 examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolDataPubSubTypes.h create mode 100644 examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolDataPublisher.cpp create mode 100644 examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolDataPublisher.h create mode 100644 examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolDataSubscriber.cpp create mode 100644 examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolDataSubscriber.h create mode 100644 examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPool_main.cpp create mode 100644 examples/cpp/dds/CustomPayloadPoolExample/README.md diff --git a/examples/cpp/dds/CMakeLists.txt b/examples/cpp/dds/CMakeLists.txt index 64e94a66b32..421eb164dcb 100644 --- a/examples/cpp/dds/CMakeLists.txt +++ b/examples/cpp/dds/CMakeLists.txt @@ -17,6 +17,7 @@ add_subdirectory(BasicConfigurationExample) add_subdirectory(Configurability) add_subdirectory(ContentFilteredTopicExample) add_subdirectory(CustomListenerExample) +add_subdirectory(CustomPayloadPoolExample) add_subdirectory(DeadlineQoSExample) add_subdirectory(DisablePositiveACKs) add_subdirectory(DiscoveryServerExample) diff --git a/examples/cpp/dds/CustomPayloadPoolExample/CMakeLists.txt b/examples/cpp/dds/CustomPayloadPoolExample/CMakeLists.txt new file mode 100644 index 00000000000..59fb071f42d --- /dev/null +++ b/examples/cpp/dds/CustomPayloadPoolExample/CMakeLists.txt @@ -0,0 +1,48 @@ +# Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cmake_minimum_required(VERSION 3.16.3) + +project(CustomPayloadPoolExample VERSION 1 LANGUAGES CXX) + +# Find requirements +if(NOT fastcdr_FOUND) + find_package(fastcdr REQUIRED) +endif() + +if(NOT fastrtps_FOUND) + find_package(fastrtps REQUIRED) +endif() + +#Check C++11 +include(CheckCXXCompilerFlag) +if(CMAKE_CXX_COMPILER_ID MATCHES "GNU|Clang") + check_cxx_compiler_flag(-std=c++11 SUPPORTS_CXX11) + if(NOT SUPPORTS_CXX11) + message(FATAL_ERROR "Compiler doesn't support C++11") + endif() +endif() + +message(STATUS "Configuring custom payload pool example...") +file(GLOB CUSTOM_PAYLOAD_POOL_DATA_EXAMPLE_SOURCES_CXX "*.cxx") +file(GLOB CUSTOM_PAYLOAD_POOL_DATA_EXAMPLE_SOURCES_CPP "*.cpp") + +add_executable(CustomPayloadPoolExample ${CUSTOM_PAYLOAD_POOL_DATA_EXAMPLE_SOURCES_CXX} ${CUSTOM_PAYLOAD_POOL_DATA_EXAMPLE_SOURCES_CPP}) +target_compile_definitions(CustomPayloadPoolExample PRIVATE + $<$>,$>:__DEBUG> + $<$:__INTERNALDEBUG> # Internal debug activated. +) +target_link_libraries(CustomPayloadPoolExample fastrtps fastcdr fastdds::optionparser) +install(TARGETS CustomPayloadPoolExample + RUNTIME DESTINATION examples/cpp/dds/CustomPayloadPoolExample/${BIN_INSTALL_DIR}) diff --git a/examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPool.hpp b/examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPool.hpp new file mode 100644 index 00000000000..ab8fca60ad6 --- /dev/null +++ b/examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPool.hpp @@ -0,0 +1,101 @@ +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file CustomPayloadPool.hpp + */ + +#ifndef DDS_CUSTOM_PAYLOAD_POOL_DATA_HPP +#define DDS_CUSTOM_PAYLOAD_POOL_DATA_HPP + +#include +#include +#include + +#include +#include + +class CustomPayloadPool : public eprosima::fastrtps::rtps::IPayloadPool +{ +public: + + ~CustomPayloadPool() = default; + + bool get_payload( + unsigned int size, + eprosima::fastrtps::rtps::CacheChange_t& cache_change) + { + // Reserve new memory for the payload buffer + unsigned char* payload = new unsigned char[size]; + + // Assign the payload buffer to the CacheChange and update sizes + cache_change.serializedPayload.data = payload; + cache_change.serializedPayload.length = size; + cache_change.serializedPayload.max_size = size; + + // Tell the CacheChange who needs to release its payload + cache_change.payload_owner(this); + + return true; + } + + bool get_payload( + eprosima::fastrtps::rtps::SerializedPayload_t& data, + eprosima::fastrtps::rtps::IPayloadPool*& /*data_owner*/, + eprosima::fastrtps::rtps::CacheChange_t& cache_change) + { + // Reserve new memory for the payload buffer + unsigned char* payload = new unsigned char[data.length]; + + // Copy the data + memcpy(payload, data.data, data.length); + + // Tell the CacheChange who needs to release its payload + cache_change.payload_owner(this); + + // Assign the payload buffer to the CacheChange and update sizes + cache_change.serializedPayload.data = payload; + cache_change.serializedPayload.length = data.length; + cache_change.serializedPayload.max_size = data.length; + + return true; + } + + bool release_payload( + eprosima::fastrtps::rtps::CacheChange_t& cache_change) + { + // Ensure precondition + if (this != cache_change.payload_owner()) + { + std::cerr << "Trying to release a payload buffer allocated by a different PayloadPool." << std::endl; + return false; + } + + // Dealloc the buffer of the payload + delete[] cache_change.serializedPayload.data; + + // Reset sizes and pointers + cache_change.serializedPayload.data = nullptr; + cache_change.serializedPayload.length = 0; + cache_change.serializedPayload.max_size = 0; + + // Reset the owner of the payload + cache_change.payload_owner(nullptr); + + return true; + } + +}; + +#endif // DDS_CUSTOM_PAYLOAD_POOL_DATA_HPP diff --git a/examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolData.cxx b/examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolData.cxx new file mode 100644 index 00000000000..b6ae6afe3c4 --- /dev/null +++ b/examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolData.cxx @@ -0,0 +1,164 @@ +// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/*! + * @file CustomPayloadPoolData.cpp + * This source file contains the implementation of the described types in the IDL file. + * + * This file was generated by the tool fastddsgen. + */ + +#ifdef _WIN32 +// Remove linker warning LNK4221 on Visual Studio +namespace { +char dummy; +} // namespace +#endif // _WIN32 + +#include "CustomPayloadPoolData.h" +#include + + +#include +using namespace eprosima::fastcdr::exception; + +#include + + +CustomPayloadPoolData::CustomPayloadPoolData() +{ + +} + +CustomPayloadPoolData::~CustomPayloadPoolData() +{ +} + +CustomPayloadPoolData::CustomPayloadPoolData( + const CustomPayloadPoolData& x) +{ + m_index = x.m_index; + m_message = x.m_message; +} + +CustomPayloadPoolData::CustomPayloadPoolData( + CustomPayloadPoolData&& x) noexcept +{ + m_index = x.m_index; + m_message = std::move(x.m_message); +} + +CustomPayloadPoolData& CustomPayloadPoolData::operator =( + const CustomPayloadPoolData& x) +{ + + m_index = x.m_index; + m_message = x.m_message; + + return *this; +} + +CustomPayloadPoolData& CustomPayloadPoolData::operator =( + CustomPayloadPoolData&& x) noexcept +{ + + m_index = x.m_index; + m_message = std::move(x.m_message); + + return *this; +} + +bool CustomPayloadPoolData::operator ==( + const CustomPayloadPoolData& x) const +{ + return (m_index == x.m_index && + m_message == x.m_message); +} + +bool CustomPayloadPoolData::operator !=( + const CustomPayloadPoolData& x) const +{ + return !(*this == x); +} + +/*! + * @brief This function sets a value in member index + * @param _index New value for member index + */ +void CustomPayloadPoolData::index( + uint32_t _index) +{ + m_index = _index; +} + +/*! + * @brief This function returns the value of member index + * @return Value of member index + */ +uint32_t CustomPayloadPoolData::index() const +{ + return m_index; +} + +/*! + * @brief This function returns a reference to member index + * @return Reference to member index + */ +uint32_t& CustomPayloadPoolData::index() +{ + return m_index; +} + + +/*! + * @brief This function copies the value in member message + * @param _message New value to be copied in member message + */ +void CustomPayloadPoolData::message( + const std::string& _message) +{ + m_message = _message; +} + +/*! + * @brief This function moves the value in member message + * @param _message New value to be moved in member message + */ +void CustomPayloadPoolData::message( + std::string&& _message) +{ + m_message = std::move(_message); +} + +/*! + * @brief This function returns a constant reference to member message + * @return Constant reference to member message + */ +const std::string& CustomPayloadPoolData::message() const +{ + return m_message; +} + +/*! + * @brief This function returns a reference to member message + * @return Reference to member message + */ +std::string& CustomPayloadPoolData::message() +{ + return m_message; +} + + +// Include auxiliary functions like for serializing/deserializing. +#include "CustomPayloadPoolDataCdrAux.ipp" diff --git a/examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolData.h b/examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolData.h new file mode 100644 index 00000000000..95ed42fe447 --- /dev/null +++ b/examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolData.h @@ -0,0 +1,184 @@ +// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/*! + * @file CustomPayloadPoolData.h + * This header file contains the declaration of the described types in the IDL file. + * + * This file was generated by the tool fastddsgen. + */ + +#ifndef _FAST_DDS_GENERATED_CUSTOMPAYLOADPOOLDATA_H_ +#define _FAST_DDS_GENERATED_CUSTOMPAYLOADPOOLDATA_H_ + +#include +#include +#include +#include +#include +#include + +#include +#include + + + +#if defined(_WIN32) +#if defined(EPROSIMA_USER_DLL_EXPORT) +#define eProsima_user_DllExport __declspec( dllexport ) +#else +#define eProsima_user_DllExport +#endif // EPROSIMA_USER_DLL_EXPORT +#else +#define eProsima_user_DllExport +#endif // _WIN32 + +#if defined(_WIN32) +#if defined(EPROSIMA_USER_DLL_EXPORT) +#if defined(CUSTOMPAYLOADPOOLDATA_SOURCE) +#define CUSTOMPAYLOADPOOLDATA_DllAPI __declspec( dllexport ) +#else +#define CUSTOMPAYLOADPOOLDATA_DllAPI __declspec( dllimport ) +#endif // CUSTOMPAYLOADPOOLDATA_SOURCE +#else +#define CUSTOMPAYLOADPOOLDATA_DllAPI +#endif // EPROSIMA_USER_DLL_EXPORT +#else +#define CUSTOMPAYLOADPOOLDATA_DllAPI +#endif // _WIN32 + +namespace eprosima { +namespace fastcdr { +class Cdr; +class CdrSizeCalculator; +} // namespace fastcdr +} // namespace eprosima + + + +/*! + * @brief This class represents the structure CustomPayloadPoolData defined by the user in the IDL file. + * @ingroup CustomPayloadPoolData + */ +class CustomPayloadPoolData +{ +public: + + /*! + * @brief Default constructor. + */ + eProsima_user_DllExport CustomPayloadPoolData(); + + /*! + * @brief Default destructor. + */ + eProsima_user_DllExport ~CustomPayloadPoolData(); + + /*! + * @brief Copy constructor. + * @param x Reference to the object CustomPayloadPoolData that will be copied. + */ + eProsima_user_DllExport CustomPayloadPoolData( + const CustomPayloadPoolData& x); + + /*! + * @brief Move constructor. + * @param x Reference to the object CustomPayloadPoolData that will be copied. + */ + eProsima_user_DllExport CustomPayloadPoolData( + CustomPayloadPoolData&& x) noexcept; + + /*! + * @brief Copy assignment. + * @param x Reference to the object CustomPayloadPoolData that will be copied. + */ + eProsima_user_DllExport CustomPayloadPoolData& operator =( + const CustomPayloadPoolData& x); + + /*! + * @brief Move assignment. + * @param x Reference to the object CustomPayloadPoolData that will be copied. + */ + eProsima_user_DllExport CustomPayloadPoolData& operator =( + CustomPayloadPoolData&& x) noexcept; + + /*! + * @brief Comparison operator. + * @param x CustomPayloadPoolData object to compare. + */ + eProsima_user_DllExport bool operator ==( + const CustomPayloadPoolData& x) const; + + /*! + * @brief Comparison operator. + * @param x CustomPayloadPoolData object to compare. + */ + eProsima_user_DllExport bool operator !=( + const CustomPayloadPoolData& x) const; + + /*! + * @brief This function sets a value in member index + * @param _index New value for member index + */ + eProsima_user_DllExport void index( + uint32_t _index); + + /*! + * @brief This function returns the value of member index + * @return Value of member index + */ + eProsima_user_DllExport uint32_t index() const; + + /*! + * @brief This function returns a reference to member index + * @return Reference to member index + */ + eProsima_user_DllExport uint32_t& index(); + + + /*! + * @brief This function copies the value in member message + * @param _message New value to be copied in member message + */ + eProsima_user_DllExport void message( + const std::string& _message); + + /*! + * @brief This function moves the value in member message + * @param _message New value to be moved in member message + */ + eProsima_user_DllExport void message( + std::string&& _message); + + /*! + * @brief This function returns a constant reference to member message + * @return Constant reference to member message + */ + eProsima_user_DllExport const std::string& message() const; + + /*! + * @brief This function returns a reference to member message + * @return Reference to member message + */ + eProsima_user_DllExport std::string& message(); + +private: + + uint32_t m_index{0}; + std::string m_message; + +}; + +#endif // _FAST_DDS_GENERATED_CUSTOMPAYLOADPOOLDATA_H_ + diff --git a/examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolData.idl b/examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolData.idl new file mode 100644 index 00000000000..f1357789414 --- /dev/null +++ b/examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolData.idl @@ -0,0 +1,5 @@ +struct CustomPayloadPoolData +{ + unsigned long index; + string message; +}; diff --git a/examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolDataCdrAux.hpp b/examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolDataCdrAux.hpp new file mode 100644 index 00000000000..b25f49dbaf7 --- /dev/null +++ b/examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolDataCdrAux.hpp @@ -0,0 +1,45 @@ +// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/*! + * @file CustomPayloadPoolDataCdrAux.hpp + * This source file contains some definitions of CDR related functions. + * + * This file was generated by the tool fastddsgen. + */ + +#ifndef _FAST_DDS_GENERATED_CUSTOMPAYLOADPOOLDATACDRAUX_HPP_ +#define _FAST_DDS_GENERATED_CUSTOMPAYLOADPOOLDATACDRAUX_HPP_ + +#include "CustomPayloadPoolData.h" + +constexpr uint32_t CustomPayloadPoolData_max_cdr_typesize {268UL}; +constexpr uint32_t CustomPayloadPoolData_max_key_cdr_typesize {0UL}; + + +namespace eprosima { +namespace fastcdr { + +class Cdr; +class CdrSizeCalculator; + +eProsima_user_DllExport void serialize_key( + eprosima::fastcdr::Cdr& scdr, + const CustomPayloadPoolData& data); + + +} // namespace fastcdr +} // namespace eprosima + +#endif // _FAST_DDS_GENERATED_CUSTOMPAYLOADPOOLDATACDRAUX_HPP_ \ No newline at end of file diff --git a/examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolDataCdrAux.ipp b/examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolDataCdrAux.ipp new file mode 100644 index 00000000000..04dd887bd59 --- /dev/null +++ b/examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolDataCdrAux.ipp @@ -0,0 +1,126 @@ +// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/*! + * @file CustomPayloadPoolDataCdrAux.ipp + * This source file contains some declarations of CDR related functions. + * + * This file was generated by the tool fastddsgen. + */ + +#ifndef _FAST_DDS_GENERATED_CUSTOMPAYLOADPOOLDATACDRAUX_IPP_ +#define _FAST_DDS_GENERATED_CUSTOMPAYLOADPOOLDATACDRAUX_IPP_ + +#include "CustomPayloadPoolDataCdrAux.hpp" + +#include +#include + + +#include +using namespace eprosima::fastcdr::exception; + +namespace eprosima { +namespace fastcdr { + +template<> +eProsima_user_DllExport size_t calculate_serialized_size( + eprosima::fastcdr::CdrSizeCalculator& calculator, + const CustomPayloadPoolData& data, + size_t& current_alignment) +{ + static_cast(data); + + eprosima::fastcdr::EncodingAlgorithmFlag previous_encoding = calculator.get_encoding(); + size_t calculated_size {calculator.begin_calculate_type_serialized_size( + eprosima::fastcdr::CdrVersion::XCDRv2 == calculator.get_cdr_version() ? + eprosima::fastcdr::EncodingAlgorithmFlag::DELIMIT_CDR2 : + eprosima::fastcdr::EncodingAlgorithmFlag::PLAIN_CDR, + current_alignment)}; + + + calculated_size += calculator.calculate_member_serialized_size(eprosima::fastcdr::MemberId(0), + data.index(), current_alignment); + + calculated_size += calculator.calculate_member_serialized_size(eprosima::fastcdr::MemberId(1), + data.message(), current_alignment); + + + calculated_size += calculator.end_calculate_type_serialized_size(previous_encoding, current_alignment); + + return calculated_size; +} + +template<> +eProsima_user_DllExport void serialize( + eprosima::fastcdr::Cdr& scdr, + const CustomPayloadPoolData& data) +{ + eprosima::fastcdr::Cdr::state current_state(scdr); + scdr.begin_serialize_type(current_state, + eprosima::fastcdr::CdrVersion::XCDRv2 == scdr.get_cdr_version() ? + eprosima::fastcdr::EncodingAlgorithmFlag::DELIMIT_CDR2 : + eprosima::fastcdr::EncodingAlgorithmFlag::PLAIN_CDR); + + scdr + << eprosima::fastcdr::MemberId(0) << data.index() + << eprosima::fastcdr::MemberId(1) << data.message() +; + + scdr.end_serialize_type(current_state); +} + +template<> +eProsima_user_DllExport void deserialize( + eprosima::fastcdr::Cdr& cdr, + CustomPayloadPoolData& data) +{ + cdr.deserialize_type(eprosima::fastcdr::CdrVersion::XCDRv2 == cdr.get_cdr_version() ? + eprosima::fastcdr::EncodingAlgorithmFlag::DELIMIT_CDR2 : + eprosima::fastcdr::EncodingAlgorithmFlag::PLAIN_CDR, + [&data](eprosima::fastcdr::Cdr& dcdr, const eprosima::fastcdr::MemberId& mid) -> bool + { + bool ret_value = true; + switch (mid.id) + { + case 0: + dcdr >> data.index(); + break; + + case 1: + dcdr >> data.message(); + break; + + default: + ret_value = false; + break; + } + return ret_value; + }); +} + +void serialize_key( + eprosima::fastcdr::Cdr& scdr, + const CustomPayloadPoolData& data) +{ + static_cast(scdr); + static_cast(data); +} + + + +} // namespace fastcdr +} // namespace eprosima + +#endif // _FAST_DDS_GENERATED_CUSTOMPAYLOADPOOLDATACDRAUX_IPP_ \ No newline at end of file diff --git a/examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolDataPubSubTypes.cxx b/examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolDataPubSubTypes.cxx new file mode 100644 index 00000000000..14861477dab --- /dev/null +++ b/examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolDataPubSubTypes.cxx @@ -0,0 +1,185 @@ +// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/*! + * @file CustomPayloadPoolDataPubSubTypes.cpp + * This header file contains the implementation of the serialization functions. + * + * This file was generated by the tool fastddsgen. + */ + + +#include +#include +#include + +#include "CustomPayloadPoolDataPubSubTypes.h" +#include "CustomPayloadPoolDataCdrAux.hpp" + +using SerializedPayload_t = eprosima::fastrtps::rtps::SerializedPayload_t; +using InstanceHandle_t = eprosima::fastrtps::rtps::InstanceHandle_t; +using DataRepresentationId_t = eprosima::fastdds::dds::DataRepresentationId_t; + +CustomPayloadPoolDataPubSubType::CustomPayloadPoolDataPubSubType() +{ + setName("CustomPayloadPoolData"); + uint32_t type_size = CustomPayloadPoolData_max_cdr_typesize; + type_size += static_cast(eprosima::fastcdr::Cdr::alignment(type_size, 4)); /* possible submessage alignment */ + m_typeSize = type_size + 4; /*encapsulation*/ + m_isGetKeyDefined = false; + uint32_t keyLength = CustomPayloadPoolData_max_key_cdr_typesize > 16 ? CustomPayloadPoolData_max_key_cdr_typesize : 16; + m_keyBuffer = reinterpret_cast(malloc(keyLength)); + memset(m_keyBuffer, 0, keyLength); +} + +CustomPayloadPoolDataPubSubType::~CustomPayloadPoolDataPubSubType() +{ + if (m_keyBuffer != nullptr) + { + free(m_keyBuffer); + } +} + +bool CustomPayloadPoolDataPubSubType::serialize( + void* data, + SerializedPayload_t* payload, + DataRepresentationId_t data_representation) +{ + CustomPayloadPoolData* p_type = static_cast(data); + + // Object that manages the raw buffer. + eprosima::fastcdr::FastBuffer fastbuffer(reinterpret_cast(payload->data), payload->max_size); + // Object that serializes the data. + eprosima::fastcdr::Cdr ser(fastbuffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, + data_representation == DataRepresentationId_t::XCDR_DATA_REPRESENTATION ? + eprosima::fastcdr::CdrVersion::XCDRv1 : eprosima::fastcdr::CdrVersion::XCDRv2); + payload->encapsulation = ser.endianness() == eprosima::fastcdr::Cdr::BIG_ENDIANNESS ? CDR_BE : CDR_LE; + ser.set_encoding_flag( + data_representation == DataRepresentationId_t::XCDR_DATA_REPRESENTATION ? + eprosima::fastcdr::EncodingAlgorithmFlag::PLAIN_CDR : + eprosima::fastcdr::EncodingAlgorithmFlag::DELIMIT_CDR2); + + try + { + // Serialize encapsulation + ser.serialize_encapsulation(); + // Serialize the object. + ser << *p_type; + } + catch (eprosima::fastcdr::exception::Exception& /*exception*/) + { + return false; + } + + // Get the serialized length + payload->length = static_cast(ser.get_serialized_data_length()); + return true; +} + +bool CustomPayloadPoolDataPubSubType::deserialize( + SerializedPayload_t* payload, + void* data) +{ + try + { + // Convert DATA to pointer of your type + CustomPayloadPoolData* p_type = static_cast(data); + + // Object that manages the raw buffer. + eprosima::fastcdr::FastBuffer fastbuffer(reinterpret_cast(payload->data), payload->length); + + // Object that deserializes the data. + eprosima::fastcdr::Cdr deser(fastbuffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN); + + // Deserialize encapsulation. + deser.read_encapsulation(); + payload->encapsulation = deser.endianness() == eprosima::fastcdr::Cdr::BIG_ENDIANNESS ? CDR_BE : CDR_LE; + + // Deserialize the object. + deser >> *p_type; + } + catch (eprosima::fastcdr::exception::Exception& /*exception*/) + { + return false; + } + + return true; +} + +std::function CustomPayloadPoolDataPubSubType::getSerializedSizeProvider( + void* data, + DataRepresentationId_t data_representation) +{ + return [data, data_representation]() -> uint32_t + { + eprosima::fastcdr::CdrSizeCalculator calculator( + data_representation == DataRepresentationId_t::XCDR_DATA_REPRESENTATION ? + eprosima::fastcdr::CdrVersion::XCDRv1 :eprosima::fastcdr::CdrVersion::XCDRv2); + size_t current_alignment {0}; + return static_cast(calculator.calculate_serialized_size( + *static_cast(data), current_alignment)) + + 4u /*encapsulation*/; + }; +} + +void* CustomPayloadPoolDataPubSubType::createData() +{ + return reinterpret_cast(new CustomPayloadPoolData()); +} + +void CustomPayloadPoolDataPubSubType::deleteData( + void* data) +{ + delete(reinterpret_cast(data)); +} + +bool CustomPayloadPoolDataPubSubType::getKey( + void* data, + InstanceHandle_t* handle, + bool force_md5) +{ + if (!m_isGetKeyDefined) + { + return false; + } + + CustomPayloadPoolData* p_type = static_cast(data); + + // Object that manages the raw buffer. + eprosima::fastcdr::FastBuffer fastbuffer(reinterpret_cast(m_keyBuffer), + CustomPayloadPoolData_max_key_cdr_typesize); + + // Object that serializes the data. + eprosima::fastcdr::Cdr ser(fastbuffer, eprosima::fastcdr::Cdr::BIG_ENDIANNESS); + eprosima::fastcdr::serialize_key(ser, *p_type); + if (force_md5 || CustomPayloadPoolData_max_key_cdr_typesize > 16) + { + m_md5.init(); + m_md5.update(m_keyBuffer, static_cast(ser.get_serialized_data_length())); + m_md5.finalize(); + for (uint8_t i = 0; i < 16; ++i) + { + handle->value[i] = m_md5.digest[i]; + } + } + else + { + for (uint8_t i = 0; i < 16; ++i) + { + handle->value[i] = m_keyBuffer[i]; + } + } + return true; +} + diff --git a/examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolDataPubSubTypes.h b/examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolDataPubSubTypes.h new file mode 100644 index 00000000000..770cfba4c62 --- /dev/null +++ b/examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolDataPubSubTypes.h @@ -0,0 +1,130 @@ +// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/*! + * @file CustomPayloadPoolDataPubSubTypes.h + * This header file contains the declaration of the serialization functions. + * + * This file was generated by the tool fastddsgen. + */ + + +#ifndef _FAST_DDS_GENERATED_CUSTOMPAYLOADPOOLDATA_PUBSUBTYPES_H_ +#define _FAST_DDS_GENERATED_CUSTOMPAYLOADPOOLDATA_PUBSUBTYPES_H_ + +#include +#include +#include +#include +#include + +#include "CustomPayloadPoolData.h" + + +#if !defined(GEN_API_VER) || (GEN_API_VER != 2) +#error \ + Generated CustomPayloadPoolData is not compatible with current installed Fast DDS. Please, regenerate it with fastddsgen. +#endif // GEN_API_VER + + +/*! + * @brief This class represents the TopicDataType of the type CustomPayloadPoolData defined by the user in the IDL file. + * @ingroup CustomPayloadPoolData + */ +class CustomPayloadPoolDataPubSubType : public eprosima::fastdds::dds::TopicDataType +{ +public: + + typedef CustomPayloadPoolData type; + + eProsima_user_DllExport CustomPayloadPoolDataPubSubType(); + + eProsima_user_DllExport ~CustomPayloadPoolDataPubSubType() override; + + eProsima_user_DllExport bool serialize( + void* data, + eprosima::fastrtps::rtps::SerializedPayload_t* payload) override + { + return serialize(data, payload, eprosima::fastdds::dds::DEFAULT_DATA_REPRESENTATION); + } + + eProsima_user_DllExport bool serialize( + void* data, + eprosima::fastrtps::rtps::SerializedPayload_t* payload, + eprosima::fastdds::dds::DataRepresentationId_t data_representation) override; + + eProsima_user_DllExport bool deserialize( + eprosima::fastrtps::rtps::SerializedPayload_t* payload, + void* data) override; + + eProsima_user_DllExport std::function getSerializedSizeProvider( + void* data) override + { + return getSerializedSizeProvider(data, eprosima::fastdds::dds::DEFAULT_DATA_REPRESENTATION); + } + + eProsima_user_DllExport std::function getSerializedSizeProvider( + void* data, + eprosima::fastdds::dds::DataRepresentationId_t data_representation) override; + + eProsima_user_DllExport bool getKey( + void* data, + eprosima::fastrtps::rtps::InstanceHandle_t* ihandle, + bool force_md5 = false) override; + + eProsima_user_DllExport void* createData() override; + + eProsima_user_DllExport void deleteData( + void* data) override; + +#ifdef TOPIC_DATA_TYPE_API_HAS_IS_BOUNDED + eProsima_user_DllExport inline bool is_bounded() const override + { + return false; + } + +#endif // TOPIC_DATA_TYPE_API_HAS_IS_BOUNDED + +#ifdef TOPIC_DATA_TYPE_API_HAS_IS_PLAIN + eProsima_user_DllExport inline bool is_plain() const override + { + return false; + } + + eProsima_user_DllExport inline bool is_plain( + eprosima::fastdds::dds::DataRepresentationId_t data_representation) const override + { + static_cast(data_representation); + return false; + } + +#endif // TOPIC_DATA_TYPE_API_HAS_IS_PLAIN + +#ifdef TOPIC_DATA_TYPE_API_HAS_CONSTRUCT_SAMPLE + eProsima_user_DllExport inline bool construct_sample( + void* memory) const override + { + static_cast(memory); + return false; + } + +#endif // TOPIC_DATA_TYPE_API_HAS_CONSTRUCT_SAMPLE + + MD5 m_md5; + unsigned char* m_keyBuffer; + +}; + +#endif // _FAST_DDS_GENERATED_CUSTOMPAYLOADPOOLDATA_PUBSUBTYPES_H_ + diff --git a/examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolDataPublisher.cpp b/examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolDataPublisher.cpp new file mode 100644 index 00000000000..866a7364bc4 --- /dev/null +++ b/examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolDataPublisher.cpp @@ -0,0 +1,227 @@ +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file CustomPayloadPoolDataPublisher.cpp + * + */ + +#include +#include + +#include "CustomPayloadPoolDataPublisher.h" + +#include +#include +#include +#include +#include +#include +#include + + +using namespace eprosima::fastdds::dds; +using namespace eprosima::fastdds::rtps; + +std::atomic CustomPayloadPoolDataPublisher::stop_(false); +std::mutex CustomPayloadPoolDataPublisher::wait_matched_cv_mtx_; +std::condition_variable CustomPayloadPoolDataPublisher::wait_matched_cv_; + +CustomPayloadPoolDataPublisher::CustomPayloadPoolDataPublisher( + std::shared_ptr payload_pool) + : payload_pool_(payload_pool) + , participant_(nullptr) + , publisher_(nullptr) + , topic_(nullptr) + , writer_(nullptr) + , type_(new CustomPayloadPoolDataPubSubType()) + , matched_(0) + , has_stopped_for_unexpected_error_(false) +{ +} + +bool CustomPayloadPoolDataPublisher::is_stopped() +{ + return stop_; +} + +void CustomPayloadPoolDataPublisher::stop() +{ + stop_ = true; + awake(); +} + +bool CustomPayloadPoolDataPublisher::init() +{ + hello_.index(0); + hello_.message("CustomPayloadPool"); + DomainParticipantQos pqos = PARTICIPANT_QOS_DEFAULT; + pqos.name("CustomPayloadPoolDataPublisher"); + auto factory = DomainParticipantFactory::get_instance(); + + participant_ = factory->create_participant(0, pqos); + + if (participant_ == nullptr) + { + return false; + } + + /* Register the type */ + type_.register_type(participant_); + + /* Create the publisher */ + publisher_ = participant_->create_publisher( + PUBLISHER_QOS_DEFAULT, + nullptr); + + if (publisher_ == nullptr) + { + return false; + } + + /* Create the topic */ + topic_ = participant_->create_topic( + "CustomPayloadPoolTopic", + type_.get_type_name(), + TOPIC_QOS_DEFAULT); + + if (topic_ == nullptr) + { + return false; + } + + /* Create the writer */ + writer_ = publisher_->create_datawriter( + topic_, + DATAWRITER_QOS_DEFAULT, + this, + StatusMask::all(), + payload_pool_); + + if (writer_ == nullptr) + { + return false; + } + + // Register SIGINT signal handler to stop thread execution + signal(SIGINT, [](int /*signum*/) + { + std::cout << "SIGINT received, stopping Publisher execution." << std::endl; + CustomPayloadPoolDataPublisher::stop(); + }); + + return true; +} + +CustomPayloadPoolDataPublisher::~CustomPayloadPoolDataPublisher() +{ + if (participant_ != nullptr) + { + participant_->delete_contained_entities(); + DomainParticipantFactory::get_instance()->delete_participant(participant_); + } +} + +void CustomPayloadPoolDataPublisher::on_publication_matched( + eprosima::fastdds::dds::DataWriter*, + const eprosima::fastdds::dds::PublicationMatchedStatus& info) +{ + if (info.current_count_change == 1) + { + matched_ = info.current_count; + std::cout << "Publisher matched." << std::endl; + if (matched_ > 0) + { + awake(); + } + } + else if (info.current_count_change == -1) + { + matched_ = info.current_count; + std::cout << "Publisher unmatched." << std::endl; + } + else + { + std::cout << info.current_count_change + << " is not a valid value for PublicationMatchedStatus current count change" << std::endl; + } +} + +void CustomPayloadPoolDataPublisher::wait() +{ + std::unique_lock lck(wait_matched_cv_mtx_); + wait_matched_cv_.wait(lck, [this] + { + return matched_ > 0 || is_stopped(); + }); +} + +void CustomPayloadPoolDataPublisher::awake() +{ + wait_matched_cv_.notify_all(); +} + +void CustomPayloadPoolDataPublisher::run_thread( + uint32_t samples, + uint32_t sleep) +{ + while (!is_stopped() && (samples == 0 || hello_.index() < samples)) + { + if (matched_ > 0) + { + if (publish()) + { + std::cout << "Message: " << hello_.message().data() << " with index: " << hello_.index() + << " SENT" << std::endl; + std::this_thread::sleep_for(std::chrono::milliseconds(sleep)); + } + // something went wrong writing + else + { + has_stopped_for_unexpected_error_ = true; + CustomPayloadPoolDataPublisher::stop(); + } + } + else + { + wait(); + } + } +} + +bool CustomPayloadPoolDataPublisher::run( + uint32_t samples, + uint32_t sleep) +{ + stop_ = false; + std::thread thread(&CustomPayloadPoolDataPublisher::run_thread, this, samples, sleep); + if (samples == 0) + { + std::cout << "Publisher running. Please press CTRL+C to stop the Publisher at any time." << std::endl; + } + else + { + std::cout << "Publisher running " << samples << " samples." << std::endl; + } + + thread.join(); + return has_stopped_for_unexpected_error_; +} + +bool CustomPayloadPoolDataPublisher::publish() +{ + hello_.index(hello_.index() + 1); + stop_ = !writer_->write(&hello_); + return !stop_; +} diff --git a/examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolDataPublisher.h b/examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolDataPublisher.h new file mode 100644 index 00000000000..9e4c0d8026f --- /dev/null +++ b/examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolDataPublisher.h @@ -0,0 +1,115 @@ +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file CustomPayloadPoolDataPublisher.h + * + */ + +#ifndef CUSTOM_PAYLOAD_POOL_DATA_PUBLISHER_H_ +#define CUSTOM_PAYLOAD_POOL_DATA_PUBLISHER_H_ + +#include +#include + +#include "CustomPayloadPool.hpp" +#include "CustomPayloadPoolDataPubSubTypes.h" + +#include +#include +#include + +/** + * Class used to group into a single working unit a Publisher with a DataWriter, its listener, and a TypeSupport member + * corresponding to the HelloWorld datatype + */ +class CustomPayloadPoolDataPublisher : private eprosima::fastdds::dds::DataWriterListener +{ +public: + + CustomPayloadPoolDataPublisher( + std::shared_ptr payload_pool); + + virtual ~CustomPayloadPoolDataPublisher(); + + //! Initialize the publisher + bool init(); + + //! Run for number samples, publish every sleep seconds + bool run( + uint32_t number, + uint32_t sleep); + +private: + + //! Publish a sample + bool publish(); + + //! Run thread for number samples, publish every sleep seconds + void run_thread( + uint32_t number, + uint32_t sleep); + + //! Return the current state of execution + static bool is_stopped(); + + //! Trigger the end of execution + static void stop(); + + //! Callback executed when a DataReader is matched or unmatched + void on_publication_matched( + eprosima::fastdds::dds::DataWriter* writer, + const eprosima::fastdds::dds::PublicationMatchedStatus& info) override; + + //! Return true if there are at least 1 matched DataReaders + bool enough_matched(); + + //! Block the thread until enough DataReaders are matched + void wait(); + + //! Unblock the thread so publication of samples begins/resumes + static void awake(); + + CustomPayloadPoolData hello_; + + std::shared_ptr payload_pool_; + + eprosima::fastdds::dds::DomainParticipant* participant_; + + eprosima::fastdds::dds::Publisher* publisher_; + + eprosima::fastdds::dds::Topic* topic_; + + eprosima::fastdds::dds::DataWriter* writer_; + + eprosima::fastdds::dds::TypeSupport type_; + + //! Number of DataReaders matched to the associated DataWriter + std::atomic matched_; + + //! Member used for control flow purposes + std::atomic has_stopped_for_unexpected_error_; + + //! Member used for control flow purposes + static std::atomic stop_; + + //! Protects wait_matched condition variable + static std::mutex wait_matched_cv_mtx_; + + //! Waits until enough DataReaders are matched + static std::condition_variable wait_matched_cv_; +}; + + +#endif /* CUSTOM_PAYLOAD_POOL_DATA_PUBLISHER_H_ */ diff --git a/examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolDataSubscriber.cpp b/examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolDataSubscriber.cpp new file mode 100644 index 00000000000..5dc610964ac --- /dev/null +++ b/examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolDataSubscriber.cpp @@ -0,0 +1,191 @@ +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file CustomPayloadPoolDataSubscriber.cpp + * + */ + +#include + +#include "CustomPayloadPoolDataSubscriber.h" + +#include +#include +#include +#include +#include +#include +#include + +using namespace eprosima::fastdds::dds; + +std::atomic CustomPayloadPoolDataSubscriber::stop_(false); +std::mutex CustomPayloadPoolDataSubscriber::terminate_cv_mtx_; +std::condition_variable CustomPayloadPoolDataSubscriber::terminate_cv_; + +CustomPayloadPoolDataSubscriber::CustomPayloadPoolDataSubscriber( + std::shared_ptr payload_pool) + : payload_pool_(payload_pool) + , participant_(nullptr) + , subscriber_(nullptr) + , topic_(nullptr) + , reader_(nullptr) + , type_(new CustomPayloadPoolDataPubSubType()) + , matched_(0) + , samples_(0) + , max_samples_(0) +{ + +} + +bool CustomPayloadPoolDataSubscriber::is_stopped() +{ + return stop_; +} + +void CustomPayloadPoolDataSubscriber::stop() +{ + stop_ = true; + terminate_cv_.notify_all(); +} + +bool CustomPayloadPoolDataSubscriber::init() +{ + DomainParticipantQos pqos = PARTICIPANT_QOS_DEFAULT; + pqos.name("CustomPayloadPoolDataSubscriber"); + auto factory = DomainParticipantFactory::get_instance(); + + participant_ = factory->create_participant(0, pqos); + + if (participant_ == nullptr) + { + return false; + } + + /* Register the type */ + type_.register_type(participant_); + + /* Create the subscriber */ + subscriber_ = participant_->create_subscriber(SUBSCRIBER_QOS_DEFAULT, nullptr); + + if (subscriber_ == nullptr) + { + return false; + } + + /* Create the topic */ + topic_ = participant_->create_topic( + "CustomPayloadPoolTopic", + type_.get_type_name(), + TOPIC_QOS_DEFAULT); + + if (topic_ == nullptr) + { + return false; + } + + /* Create the reader */ + DataReaderQos rqos = DATAREADER_QOS_DEFAULT; + rqos.reliability().kind = RELIABLE_RELIABILITY_QOS; + + reader_ = subscriber_->create_datareader(topic_, rqos, this, StatusMask::all(), payload_pool_); + + if (reader_ == nullptr) + { + return false; + } + + // Register SIGINT signal handler to stop thread execution + signal(SIGINT, [](int /*signum*/) + { + std::cout << "SIGINT received, stopping subscriber execution." << std::endl; + CustomPayloadPoolDataSubscriber::stop(); + }); + + return true; +} + +CustomPayloadPoolDataSubscriber::~CustomPayloadPoolDataSubscriber() +{ + if (participant_ != nullptr) + { + participant_->delete_contained_entities(); + DomainParticipantFactory::get_instance()->delete_participant(participant_); + } +} + +void CustomPayloadPoolDataSubscriber::on_subscription_matched( + DataReader*, + const SubscriptionMatchedStatus& info) +{ + if (info.current_count_change == 1) + { + matched_ = info.total_count; + std::cout << "Subscriber matched." << std::endl; + } + else if (info.current_count_change == -1) + { + matched_ = info.total_count; + std::cout << "Subscriber unmatched." << std::endl; + } + else + { + std::cout << info.current_count_change + << " is not a valid value for SubscriptionMatchedStatus current count change" << std::endl; + } +} + +void CustomPayloadPoolDataSubscriber::on_data_available( + DataReader* reader) +{ + SampleInfo info; + if (reader->take_next_sample(&hello_, &info) == ReturnCode_t::RETCODE_OK) + { + if (info.instance_state == ALIVE_INSTANCE_STATE) + { + samples_++; + // Print your structure data here. + std::cout << "Message [" << samples_ << "] of " << hello_.message() << " " << hello_.index() + << " RECEIVED" << std::endl; + + if (max_samples_ > 0 && (samples_ >= max_samples_)) + { + stop(); + } + } + } +} + +bool CustomPayloadPoolDataSubscriber::run( + uint32_t samples) +{ + max_samples_ = samples; + stop_ = false; + if (samples == 0) + { + std::cout << "Subscriber running. Please press Ctrl+C to stop the Subscriber at any time." << std::endl; + } + else + { + std::cout << "Subscriber running until " << samples << " samples have been received" << std::endl; + } + + std::unique_lock lck(terminate_cv_mtx_); + terminate_cv_.wait(lck, [] + { + return is_stopped(); + }); + return is_stopped(); +} diff --git a/examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolDataSubscriber.h b/examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolDataSubscriber.h new file mode 100644 index 00000000000..ab1ab59039d --- /dev/null +++ b/examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPoolDataSubscriber.h @@ -0,0 +1,95 @@ +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file CustomPayloadPoolDataSubscriber.h + * + */ + +#ifndef CUSTOM_PAYLOAD_POOL_DATA_SUBSCRIBER_H_ +#define CUSTOM_PAYLOAD_POOL_DATA_SUBSCRIBER_H_ + +#include +#include + +#include "CustomPayloadPoolDataPubSubTypes.h" +#include "CustomPayloadPool.hpp" + +#include +#include +#include +#include + +class CustomPayloadPoolDataSubscriber : private eprosima::fastdds::dds::DataReaderListener +{ +public: + + CustomPayloadPoolDataSubscriber( + std::shared_ptr payload_pool); + + virtual ~CustomPayloadPoolDataSubscriber(); + + //!Initialize the subscriber + bool init(); + + //!Run the subscriber until all samples have been received. + bool run( + uint32_t samples); + +private: + + void on_data_available( + eprosima::fastdds::dds::DataReader* reader) override; + + void on_subscription_matched( + eprosima::fastdds::dds::DataReader* reader, + const eprosima::fastdds::dds::SubscriptionMatchedStatus& info) override; + + //! Return the current state of execution + static bool is_stopped(); + + //! Trigger the end of execution + static void stop(); + + CustomPayloadPoolData hello_; + + std::shared_ptr payload_pool_; + + eprosima::fastdds::dds::DomainParticipant* participant_; + + eprosima::fastdds::dds::Subscriber* subscriber_; + + eprosima::fastdds::dds::Topic* topic_; + + eprosima::fastdds::dds::DataReader* reader_; + + eprosima::fastdds::dds::TypeSupport type_; + + int32_t matched_; + + uint32_t samples_; + + uint32_t max_samples_; + + //! Member used for control flow purposes + static std::atomic stop_; + + //! Protects terminate condition variable + static std::mutex terminate_cv_mtx_; + + //! Waits during execution until SIGINT or max_messages_ samples are received + static std::condition_variable terminate_cv_; +}; + +#endif /* CUSTOM_PAYLOAD_POOL_DATA_SUBSCRIBER_H_ */ diff --git a/examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPool_main.cpp b/examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPool_main.cpp new file mode 100644 index 00000000000..da78c8a224c --- /dev/null +++ b/examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPool_main.cpp @@ -0,0 +1,318 @@ +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file CustomPayloadPool_main.cpp + * + */ + +#include +#include + +#include "CustomPayloadPoolDataPublisher.h" +#include "CustomPayloadPoolDataSubscriber.h" + +#include +#include + +#include + +using eprosima::fastdds::dds::Log; + +namespace option = eprosima::option; + +enum ApplicationRole : uint8_t +{ + PUBLISHER, + SUBSCRIBER +}; + +struct Arg : public option::Arg +{ + static void print_error( + const char* msg1, + const option::Option& opt, + const char* msg2) + { + fprintf(stderr, "%s", msg1); + fwrite(opt.name, opt.namelen, 1, stderr); + fprintf(stderr, "%s", msg2); + } + + static option::ArgStatus Unknown( + const option::Option& option, + bool msg) + { + if (msg) + { + print_error("Unknown option '", option, "'\n"); + } + return option::ARG_ILLEGAL; + } + + static option::ArgStatus Required( + const option::Option& option, + bool msg) + { + if (option.arg != 0 && option.arg[0] != 0) + { + return option::ARG_OK; + } + + if (msg) + { + print_error("Option '", option, "' requires an argument\n"); + } + return option::ARG_ILLEGAL; + } + + static option::ArgStatus Numeric( + const option::Option& option, + bool msg) + { + char* endptr = 0; + if ( option.arg != nullptr ) + { + strtol(option.arg, &endptr, 10); + if (endptr != option.arg && *endptr == 0) + { + return option::ARG_OK; + } + } + + if (msg) + { + print_error("Option '", option, "' requires a numeric argument\n"); + } + return option::ARG_ILLEGAL; + } + + template::max()> + static option::ArgStatus NumericRange( + const option::Option& option, + bool msg) + { + static_assert(min <= max, "NumericRange: invalid range provided."); + + char* endptr = 0; + if ( option.arg != nullptr ) + { + long value = strtol(option.arg, &endptr, 10); + if ( endptr != option.arg && *endptr == 0 && + value >= min && value <= max) + { + return option::ARG_OK; + } + } + + if (msg) + { + std::ostringstream os; + os << "' requires a numeric argument in range [" + << min << ", " << max << "]" << std::endl; + print_error("Option '", option, os.str().c_str()); + } + + return option::ARG_ILLEGAL; + } + + static option::ArgStatus String( + const option::Option& option, + bool msg) + { + if (option.arg != 0) + { + return option::ARG_OK; + } + if (msg) + { + print_error("Option '", option, "' requires an argument\n"); + } + return option::ARG_ILLEGAL; + } + +}; + +enum optionIndex +{ + UNKNOWN_OPT, + HELP, + SAMPLES, + INTERVAL +}; + +const option::Descriptor usage[] = { + { UNKNOWN_OPT, 0, "", "", Arg::None, + "Usage: CustomPayloadPoolExample \n\nGeneral options:" }, + { HELP, 0, "h", "help", Arg::None, " -h \t--help \tProduce help message." }, + { SAMPLES, 0, "s", "samples", Arg::NumericRange<>, + " -s , \t--samples= \tNumber of samples (0, default, infinite)." }, + { UNKNOWN_OPT, 0, "", "", Arg::None, "\nPublisher options:"}, + { INTERVAL, 0, "i", "interval", Arg::NumericRange<>, + " -i , \t--interval= \tTime between samples in milliseconds (Default: 100)." }, + { 0, 0, 0, 0, 0, 0 } +}; + +int main( + int argc, + char** argv) +{ + int columns; + +#if defined(_WIN32) + char* buf = nullptr; + size_t sz = 0; + if (_dupenv_s(&buf, &sz, "COLUMNS") == 0 && buf != nullptr) + { + columns = strtol(buf, nullptr, 10); + free(buf); + } + else + { + columns = 80; + } +#else + columns = getenv("COLUMNS") ? atoi(getenv("COLUMNS")) : 80; +#endif // if defined(_WIN32) + + std::cout << "Starting " << std::endl; + + int type = ApplicationRole::PUBLISHER; + uint32_t count = 0; + uint32_t sleep = 100; + + argc -= (argc > 0); + argv += (argc > 0); // skip program name argv[0] if present + option::Stats stats(true, usage, argc, argv); + std::vector options(stats.options_max); + std::vector buffer(stats.buffer_max); + option::Parser parse(true, usage, argc, argv, &options[0], &buffer[0]); + + try + { + if (parse.error()) + { + throw 1; + } + + if (options[HELP] || options[UNKNOWN_OPT]) + { + throw 1; + } + + if (parse.nonOptionsCount() < 1) + { + throw 2; + } + + // Decide between publisher or subscriber + const char* type_name = parse.nonOption(0); + + // make sure is the first option. + // type_name and buffer[0].name reference the original command line char array + // type_name must precede any other arguments in the array. + // Note buffer[0].arg may be null for non-valued options and is not reliable for + // testing purposes. + if (parse.optionsCount() && type_name >= buffer[0].name) + { + throw 2; + } + + if (strcmp(type_name, "publisher") == 0) + { + type = ApplicationRole::PUBLISHER; + } + else if (strcmp(type_name, "subscriber") == 0) + { + type = ApplicationRole::SUBSCRIBER; + } + else + { + throw 2; + } + } + catch (int error) + { + if ( error == 2 ) + { + std::cerr << "ERROR: first argument must be " << std::endl; + } + option::printUsage(fwrite, stdout, usage, columns); + return error; + } + + // Decide between the old and new syntax + if (parse.nonOptionsCount() > 1) + { + // old syntax, only affects publishers + // old and new syntax cannot be mixed + if (type != ApplicationRole::PUBLISHER || parse.optionsCount() >= 0) + { + option::printUsage(fwrite, stdout, usage, columns); + return 1; + } + } + else + { + // new syntax + option::Option* opt = options[SAMPLES]; + if (opt) + { + count = strtol(opt->arg, nullptr, 10); + } + + opt = options[INTERVAL]; + if (opt) + { + sleep = strtol(opt->arg, nullptr, 10); + } + } + + // Create custom payload pool + std::shared_ptr payload_pool = std::make_shared(); + + bool execution_status = false; + + switch (type) + { + case ApplicationRole::PUBLISHER: + { + CustomPayloadPoolDataPublisher mypub(payload_pool); + if (mypub.init()) + { + execution_status = !mypub.run(count, sleep); + } + else + { + return 1; + } + break; + } + case ApplicationRole::SUBSCRIBER: + { + CustomPayloadPoolDataSubscriber mysub(payload_pool); + if (mysub.init()) + { + execution_status = !mysub.run(count); + } + else + { + return 1; + } + break; + } + } + Log::Reset(); + return execution_status; +} diff --git a/examples/cpp/dds/CustomPayloadPoolExample/README.md b/examples/cpp/dds/CustomPayloadPoolExample/README.md new file mode 100644 index 00000000000..ee32ace9a46 --- /dev/null +++ b/examples/cpp/dds/CustomPayloadPoolExample/README.md @@ -0,0 +1,13 @@ +# Objetive + +This example has been developed to allow setting the endpoints' payload pool from the DDS layer. +In that way, a custom payload pool is mandatory to be implemented, and passed to the endpoints in the publisher and subscriber implementations. + +# Launch + +To launch this test open two different consoles: + +In the first one launch: `./CustomPayloadPoolExample publisher` (or `CustomPayloadPoolExample.exe publisher` on Windows). +In the second one: `./CustomPayloadPoolExample subscriber` (or `CustomPayloadPoolExample.exe subscriber` on Windows). + +The endpoints will match and communicate with the custom payload pool.