Skip to content

Commit

Permalink
Only create readers, writers, and tracks when they are going to be us…
Browse files Browse the repository at this point in the history
…ed (#57)

* Save the discoverer's id when creating a DdsBridge

Signed-off-by: tempate <[email protected]>

* Only create the necessary readers and writers

Signed-off-by: tempate <[email protected]>

* Improve readability

Signed-off-by: tempate <[email protected]>

* Add a new subscriber to the tracks in a bridge

Signed-off-by: tempate <[email protected]>

* Remove a dropped subscriber from its bridge

Signed-off-by: tempate <[email protected]>

* Remove unnecessary removed_topic_nts function

Signed-off-by: tempate <[email protected]>

* Delete the tracks that don't have any writers

Signed-off-by: tempate <[email protected]>

* Improved readability

Signed-off-by: tempate <[email protected]>

* Uncrustify

Signed-off-by: tempate <[email protected]>

* Improved readability

Signed-off-by: tempate <[email protected]>

* Allow participants to have multiple subscribers

Signed-off-by: tempate <[email protected]>

* Doxygen, Windows support, and improved readability

Signed-off-by: tempate <[email protected]>

* Uncrustify

Signed-off-by: tempate <[email protected]>

* Make the deletion of unused entities configurable

Signed-off-by: tempate <[email protected]>

* Don't delete unused entities by default

Signed-off-by: tempate <[email protected]>

* Make dynamic tracks configurable

Signed-off-by: tempate <[email protected]>

* Three small patches to pass the tests

Signed-off-by: tempate <[email protected]>

* Move dynamic_tracks's init  to avoid warning

Signed-off-by: tempate <[email protected]>

* Fix bug in forwarding routes

Signed-off-by: tempate <[email protected]>

* Fix bug by removing debugging comments

Signed-off-by: tempate <[email protected]>

* Rename dynamic tracks to remove unused entities

Signed-off-by: tempate <[email protected]>

* Fix segfault when removing elements from tracks

Signed-off-by: tempate <[email protected]>

* Rebase fix

Signed-off-by: tempate <[email protected]>

* Apply suggestions

Signed-off-by: tempate <[email protected]>

* Remove commented code

Signed-off-by: tempate <[email protected]>

* Apply suggestions

Signed-off-by: tempate <[email protected]>

* Get endpoints with a filter

Signed-off-by: tempate <[email protected]>

* Set builtin participants' discoverer

Signed-off-by: tempate <[email protected]>

* Apply suggestions

Signed-off-by: tempate <[email protected]>

* Apply NIT suggestions

Signed-off-by: tempate <[email protected]>

* Uncrustify

Signed-off-by: tempate <[email protected]>

* Check that the conf. is valid inside the DdsPipe

Signed-off-by: tempate <[email protected]>

* Fix compilation error

Signed-off-by: tempate <[email protected]>

* Apply suggestions

Signed-off-by: tempate <[email protected]>

---------

Signed-off-by: tempate <[email protected]>
  • Loading branch information
Tempate authored Oct 9, 2023
1 parent 36af952 commit 47856ff
Show file tree
Hide file tree
Showing 24 changed files with 780 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ class DdsBridge : public Bridge
const std::shared_ptr<ParticipantsDatabase>& participants_database,
const std::shared_ptr<PayloadPool>& payload_pool,
const std::shared_ptr<utils::SlotThreadPool>& thread_pool,
const RoutesConfiguration& routes_config);
const RoutesConfiguration& routes_config,
const bool remove_unused_entities);

DDSPIPE_CORE_DllAPI
~DdsBridge();
Expand All @@ -80,10 +81,79 @@ class DdsBridge : public Bridge
DDSPIPE_CORE_DllAPI
void disable() noexcept override;

/**
* Build the IReaders and IWriters inside the bridge for the new participant,
* and add them to the Tracks.
*
* Thread safe
*
* @param participant_id: The id of the participant who is creating the writer.
*
* @throw InitializationException in case \c IWriters or \c IReaders creation fails.
*/
DDSPIPE_CORE_DllAPI
void create_writer(
const types::ParticipantId& participant_id);

/**
* Remove the IWriter from all the Tracks in the bridge.
* Remove the IReaders and Tracks that don't have any IWriters.
*
* Thread safe
*
* @param participant_id: The id of the participant who is removing the writer.
*/
DDSPIPE_CORE_DllAPI
void remove_writer(
const types::ParticipantId& participant_id) noexcept;

protected:

/**
* Create the readers, writers, and tracks that are required by the routes.
*
* Thread safe
*
* @throw InitializationException in case \c IWriters or \c IReaders creation fails.
*/
DDSPIPE_CORE_DllAPI
void create_all_tracks_();

/**
* Add each Participant's IWriters to its Track.
* If the Participant's IReader doesn't exist, create it.
* If the Participant's Track doesn't exist, create it.
*
* Thread safe
*
* @param writers: The map of ids to writers that are required for the tracks.
*
* @throw InitializationException in case \c IReaders creation fails.
*/
DDSPIPE_CORE_DllAPI
void add_writer_to_tracks_nts_(
const types::ParticipantId& participant_id,
std::shared_ptr<IWriter>& writer);

/**
* Add each Participant's IWriters to its Track.
* If the Participant's IReader doesn't exist, create it.
* If the Participant's Track doesn't exist, create it.
*
* Thread safe
*
* @param writers: The map of ids to writers that are required for the tracks.
*
* @throw InitializationException in case \c IReaders creation fails.
*/
DDSPIPE_CORE_DllAPI
void add_writers_to_tracks_nts_(
std::map<types::ParticipantId, std::shared_ptr<IWriter>>& writers);

utils::Heritable<types::DistributedTopic> topic_;

RoutesConfiguration::RoutesMap routes_;

/**
* Inside \c Tracks
* They are indexed by the Id of the participant that is source
Expand Down
38 changes: 38 additions & 0 deletions ddspipe_core/include/ddspipe_core/communication/dds/Track.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,44 @@ class Track
DDSPIPE_CORE_DllAPI
void disable() noexcept;

/**
* Add a writer to the track.
* It doesn't do anything if the writer is already in it.
*
* Tread safe
*/
DDSPIPE_CORE_DllAPI
void add_writer(
const types::ParticipantId& id,
const std::shared_ptr<IWriter>& writer) noexcept;

/**
* Remove a writer from the track.
* It doesn't do anything if the writer isn't in the track.
*
* Tread safe
*/
DDSPIPE_CORE_DllAPI
void remove_writer(
const types::ParticipantId& id) noexcept;

/**
* Check if a writer is inside the track.
*
* Tread safe
*/
DDSPIPE_CORE_DllAPI
bool has_writer(
const types::ParticipantId& id) noexcept;

/**
* Check if a track has at least one writer.
*
* Tread safe
*/
DDSPIPE_CORE_DllAPI
bool has_writers() noexcept;

protected:

/*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <map>
#include <set>

#include <cpp_utils/Formatter.hpp>

#include <ddspipe_core/configuration/IConfiguration.hpp>
#include <ddspipe_core/configuration/RoutesConfiguration.hpp>
#include <ddspipe_core/configuration/TopicRoutesConfiguration.hpp>
#include <ddspipe_core/types/participant/ParticipantId.hpp>
#include <ddspipe_core/types/topic/dds/DistributedTopic.hpp>

#include <ddspipe_core/library/library_dll.h>

namespace eprosima {
namespace ddspipe {
namespace core {

/**
* Configuration structure encapsulating the configuration of a \c DdsPipe instance.
*/
struct DdsPipeConfiguration : public IConfiguration
{
/////////////////////////
// CONSTRUCTORS
/////////////////////////

DDSPIPE_CORE_DllAPI DdsPipeConfiguration() = default;

/////////////////////////
// METHODS
/////////////////////////

/**
* @brief Override \c is_valid method.
*/
DDSPIPE_CORE_DllAPI
virtual bool is_valid(
utils::Formatter& error_msg) const noexcept override;

/**
* @brief Check if a configuration is valid given a list of participants.
*
* It calls its own \c is_valid method plus the \c is_valid method of the
* encapsulated configurations.
*/
DDSPIPE_CORE_DllAPI
bool is_valid(
utils::Formatter& error_msg,
const std::map<types::ParticipantId, bool>& participants) const noexcept;

/**
* @brief Select the \c RoutesConfiguration for a topic.
*
* @return The route configuration for a specific topic.
*/
DDSPIPE_CORE_DllAPI
RoutesConfiguration get_routes_config(
const utils::Heritable<types::DistributedTopic>& topic) const noexcept;

/////////////////////////
// VARIABLES
/////////////////////////

//! Configuration of the generic routes.
RoutesConfiguration routes{};

//! Configuration of the routes specific to a topic.
TopicRoutesConfiguration topic_routes{};

//! Whether entities should be removed when they have no writers connected to them.
bool remove_unused_entities = false;
};

} /* namespace core */
} /* namespace ddspipe */
} /* namespace eprosima */
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ struct RoutesConfiguration : public IConfiguration

DDSPIPE_CORE_DllAPI bool is_valid(
utils::Formatter& error_msg,
std::map<types::ParticipantId, bool> participant_ids) const noexcept;
const std::map<types::ParticipantId, bool>& participants) const noexcept;

/////////////////////////
// OPERATORS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ struct TopicRoutesConfiguration : public IConfiguration

DDSPIPE_CORE_DllAPI bool is_valid(
utils::Formatter& error_msg,
std::map<types::ParticipantId, bool> participant_ids) const noexcept;
const std::map<types::ParticipantId, bool>& participant_ids) const noexcept;

/////////////////////////
// OPERATORS
Expand Down
73 changes: 58 additions & 15 deletions ddspipe_core/include/ddspipe_core/core/DdsPipe.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@

#include <ddspipe_core/communication/dds/DdsBridge.hpp>
#include <ddspipe_core/communication/rpc/RpcBridge.hpp>
#include <ddspipe_core/configuration/RoutesConfiguration.hpp>
#include <ddspipe_core/configuration/TopicRoutesConfiguration.hpp>
#include <ddspipe_core/configuration/DdsPipeConfiguration.hpp>
#include <ddspipe_core/dynamic/AllowedTopicList.hpp>
#include <ddspipe_core/dynamic/DiscoveryDatabase.hpp>
#include <ddspipe_core/dynamic/ParticipantsDatabase.hpp>
Expand Down Expand Up @@ -69,8 +68,7 @@ class DdsPipe
const std::shared_ptr<utils::SlotThreadPool>& thread_pool,
const std::set<utils::Heritable<types::DistributedTopic>>& builtin_topics = {},
bool start_enable = false,
const RoutesConfiguration& routes_config = {},
const TopicRoutesConfiguration& topic_routes_config = {});
const DdsPipeConfiguration& configuration = {});

/**
* @brief Destroy the DdsPipe object
Expand Down Expand Up @@ -136,23 +134,71 @@ class DdsPipe
/////////////////////////

/**
* @brief Method called every time a new endpoint has been discovered/updated
* @brief Method called every time a new endpoint has been discovered
*
* This method calls \c discovered_topic_ with the topic of \c endpoint as parameter.
* This method calls \c discovered_endpoint_nts_ with a lock on the mutex to make it thread safe.
*
* @param [in] endpoint : endpoint discovered
*/
void discovered_endpoint_(
const types::Endpoint& endpoint) noexcept;

/**
* @brief Method called every time a new endpoint has been updated
*
* This method calls \c updated_endpoint_nts_ with a lock on the mutex to make it thread safe.
*
* @param [in] endpoint : endpoint updated
*/
void updated_endpoint_(
const types::Endpoint& endpoint) noexcept;

/**
* @brief Method called every time an endpoint has been removed/dropped
*
* This method calls \c removed_endpoint_nts_ with a lock on the mutex to make it thread safe.
*
* @param [in] endpoint : endpoint removed/dropped
*/
void removed_endpoint_(
const types::Endpoint& endpoint) noexcept;

/**
* @brief Method called every time a new endpoint has been discovered
*
* This method calls \c discovered_topic_ with the topic of \c endpoint as parameter.
*
* @param [in] endpoint : endpoint discovered
*/
void discovered_endpoint_nts_(
const types::Endpoint& endpoint) noexcept;

/**
* @brief Method called every time an endpoint has been removed/dropped
*
* @param [in] endpoint : endpoint removed/dropped
*/
void removed_endpoint_nts_(
const types::Endpoint& endpoint) noexcept;

/**
* @brief Method called every time a new endpoint has been updated
*
* This method calls \c discovered_endpoint_nts_ or \c removed_endpoint_nts_.
*
* @param [in] endpoint : endpoint updated
*/
void updated_endpoint_nts_(
const types::Endpoint& endpoint) noexcept;

/**
* @brief Method called every time a new endpoint has been discovered, removed, or updated.
*
* @param [in] endpoint : endpoint discovered, removed, or updated.
*/
bool is_endpoint_relevant_(
const types::Endpoint& endpoint) noexcept;

/////////////////////////
// INTERNAL CTOR METHODS
/////////////////////////
Expand Down Expand Up @@ -321,9 +367,9 @@ class DdsPipe
*/
std::map<types::RpcTopic, bool> current_services_;

/////////////////////////
/////////////////////
// AUXILIAR VARIABLES
/////////////////////////
/////////////////////

//! Whether the DdsPipe is currently communicating data or not
bool enabled_;
Expand All @@ -333,15 +379,12 @@ class DdsPipe
*/
mutable std::mutex mutex_;

/////////////////////////
//////////////////////////
// CONFIGURATION VARIABLES
/////////////////////////

//! Custom forwarding routes
RoutesConfiguration routes_config_;
//////////////////////////

//! Custom forwarding routes per topic
TopicRoutesConfiguration topic_routes_config_;
//! Configuration of the DDS Pipe
DdsPipeConfiguration configuration_;
};

} /* namespace core */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,15 @@ class DiscoveryDatabase
types::Endpoint get_endpoint(
const types::Guid& endpoint_guid) const;

/**
* @brief Get the endpoints that pass the given filter
*
* @return A map with the endpoints that pass the filter
*/
DDSPIPE_CORE_DllAPI
std::map<types::Guid, types::Endpoint> get_endpoints(
std::function<bool(const types::Endpoint&)> is_valid_endpoint) const noexcept;

/**
* @brief Add callback to be called when discovering an Endpoint
*
Expand Down
Loading

0 comments on commit 47856ff

Please sign in to comment.