From 600014d43f99b85703dfeaf72b28bc93d77731d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20D=C3=ADaz=20Qu=C3=ADlez?= <33602217+Tempate@users.noreply.github.com> Date: Tue, 7 Nov 2023 16:19:10 +0100 Subject: [PATCH] Manually fix a set of Topic QoS, and implement the max-tx-rate, max-rx-rate, and downsampling (#64) * Max-reception-rate & downsampling in RTPS participants Signed-off-by: tempate * Max-reception-rate & downsampling in DDS participants Signed-off-by: tempate * Rename max-reception-rate to max-rx-rate Signed-off-by: tempate * Basic support for manual topics tag Signed-off-by: tempate * Read all TopicQoS in the manual topics Signed-off-by: tempate * Implement max transmission rate Signed-off-by: tempate * Minor fixes Signed-off-by: tempate * Fix bugs and clean code Signed-off-by: tempate * Fix rebase error Signed-off-by: tempate * Make the DdsPipeConfiguration required Signed-off-by: tempate * Apply manual topics by default to all participants Signed-off-by: tempate * Remove Wildcard Topic without name test Signed-off-by: tempate * Simplify the DdsPipe's constructor Signed-off-by: tempate * Minor fixes Signed-off-by: tempate * Support on_data_available with multiple messages Signed-off-by: tempate * Uncrustify Signed-off-by: tempate * Fix compilation in Ubuntu 20.04 debug Signed-off-by: tempate * Support manual topics in the dds participant Signed-off-by: tempate * Uncrustify Signed-off-by: tempate * Minor fixes Signed-off-by: tempate * Fix bug customizing a topic in the DdsBridge Signed-off-by: tempate * Apply suggestions Signed-off-by: tempate * Avoid using DdsTopics in the DdsBridge Signed-off-by: tempate * Minor fixes Signed-off-by: tempate * Add Topic QoS to the SchemaParticipant Signed-off-by: tempate * Bugfix copy method Signed-off-by: tempate * Uncrustify & Windows fix Signed-off-by: tempate * Allowed topics test Signed-off-by: tempate * Pack the QoSs in specs on a Topic QoS object Signed-off-by: tempate * Fix default Topic QoS variable Signed-off-by: tempate * Apply suggestions Signed-off-by: tempate * Rename max-depth to history-depth Signed-off-by: tempate * Uncrustify Signed-off-by: tempate * Remove test that checks parsing QoS in a DdsTopic Signed-off-by: tempate * Fix compilation in Ubuntu 20.04 debug Signed-off-by: tempate * Improve test according to suggestion Signed-off-by: tempate * Apply specs TopicQoS over discovery TopicQoS Signed-off-by: tempate * Apply suggestions Signed-off-by: tempate * Apply suggestions Signed-off-by: tempate * Apply more suggestions Signed-off-by: tempate * Add missing Windows header Signed-off-by: tempate --------- Signed-off-by: tempate --- .../communication/dds/DdsBridge.hpp | 27 ++- .../configuration/DdsPipeConfiguration.hpp | 23 +++ .../include/ddspipe_core/core/DdsPipe.hpp | 61 ++++--- .../ddspipe_core/interface/IParticipant.hpp | 17 +- .../include/ddspipe_core/interface/ITopic.hpp | 5 + .../ddspipe_core/types/dds/TopicQoS.hpp | 146 ++++++++++------ .../ddspipe_core/types/topic/Topic.hpp | 23 ++- .../ddspipe_core/types/topic/dds/DdsTopic.hpp | 22 ++- .../types/topic/filter/ManualTopic.hpp | 31 ++++ .../topic/filter/WildcardDdsFilterTopic.hpp | 7 +- .../src/cpp/communication/dds/DdsBridge.cpp | 41 ++++- .../configuration/DdsPipeConfiguration.cpp | 17 ++ ddspipe_core/src/cpp/core/DdsPipe.cpp | 161 +++++++++++------- ddspipe_core/src/cpp/types/dds/TopicQoS.cpp | 103 +++++++++-- ddspipe_core/src/cpp/types/topic/Topic.cpp | 24 ++- .../src/cpp/types/topic/dds/DdsTopic.cpp | 20 +++ .../test/unittest/core/ddspipe/CMakeLists.txt | 1 + .../unittest/core/ddspipe/DdsPipeTest.cpp | 160 +++++++++++++---- .../ParticipantConfiguration.hpp | 8 +- .../participant/auxiliar/BlankParticipant.hpp | 5 + .../participant/dds/CommonParticipant.hpp | 4 + .../dynamic_types/SchemaParticipant.hpp | 7 + .../participant/rtps/CommonParticipant.hpp | 9 +- .../reader/auxiliar/BaseReader.hpp | 28 ++- .../reader/dds/CommonReader.hpp | 4 + .../reader/rtps/CommonReader.hpp | 12 +- .../testing/entities/mock_entities.hpp | 6 +- .../writer/auxiliar/BaseWriter.hpp | 19 ++- .../participant/auxiliar/BlankParticipant.cpp | 6 + .../cpp/participant/dds/CommonParticipant.cpp | 7 + .../dynamic_types/SchemaParticipant.cpp | 8 +- .../participant/rtps/CommonParticipant.cpp | 8 + .../src/cpp/reader/auxiliar/BaseReader.cpp | 47 ++++- .../src/cpp/reader/dds/CommonReader.cpp | 29 ++-- .../src/cpp/reader/rtps/CommonReader.cpp | 44 ++--- .../cpp/testing/entities/mock_entities.cpp | 6 + ddspipe_participants/src/cpp/utils/utils.cpp | 12 +- .../src/cpp/writer/auxiliar/BaseWriter.cpp | 43 ++++- .../src/cpp/writer/dds/CommonWriter.cpp | 4 +- .../src/cpp/writer/rtps/CommonWriter.cpp | 2 +- .../DdsPipeCommunicationMockTest.cpp | 60 ++++--- .../ParticipantsCreationgTest.cpp | 15 +- .../include/ddspipe_yaml/YamlReader.hpp | 21 ++- .../ddspipe_yaml/yaml_configuration_tags.hpp | 23 ++- .../src/cpp/YamlReader_participants.cpp | 6 + ddspipe_yaml/src/cpp/YamlReader_types.cpp | 128 +++++++++----- .../entities/topic/YamlGetEntityTopicTest.cpp | 45 ----- 47 files changed, 1086 insertions(+), 419 deletions(-) create mode 100644 ddspipe_core/include/ddspipe_core/types/topic/filter/ManualTopic.hpp diff --git a/ddspipe_core/include/ddspipe_core/communication/dds/DdsBridge.hpp b/ddspipe_core/include/ddspipe_core/communication/dds/DdsBridge.hpp index 6056313a..e1af73ec 100644 --- a/ddspipe_core/include/ddspipe_core/communication/dds/DdsBridge.hpp +++ b/ddspipe_core/include/ddspipe_core/communication/dds/DdsBridge.hpp @@ -20,6 +20,8 @@ #include #include #include +#include +#include namespace eprosima { namespace ddspipe { @@ -58,7 +60,8 @@ class DdsBridge : public Bridge const std::shared_ptr& payload_pool, const std::shared_ptr& thread_pool, const RoutesConfiguration& routes_config, - const bool remove_unused_entities); + const bool remove_unused_entities, + const std::vector& manual_topics); DDSPIPE_CORE_DllAPI ~DdsBridge(); @@ -124,8 +127,6 @@ class DdsBridge : public Bridge * If the Participant's IReader doesn't exist, create it. * If the Participant's Track doesn't exist, create it. * - * Thread safe - * * @param writers: The map of ids to writers that are required for the tracks. * * @throw InitializationException in case \c IReaders creation fails. @@ -140,8 +141,6 @@ class DdsBridge : public Bridge * If the Participant's IReader doesn't exist, create it. * If the Participant's Track doesn't exist, create it. * - * Thread safe - * * @param writers: The map of ids to writers that are required for the tracks. * * @throw InitializationException in case \c IReaders creation fails. @@ -150,10 +149,28 @@ class DdsBridge : public Bridge void add_writers_to_tracks_nts_( std::map>& writers); + /** + * @brief Impose the Topic QoS that have been pre-configured for a participant. + * + * First, it imposes the Topic QoS configured at \c manual_topics and then the ones configured at \c participants. + */ + DDSPIPE_CORE_DllAPI + utils::Heritable create_topic_for_participant_nts_( + const std::shared_ptr& participant) noexcept; + + ///////////////////////// + // VARIABLES + ///////////////////////// + + //! Topic associated to the DdsBridge. utils::Heritable topic_; + //! Routes associated to the Topic. RoutesConfiguration::RoutesMap routes_; + //! Topics that explicitally set a QoS attribute for this participant. + std::vector manual_topics_; + /** * Inside \c Tracks * They are indexed by the Id of the participant that is source diff --git a/ddspipe_core/include/ddspipe_core/configuration/DdsPipeConfiguration.hpp b/ddspipe_core/include/ddspipe_core/configuration/DdsPipeConfiguration.hpp index c4092238..fe8bfb56 100644 --- a/ddspipe_core/include/ddspipe_core/configuration/DdsPipeConfiguration.hpp +++ b/ddspipe_core/include/ddspipe_core/configuration/DdsPipeConfiguration.hpp @@ -24,6 +24,7 @@ #include #include #include +#include #include @@ -73,10 +74,29 @@ struct DdsPipeConfiguration : public IConfiguration RoutesConfiguration get_routes_config( const utils::Heritable& topic) const noexcept; + /** + * @brief Select the \c manual_topics for a topic. + * + * @return The manual topics for a specific topic. + */ + DDSPIPE_CORE_DllAPI + std::vector get_manual_topics( + const core::ITopic& topic) const noexcept; + ///////////////////////// // VARIABLES ///////////////////////// + //! Topic lists to build the AllowedTopics + std::set> allowlist{}; + std::set> blocklist{}; + + //! Builtin topics to create at the beggining of the execution + std::set> builtin_topics{}; + + //! Set of manually configured Topic QoS + std::vector manual_topics{}; + //! Configuration of the generic routes. RoutesConfiguration routes{}; @@ -85,6 +105,9 @@ struct DdsPipeConfiguration : public IConfiguration //! Whether entities should be removed when they have no writers connected to them. bool remove_unused_entities = false; + + //! Whether the DDS Pipe should be initialized enabled. + bool init_enabled = false; }; } /* namespace core */ diff --git a/ddspipe_core/include/ddspipe_core/core/DdsPipe.hpp b/ddspipe_core/include/ddspipe_core/core/DdsPipe.hpp index 4952bd16..ef61ad51 100644 --- a/ddspipe_core/include/ddspipe_core/core/DdsPipe.hpp +++ b/ddspipe_core/include/ddspipe_core/core/DdsPipe.hpp @@ -61,14 +61,11 @@ class DdsPipe */ DDSPIPE_CORE_DllAPI DdsPipe( - const std::shared_ptr& allowed_topics, + const DdsPipeConfiguration& configuration, const std::shared_ptr& discovery_database, const std::shared_ptr& payload_pool, const std::shared_ptr& participants_database, - const std::shared_ptr& thread_pool, - const std::set>& builtin_topics = {}, - bool start_enable = false, - const DdsPipeConfiguration& configuration = {}); + const std::shared_ptr& thread_pool); /** * @brief Destroy the DdsPipe object @@ -85,19 +82,21 @@ class DdsPipe ///////////////////////// /** - * @brief Reload the allowed topic configuration + * @brief Reload the DdsPipe configuration. * - * @param [in] configuration : new configuration + * @param [in] new_configuration : new configuration. * - * @return \c RETCODE_OK if configuration has been updated correctly - * @return \c RETCODE_NO_DATA if new configuration has not changed - * @return \c RETCODE_ERROR if any other error has occurred + * @return \c RETCODE_OK if the configuration has been updated correctly. + * @return \c RETCODE_NO_DATA if the new configuration has not changed. + * @return \c RETCODE_ERROR if any other error has occurred. * - * @throw \c ConfigurationException in case the new yaml is not well-formed + * @note This method checks that the new configuration file is valid and calls \c reload_allowed_topics_ + * + * @throw \c ConfigurationException in case the new yaml is not well-formed. */ DDSPIPE_CORE_DllAPI - utils::ReturnCode reload_allowed_topics( - const std::shared_ptr& allowed_topics); + utils::ReturnCode reload_configuration( + const DdsPipeConfiguration& new_configuration); ///////////////////////// // ENABLING METHODS @@ -129,6 +128,28 @@ class DdsPipe protected: + ///////////////////////// + // INTERACTION METHODS + ///////////////////////// + /** + * @brief Load allowed topics from configuration + * + * @throw \c ConfigurationException in case the yaml inside allowlist is not well-formed + */ + void init_allowed_topics_(); + + /** + * @brief Reload the allowed topics configuration. + * + * @param [in] allowed_topics : new allowed topics. + * + * @return \c RETCODE_OK if the allowed topics have been updated correctly. + * @return \c RETCODE_NO_DATA if the new allowed topics have not changed. + * @return \c RETCODE_ERROR if any other error has occurred. + */ + utils::ReturnCode reload_allowed_topics_( + const std::shared_ptr& allowed_topics); + ///////////////////////// // CALLBACK METHODS ///////////////////////// @@ -308,6 +329,13 @@ class DdsPipe */ void deactivate_all_topics_nts_() noexcept; + ////////////////////////// + // CONFIGURATION VARIABLES + ////////////////////////// + + //! Configuration of the DDS Pipe + DdsPipeConfiguration configuration_; + ///////////////////////// // SHARED DATA STORAGE ///////////////////////// @@ -378,13 +406,6 @@ class DdsPipe * @brief Internal mutex for concurrent calls */ mutable std::mutex mutex_; - - ////////////////////////// - // CONFIGURATION VARIABLES - ////////////////////////// - - //! Configuration of the DDS Pipe - DdsPipeConfiguration configuration_; }; } /* namespace core */ diff --git a/ddspipe_core/include/ddspipe_core/interface/IParticipant.hpp b/ddspipe_core/include/ddspipe_core/interface/IParticipant.hpp index 6e5a5b39..80ca5b68 100644 --- a/ddspipe_core/include/ddspipe_core/interface/IParticipant.hpp +++ b/ddspipe_core/include/ddspipe_core/interface/IParticipant.hpp @@ -16,6 +16,7 @@ #include #include +#include #include #include @@ -43,24 +44,22 @@ class IParticipant DDSPIPE_CORE_DllAPI virtual ~IParticipant() = default; - /** - * @brief Return the unique identifier of this Participant. - * - * @return This Participant id - */ + //! The Participant's unique identifier. DDSPIPE_CORE_DllAPI virtual types::ParticipantId id() const noexcept = 0; - //! Whether this participant is RTPS + //! Whether the Participant is RTPS. DDSPIPE_CORE_DllAPI virtual bool is_rtps_kind() const noexcept = 0; - /** - * @brief Whether this Participant requires to connect ist own readers with its own writers. - */ + //! Whether the Participant requires to connect its own readers with its own writers. DDSPIPE_CORE_DllAPI virtual bool is_repeater() const noexcept = 0; + //! The Participant's Topic QoS. + DDSPIPE_CORE_DllAPI + virtual types::TopicQoS topic_qos() const noexcept = 0; + /** * @brief Return a new Writer * diff --git a/ddspipe_core/include/ddspipe_core/interface/ITopic.hpp b/ddspipe_core/include/ddspipe_core/interface/ITopic.hpp index ff21b668..64cd70a0 100644 --- a/ddspipe_core/include/ddspipe_core/interface/ITopic.hpp +++ b/ddspipe_core/include/ddspipe_core/interface/ITopic.hpp @@ -18,6 +18,7 @@ #include #include +#include #include #include @@ -60,6 +61,10 @@ class ITopic DDSPIPE_CORE_DllAPI virtual std::string serialize() const noexcept = 0; + //! Make a copy of the ITopic + DDSPIPE_CORE_DllAPI + virtual utils::Heritable copy() const noexcept = 0; + /** * This refers to an internal used identifier that declares which kind of data type is going to be * transmitted in this Itopic inside the core. diff --git a/ddspipe_core/include/ddspipe_core/types/dds/TopicQoS.hpp b/ddspipe_core/include/ddspipe_core/types/dds/TopicQoS.hpp index 7b139650..8f6634da 100644 --- a/ddspipe_core/include/ddspipe_core/types/dds/TopicQoS.hpp +++ b/ddspipe_core/include/ddspipe_core/types/dds/TopicQoS.hpp @@ -14,6 +14,8 @@ #pragma once +#include + #include #include @@ -37,19 +39,23 @@ using HistoryDepthType = unsigned int; using OwnershipQosPolicyKind = eprosima::fastdds::dds::OwnershipQosPolicyKind; /** - * Collection of QoS related with a Topic. + * The collection of QoS related to a Topic. * - * The QoS associated with Topic are: - * - Reliability - * - Durability - * - Ownership - * - Partitions - * - History Depth (history kind is always KEEP_LAST) + * The Topic QoS are: + * - Durability + * - Reliability + * - Ownership + * - Partitions + * - Keyed + * - History Depth + * - Max Transmission Rate + * - Max Reception Rate + * - Downsampling * - * @warning partitions are considered as a QoS, thus a Topic can only have partitions, or not have any, but cannot - * support empty partition and partitions. + * @warning partitions are considered a Topic QoS. A Topic can then only either have partitions or not have them, but it + * cannot support empty partitions. * - * @todo add keys to Topic QoS + * @todo create a child of TopicQoS called DdsTopicQoS that contains the QoS specific to DDS. */ struct TopicQoS @@ -91,71 +97,109 @@ TopicQoS DDSPIPE_CORE_DllAPI bool has_partitions() const noexcept; - ///////////////////////// - // GLOBAL VARIABLES - ///////////////////////// - /** - * @brief Global value to store the default history depth in this execution. + * @brief Set the Topic QoS that have not been set and are set in \c qos . * - * This value can change along the execution. - * Every new TopicQoS object will use this value as \c history_depth default. + * @note A Topic QoS is considered as set when it has a FuzzyLevel higher than DEFAULT. */ DDSPIPE_CORE_DllAPI - static std::atomic default_history_depth; + void set_qos( + const TopicQoS& qos, + const utils::FuzzyLevelValues& fuzzy_level = utils::FuzzyLevelValues::fuzzy_level_fuzzy) noexcept; /** - * @brief Global value to store the default downsampling factor in this execution. - * - * This value can change along the execution. - * Every new TopicQoS object will use this value as \c downsampling default. + * @brief Set the default Topic QoS. */ DDSPIPE_CORE_DllAPI - static std::atomic default_downsampling; - - /** - * @brief Global value to store the default max reception rate in this execution. - * - * This value can change along the execution. - * Every new TopicQoS object will use this value as \c max_reception_rate default. - */ - DDSPIPE_CORE_DllAPI - static std::atomic default_max_reception_rate; + void set_default_qos( + DurabilityKind durability_qos = DEFAULT_DURABILITY_QOS, + ReliabilityKind reliability_qos = DEFAULT_RELIABILITY_QOS, + OwnershipQosPolicyKind ownership_qos = DEFAULT_OWNERSHIP_QOS, + bool use_partitions = DEFAULT_USE_PARTITIONS, + bool keyed = DEFAULT_KEYED, + HistoryDepthType history_depth = DEFAULT_HISTORY_DEPTH, + float max_tx_rate = DEFAULT_MAX_TX_RATE, + float max_rx_rate = DEFAULT_MAX_RX_RATE, + unsigned int downsampling = DEFAULT_DOWNSAMPLING) noexcept; ///////////////////////// // VARIABLES ///////////////////////// - //! Durability kind (Default = VOLATILE) - DurabilityKind durability_qos = DurabilityKind::VOLATILE; + //! Durability kind + utils::Fuzzy durability_qos; - //! Reliability kind (Default = BEST_EFFORT) - ReliabilityKind reliability_qos = ReliabilityKind::BEST_EFFORT; + //! Reliability kind + utils::Fuzzy reliability_qos; //! Ownership kind of the topic - OwnershipQosPolicyKind ownership_qos = OwnershipQosPolicyKind::SHARED_OWNERSHIP_QOS; + utils::Fuzzy ownership_qos; //! Whether the topics uses partitions - bool use_partitions = false; - - /** - * @brief History Qos - * - * @note Default value would be taken from \c default_history_depth in object creation. - * @note It only stores the depth because in pipe it will always be keep last, as RTPS has not resource limits. - */ - HistoryDepthType history_depth = HISTORY_DEPTH_DEFAULT; + utils::Fuzzy use_partitions; //! Whether the topic has key or not - bool keyed = false; + utils::Fuzzy keyed; - //! Downsampling factor: keep 1 out of every *downsampling* samples received (downsampling=1 <=> no downsampling) - unsigned int downsampling = 1; + //! Depth of the history + utils::Fuzzy history_depth; + + //! Discard msgs if less than 1/rate seconds elapsed since the last sample was transmitted [Hz]. Default: 0 (no limit) + utils::Fuzzy max_tx_rate; //! Discard msgs if less than 1/rate seconds elapsed since the last sample was processed [Hz]. Default: 0 (no limit) - float max_reception_rate = 0; + utils::Fuzzy max_rx_rate; + + //! Downsampling factor: keep 1 out of every *downsampling* samples received (downsampling=1 <=> no downsampling) + utils::Fuzzy downsampling; + + ///////////////////////// + // GLOBAL VARIABLES + ///////////////////////// - static constexpr HistoryDepthType HISTORY_DEPTH_DEFAULT = 5000; + //! Global value to store the default Topic QoS in this execution. + DDSPIPE_CORE_DllAPI + static utils::Fuzzy default_topic_qos; + + ///////////////////////// + // DEFAULT VALUES + ///////////////////////// + + //! Durability kind (Default = VOLATILE) + DDSPIPE_CORE_DllAPI + static constexpr const DurabilityKind DEFAULT_DURABILITY_QOS = DurabilityKind::VOLATILE; + + //! Reliability kind (Default = BEST_EFFORT) + DDSPIPE_CORE_DllAPI + static constexpr const ReliabilityKind DEFAULT_RELIABILITY_QOS = ReliabilityKind::BEST_EFFORT; + + //! Ownership kind (Default = SHARED_OWNERSHIP) + DDSPIPE_CORE_DllAPI + static constexpr const OwnershipQosPolicyKind DEFAULT_OWNERSHIP_QOS = OwnershipQosPolicyKind::SHARED_OWNERSHIP_QOS; + + //! Whether the topic uses partitions (Default = False) + DDSPIPE_CORE_DllAPI + static constexpr const bool DEFAULT_USE_PARTITIONS = false; + + //! History depth (Default = 5000) + DDSPIPE_CORE_DllAPI + static constexpr const HistoryDepthType DEFAULT_HISTORY_DEPTH = 5000; + + //! Whether the topic has a key (Default = False) + DDSPIPE_CORE_DllAPI + static constexpr const bool DEFAULT_KEYED = false; + + //! Max Tx Rate (Default = 0) + DDSPIPE_CORE_DllAPI + static constexpr const float DEFAULT_MAX_TX_RATE = 0; + + //! Max Rx Rate (Default = 0) + DDSPIPE_CORE_DllAPI + static constexpr const float DEFAULT_MAX_RX_RATE = 0; + + //! Downsampling (Default = 1) + DDSPIPE_CORE_DllAPI + static constexpr const unsigned int DEFAULT_DOWNSAMPLING = 1; }; /** diff --git a/ddspipe_core/include/ddspipe_core/types/topic/Topic.hpp b/ddspipe_core/include/ddspipe_core/types/topic/Topic.hpp index 84c80ef8..d162730f 100644 --- a/ddspipe_core/include/ddspipe_core/types/topic/Topic.hpp +++ b/ddspipe_core/include/ddspipe_core/types/topic/Topic.hpp @@ -18,12 +18,14 @@ #include #include +#include -#include #include +#include +#include +#include #include #include -#include namespace eprosima { namespace ddspipe { @@ -61,6 +63,10 @@ struct Topic : public ITopic, public IConfiguration virtual bool operator <( const ITopic& other) const noexcept override; + DDSPIPE_CORE_DllAPI + virtual Topic& operator =( + const Topic& other) noexcept; + ///////////////////////// // METHODS ///////////////////////// @@ -80,6 +86,10 @@ struct Topic : public ITopic, public IConfiguration virtual bool is_valid( utils::Formatter& error_msg) const noexcept override; + //! Make a copy of the Topic + DDSPIPE_CORE_DllAPI + virtual utils::Heritable copy() const noexcept override; + ///////////////////////// // METHODS TO OVERRIDE ///////////////////////// @@ -115,6 +125,15 @@ struct Topic : public ITopic, public IConfiguration * @brief The id of the participant who discovered the topic. */ ParticipantId m_topic_discoverer {DEFAULT_PARTICIPANT_ID}; + + /** + * @brief The Topic QoS for the Topic. + * + * If the Topic is built-in, they take their default value. + * If the Topic isn't built-in, they take their value by discovery. + * If the Topic has manually configured Topic QoS, the Topic QoS that are manually configured get overriden. + */ + types::TopicQoS topic_qos{}; }; /** diff --git a/ddspipe_core/include/ddspipe_core/types/topic/dds/DdsTopic.hpp b/ddspipe_core/include/ddspipe_core/types/topic/dds/DdsTopic.hpp index 147a113f..a65b2caa 100644 --- a/ddspipe_core/include/ddspipe_core/types/topic/dds/DdsTopic.hpp +++ b/ddspipe_core/include/ddspipe_core/types/topic/dds/DdsTopic.hpp @@ -17,10 +17,10 @@ #include #include +#include #include #include -#include #include namespace eprosima { @@ -42,6 +42,14 @@ struct DdsTopic : public DistributedTopic DDSPIPE_CORE_DllAPI DdsTopic(); + ///////////////////////// + // OPERATORS + ///////////////////////// + + DDSPIPE_CORE_DllAPI + virtual DdsTopic& operator =( + const DdsTopic& other) noexcept; + ///////////////////////// // METHODS ///////////////////////// @@ -56,6 +64,10 @@ struct DdsTopic : public DistributedTopic DDSPIPE_CORE_DllAPI virtual std::string serialize() const noexcept override; + //! Make a copy of the Topic + DDSPIPE_CORE_DllAPI + virtual utils::Heritable copy() const noexcept override; + ///////////////////////// // STATIC METHODS ///////////////////////// @@ -73,14 +85,6 @@ struct DdsTopic : public DistributedTopic //! Topic Type name std::string type_name{}; - - /** - * @brief Topic QoS - * - * @todo this makes few sense here as the qos does not depend on the QoS itself but in the discovery of it. - * This Topic class is a proxy, not an actual Topic Entity of DDS, so it should not have QoS. - */ - types::TopicQoS topic_qos{}; }; } /* namespace types */ diff --git a/ddspipe_core/include/ddspipe_core/types/topic/filter/ManualTopic.hpp b/ddspipe_core/include/ddspipe_core/types/topic/filter/ManualTopic.hpp new file mode 100644 index 00000000..b6affc31 --- /dev/null +++ b/ddspipe_core/include/ddspipe_core/types/topic/filter/ManualTopic.hpp @@ -0,0 +1,31 @@ +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License\. + +#pragma once + +#include +#include +#include + +namespace eprosima { +namespace ddspipe { +namespace core { +namespace types { + +using ManualTopic = std::pair, std::set>; + +} /* namespace types */ +} /* namespace core */ +} /* namespace ddspipe */ +} /* namespace eprosima */ diff --git a/ddspipe_core/include/ddspipe_core/types/topic/filter/WildcardDdsFilterTopic.hpp b/ddspipe_core/include/ddspipe_core/types/topic/filter/WildcardDdsFilterTopic.hpp index c48c5326..d921c54c 100644 --- a/ddspipe_core/include/ddspipe_core/types/topic/filter/WildcardDdsFilterTopic.hpp +++ b/ddspipe_core/include/ddspipe_core/types/topic/filter/WildcardDdsFilterTopic.hpp @@ -20,6 +20,8 @@ #include #include +#include +#include #include #include @@ -69,12 +71,15 @@ struct WildcardDdsFilterTopic : public IFilterTopic // VARIABLES ///////////////////////// - //! Topic name filter + //! Topic name filter. utils::Fuzzy topic_name; //! Type name filter. If not set matches with all. utils::Fuzzy type_name; + //! The subset of manually configured Topic QoS. + utils::Fuzzy topic_qos; + protected: ///////////////////////// diff --git a/ddspipe_core/src/cpp/communication/dds/DdsBridge.cpp b/ddspipe_core/src/cpp/communication/dds/DdsBridge.cpp index 9e29a0d0..18b606cb 100644 --- a/ddspipe_core/src/cpp/communication/dds/DdsBridge.cpp +++ b/ddspipe_core/src/cpp/communication/dds/DdsBridge.cpp @@ -29,9 +29,11 @@ DdsBridge::DdsBridge( const std::shared_ptr& payload_pool, const std::shared_ptr& thread_pool, const RoutesConfiguration& routes_config, - const bool remove_unused_entities) + const bool remove_unused_entities, + const std::vector& manual_topics) : Bridge(participants_database, payload_pool, thread_pool) , topic_(topic) + , manual_topics_(manual_topics) { logDebug(DDSPIPE_DDSBRIDGE, "Creating DdsBridge " << *this << "."); @@ -138,7 +140,8 @@ void DdsBridge::create_all_tracks_() for (const auto& id : writers_to_create) { std::shared_ptr participant = participants_->get_participant(id); - writers[id] = participant->create_writer(*topic_); + const auto topic = create_topic_for_participant_nts_(participant); + writers[id] = participant->create_writer(*topic); } // Add the writers to the tracks they have routes for. @@ -154,7 +157,8 @@ void DdsBridge::create_writer( // Create the writer. std::shared_ptr participant = participants_->get_participant(participant_id); - std::shared_ptr writer = participant->create_writer(*topic_); + const auto topic = create_topic_for_participant_nts_(participant); + auto writer = participant->create_writer(*topic); // Add the writer to the tracks it has routes for. add_writer_to_tracks_nts_(participant_id, writer); @@ -259,7 +263,8 @@ void DdsBridge::add_writers_to_tracks_nts_( { // The track doesn't exist. Create it. std::shared_ptr participant = participants_->get_participant(id); - auto reader = participant->create_reader(*topic_); + const auto topic = create_topic_for_participant_nts_(participant); + auto reader = participant->create_reader(*topic); tracks_[id] = std::make_unique( topic_, @@ -277,6 +282,34 @@ void DdsBridge::add_writers_to_tracks_nts_( } } +utils::Heritable DdsBridge::create_topic_for_participant_nts_( + const std::shared_ptr& participant) noexcept +{ + // Make a copy of the Topic to customize it according to the Participant's configured QoS. + utils::Heritable topic = topic_->copy(); + + // Impose the Topic QoS that have been pre-configured for the Bridge's topic. + // set_qos only overwrites the Topic QoS that have been set with a lower FuzzyLevel. + // A Topic QoS set with fuzzy_level_hard (the highest FuzzyLevel) cannot be overwritten. + // Thus, the order matters. In this case, manual_topics[0] > manual_topics[1] > participant. + + // 1. Manually Configured Topic QoS. + for (const auto& manual_topic : manual_topics_) + { + const auto& participant_ids = manual_topic.second; + + if (participant_ids.empty() || participant_ids.count(participant->id())) + { + topic->topic_qos.set_qos(manual_topic.first->topic_qos, utils::FuzzyLevelValues::fuzzy_level_hard); + } + } + + // 2. Participant Topic QoS. + topic->topic_qos.set_qos(participant->topic_qos(), utils::FuzzyLevelValues::fuzzy_level_hard); + + return topic; +} + std::ostream& operator <<( std::ostream& os, const DdsBridge& bridge) diff --git a/ddspipe_core/src/cpp/configuration/DdsPipeConfiguration.cpp b/ddspipe_core/src/cpp/configuration/DdsPipeConfiguration.cpp index b666e647..39d30008 100644 --- a/ddspipe_core/src/cpp/configuration/DdsPipeConfiguration.cpp +++ b/ddspipe_core/src/cpp/configuration/DdsPipeConfiguration.cpp @@ -60,6 +60,23 @@ RoutesConfiguration DdsPipeConfiguration::get_routes_config( } } +std::vector DdsPipeConfiguration::get_manual_topics( + const core::ITopic& topic) const noexcept +{ + // Filter the manual topics to only return the ones that match with the given topic. + std::vector matching_manual_topics{}; + + for (const auto& manual_topic : manual_topics) + { + if (manual_topic.first->matches(topic)) + { + matching_manual_topics.push_back(manual_topic); + } + } + + return matching_manual_topics; +} + } /* namespace core */ } /* namespace ddspipe */ } /* namespace eprosima */ diff --git a/ddspipe_core/src/cpp/core/DdsPipe.cpp b/ddspipe_core/src/cpp/core/DdsPipe.cpp index 635653c2..20dfc491 100644 --- a/ddspipe_core/src/cpp/core/DdsPipe.cpp +++ b/ddspipe_core/src/cpp/core/DdsPipe.cpp @@ -29,21 +29,17 @@ namespace core { using namespace eprosima::ddspipe::core::types; DdsPipe::DdsPipe( - const std::shared_ptr& allowed_topics, + const DdsPipeConfiguration& configuration, const std::shared_ptr& discovery_database, const std::shared_ptr& payload_pool, const std::shared_ptr& participants_database, - const std::shared_ptr& thread_pool, - const std::set>& builtin_topics, /* = {} */ - bool start_enable, /* = false */ - const DdsPipeConfiguration& configuration /* = {} */) - : allowed_topics_(allowed_topics) + const std::shared_ptr& thread_pool) + : configuration_(configuration) , discovery_database_(discovery_database) , payload_pool_(payload_pool) , participants_database_(participants_database) , thread_pool_(thread_pool) , enabled_(false) - , configuration_(configuration) { logDebug(DDSPIPE, "Creating DDS Pipe."); @@ -56,6 +52,9 @@ DdsPipe::DdsPipe( "Configuration for DDS Pipe is invalid: " << error_msg); } + // Initialize the allowed topics + init_allowed_topics_(); + // Add callback to be called by the discovery database when an Endpoint is discovered discovery_database_->add_endpoint_discovered_callback(std::bind(&DdsPipe::discovered_endpoint_, this, std::placeholders::_1)); @@ -69,13 +68,13 @@ DdsPipe::DdsPipe( std::placeholders::_1)); // Create Bridges for builtin topics - init_bridges_nts_(builtin_topics); + init_bridges_nts_(configuration_.builtin_topics); // Enable thread pool thread_pool_->enable(); // Enable if set - if (start_enable) + if (configuration_.init_enabled) { enable(); } @@ -108,12 +107,93 @@ DdsPipe::~DdsPipe() // Destroy RpcBridges, so Writers and Readers are destroyed before the Databases rpc_bridges_.clear(); - // There is no need to destroy shared ptrs as they will delete itslefs with 0 references + // There is no need to destroy shared pointers. + // They self-destruct when they have 0 references. logDebug(DDSPIPE, "DDS Pipe destroyed."); } -utils::ReturnCode DdsPipe::reload_allowed_topics( +utils::ReturnCode DdsPipe::reload_configuration( + const DdsPipeConfiguration& new_configuration) +{ + // Check that the configuration is correct + utils::Formatter error_msg; + if (!new_configuration.is_valid(error_msg, participants_database_->get_participants_repeater_map())) + { + throw utils::ConfigurationException( + utils::Formatter() << + "Configuration for Reload DDS Pipe is invalid: " << error_msg); + } + + auto allowed_topics = std::make_shared( + new_configuration.allowlist, + new_configuration.blocklist); + + return reload_allowed_topics_(allowed_topics); +} + +utils::ReturnCode DdsPipe::enable() noexcept +{ + std::lock_guard lock(mutex_); + + if (!enabled_) + { + enabled_ = true; + + logInfo(DDSPIPE, "Enabling DDS Pipe."); + + activate_all_topics_nts_(); + + // Enable services discovered while pipe disabled + for (auto it : current_services_) + { + // Enable only allowed services + if (it.second) + { + rpc_bridges_[it.first]->enable(); + } + } + + return utils::ReturnCode::RETCODE_OK; + } + else + { + logInfo(DDSPIPE, "Trying to enable an already enabled DDS Pipe."); + return utils::ReturnCode::RETCODE_PRECONDITION_NOT_MET; + } +} + +utils::ReturnCode DdsPipe::disable() noexcept +{ + std::lock_guard lock(mutex_); + + if (enabled_) + { + enabled_ = false; + + logInfo(DDSPIPE, "Disabling DDS Pipe."); + + deactivate_all_topics_nts_(); + + return utils::ReturnCode::RETCODE_OK; + } + else + { + logInfo(DDSPIPE, "Trying to disable a disabled DDS Pipe."); + return utils::ReturnCode::RETCODE_PRECONDITION_NOT_MET; + } +} + +void DdsPipe::init_allowed_topics_() +{ + allowed_topics_ = std::make_shared( + configuration_.allowlist, + configuration_.blocklist); + + logInfo(DDSROUTER, "DDS Router configured with allowed topics: " << *allowed_topics_); +} + +utils::ReturnCode DdsPipe::reload_allowed_topics_( const std::shared_ptr& allowed_topics) { std::lock_guard lock(mutex_); @@ -176,58 +256,6 @@ utils::ReturnCode DdsPipe::reload_allowed_topics( return utils::ReturnCode::RETCODE_OK; } -utils::ReturnCode DdsPipe::enable() noexcept -{ - std::lock_guard lock(mutex_); - - if (!enabled_) - { - enabled_ = true; - - logInfo(DDSPIPE, "Enabling DDS Pipe."); - - activate_all_topics_nts_(); - - // Enable services discovered while pipe disabled - for (auto it : current_services_) - { - // Enable only allowed services - if (it.second) - { - rpc_bridges_[it.first]->enable(); - } - } - - return utils::ReturnCode::RETCODE_OK; - } - else - { - logInfo(DDSPIPE, "Trying to enable an already enabled DDS Pipe."); - return utils::ReturnCode::RETCODE_PRECONDITION_NOT_MET; - } -} - -utils::ReturnCode DdsPipe::disable() noexcept -{ - std::lock_guard lock(mutex_); - - if (enabled_) - { - enabled_ = false; - - logInfo(DDSPIPE, "Disabling DDS Pipe."); - - deactivate_all_topics_nts_(); - - return utils::ReturnCode::RETCODE_OK; - } - else - { - logInfo(DDSPIPE, "Trying to disable a disabled DDS Pipe."); - return utils::ReturnCode::RETCODE_PRECONDITION_NOT_MET; - } -} - void DdsPipe::discovered_endpoint_( const Endpoint& endpoint) noexcept { @@ -381,7 +409,7 @@ void DdsPipe::discovered_topic_nts_( return; } - // Add topic to current_topics as non activated + // Add topic to current_topics as not activated current_topics_.emplace(topic, false); // If Pipe is enabled and topic allowed, activate it @@ -445,8 +473,8 @@ void DdsPipe::create_new_bridge_nts_( try { - auto routes_config = configuration_.get_routes_config(topic); + auto manual_topics = configuration_.get_manual_topics(dynamic_cast(*topic)); // Create bridge instance auto new_bridge = std::make_unique(topic, @@ -454,7 +482,8 @@ void DdsPipe::create_new_bridge_nts_( payload_pool_, thread_pool_, routes_config, - configuration_.remove_unused_entities); + configuration_.remove_unused_entities, + manual_topics); if (enabled) { diff --git a/ddspipe_core/src/cpp/types/dds/TopicQoS.cpp b/ddspipe_core/src/cpp/types/dds/TopicQoS.cpp index 229f7108..41e2f6a8 100644 --- a/ddspipe_core/src/cpp/types/dds/TopicQoS.cpp +++ b/ddspipe_core/src/cpp/types/dds/TopicQoS.cpp @@ -25,32 +25,33 @@ namespace ddspipe { namespace core { namespace types { -std::atomic TopicQoS::default_history_depth{HISTORY_DEPTH_DEFAULT}; -std::atomic TopicQoS::default_downsampling{1}; -std::atomic TopicQoS::default_max_reception_rate{0}; +utils::Fuzzy TopicQoS::default_topic_qos{}; TopicQoS::TopicQoS() { - // Set history by default - history_depth = default_history_depth; - // Set downsampling by default - downsampling = default_downsampling; - // Set max reception rate by default - max_reception_rate = default_max_reception_rate; + set_default_qos(); + + // This check must be done. If not, the constructor of the default Topic QoS would enter into a loop. + if (default_topic_qos.is_set()) + { + // The FuzzyLevel must be set so that specs overwrites the discovery values. + set_qos(default_topic_qos, utils::FuzzyLevelValues::fuzzy_level_set); + } } bool TopicQoS::operator ==( const TopicQoS& other) const noexcept { return - this->reliability_qos == other.reliability_qos && this->durability_qos == other.durability_qos && - this->history_depth == other.history_depth && + this->reliability_qos == other.reliability_qos && this->ownership_qos == other.ownership_qos && this->use_partitions == other.use_partitions && + this->history_depth == other.history_depth && this->keyed == other.keyed && - this->downsampling == other.downsampling && - this->max_reception_rate == other.max_reception_rate; + this->max_tx_rate == other.max_tx_rate && + this->max_rx_rate == other.max_rx_rate && + this->downsampling == other.downsampling; } bool TopicQoS::is_reliable() const noexcept @@ -73,6 +74,79 @@ bool TopicQoS::has_partitions() const noexcept return use_partitions; } +void TopicQoS::set_qos( + const TopicQoS& qos, + const utils::FuzzyLevelValues& fuzzy_level /*= utils::FuzzyLevelValues::fuzzy_level_fuzzy*/) noexcept +{ + if (durability_qos.get_level() < fuzzy_level && qos.durability_qos.is_set()) + { + durability_qos.set_value(qos.durability_qos.get_value(), fuzzy_level); + } + + if (reliability_qos.get_level() < fuzzy_level && qos.reliability_qos.is_set()) + { + reliability_qos.set_value(qos.reliability_qos.get_value(), fuzzy_level); + } + + if (ownership_qos.get_level() < fuzzy_level && qos.ownership_qos.is_set()) + { + ownership_qos.set_value(qos.ownership_qos.get_value(), fuzzy_level); + } + + if (use_partitions.get_level() < fuzzy_level && qos.use_partitions.is_set()) + { + use_partitions.set_value(qos.use_partitions.get_value(), fuzzy_level); + } + + if (history_depth.get_level() < fuzzy_level && qos.history_depth.is_set()) + { + history_depth.set_value(qos.history_depth.get_value(), fuzzy_level); + } + + if (keyed.get_level() < fuzzy_level && qos.keyed.is_set()) + { + keyed.set_value(qos.keyed.get_value(), fuzzy_level); + } + + if (max_tx_rate.get_level() < fuzzy_level && qos.max_tx_rate.is_set()) + { + max_tx_rate.set_value(qos.max_tx_rate.get_value(), fuzzy_level); + } + + if (max_rx_rate.get_level() < fuzzy_level && qos.max_rx_rate.is_set()) + { + max_rx_rate.set_value(qos.max_rx_rate.get_value(), fuzzy_level); + } + + if (downsampling.get_level() < fuzzy_level && qos.downsampling.is_set()) + { + downsampling.set_value(qos.downsampling.get_value(), fuzzy_level); + } +} + +void TopicQoS::set_default_qos( + DurabilityKind durability_qos /*= DEFAULT_DURABILITY_QOS */, + ReliabilityKind reliability_qos /*= DEFAULT_RELIABILITY_QOS */, + OwnershipQosPolicyKind ownership_qos /*= DEFAULT_OWNERSHIP_QOS */, + bool use_partitions /*= DEFAULT_USE_PARTITIONS */, + bool keyed /*= DEFAULT_KEYED */, + HistoryDepthType history_depth /*= DEFAULT_HISTORY_DEPTH */, + float max_tx_rate /*= DEFAULT_MAX_TX_RATE */, + float max_rx_rate /*= DEFAULT_MAX_RX_RATE */, + unsigned int downsampling /*= DEFAULT_DOWNSAMPLING */) noexcept +{ + // The default values must be received as arguments. Otherwise, Ubuntu 20.04 Debug does not compile. + this->durability_qos.set_value(durability_qos, utils::FuzzyLevelValues::fuzzy_level_default); + this->reliability_qos.set_value(reliability_qos, utils::FuzzyLevelValues::fuzzy_level_default); + this->ownership_qos.set_value(ownership_qos, utils::FuzzyLevelValues::fuzzy_level_default); + this->use_partitions.set_value(use_partitions, utils::FuzzyLevelValues::fuzzy_level_default); + this->history_depth.set_value(history_depth, utils::FuzzyLevelValues::fuzzy_level_default); + this->keyed.set_value(keyed, utils::FuzzyLevelValues::fuzzy_level_default); + this->max_tx_rate.set_value(max_tx_rate, utils::FuzzyLevelValues::fuzzy_level_default); + this->max_rx_rate.set_value(max_rx_rate, utils::FuzzyLevelValues::fuzzy_level_default); + this->downsampling.set_value(downsampling, utils::FuzzyLevelValues::fuzzy_level_default); +} + std::ostream& operator <<( std::ostream& os, const DurabilityKind& kind) @@ -158,8 +232,9 @@ std::ostream& operator <<( (qos.has_partitions() ? ";partitions" : "") << (qos.keyed ? ";keyed" : "") << ";depth(" << qos.history_depth << ")" << + ";max_tx_rate(" << qos.max_tx_rate << ")" << + ";max_rx_rate(" << qos.max_rx_rate << ")" << ";downsampling(" << qos.downsampling << ")" << - ";max_reception_rate(" << qos.max_reception_rate << ")" << "}"; return os; diff --git a/ddspipe_core/src/cpp/types/topic/Topic.cpp b/ddspipe_core/src/cpp/types/topic/Topic.cpp index 0034e11a..1d9ad8cb 100644 --- a/ddspipe_core/src/cpp/types/topic/Topic.cpp +++ b/ddspipe_core/src/cpp/types/topic/Topic.cpp @@ -31,8 +31,7 @@ namespace types { bool Topic::operator ==( const ITopic& other) const noexcept { - return - topic_unique_name() == other.topic_unique_name(); + return topic_unique_name() == other.topic_unique_name(); } bool Topic::operator <( @@ -85,6 +84,12 @@ bool Topic::is_valid( return true; } +utils::Heritable Topic::copy() const noexcept +{ + Topic topic = *this; + return utils::Heritable::make_heritable(topic); +} + ///////////////////////// // METHODS TO OVERRIDE ///////////////////////// @@ -107,6 +112,21 @@ std::ostream& operator <<( return os; } +///////////////////////// +// OPERATORS +///////////////////////// + +Topic& Topic::operator = ( + const Topic& other) noexcept +{ + this->m_topic_name = other.m_topic_name; + this->m_internal_type_discriminator = other.m_internal_type_discriminator; + this->m_topic_discoverer = other.m_topic_discoverer; + this->topic_qos = other.topic_qos; + + return *this; +} + } /* namespace types */ } /* namespace core */ } /* namespace ddspipe */ diff --git a/ddspipe_core/src/cpp/types/topic/dds/DdsTopic.cpp b/ddspipe_core/src/cpp/types/topic/dds/DdsTopic.cpp index 72b33c35..b30b404f 100644 --- a/ddspipe_core/src/cpp/types/topic/dds/DdsTopic.cpp +++ b/ddspipe_core/src/cpp/types/topic/dds/DdsTopic.cpp @@ -61,6 +61,12 @@ std::string DdsTopic::serialize() const noexcept return ss.str(); } +utils::Heritable DdsTopic::copy() const noexcept +{ + DdsTopic topic = *this; + return utils::Heritable::make_heritable(topic); +} + ///////////////////////// // STATIC METHODS ///////////////////////// @@ -105,6 +111,20 @@ bool DdsTopic::is_valid_dds_topic( return true; } +///////////////////////// +// OPERATORS +///////////////////////// + +DdsTopic& DdsTopic::operator = ( + const DdsTopic& other) noexcept +{ + Topic::operator =(other); + + this->type_name = other.type_name; + + return *this; +} + } /* namespace types */ } /* namespace core */ } /* namespace ddspipe */ diff --git a/ddspipe_core/test/unittest/core/ddspipe/CMakeLists.txt b/ddspipe_core/test/unittest/core/ddspipe/CMakeLists.txt index a7836598..b5e24914 100644 --- a/ddspipe_core/test/unittest/core/ddspipe/CMakeLists.txt +++ b/ddspipe_core/test/unittest/core/ddspipe/CMakeLists.txt @@ -22,6 +22,7 @@ all_library_sources("${TEST_SOURCES}") set(TEST_LIST default_initialization enable_disable + allowed_blocked_topics ) set(TEST_EXTRA_LIBRARIES diff --git a/ddspipe_core/test/unittest/core/ddspipe/DdsPipeTest.cpp b/ddspipe_core/test/unittest/core/ddspipe/DdsPipeTest.cpp index 29a1db48..59a71b37 100644 --- a/ddspipe_core/test/unittest/core/ddspipe/DdsPipeTest.cpp +++ b/ddspipe_core/test/unittest/core/ddspipe/DdsPipeTest.cpp @@ -17,8 +17,10 @@ #include +#include #include #include +#include using namespace eprosima::ddspipe::core; @@ -105,8 +107,10 @@ TEST(DdsPipeTest, default_initialization) { // default { + DdsPipeConfiguration ddspipe_configuration; + test::DdsPipe ddspipe( - std::make_shared(), + ddspipe_configuration, std::make_shared(), std::make_shared(), std::make_shared(), @@ -118,14 +122,15 @@ TEST(DdsPipeTest, default_initialization) // enable { + DdsPipeConfiguration ddspipe_configuration; + ddspipe_configuration.init_enabled = true; + test::DdsPipe ddspipe( - std::make_shared(), + ddspipe_configuration, std::make_shared(), std::make_shared(), std::make_shared(), - std::make_shared(test::N_THREADS), - {}, - true + std::make_shared(test::N_THREADS) ); ASSERT_TRUE(ddspipe.is_enabled()); @@ -139,15 +144,15 @@ TEST(DdsPipeTest, default_initialization) eprosima::utils::Heritable htopic_1 = eprosima::utils::Heritable::make_heritable(topic_1); + DdsPipeConfiguration ddspipe_configuration; + ddspipe_configuration.builtin_topics.insert(htopic_1); + test::DdsPipe ddspipe( - std::make_shared(), + ddspipe_configuration, std::make_shared(), std::make_shared(), std::make_shared(), - std::make_shared(test::N_THREADS), - { - htopic_1 - } + std::make_shared(test::N_THREADS) ); ASSERT_FALSE(ddspipe.is_enabled()); @@ -165,16 +170,16 @@ TEST(DdsPipeTest, default_initialization) eprosima::utils::Heritable htopic_1 = eprosima::utils::Heritable::make_heritable(topic_1); + DdsPipeConfiguration ddspipe_configuration; + ddspipe_configuration.builtin_topics.insert(htopic_1); + ddspipe_configuration.init_enabled = true; + test::DdsPipe ddspipe( - std::make_shared(), + ddspipe_configuration, std::make_shared(), std::make_shared(), std::make_shared(), - std::make_shared(test::N_THREADS), - { - htopic_1 - }, - true + std::make_shared(test::N_THREADS) ); ASSERT_TRUE(ddspipe.is_enabled()); @@ -196,9 +201,12 @@ TEST(DdsPipeTest, enable_disable) { // enable discovered topic { + DdsPipeConfiguration ddspipe_configuration; + auto discovery_database = std::make_shared(); + test::DdsPipe ddspipe( - std::make_shared(), + ddspipe_configuration, discovery_database, std::make_shared(), std::make_shared(), @@ -249,14 +257,15 @@ TEST(DdsPipeTest, enable_disable) eprosima::utils::Heritable htopic_1 = eprosima::utils::Heritable::make_heritable(topic_1); - auto discovery_database = std::make_shared(); + DdsPipeConfiguration ddspipe_configuration; + ddspipe_configuration.builtin_topics.insert(htopic_1); + test::DdsPipe ddspipe( - std::make_shared(), + ddspipe_configuration, std::make_shared(), std::make_shared(), std::make_shared(), - std::make_shared(test::N_THREADS), - {htopic_1} + std::make_shared(test::N_THREADS) ); ASSERT_TRUE(ddspipe.is_topic_discovered(htopic_1)); @@ -278,19 +287,110 @@ TEST(DdsPipeTest, enable_disable) } /** - * TODO + * Test the DDS Pipe's allowlist and blocklist + * + * CASES: + * - check that a topic is allowed when the allowed topics are empty. + * - check that a topic is allowed when its in the allowlist. + * - check that a topic is blocked when its in the blocklist. + * - check that a topic is allowed when it gets removed from the blocklist. */ TEST(DdsPipeTest, allowed_blocked_topics) { - // TODO -} + { + DdsPipeConfiguration ddspipe_configuration; + ddspipe_configuration.init_enabled = true; -/** - * TODO - */ -TEST(DdsPipeTest, reload) -{ - // TODO + auto discovery_database = std::make_shared(); + + test::DdsPipe ddspipe( + ddspipe_configuration, + discovery_database, + std::make_shared(), + std::make_shared(), + std::make_shared(test::N_THREADS) + ); + + types::DdsTopic topic_1; + topic_1.m_topic_name = "topic1"; + topic_1.type_name = "type1"; + eprosima::utils::Heritable htopic_1 = + eprosima::utils::Heritable::make_heritable(topic_1); + + ASSERT_FALSE(ddspipe.is_topic_discovered(htopic_1)); + ASSERT_FALSE(ddspipe.is_topic_active(htopic_1)); + ASSERT_FALSE(ddspipe.is_bridge_created(htopic_1)); + + // Create a wildcard topic + types::WildcardDdsFilterTopic topic_2; + topic_2.topic_name.set_value("t*"); + + eprosima::utils::Heritable htopic_2 = + eprosima::utils::Heritable::make_heritable(topic_2); + + // Add the Topic to the blocklist + DdsPipeConfiguration new_ddspipe_configuration; + new_ddspipe_configuration.blocklist.insert(htopic_2); + ddspipe.reload_configuration(new_ddspipe_configuration); + + types::Endpoint endpoint_1; + endpoint_1.kind = types::EndpointKind::reader; + endpoint_1.topic = topic_1; + + discovery_database->add_endpoint(endpoint_1); + + // Wait a bit for callback to arrive + eprosima::utils::sleep_for(10u); + + ASSERT_TRUE(ddspipe.is_topic_discovered(htopic_1)); + ASSERT_FALSE(ddspipe.is_topic_active(htopic_1)); + ASSERT_FALSE(ddspipe.is_bridge_created(htopic_1)); + + // Remove the Topic from the blocklist + new_ddspipe_configuration.blocklist.erase(htopic_2); + ddspipe.reload_configuration(new_ddspipe_configuration); + + ASSERT_TRUE(ddspipe.is_topic_discovered(htopic_1)); + ASSERT_TRUE(ddspipe.is_topic_active(htopic_1)); + ASSERT_TRUE(ddspipe.is_bridge_created(htopic_1)); + + // Create two Wildcard Topics + types::WildcardDdsFilterTopic topic_3; + topic_3.topic_name.set_value("topic*"); + + eprosima::utils::Heritable htopic_3 = + eprosima::utils::Heritable::make_heritable(topic_3); + + types::WildcardDdsFilterTopic topic_4; + topic_4.topic_name.set_value("top*"); + + eprosima::utils::Heritable htopic_4 = + eprosima::utils::Heritable::make_heritable(topic_4); + + // Add the Topic to the allowlist + new_ddspipe_configuration.allowlist.insert(htopic_3); + ddspipe.reload_configuration(new_ddspipe_configuration); + + ASSERT_TRUE(ddspipe.is_topic_discovered(htopic_1)); + ASSERT_TRUE(ddspipe.is_topic_active(htopic_1)); + ASSERT_TRUE(ddspipe.is_bridge_created(htopic_1)); + + // Add the Topic to the blocklist + new_ddspipe_configuration.blocklist.insert(htopic_4); + ddspipe.reload_configuration(new_ddspipe_configuration); + + ASSERT_TRUE(ddspipe.is_topic_discovered(htopic_1)); + ASSERT_FALSE(ddspipe.is_topic_active(htopic_1)); + ASSERT_TRUE(ddspipe.is_bridge_created(htopic_1)); + + // Remove the Topic from the blocklist + new_ddspipe_configuration.blocklist.erase(htopic_4); + ddspipe.reload_configuration(new_ddspipe_configuration); + + ASSERT_TRUE(ddspipe.is_topic_discovered(htopic_1)); + ASSERT_TRUE(ddspipe.is_topic_active(htopic_1)); + ASSERT_TRUE(ddspipe.is_bridge_created(htopic_1)); + } } int main( diff --git a/ddspipe_participants/include/ddspipe_participants/configuration/ParticipantConfiguration.hpp b/ddspipe_participants/include/ddspipe_participants/configuration/ParticipantConfiguration.hpp index fc95a3a4..bcc671d6 100644 --- a/ddspipe_participants/include/ddspipe_participants/configuration/ParticipantConfiguration.hpp +++ b/ddspipe_participants/include/ddspipe_participants/configuration/ParticipantConfiguration.hpp @@ -14,9 +14,10 @@ #pragma once +#include +#include #include -#include #include namespace eprosima { @@ -42,11 +43,14 @@ struct ParticipantConfiguration : public core::IConfiguration virtual bool is_valid( utils::Formatter& error_msg) const noexcept override; - //! Participant Id associated with this configuration + //! Participant Id associated with this configuration. core::types::ParticipantId id {}; //! Whether this Participant should connect its readers with its writers. bool is_repeater {false}; + + //! The Topic QoS that have been manually configured for the Participant. + core::types::TopicQoS topic_qos{}; }; } /* namespace participants */ diff --git a/ddspipe_participants/include/ddspipe_participants/participant/auxiliar/BlankParticipant.hpp b/ddspipe_participants/include/ddspipe_participants/participant/auxiliar/BlankParticipant.hpp index 4e62b19e..ad8922c3 100644 --- a/ddspipe_participants/include/ddspipe_participants/participant/auxiliar/BlankParticipant.hpp +++ b/ddspipe_participants/include/ddspipe_participants/participant/auxiliar/BlankParticipant.hpp @@ -15,6 +15,7 @@ #pragma once #include +#include #include @@ -50,6 +51,10 @@ class BlankParticipant : public core::IParticipant DDSPIPE_PARTICIPANTS_DllAPI bool is_rtps_kind() const noexcept override; + //! Override topic_qos() IParticipant method + DDSPIPE_PARTICIPANTS_DllAPI + core::types::TopicQoS topic_qos() const noexcept override; + //! Override create_writer() IParticipant method DDSPIPE_PARTICIPANTS_DllAPI std::shared_ptr create_writer( diff --git a/ddspipe_participants/include/ddspipe_participants/participant/dds/CommonParticipant.hpp b/ddspipe_participants/include/ddspipe_participants/participant/dds/CommonParticipant.hpp index 5d02ba38..5db52154 100644 --- a/ddspipe_participants/include/ddspipe_participants/participant/dds/CommonParticipant.hpp +++ b/ddspipe_participants/include/ddspipe_participants/participant/dds/CommonParticipant.hpp @@ -27,6 +27,7 @@ #include #include #include +#include #include #include @@ -80,6 +81,9 @@ class CommonParticipant : public core::IParticipant, public fastdds::dds::Domain DDSPIPE_PARTICIPANTS_DllAPI virtual bool is_repeater() const noexcept override; + DDSPIPE_PARTICIPANTS_DllAPI + core::types::TopicQoS topic_qos() const noexcept override; + /** * @brief Create a writer object * diff --git a/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/SchemaParticipant.hpp b/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/SchemaParticipant.hpp index 0e7eca88..621bfadb 100644 --- a/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/SchemaParticipant.hpp +++ b/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/SchemaParticipant.hpp @@ -52,6 +52,10 @@ class SchemaParticipant : public core::IParticipant DDSPIPE_PARTICIPANTS_DllAPI bool is_rtps_kind() const noexcept override; + //! Override topic_qos() IParticipant method + DDSPIPE_PARTICIPANTS_DllAPI + core::types::TopicQoS topic_qos() const noexcept override; + //! Override create_writer_() IParticipant method DDSPIPE_PARTICIPANTS_DllAPI std::shared_ptr create_writer( @@ -64,6 +68,9 @@ class SchemaParticipant : public core::IParticipant protected: + //! Participant Configuration + std::shared_ptr configuration_; + //! Participant Id core::types::ParticipantId id_; diff --git a/ddspipe_participants/include/ddspipe_participants/participant/rtps/CommonParticipant.hpp b/ddspipe_participants/include/ddspipe_participants/participant/rtps/CommonParticipant.hpp index a4143bee..f39d0332 100644 --- a/ddspipe_participants/include/ddspipe_participants/participant/rtps/CommonParticipant.hpp +++ b/ddspipe_participants/include/ddspipe_participants/participant/rtps/CommonParticipant.hpp @@ -14,6 +14,8 @@ #pragma once +#include + #include #include #include @@ -22,11 +24,12 @@ #include #include - #include #include #include #include +#include +#include #include #include @@ -92,6 +95,10 @@ class CommonParticipant DDSPIPE_PARTICIPANTS_DllAPI virtual bool is_rtps_kind() const noexcept override; + //! Implement parent method \c topic_qos . + DDSPIPE_PARTICIPANTS_DllAPI + core::types::TopicQoS topic_qos() const noexcept override; + /** * @brief Create a writer object * diff --git a/ddspipe_participants/include/ddspipe_participants/reader/auxiliar/BaseReader.hpp b/ddspipe_participants/include/ddspipe_participants/reader/auxiliar/BaseReader.hpp index 47c203e5..3754f499 100644 --- a/ddspipe_participants/include/ddspipe_participants/reader/auxiliar/BaseReader.hpp +++ b/ddspipe_participants/include/ddspipe_participants/reader/auxiliar/BaseReader.hpp @@ -17,6 +17,8 @@ #include #include +#include + #include #include #include @@ -156,7 +158,9 @@ class BaseReader : public core::IReader * @param participant_id parent participant id */ BaseReader( - const core::types::ParticipantId& participant_id); + const core::types::ParticipantId& participant_id, + const float max_rx_rate = 0, + const unsigned int downsampling = 1); ///////////////////////// // PROTECTED METHODS @@ -197,6 +201,13 @@ class BaseReader : public core::IReader virtual utils::ReturnCode take_nts_( std::unique_ptr& data) noexcept = 0; + /** + * @brief Check the \c max_rx_rate and the \c downsampling to decide whether a sample should be processed. + * + * Implement this method in every inherited Reader class with take functionality. + */ + virtual bool should_accept_sample_() noexcept; + ///////////////////////// // INTERNAL VARIABLES ///////////////////////// @@ -204,6 +215,12 @@ class BaseReader : public core::IReader //! Participant parent ID const core::types::ParticipantId participant_id_; + //! Max reception rate + float max_rx_rate_; + + //! Downsampling value + unsigned int downsampling_; + //! Lambda to call the callback whenever a new data arrives std::function on_data_available_lambda_; @@ -216,6 +233,15 @@ class BaseReader : public core::IReader //! Mutex that guards every access to the Reader mutable std::recursive_mutex mutex_; + //! Counter used to keep only 1 sample of every N received, with N being the topic's downsampling factor. + unsigned int downsampling_idx_ = 0; + + //! Reception timestamp of the last received (and processed) message, upon which max reception rate can be applied. + utils::Timestamp last_received_ts_ = utils::the_beginning_of_time(); + + //! Minimum time [ns] between received samples required to be processed (0 <=> no restriction). + std::chrono::nanoseconds min_intersample_period_ = std::chrono::nanoseconds(0); + //! Default callback. It shows a warning that callback is not set static const std::function DEFAULT_ON_DATA_AVAILABLE_CALLBACK; diff --git a/ddspipe_participants/include/ddspipe_participants/reader/dds/CommonReader.hpp b/ddspipe_participants/include/ddspipe_participants/reader/dds/CommonReader.hpp index 41b928c3..f1e33445 100644 --- a/ddspipe_participants/include/ddspipe_participants/reader/dds/CommonReader.hpp +++ b/ddspipe_participants/include/ddspipe_participants/reader/dds/CommonReader.hpp @@ -136,6 +136,10 @@ class CommonReader : public BaseReader, public fastdds::dds::DataReaderListener fastdds::dds::DataReaderQos reckon_reader_qos_() const; + //! Whether a sample received should be processed + virtual bool should_accept_sample_( + const fastdds::dds::SampleInfo& info) noexcept; + virtual void fill_received_data_( const fastdds::dds::SampleInfo& info, core::types::RtpsPayloadData& data_to_fill) const noexcept; diff --git a/ddspipe_participants/include/ddspipe_participants/reader/rtps/CommonReader.hpp b/ddspipe_participants/include/ddspipe_participants/reader/rtps/CommonReader.hpp index a0ca6c62..478bae10 100644 --- a/ddspipe_participants/include/ddspipe_participants/reader/rtps/CommonReader.hpp +++ b/ddspipe_participants/include/ddspipe_participants/reader/rtps/CommonReader.hpp @@ -271,8 +271,7 @@ class CommonReader : public BaseReader, public fastrtps::rtps::ReaderListener // CommonReader specific methods //! Whether a change received should be processed - DDSPIPE_PARTICIPANTS_DllAPI - virtual bool accept_change_( + virtual bool should_accept_change_( const fastrtps::rtps::CacheChange_t* change) noexcept; //! Whether a change received is from this Participant (to avoid auto-feedback) @@ -317,15 +316,6 @@ class CommonReader : public BaseReader, public fastrtps::rtps::ReaderListener //! Reader QoS to create the internal RTPS Reader. fastrtps::ReaderQos reader_qos_; - - //! Counter used to keep only 1 sample of every N received, with N being the topic's downsampling factor. - unsigned int downsampling_idx_ = 0; - - // ! Reception timestamp of the last received (and processed) message, upon which max reception rate can be applied. - utils::Timestamp last_received_ts_ = utils::the_beginning_of_time(); - - //! Minimum time [ns] between received samples required to be processed (0 <=> no restriction). - std::chrono::nanoseconds min_intersample_period_ = std::chrono::nanoseconds(0); }; } /* namespace rtps */ diff --git a/ddspipe_participants/include/ddspipe_participants/testing/entities/mock_entities.hpp b/ddspipe_participants/include/ddspipe_participants/testing/entities/mock_entities.hpp index e9abb193..cd6283ba 100644 --- a/ddspipe_participants/include/ddspipe_participants/testing/entities/mock_entities.hpp +++ b/ddspipe_participants/include/ddspipe_participants/testing/entities/mock_entities.hpp @@ -17,8 +17,9 @@ #include #include -#include +#include #include +#include #include #include @@ -152,6 +153,9 @@ class MockTopic : public core::types::DistributedTopic DDSPIPE_PARTICIPANTS_DllAPI core::types::TopicInternalTypeDiscriminator internal_type_discriminator() const noexcept override; + + DDSPIPE_PARTICIPANTS_DllAPI + virtual utils::Heritable copy() const noexcept override; }; class MockFilterAllTopic : public core::types::IFilterTopic diff --git a/ddspipe_participants/include/ddspipe_participants/writer/auxiliar/BaseWriter.hpp b/ddspipe_participants/include/ddspipe_participants/writer/auxiliar/BaseWriter.hpp index e0706a2b..03f4bb65 100644 --- a/ddspipe_participants/include/ddspipe_participants/writer/auxiliar/BaseWriter.hpp +++ b/ddspipe_participants/include/ddspipe_participants/writer/auxiliar/BaseWriter.hpp @@ -17,6 +17,8 @@ #include #include +#include + #include #include #include @@ -100,7 +102,8 @@ class BaseWriter : public core::IWriter */ DDSPIPE_PARTICIPANTS_DllAPI BaseWriter( - const core::types::ParticipantId& participant_id); + const core::types::ParticipantId& participant_id, + const float max_tx_rate = 0); ///////////////////////// // METHODS TO IMPLEMENT BY SUBCLASSES @@ -130,6 +133,11 @@ class BaseWriter : public core::IWriter virtual utils::ReturnCode write_nts_( core::IRoutingData& data) noexcept = 0; + /** + * @brief Check the \c max_tx_rate to decide whether a sample should be sent. + */ + bool should_send_sample_() noexcept; + ///////////////////////// // INTERNAL VARIABLES ///////////////////////// @@ -137,12 +145,21 @@ class BaseWriter : public core::IWriter //! Participant parent ID const core::types::ParticipantId participant_id_; + // Max transmission rate + float max_tx_rate_; + //! Whether the Writer is currently enabled std::atomic enabled_; //! Mutex that guards every access to the Writer mutable std::recursive_mutex mutex_; + //! Timestamp of the last sent message. + utils::Timestamp last_sent_ts_ = utils::the_beginning_of_time(); + + //! Minimum time [ns] between sent samples required to be processed (0 <=> no restriction). + std::chrono::nanoseconds min_intersample_period_ = std::chrono::nanoseconds(0); + // Allow operator << to use private variables friend std::ostream& operator <<( std::ostream&, diff --git a/ddspipe_participants/src/cpp/participant/auxiliar/BlankParticipant.cpp b/ddspipe_participants/src/cpp/participant/auxiliar/BlankParticipant.cpp index 4d6539f7..8027d5e0 100644 --- a/ddspipe_participants/src/cpp/participant/auxiliar/BlankParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/auxiliar/BlankParticipant.cpp @@ -42,6 +42,12 @@ bool BlankParticipant::is_rtps_kind() const noexcept return false; } +core::types::TopicQoS BlankParticipant::topic_qos() const noexcept +{ + core::types::TopicQoS m_topic_qos; + return m_topic_qos; +} + std::shared_ptr BlankParticipant::create_writer( const core::ITopic& topic) { diff --git a/ddspipe_participants/src/cpp/participant/dds/CommonParticipant.cpp b/ddspipe_participants/src/cpp/participant/dds/CommonParticipant.cpp index ffb60061..a4c725de 100644 --- a/ddspipe_participants/src/cpp/participant/dds/CommonParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/dds/CommonParticipant.cpp @@ -111,6 +111,11 @@ bool CommonParticipant::is_repeater() const noexcept return false; } +core::types::TopicQoS CommonParticipant::topic_qos() const noexcept +{ + return configuration_->topic_qos; +} + std::shared_ptr CommonParticipant::create_writer( const core::ITopic& topic) { @@ -162,11 +167,13 @@ std::shared_ptr CommonParticipant::create_reader( { // Can only create DDS Topics const core::types::DdsTopic* topic_ptr = dynamic_cast(&topic); + if (!topic_ptr) { logDebug(DDSPIPE_DDS_PARTICIPANT, "Not creating Reader for topic " << topic.topic_name()); return std::make_shared(); } + const core::types::DdsTopic& dds_topic = *topic_ptr; // Check that it is RTPS topic diff --git a/ddspipe_participants/src/cpp/participant/dynamic_types/SchemaParticipant.cpp b/ddspipe_participants/src/cpp/participant/dynamic_types/SchemaParticipant.cpp index c637c410..26b0d6c2 100644 --- a/ddspipe_participants/src/cpp/participant/dynamic_types/SchemaParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/dynamic_types/SchemaParticipant.cpp @@ -41,7 +41,8 @@ SchemaParticipant::SchemaParticipant( std::shared_ptr payload_pool, std::shared_ptr discovery_database, std::shared_ptr schema_handler) - : id_(participant_configuration->id) + : configuration_(participant_configuration) + , id_(participant_configuration->id) , payload_pool_(payload_pool) , discovery_database_(discovery_database) , schema_handler_(schema_handler) @@ -82,6 +83,11 @@ bool SchemaParticipant::is_rtps_kind() const noexcept return false; } +core::types::TopicQoS SchemaParticipant::topic_qos() const noexcept +{ + return configuration_->topic_qos; +} + std::shared_ptr SchemaParticipant::create_writer( const ITopic& topic) { diff --git a/ddspipe_participants/src/cpp/participant/rtps/CommonParticipant.cpp b/ddspipe_participants/src/cpp/participant/rtps/CommonParticipant.cpp index 92b5b41b..7a3fb963 100644 --- a/ddspipe_participants/src/cpp/participant/rtps/CommonParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/rtps/CommonParticipant.cpp @@ -324,6 +324,11 @@ bool CommonParticipant::is_rtps_kind() const noexcept return true; } +core::types::TopicQoS CommonParticipant::topic_qos() const noexcept +{ + return configuration_->topic_qos; +} + void CommonParticipant::create_participant_( const core::types::DomainId& domain, const fastrtps::rtps::RTPSParticipantAttributes& participant_attributes) @@ -360,6 +365,7 @@ std::shared_ptr CommonParticipant::create_writer( logDebug(DDSPIPE_RTPS_PARTICIPANT, "Not creating Writer for topic " << topic.topic_name()); return std::make_shared(); } + const core::types::DdsTopic& dds_topic = *dds_topic_ptr; if (topic.internal_type_discriminator() == core::types::INTERNAL_TOPIC_TYPE_RPC) @@ -413,11 +419,13 @@ std::shared_ptr CommonParticipant::create_reader( { // Can only create DDS Topics const core::types::DdsTopic* dds_topic_ptr = dynamic_cast(&topic); + if (!dds_topic_ptr) { logDebug(DDSPIPE_RTPS_PARTICIPANT, "Not creating Reader for topic " << topic.topic_name()); return std::make_shared(); } + const core::types::DdsTopic& dds_topic = *dds_topic_ptr; if (topic.internal_type_discriminator() == core::types::INTERNAL_TOPIC_TYPE_RPC) diff --git a/ddspipe_participants/src/cpp/reader/auxiliar/BaseReader.cpp b/ddspipe_participants/src/cpp/reader/auxiliar/BaseReader.cpp index 4f3b41a9..2a4fbbf4 100644 --- a/ddspipe_participants/src/cpp/reader/auxiliar/BaseReader.cpp +++ b/ddspipe_participants/src/cpp/reader/auxiliar/BaseReader.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include @@ -29,13 +30,25 @@ const std::function BaseReader::DEFAULT_ON_DATA_AVAILABLE_CALLBACK = }; BaseReader::BaseReader( - const core::types::ParticipantId& participant_id) + const core::types::ParticipantId& participant_id, + const float max_rx_rate /* = 0 */, + const unsigned int downsampling /* = 1 */) : participant_id_(participant_id) + , max_rx_rate_(max_rx_rate) + , downsampling_(downsampling) , on_data_available_lambda_(DEFAULT_ON_DATA_AVAILABLE_CALLBACK) , on_data_available_lambda_set_(false) , enabled_(false) { logDebug(DDSPIPE_BASEREADER, "Creating Reader " << *this << "."); + + // Calculate min_intersample_period_ from topic's max_rx_rate only once to lighten hot path + assert(max_rx_rate_ >= 0); + + if (max_rx_rate_ > 0) + { + min_intersample_period_ = std::chrono::nanoseconds((unsigned int)(1e9 / max_rx_rate_)); + } } void BaseReader::enable() noexcept @@ -117,6 +130,38 @@ core::types::ParticipantId BaseReader::participant_id() const noexcept return participant_id_; } +bool BaseReader::should_accept_sample_() noexcept +{ + // Get reception timestamp + auto now = utils::now(); + + // Max Reception Rate + if (max_rx_rate_ > 0) + { + auto threshold = last_received_ts_ + min_intersample_period_; + if (now < threshold) + { + return false; + } + } + + // Downsampling (keep 1 out of every \c downsampling samples) + // NOTE: Downsampling is applied to messages that already passed previous filters + auto prev_downsampling_idx = downsampling_idx_; + + downsampling_idx_ = utils::fast_module(downsampling_idx_ + 1, downsampling_); + + if (prev_downsampling_idx != 0) + { + return false; + } + + // All filters passed -> Update last received timestamp with this sample's reception timestamp + last_received_ts_ = now; + + return true; +} + void BaseReader::on_data_available_() const noexcept { if (on_data_available_lambda_set_) diff --git a/ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp b/ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp index 4f184c64..70c281a1 100644 --- a/ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp +++ b/ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp @@ -91,6 +91,7 @@ void CommonReader::on_data_available( fastdds::dds::DataReader* /* reader */) { logInfo(DDSPIPE_DDS_READER, "On data available in reader in " << participant_id_ << " for topic " << topic_ << "."); + on_data_available_(); } @@ -100,7 +101,7 @@ CommonReader::CommonReader( const std::shared_ptr& payload_pool, fastdds::dds::DomainParticipant* participant, fastdds::dds::Topic* topic_entity) - : BaseReader(participant_id) + : BaseReader(participant_id, topic.topic_qos.max_rx_rate, topic.topic_qos.downsampling) , dds_participant_(participant) , dds_topic_(topic_entity) , payload_pool_(payload_pool) @@ -143,14 +144,8 @@ utils::ReturnCode CommonReader::take_nts_( return ret; } - // Check if it comes from same participant. If so, discard and continue - if (detail::come_from_same_participant_( - detail::guid_from_instance_handle(info.publication_handle), - this->dds_participant_->guid())) - { - continue; - } - else + // Check if the sample is acceptable + if (should_accept_sample_(info)) { break; } @@ -213,7 +208,7 @@ fastdds::dds::DataReaderQos CommonReader::reckon_reader_qos_() const ? fastdds::dds::OwnershipQosPolicyKind::EXCLUSIVE_OWNERSHIP_QOS : fastdds::dds::OwnershipQosPolicyKind::SHARED_OWNERSHIP_QOS; - if (topic_.topic_qos.history_depth == 0) + if (topic_.topic_qos.history_depth == 0U) { qos.history().kind = eprosima::fastdds::dds::HistoryQosPolicyKind::KEEP_ALL_HISTORY_QOS; } @@ -226,6 +221,20 @@ fastdds::dds::DataReaderQos CommonReader::reckon_reader_qos_() const return qos; } +bool CommonReader::should_accept_sample_( + const fastdds::dds::SampleInfo& info) noexcept +{ + // Reject samples sent by a Writer from the same Participant this Reader belongs to + if (detail::come_from_same_participant_( + detail::guid_from_instance_handle(info.publication_handle), + this->dds_participant_->guid())) + { + return false; + } + + return BaseReader::should_accept_sample_(); +} + void CommonReader::fill_received_data_( const fastdds::dds::SampleInfo& info, core::types::RtpsPayloadData& data_to_fill) const noexcept diff --git a/ddspipe_participants/src/cpp/reader/rtps/CommonReader.cpp b/ddspipe_participants/src/cpp/reader/rtps/CommonReader.cpp index 899df5ff..53f3c68d 100644 --- a/ddspipe_participants/src/cpp/reader/rtps/CommonReader.cpp +++ b/ddspipe_participants/src/cpp/reader/rtps/CommonReader.cpp @@ -17,7 +17,6 @@ #include #include -#include #include #include @@ -41,7 +40,7 @@ CommonReader::CommonReader( const fastrtps::rtps::ReaderAttributes& reader_attributes, const fastrtps::TopicAttributes& topic_attributes, const fastrtps::ReaderQos& reader_qos) - : BaseReader(participant_id) + : BaseReader(participant_id, topic.topic_qos.max_rx_rate, topic.topic_qos.downsampling) , rtps_participant_(rtps_participant) , payload_pool_(payload_pool) , topic_(topic) @@ -52,9 +51,7 @@ CommonReader::CommonReader( , topic_attributes_(topic_attributes) , reader_qos_(reader_qos) { - // Calculate min_intersample_period_ from topic's max_reception_rate only once to lighten hot path - assert(topic_.topic_qos.max_reception_rate >= 0); - min_intersample_period_ = std::chrono::nanoseconds((unsigned int)(1e9 / topic_.topic_qos.max_reception_rate)); + // Do nothing. } CommonReader::~CommonReader() @@ -255,41 +252,16 @@ void CommonReader::enable_nts_() noexcept } } -bool CommonReader::accept_change_( +bool CommonReader::should_accept_change_( const fastrtps::rtps::CacheChange_t* change) noexcept { - // Get reception timestamp - auto now = utils::now(); - // Reject samples sent by a Writer from the same Participant this Reader belongs to if (come_from_this_participant_(change)) { return false; } - // Max Reception Rate - if (topic_.topic_qos.max_reception_rate > 0) - { - auto threshold = last_received_ts_ + min_intersample_period_; - if (now < threshold) - { - return false; - } - } - - // Downsampling (keep 1 out of every \c downsampling samples) - // NOTE: Downsampling is applied to messages that already passed previous filters - auto prev_downsampling_idx = downsampling_idx_; - downsampling_idx_ = utils::fast_module(downsampling_idx_ + 1, topic_.topic_qos.downsampling); - if (prev_downsampling_idx != 0) - { - return false; - } - - // All filters passed -> Update last received timestamp with this sample's reception timestamp - last_received_ts_ = now; - - return true; + return should_accept_sample_(); } bool CommonReader::come_from_this_participant_( @@ -404,7 +376,7 @@ void CommonReader::onNewCacheChangeAdded( fastrtps::rtps::RTPSReader* reader, const fastrtps::rtps::CacheChange_t* const change) noexcept { - if (accept_change_(change)) + if (should_accept_change_(change)) { // Do not remove previous received changes so they can be read when the reader is enabled if (enabled_) @@ -433,7 +405,11 @@ void CommonReader::onNewCacheChangeAdded( "Rejected received data in reader " << *this << "."); // Change rejected, do not send it forward and remove it - // TODO: do this more elegant + // TODO: do this more elegantly + + // WARNING: Removing an unacceptable change here is valid given that Fast-DDS internal reader's mutex is locked. + // If the mutex wasn't locked, the track's transmit thread could take an unacceptable sample before it gets + // deleted here. reader->getHistory()->remove_change((fastrtps::rtps::CacheChange_t*)change); } } diff --git a/ddspipe_participants/src/cpp/testing/entities/mock_entities.cpp b/ddspipe_participants/src/cpp/testing/entities/mock_entities.cpp index 75c1314e..6340c685 100644 --- a/ddspipe_participants/src/cpp/testing/entities/mock_entities.cpp +++ b/ddspipe_participants/src/cpp/testing/entities/mock_entities.cpp @@ -207,6 +207,12 @@ core::types::TopicInternalTypeDiscriminator MockTopic::internal_type_discriminat return INTERNAL_TOPIC_TYPE_MOCK_TEST; } +utils::Heritable MockTopic::copy() const noexcept +{ + MockTopic topic = *this; + return utils::Heritable::make_heritable(topic); +} + core::types::TopicInternalTypeDiscriminator MockRoutingData::internal_type_discriminator() const noexcept { return INTERNAL_TOPIC_TYPE_MOCK_TEST; diff --git a/ddspipe_participants/src/cpp/utils/utils.cpp b/ddspipe_participants/src/cpp/utils/utils.cpp index c2ab62c0..b163f0ce 100644 --- a/ddspipe_participants/src/cpp/utils/utils.cpp +++ b/ddspipe_participants/src/cpp/utils/utils.cpp @@ -38,15 +38,15 @@ core::types::Endpoint create_common_endpoint_from_info_( // Parse TopicQoS // Durability - endpoint.topic.topic_qos.durability_qos = info.info.m_qos.m_durability.durabilityKind(); + endpoint.topic.topic_qos.durability_qos.set_value(info.info.m_qos.m_durability.durabilityKind()); // Reliability if (info.info.m_qos.m_reliability.kind == fastdds::dds::BEST_EFFORT_RELIABILITY_QOS) { - endpoint.topic.topic_qos.reliability_qos = fastrtps::rtps::BEST_EFFORT; + endpoint.topic.topic_qos.reliability_qos.set_value(fastrtps::rtps::BEST_EFFORT); } else if (info.info.m_qos.m_reliability.kind == fastdds::dds::RELIABLE_RELIABILITY_QOS) { - endpoint.topic.topic_qos.reliability_qos = fastrtps::rtps::RELIABLE; + endpoint.topic.topic_qos.reliability_qos.set_value(fastrtps::rtps::RELIABLE); } else { @@ -55,11 +55,11 @@ core::types::Endpoint create_common_endpoint_from_info_( "Invalid ReliabilityQoS value found while parsing DiscoveryInfo for Endpoint creation."); } // Set Topic with Partitions - endpoint.topic.topic_qos.use_partitions = !info.info.m_qos.m_partition.empty(); + endpoint.topic.topic_qos.use_partitions.set_value(!info.info.m_qos.m_partition.empty()); // Set Topic with ownership - endpoint.topic.topic_qos.ownership_qos = info.info.m_qos.m_ownership.kind; + endpoint.topic.topic_qos.ownership_qos.set_value(info.info.m_qos.m_ownership.kind); // Set Topic key - endpoint.topic.topic_qos.keyed = info.info.topicKind() == eprosima::fastrtps::rtps::TopicKind_t::WITH_KEY; + endpoint.topic.topic_qos.keyed.set_value(info.info.topicKind() == eprosima::fastrtps::rtps::TopicKind_t::WITH_KEY); // Parse Topic core::types::DdsTopic info_topic; diff --git a/ddspipe_participants/src/cpp/writer/auxiliar/BaseWriter.cpp b/ddspipe_participants/src/cpp/writer/auxiliar/BaseWriter.cpp index dbc8823f..842dfb0e 100644 --- a/ddspipe_participants/src/cpp/writer/auxiliar/BaseWriter.cpp +++ b/ddspipe_participants/src/cpp/writer/auxiliar/BaseWriter.cpp @@ -21,11 +21,21 @@ namespace ddspipe { namespace participants { BaseWriter::BaseWriter( - const core::types::ParticipantId& participant_id) + const core::types::ParticipantId& participant_id, + const float max_tx_rate /* = 0 */) : participant_id_(participant_id) + , max_tx_rate_(max_tx_rate) , enabled_(false) { logDebug(DDSPIPE_BASEWRITER, "Creating Writer " << *this << "."); + + // Calculate min_intersample_period_ from topic's max_tx_rate only once to lighten hot path + assert(max_tx_rate_ >= 0); + + if (max_tx_rate_ > 0) + { + min_intersample_period_ = std::chrono::nanoseconds((unsigned int)(1e9 / max_tx_rate_)); + } } void BaseWriter::enable() noexcept @@ -63,7 +73,15 @@ utils::ReturnCode BaseWriter::write( if (enabled_.load()) { - return write_nts_(data); + if (!should_send_sample_()) + { + return utils::ReturnCode::RETCODE_OK; + } + else + { + return write_nts_(data); + } + } else { @@ -83,6 +101,27 @@ void BaseWriter::disable_() noexcept // It does nothing. Override this method so it has functionality. } +bool BaseWriter::should_send_sample_() noexcept +{ + // Get transmission timestamp + auto now = utils::now(); + + // Max Transmission Rate + if (max_tx_rate_ > 0) + { + auto threshold = last_sent_ts_ + min_intersample_period_; + if (now < threshold) + { + return false; + } + } + + // All filters passed -> Update last sent timestamp with this sample's transmission timestamp + last_sent_ts_ = now; + + return true; +} + std::ostream& operator <<( std::ostream& os, const BaseWriter& writer) diff --git a/ddspipe_participants/src/cpp/writer/dds/CommonWriter.cpp b/ddspipe_participants/src/cpp/writer/dds/CommonWriter.cpp index dbc87a17..96d34a84 100644 --- a/ddspipe_participants/src/cpp/writer/dds/CommonWriter.cpp +++ b/ddspipe_participants/src/cpp/writer/dds/CommonWriter.cpp @@ -88,7 +88,7 @@ CommonWriter::CommonWriter( const std::shared_ptr& payload_pool, fastdds::dds::DomainParticipant* participant, fastdds::dds::Topic* topic_entity) - : BaseWriter(participant_id) + : BaseWriter(participant_id, topic.topic_qos.max_tx_rate) , dds_participant_(participant) , dds_topic_(topic_entity) , payload_pool_(payload_pool) @@ -160,7 +160,7 @@ fastdds::dds::DataWriterQos CommonWriter::reckon_writer_qos_() const noexcept ? fastdds::dds::OwnershipQosPolicyKind::EXCLUSIVE_OWNERSHIP_QOS : fastdds::dds::OwnershipQosPolicyKind::SHARED_OWNERSHIP_QOS; - if (topic_.topic_qos.history_depth == 0) + if (topic_.topic_qos.history_depth == 0U) { qos.history().kind = eprosima::fastdds::dds::HistoryQosPolicyKind::KEEP_ALL_HISTORY_QOS; } diff --git a/ddspipe_participants/src/cpp/writer/rtps/CommonWriter.cpp b/ddspipe_participants/src/cpp/writer/rtps/CommonWriter.cpp index 9ae3e01e..64e32ac0 100644 --- a/ddspipe_participants/src/cpp/writer/rtps/CommonWriter.cpp +++ b/ddspipe_participants/src/cpp/writer/rtps/CommonWriter.cpp @@ -48,7 +48,7 @@ CommonWriter::CommonWriter( const fastrtps::TopicAttributes& topic_attributes, const fastrtps::WriterQos& writer_qos, const utils::PoolConfiguration& pool_configuration) - : BaseWriter(participant_id) + : BaseWriter(participant_id, topic.topic_qos.max_tx_rate) , rtps_participant_(rtps_participant) , repeater_(repeater) , topic_(topic) diff --git a/ddspipe_participants/test/blackbox/mock_core/DdsPipeCommunicationMockTest.cpp b/ddspipe_participants/test/blackbox/mock_core/DdsPipeCommunicationMockTest.cpp index 7cd75cf8..7fe876c9 100644 --- a/ddspipe_participants/test/blackbox/mock_core/DdsPipeCommunicationMockTest.cpp +++ b/ddspipe_participants/test/blackbox/mock_core/DdsPipeCommunicationMockTest.cpp @@ -17,6 +17,7 @@ #include +#include #include #include #include @@ -69,14 +70,16 @@ TEST(DdsPipeCommunicationMockTest, mock_communication_trivial) part_db->add_participant(part_2_id, part_2); // Create DDS Pipe + core::DdsPipeConfiguration ddspipe_configuration; + ddspipe_configuration.builtin_topics.insert(htopic_1); + ddspipe_configuration.init_enabled = true; + core::DdsPipe ddspipe( - std::make_shared(), + ddspipe_configuration, std::make_shared(), std::make_shared(), part_db, - std::make_shared(test::N_THREADS), - {htopic_1}, - true + std::make_shared(test::N_THREADS) ); // Look for the reader in participant 1 and writer in participant 2 @@ -137,13 +140,15 @@ TEST(DdsPipeCommunicationMockTest, mock_communication_before_enabling) part_db->add_participant(part_2_id, part_2); // Create DDS Pipe + core::DdsPipeConfiguration ddspipe_configuration; + ddspipe_configuration.builtin_topics.insert(htopic_1); + core::DdsPipe ddspipe( - std::make_shared(), + ddspipe_configuration, std::make_shared(), std::make_shared(), part_db, - std::make_shared(test::N_THREADS), - {htopic_1} + std::make_shared(test::N_THREADS) ); // Look for the reader in participant 1 and writer in participant 2 @@ -236,8 +241,10 @@ TEST(DdsPipeCommunicationMockTest, mock_communication_topic_discovery) part_db->add_participant(part_2_id, part_2); // Create DDS Pipe + core::DdsPipeConfiguration ddspipe_configuration; + core::DdsPipe ddspipe( - std::make_shared(), + ddspipe_configuration, disc_db, std::make_shared(), part_db, @@ -317,22 +324,19 @@ TEST(DdsPipeCommunicationMockTest, mock_communication_topic_allow) // Blocks all topics utils::Heritable filter_topic = utils::Heritable::make_heritable(); - std::shared_ptr atl(new core::AllowedTopicList({}, {filter_topic})); - // TODO for education sake, check whit this not compile - // auto atl = std::make_shared( - // {}, - // {filter_topic} - // ); // Create DDS Pipe + core::DdsPipeConfiguration ddspipe_configuration; + ddspipe_configuration.blocklist.insert(filter_topic); + ddspipe_configuration.builtin_topics.insert(htopic_1); + ddspipe_configuration.init_enabled = true; + core::DdsPipe ddspipe( - atl, + ddspipe_configuration, std::make_shared(), std::make_shared(), part_db, - std::make_shared(test::N_THREADS), - {htopic_1}, - true + std::make_shared(test::N_THREADS) ); // Look for the reader in participant 1 and writer in participant 2 @@ -352,7 +356,11 @@ TEST(DdsPipeCommunicationMockTest, mock_communication_topic_allow) ASSERT_EQ(writer_2->n_to_send_data(), 0u); // Allow topic (empty allowed list allows everything) - ddspipe.reload_allowed_topics(std::make_shared()); + core::DdsPipeConfiguration new_ddspipe_configuration; + new_ddspipe_configuration.allowlist.clear(); + new_ddspipe_configuration.blocklist.clear(); + + ddspipe.reload_configuration(new_ddspipe_configuration); // Wait for all messages for (unsigned int i = 0; i < test::N_MESSAGES; i++) @@ -362,7 +370,9 @@ TEST(DdsPipeCommunicationMockTest, mock_communication_topic_allow) } // Block topic - ddspipe.reload_allowed_topics(atl); + new_ddspipe_configuration.blocklist.insert(filter_topic); + + ddspipe.reload_configuration(new_ddspipe_configuration); // Simulate N messages for (unsigned int i = 0; i < test::N_MESSAGES; i++) @@ -406,14 +416,16 @@ TEST(DdsPipeCommunicationMockTest, mock_communication_multiple_participant_topic } // Create DDS Pipe + core::DdsPipeConfiguration ddspipe_configuration; + ddspipe_configuration.builtin_topics = builtin; + ddspipe_configuration.init_enabled = true; + core::DdsPipe ddspipe( - std::make_shared(), + ddspipe_configuration, std::make_shared(), std::make_shared(), part_db, - std::make_shared(test::N_THREADS), - builtin, - true + std::make_shared(test::N_THREADS) ); // For every reader in every participant, send N data diff --git a/ddspipe_participants/test/blackbox/participants_creation/ParticipantsCreationgTest.cpp b/ddspipe_participants/test/blackbox/participants_creation/ParticipantsCreationgTest.cpp index d5da6fa6..a49c3b17 100644 --- a/ddspipe_participants/test/blackbox/participants_creation/ParticipantsCreationgTest.cpp +++ b/ddspipe_participants/test/blackbox/participants_creation/ParticipantsCreationgTest.cpp @@ -17,6 +17,7 @@ #include +#include #include #include @@ -120,7 +121,6 @@ TEST(ParticipantsCreationgTest, creation_trivial) TEST(ParticipantsCreationgTest, ddspipe_all_creation_builtin_topic) { // Auxiliar objects - std::shared_ptr atl(new core::AllowedTopicList()); std::shared_ptr discovery_database(new core::DiscoveryDatabase()); std::shared_ptr payload_pool(new core::FastPayloadPool()); std::shared_ptr part_db(new core::ParticipantsDatabase()); @@ -210,17 +210,20 @@ TEST(ParticipantsCreationgTest, ddspipe_all_creation_builtin_topic) eprosima::utils::Heritable::make_heritable(topic_2); // Create DDS Pipe + core::DdsPipeConfiguration ddspipe_configuration; + ddspipe_configuration.builtin_topics.insert(htopic_1); + ddspipe_configuration.builtin_topics.insert(htopic_2); + ddspipe_configuration.init_enabled = true; + core::DdsPipe ddspipe( - atl, + ddspipe_configuration, discovery_database, payload_pool, part_db, - thread_pool, - {htopic_1, htopic_2}, - true + thread_pool ); - // Let everything destroy by itself + // Let everything destroy itself } int main( diff --git a/ddspipe_yaml/include/ddspipe_yaml/YamlReader.hpp b/ddspipe_yaml/include/ddspipe_yaml/YamlReader.hpp index d32787af..3a7b94f7 100644 --- a/ddspipe_yaml/include/ddspipe_yaml/YamlReader.hpp +++ b/ddspipe_yaml/include/ddspipe_yaml/YamlReader.hpp @@ -56,7 +56,7 @@ enum YamlReaderVersion V_1_0, /** - * @brief Version 2.0 + * @brief Version 2.0 * * @version 0.3.0 * @@ -68,7 +68,7 @@ enum YamlReaderVersion V_2_0, /** - * @brief Version 3.0 + * @brief Version 3.0 * * @version 0.4.0 * @@ -78,7 +78,7 @@ enum YamlReaderVersion V_3_0, /** - * @brief Latest version. + * @brief Version 3.1. * * @version 0.5.0 * @@ -87,6 +87,21 @@ enum YamlReaderVersion */ V_3_1, + /** + * @brief Latest version. + * + * @version 0.6.0 + * + * - Forwarding Routes. + * - Remove Unused Entities. + * - Manual Topics. + * - Max Transmission Rate. + * - Max Reception Rate. + * - Downsampling. + * - Rename the `max-depth` under the `specs` tag to `history-depth`. + */ + V_4_0, + /** * @brief Main version. * diff --git a/ddspipe_yaml/include/ddspipe_yaml/yaml_configuration_tags.hpp b/ddspipe_yaml/include/ddspipe_yaml/yaml_configuration_tags.hpp index 772b9a34..29702841 100644 --- a/ddspipe_yaml/include/ddspipe_yaml/yaml_configuration_tags.hpp +++ b/ddspipe_yaml/include/ddspipe_yaml/yaml_configuration_tags.hpp @@ -27,30 +27,37 @@ constexpr const char* VERSION_TAG_V_1_0("v1.0"); //! Version v1.0 constexpr const char* VERSION_TAG_V_2_0("v2.0"); //! Version v2.0 constexpr const char* VERSION_TAG_V_3_0("v3.0"); //! Version v3.0 constexpr const char* VERSION_TAG_V_3_1("v3.1"); //! Version v3.1 +constexpr const char* VERSION_TAG_V_4_0("v4.0"); //! Version v4.0 // Topics related tags constexpr const char* ALLOWLIST_TAG("allowlist"); //! List of allowed topics constexpr const char* BLOCKLIST_TAG("blocklist"); //! List of blocked topics -constexpr const char* BUILTIN_TAG("builtin-topics"); //! List of builtin topics -constexpr const char* TOPIC_NAME_TAG("name"); //! Name of a topic -constexpr const char* TOPIC_TYPE_NAME_TAG("type"); //! Type name of a topic -constexpr const char* TOPIC_QOS_TAG("qos"); //! QoS of a topic + +constexpr const char* BUILTIN_TAG("builtin-topics"); //! List of builtin topics + +constexpr const char* TOPICS_TAG("topics"); //! List of manual topics to configure +constexpr const char* TOPIC_NAME_TAG("name"); //! Name of a topic +constexpr const char* TOPIC_TYPE_NAME_TAG("type"); //! Type name of a topic +constexpr const char* TOPIC_QOS_TAG("qos"); //! QoS of a topic +constexpr const char* TOPIC_PARTICIPANTS_TAG("participants"); //! List of participants of a topic // QoS related tags constexpr const char* QOS_RELIABLE_TAG("reliability"); //! The Endpoints of that topic will be configured as RELIABLE constexpr const char* QOS_TRANSIENT_TAG("durability"); //! The Endpoints of that topic will be configured as TRANSIENT_LOCAL -constexpr const char* QOS_HISTORY_DEPTH_TAG("depth"); //! The Endpoints of that topic will be configured as this History Depth +constexpr const char* QOS_HISTORY_DEPTH_TAG("history-depth"); //! The Endpoints of that topic will be configured as this History Depth constexpr const char* QOS_PARTITION_TAG("partitions"); //! The Endpoints of that topic will be configured with partitions constexpr const char* QOS_OWNERSHIP_TAG("ownership"); //! The Endpoints of that topic will be configured with partitions constexpr const char* QOS_KEYED_TAG("keyed"); //! Kind of a topic (with or without key) +constexpr const char* QOS_MAX_TX_RATE_TAG("max-tx-rate"); //! Topic specific max transmission rate +constexpr const char* QOS_MAX_RX_RATE_TAG("max-rx-rate"); //! Topic specific max reception rate constexpr const char* QOS_DOWNSAMPLING_TAG("downsampling"); //! Topic specific downsampling factor -constexpr const char* QOS_MAX_RECEPTION_RATE_TAG("max-reception-rate"); //! Topic specific max reception rate // Participant related tags constexpr const char* PARTICIPANT_KIND_TAG("kind"); //! Participant Kind constexpr const char* PARTICIPANT_NAME_TAG("name"); //! Participant Name constexpr const char* COLLECTION_PARTICIPANTS_TAG("participants"); //! TODO: add comment constexpr const char* IS_REPEATER_TAG("repeater"); //! Is participant a repeater +constexpr const char* PARTICIPANT_QOS_TAG("qos"); //! Participant Topic QoS // Echo related tags constexpr const char* ECHO_DATA_TAG("data"); //! Echo Data received @@ -130,10 +137,8 @@ constexpr const char* TOPIC_ROUTES_TAG("topic-routes"); // Topic specific forwar // Advanced configuration constexpr const char* SPECS_TAG("specs"); //! Specs options for DDS Router configuration +constexpr const char* SPECS_QOS_TAG("qos"); //! Global Topic QoS constexpr const char* NUMBER_THREADS_TAG("threads"); //! Number of threads to configure the thread pool -constexpr const char* MAX_HISTORY_DEPTH_TAG("max-depth"); //! Maximum size (number of stored cache changes) for RTPS History instances -constexpr const char* DOWNSAMPLING_TAG("downsampling"); //! Keep 1 out of every *downsampling* samples received -constexpr const char* MAX_RECEPTION_RATE_TAG("max-reception-rate"); //! Process up to *max_reception_rate* samples in a 1 second bin constexpr const char* WAIT_ALL_ACKED_TIMEOUT_TAG("wait-all-acked-timeout"); //! Wait for a maximum of *wait-all-acked-timeout* ms until all msgs sent by reliable writers are acknowledged by their matched readers constexpr const char* REMOVE_UNUSED_ENTITIES_TAG("remove-unused-entities"); //! Dynamically create and delete entities and tracks. diff --git a/ddspipe_yaml/src/cpp/YamlReader_participants.cpp b/ddspipe_yaml/src/cpp/YamlReader_participants.cpp index 6f4e22e6..e1ae6e67 100644 --- a/ddspipe_yaml/src/cpp/YamlReader_participants.cpp +++ b/ddspipe_yaml/src/cpp/YamlReader_participants.cpp @@ -160,6 +160,12 @@ void YamlReader::fill( { object.ignore_participant_flags = core::types::IgnoreParticipantFlags::no_filter; } + + // Optional Praticipant Topic QoS + if (YamlReader::is_tag_present(yml, PARTICIPANT_QOS_TAG)) + { + fill(object.topic_qos, get_value_in_tag(yml, PARTICIPANT_QOS_TAG), version); + } } template <> diff --git a/ddspipe_yaml/src/cpp/YamlReader_types.cpp b/ddspipe_yaml/src/cpp/YamlReader_types.cpp index 434fd5aa..9e9286b9 100644 --- a/ddspipe_yaml/src/cpp/YamlReader_types.cpp +++ b/ddspipe_yaml/src/cpp/YamlReader_types.cpp @@ -27,6 +27,7 @@ #include #include #include +#include #include #include @@ -95,6 +96,7 @@ YamlReaderVersion YamlReader::get( {VERSION_TAG_V_2_0, YamlReaderVersion::V_2_0}, {VERSION_TAG_V_3_0, YamlReaderVersion::V_3_0}, {VERSION_TAG_V_3_1, YamlReaderVersion::V_3_1}, + {VERSION_TAG_V_4_0, YamlReaderVersion::V_4_0}, }); } @@ -349,73 +351,80 @@ void YamlReader::fill( const Yaml& yml, const YamlReaderVersion version) { - // Reliability optional + // Durability optional + if (is_tag_present(yml, QOS_TRANSIENT_TAG)) + { + if (get(yml, QOS_TRANSIENT_TAG, version)) + { + object.durability_qos.set_value(eprosima::ddspipe::core::types::DurabilityKind::TRANSIENT_LOCAL); + } + else + { + object.durability_qos.set_value(eprosima::ddspipe::core::types::DurabilityKind::VOLATILE); + } + } + + // Optional Reliability Tag if (is_tag_present(yml, QOS_RELIABLE_TAG)) { if (get(yml, QOS_RELIABLE_TAG, version)) { - object.reliability_qos = eprosima::ddspipe::core::types::ReliabilityKind::RELIABLE; + object.reliability_qos.set_value(eprosima::ddspipe::core::types::ReliabilityKind::RELIABLE); } else { - object.reliability_qos = eprosima::ddspipe::core::types::ReliabilityKind::BEST_EFFORT; + object.reliability_qos.set_value(eprosima::ddspipe::core::types::ReliabilityKind::BEST_EFFORT); } } - // Durability optional - if (is_tag_present(yml, QOS_TRANSIENT_TAG)) + // Ownership optional + if (is_tag_present(yml, QOS_OWNERSHIP_TAG)) { - if (get(yml, QOS_TRANSIENT_TAG, version)) + if (get(yml, QOS_OWNERSHIP_TAG, version)) { - object.durability_qos = eprosima::ddspipe::core::types::DurabilityKind::TRANSIENT_LOCAL; + object.ownership_qos.set_value( + eprosima::ddspipe::core::types::OwnershipQosPolicyKind::EXCLUSIVE_OWNERSHIP_QOS); } else { - object.durability_qos = eprosima::ddspipe::core::types::DurabilityKind::VOLATILE; + object.ownership_qos.set_value(eprosima::ddspipe::core::types::OwnershipQosPolicyKind::SHARED_OWNERSHIP_QOS); } } - // History depth optional - if (is_tag_present(yml, QOS_HISTORY_DEPTH_TAG)) + // Use partitions optional + if (is_tag_present(yml, QOS_PARTITION_TAG)) { - object.history_depth = get(yml, QOS_HISTORY_DEPTH_TAG, version); + object.use_partitions.set_value(get(yml, QOS_PARTITION_TAG, version)); } - // Durability optional - if (is_tag_present(yml, QOS_PARTITION_TAG)) + // History depth optional + if (is_tag_present(yml, QOS_HISTORY_DEPTH_TAG)) { - object.use_partitions = get(yml, QOS_PARTITION_TAG, version); + object.history_depth.set_value(get(yml, QOS_HISTORY_DEPTH_TAG, version)); } - // Optional keyed + // Keyed optional if (is_tag_present(yml, QOS_KEYED_TAG)) { - object.keyed = get(yml, QOS_KEYED_TAG, version); + object.keyed.set_value(get(yml, QOS_KEYED_TAG, version)); } - // Ownership optional - if (is_tag_present(yml, QOS_OWNERSHIP_TAG)) + // Max Transmission Rate optional + if (is_tag_present(yml, QOS_MAX_TX_RATE_TAG)) { - if (get(yml, QOS_OWNERSHIP_TAG, version)) - { - object.ownership_qos = eprosima::ddspipe::core::types::OwnershipQosPolicyKind::EXCLUSIVE_OWNERSHIP_QOS; - } - else - { - object.ownership_qos = eprosima::ddspipe::core::types::OwnershipQosPolicyKind::SHARED_OWNERSHIP_QOS; - } + object.max_tx_rate.set_value(get_nonnegative_float(yml, QOS_MAX_TX_RATE_TAG)); } - // Downsampling optional - if (is_tag_present(yml, QOS_DOWNSAMPLING_TAG)) + // Max Reception Rate optional + if (is_tag_present(yml, QOS_MAX_RX_RATE_TAG)) { - object.downsampling = get_positive_int(yml, QOS_DOWNSAMPLING_TAG); + object.max_rx_rate.set_value(get_nonnegative_float(yml, QOS_MAX_RX_RATE_TAG)); } - // Max Reception Rate optional - if (is_tag_present(yml, QOS_MAX_RECEPTION_RATE_TAG)) + // Downsampling optional + if (is_tag_present(yml, QOS_DOWNSAMPLING_TAG)) { - object.max_reception_rate = get(yml, QOS_MAX_RECEPTION_RATE_TAG, version); + object.downsampling.set_value(get_positive_int(yml, QOS_DOWNSAMPLING_TAG)); } } @@ -435,12 +444,6 @@ void YamlReader::fill( // Data Type required object.type_name = get(yml, TOPIC_TYPE_NAME_TAG, version); - - // Optional QoS - if (is_tag_present(yml, TOPIC_QOS_TAG)) - { - fill(object.topic_qos, get_value_in_tag(yml, TOPIC_QOS_TAG), version); - } } template <> @@ -461,11 +464,8 @@ void YamlReader::fill( const Yaml& yml, const YamlReaderVersion version) { - // Optional name - if (is_tag_present(yml, TOPIC_NAME_TAG)) - { - object.topic_name.set_value(get(yml, TOPIC_NAME_TAG, version)); - } + // Name required + object.topic_name = get(yml, TOPIC_NAME_TAG, version); // Optional data type if (is_tag_present(yml, TOPIC_TYPE_NAME_TAG)) @@ -473,7 +473,11 @@ void YamlReader::fill( object.type_name.set_value(get(yml, TOPIC_TYPE_NAME_TAG, version)); } - // TODO: decide whether we want to use QoS as filtering + // Optional QoS + if (is_tag_present(yml, TOPIC_QOS_TAG)) + { + fill(object.topic_qos, get_value_in_tag(yml, TOPIC_QOS_TAG), version); + } } template <> @@ -487,6 +491,36 @@ WildcardDdsFilterTopic YamlReader::get( return object; } +template <> +DDSPIPE_YAML_DllAPI +void YamlReader::fill( + ManualTopic& object, + const Yaml& yml, + const YamlReaderVersion version) +{ + auto manual_topic = YamlReader::get(yml, version); + object.first = utils::Heritable::make_heritable(manual_topic); + + // Optional participants tag + if (is_tag_present(yml, TOPIC_PARTICIPANTS_TAG)) + { + object.second = get_set(yml, TOPIC_PARTICIPANTS_TAG, version); + } +} + +template <> +DDSPIPE_YAML_DllAPI +ManualTopic YamlReader::get( + const Yaml& yml, + const YamlReaderVersion version) +{ + auto topic = utils::Heritable::make_heritable(); + std::set participants; + ManualTopic object = std::make_pair(topic, participants); + fill(object, yml, version); + return object; +} + template <> DDSPIPE_YAML_DllAPI utils::Heritable YamlReader::get( @@ -621,10 +655,14 @@ std::ostream& operator <<( break; case V_3_1: - case LATEST: os << VERSION_TAG_V_3_1; break; + case V_4_0: + case LATEST: + os << VERSION_TAG_V_4_0; + break; + default: utils::tsnh(STR_ENTRY << "Value of YamlReaderVersion out of enumeration."); break; diff --git a/ddspipe_yaml/test/unittest/entities/topic/YamlGetEntityTopicTest.cpp b/ddspipe_yaml/test/unittest/entities/topic/YamlGetEntityTopicTest.cpp index 164db031..2e8a1447 100644 --- a/ddspipe_yaml/test/unittest/entities/topic/YamlGetEntityTopicTest.cpp +++ b/ddspipe_yaml/test/unittest/entities/topic/YamlGetEntityTopicTest.cpp @@ -109,30 +109,6 @@ TEST(YamlGetEntityTopicTest, get_real_topic) ASSERT_EQ(topic, real_topic); } - - // Checks that a topic yaml object has been parsed correctly with the topic reliable tag set to true. - // A topic configured as reliable creates RELIABLE-TRANSIENT_LOCAL RTPS Readers in order to ensure - // that no data is lost in the information relay. - // TODO: extend for other QoS - { - core::types::DdsTopic real_topic; - real_topic.m_topic_name = test::TOPIC_NAME; - real_topic.type_name = test::TOPIC_TYPE; - real_topic.topic_qos.reliability_qos = core::types::ReliabilityKind::RELIABLE; - - Yaml yml_topic; - real_topic_to_yaml( - yml_topic, - real_topic); - - Yaml yml; - yml["topic"] = yml_topic; - - core::types::DdsTopic topic = YamlReader::get(yml, "topic", LATEST); - - ASSERT_EQ(topic, real_topic); - ASSERT_TRUE(topic.topic_qos.is_reliable()); - } } /** @@ -209,27 +185,6 @@ TEST(YamlGetEntityTopicTest, get_wildcard_topic) ASSERT_EQ(topic, w_topic); } - // Topic without name - { - core::types::WildcardDdsFilterTopic w_topic; - w_topic.type_name.set_value(test::TOPIC_TYPE); - - Yaml yml_topic; - filter_topic_to_yaml( - yml_topic, - w_topic); - - Yaml yml; - yml["topic"] = yml_topic; - - core::types::WildcardDdsFilterTopic topic = YamlReader::get(yml, "topic", - LATEST); - - ASSERT_EQ(topic, w_topic); - ASSERT_TRUE(topic.type_name.is_set()); - ASSERT_FALSE(topic.topic_name.is_set()); - } - // Topic without type { core::types::WildcardDdsFilterTopic w_topic;