Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manually fix a set of Topic QoS, and implement the max-tx-rate #86

Merged
merged 23 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions ddsrecorder/src/cpp/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,7 @@ std::unique_ptr<eprosima::utils::event::FileWatcherHandler> create_filewatcher(
try
{
eprosima::ddsrecorder::yaml::RecorderConfiguration new_configuration(file_path);
// Create new allowed topics list
auto new_allowed_topics = std::make_shared<core::AllowedTopicList>(
new_configuration.allowlist,
new_configuration.blocklist);
recorder->reload_allowed_topics(new_allowed_topics);
recorder->reload_configuration(new_configuration);
}
catch (const std::exception& e)
{
Expand Down Expand Up @@ -102,11 +98,7 @@ std::unique_ptr<eprosima::utils::event::PeriodicEventHandler> create_periodic_ha
try
{
eprosima::ddsrecorder::yaml::RecorderConfiguration new_configuration(file_path);
// Create new allowed topics list
auto new_allowed_topics = std::make_shared<core::AllowedTopicList>(
new_configuration.allowlist,
new_configuration.blocklist);
recorder->reload_allowed_topics(new_allowed_topics);
recorder->reload_configuration(new_configuration);
}
catch (const std::exception& e)
{
Expand Down
17 changes: 5 additions & 12 deletions ddsrecorder/src/cpp/tool/DdsRecorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,6 @@ DdsRecorder::DdsRecorder(
const DdsRecorderStateCode& init_state,
const std::string& file_name)
{
// Create allowed topics list
auto allowed_topics = std::make_shared<AllowedTopicList>(
configuration.allowlist,
configuration.blocklist);

// Create Discovery Database
discovery_database_ =
std::make_shared<DiscoveryDatabase>();
Expand Down Expand Up @@ -115,19 +110,17 @@ DdsRecorder::DdsRecorder(

// Create DDS Pipe
pipe_ = std::make_unique<DdsPipe>(
allowed_topics,
configuration.ddspipe_configuration,
discovery_database_,
payload_pool_,
participants_database_,
thread_pool_,
configuration.builtin_topics,
true);
thread_pool_);
}

utils::ReturnCode DdsRecorder::reload_allowed_topics(
const std::shared_ptr<AllowedTopicList>& allowed_topics)
utils::ReturnCode DdsRecorder::reload_configuration(
const yaml::RecorderConfiguration& new_configuration)
{
return pipe_->reload_allowed_topics(allowed_topics);
return pipe_->reload_configuration(new_configuration.ddspipe_configuration);
}

void DdsRecorder::start()
Expand Down
4 changes: 2 additions & 2 deletions ddsrecorder/src/cpp/tool/DdsRecorder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ class DdsRecorder
* @return \c RETCODE_OK if allowed topics list has been updated correctly
* @return \c RETCODE_NO_DATA if new allowed topics list is the same as the previous one
*/
utils::ReturnCode reload_allowed_topics(
const std::shared_ptr<ddspipe::core::AllowedTopicList>& allowed_topics);
utils::ReturnCode reload_configuration(
const yaml::RecorderConfiguration& new_configuration);

//! Start recorder (\c mcap_handler_)
void start();
Expand Down
4 changes: 2 additions & 2 deletions ddsrecorder/test/blackbox/mcap/McapFileCreationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@ std::unique_ptr<DdsRecorder> create_recorder(
YAML::Node yml;

eprosima::ddsrecorder::yaml::RecorderConfiguration configuration(yml);
configuration.downsampling = downsampling;
configuration.topic_qos.downsampling = downsampling;
// Set default value for downsampling
// TODO: Change mechanism setting topic qos' default values from specs
eprosima::ddspipe::core::types::TopicQoS::default_downsampling.store(downsampling);
eprosima::ddspipe::core::types::TopicQoS::default_topic_qos.set_value(configuration.topic_qos);
configuration.event_window = event_window;
eprosima::ddspipe::core::types::DomainId domainId;
domainId.domain_id = test::DOMAIN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ class McapReaderParticipant : public ddspipe::core::IParticipant
DDSRECORDER_PARTICIPANTS_DllAPI
bool is_rtps_kind() const noexcept override;

//! Override topic_qos() IParticipant method
DDSRECORDER_PARTICIPANTS_DllAPI
ddspipe::core::types::TopicQoS topic_qos() const noexcept override;

//! Override create_writer_() IParticipant method
DDSRECORDER_PARTICIPANTS_DllAPI
std::shared_ptr<ddspipe::core::IWriter> create_writer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ bool McapReaderParticipant::is_rtps_kind() const noexcept
return false;
}

TopicQoS McapReaderParticipant::topic_qos() const noexcept
{
return configuration_->topic_qos;
}

std::shared_ptr<IWriter> McapReaderParticipant::create_writer(
const ITopic& /* topic */)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

#include <cpp_utils/memory/Heritable.hpp>

#include <ddspipe_core/configuration/DdsPipeConfiguration.hpp>
#include <ddspipe_core/types/dds/TopicQoS.hpp>
#include <ddspipe_core/types/topic/dds/DistributedTopic.hpp>
#include <ddspipe_core/types/topic/filter/IFilterTopic.hpp>

Expand Down Expand Up @@ -52,15 +54,13 @@ class DDSRECORDER_YAML_DllAPI RecorderConfiguration
RecorderConfiguration(
const std::string& file_path);

// DDS Pipe Configuration
ddspipe::core::DdsPipeConfiguration ddspipe_configuration;

// Participants configurations
std::shared_ptr<ddspipe::participants::SimpleParticipantConfiguration> simple_configuration;
std::shared_ptr<ddspipe::participants::ParticipantConfiguration> recorder_configuration;

// Topic filtering
std::set<utils::Heritable<ddspipe::core::types::IFilterTopic>> allowlist{};
std::set<utils::Heritable<ddspipe::core::types::IFilterTopic>> blocklist{};
std::set<utils::Heritable<ddspipe::core::types::DistributedTopic>> builtin_topics{};

// Output file params
std::string output_filepath = ".";
std::string output_filename = "output";
Expand All @@ -71,8 +71,6 @@ class DDSRECORDER_YAML_DllAPI RecorderConfiguration
unsigned int buffer_size = 100;
unsigned int event_window = 20;
bool log_publish_time = false;
unsigned int downsampling = 1;
float max_reception_rate = 0;
bool only_with_type = false;
mcap::McapWriterOptions mcap_writer_options{"ros2"};
bool record_types = true;
Expand All @@ -86,9 +84,9 @@ class DDSRECORDER_YAML_DllAPI RecorderConfiguration

// Specs
unsigned int n_threads = 12;
unsigned int max_history_depth = 5000;
int max_pending_samples = 5000; // -1 <-> no limit || 0 <-> no pending samples
unsigned int cleanup_period;
ddspipe::core::types::TopicQoS topic_qos{};

protected:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include <cpp_utils/time/time_utils.hpp>
#include <cpp_utils/types/Fuzzy.hpp>

#include <ddspipe_core/configuration/DdsPipeConfiguration.hpp>
#include <ddspipe_core/types/dds/TopicQoS.hpp>
#include <ddspipe_core/types/topic/dds/DistributedTopic.hpp>
#include <ddspipe_core/types/topic/filter/IFilterTopic.hpp>

Expand Down Expand Up @@ -52,15 +54,13 @@ class DDSRECORDER_YAML_DllAPI ReplayerConfiguration
ReplayerConfiguration(
const std::string& file_path);

// DDS Pipe Configuration
ddspipe::core::DdsPipeConfiguration ddspipe_configuration;

// Participants configurations
std::shared_ptr<ddsrecorder::participants::McapReaderParticipantConfiguration> mcap_reader_configuration;
std::shared_ptr<ddspipe::participants::SimpleParticipantConfiguration> replayer_configuration;

// Topic filtering
std::set<utils::Heritable<ddspipe::core::types::IFilterTopic>> allowlist{};
std::set<utils::Heritable<ddspipe::core::types::IFilterTopic>> blocklist{};
std::set<utils::Heritable<ddspipe::core::types::DistributedTopic>> builtin_topics{};

// Replay params
std::string input_file;
utils::Fuzzy<utils::Timestamp> begin_time{};
Expand All @@ -71,7 +71,7 @@ class DDSRECORDER_YAML_DllAPI ReplayerConfiguration

// Specs
unsigned int n_threads = 12;
unsigned int max_history_depth = 5000;
ddspipe::core::types::TopicQoS topic_qos{};

protected:

Expand Down
57 changes: 28 additions & 29 deletions ddsrecorder_yaml/src/cpp/recorder/YamlReaderConfiguration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <cpp_utils/utils.hpp>

#include <ddspipe_core/types/dynamic_types/types.hpp>
#include <ddspipe_core/types/topic/filter/ManualTopic.hpp>
#include <ddspipe_core/types/topic/filter/WildcardDdsFilterTopic.hpp>
#include <ddspipe_participants/types/address/Address.hpp>

Expand Down Expand Up @@ -115,11 +116,16 @@ void RecorderConfiguration::load_ddsrecorder_configuration_(
WildcardDdsFilterTopic rpc_request_topic, rpc_response_topic;
rpc_request_topic.topic_name.set_value("rq/*");
rpc_response_topic.topic_name.set_value("rr/*");
blocklist.insert(

ddspipe_configuration.blocklist.insert(
utils::Heritable<WildcardDdsFilterTopic>::make_heritable(rpc_request_topic));
blocklist.insert(

ddspipe_configuration.blocklist.insert(
utils::Heritable<WildcardDdsFilterTopic>::make_heritable(rpc_response_topic));

// The DDS Pipe should be enabled on start up.
ddspipe_configuration.init_enabled = true;

// Initialize controller domain with the same as the one being recorded
// WARNING: dds tag must have been parsed beforehand
controller_domain = simple_configuration->domain;
Expand Down Expand Up @@ -198,24 +204,6 @@ void RecorderConfiguration::load_recorder_configuration_(
log_publish_time = YamlReader::get<bool>(yml, RECORDER_LOG_PUBLISH_TIME_TAG, version);
}

/////
// Get optional downsampling
if (YamlReader::is_tag_present(yml, DOWNSAMPLING_TAG))
{
downsampling = YamlReader::get_positive_int(yml, DOWNSAMPLING_TAG);
// Set default value for downsampling
TopicQoS::default_downsampling.store(downsampling);
}

/////
// Get optional max reception rate
if (YamlReader::is_tag_present(yml, MAX_RECEPTION_RATE_TAG))
{
// Set default value for max reception rate
TopicQoS::default_max_reception_rate.store(YamlReader::get_nonnegative_float(yml,
MAX_RECEPTION_RATE_TAG));
}

/////
// Get optional only_with_type
if (YamlReader::is_tag_present(yml, RECORDER_ONLY_WITH_TYPE_TAG))
Expand Down Expand Up @@ -290,12 +278,12 @@ void RecorderConfiguration::load_specs_configuration_(
n_threads = YamlReader::get_positive_int(yml, NUMBER_THREADS_TAG);
}

// Get maximum history depth
if (YamlReader::is_tag_present(yml, MAX_HISTORY_DEPTH_TAG))
/////
// Get optional Topic QoS
if (YamlReader::is_tag_present(yml, SPECS_QOS_TAG))
{
max_history_depth = YamlReader::get_positive_int(yml, MAX_HISTORY_DEPTH_TAG);
// Set default value for history
TopicQoS::default_history_depth.store(max_history_depth);
YamlReader::fill<TopicQoS>(topic_qos, YamlReader::get_value_in_tag(yml, SPECS_QOS_TAG), version);
TopicQoS::default_topic_qos.set_value(topic_qos);
}

// Get max pending samples
Expand Down Expand Up @@ -363,28 +351,39 @@ void RecorderConfiguration::load_dds_configuration_(
// Get optional allowlist
if (YamlReader::is_tag_present(yml, ALLOWLIST_TAG))
{
allowlist = YamlReader::get_set<utils::Heritable<IFilterTopic>>(yml, ALLOWLIST_TAG, version);
ddspipe_configuration.allowlist = YamlReader::get_set<utils::Heritable<IFilterTopic>>(yml, ALLOWLIST_TAG,
version);

// Add to allowlist always the type object topic
WildcardDdsFilterTopic internal_topic;
internal_topic.topic_name.set_value(TYPE_OBJECT_TOPIC_NAME);
allowlist.insert(
ddspipe_configuration.allowlist.insert(
utils::Heritable<WildcardDdsFilterTopic>::make_heritable(internal_topic));
}

/////
// Get optional blocklist
if (YamlReader::is_tag_present(yml, BLOCKLIST_TAG))
{
blocklist = YamlReader::get_set<utils::Heritable<IFilterTopic>>(yml, BLOCKLIST_TAG, version);
ddspipe_configuration.blocklist = YamlReader::get_set<utils::Heritable<IFilterTopic>>(yml, BLOCKLIST_TAG,
version);
}

/////
// Get optional topics
if (YamlReader::is_tag_present(yml, TOPICS_TAG))
{
const auto& manual_topics = YamlReader::get_list<ManualTopic>(yml, TOPICS_TAG, version);
ddspipe_configuration.manual_topics =
std::vector<ManualTopic>(manual_topics.begin(), manual_topics.end());
}

/////
// Get optional builtin topics
if (YamlReader::is_tag_present(yml, BUILTIN_TAG))
{
// WARNING: Parse builtin topics AFTER specs and recorder, as some topic-specific default values are set there
builtin_topics = YamlReader::get_set<utils::Heritable<DistributedTopic>>(yml, BUILTIN_TAG,
ddspipe_configuration.builtin_topics = YamlReader::get_set<utils::Heritable<DistributedTopic>>(yml, BUILTIN_TAG,
version);
}
}
Expand Down
Loading
Loading