diff --git a/test/performance/video/CMakeLists.txt b/test/performance/video/CMakeLists.txt index 6c974fc9a16..186df42e449 100644 --- a/test/performance/video/CMakeLists.txt +++ b/test/performance/video/CMakeLists.txt @@ -112,6 +112,7 @@ if(GST_FOUND) fastcdr foonathan_memory fastdds::optionparser + GTest::gtest ${CMAKE_THREAD_LIBS_INIT} ${CMAKE_DL_LIBS} ${GST_LIBRARIES} @@ -128,6 +129,7 @@ if(GST_FOUND) fastcdr foonathan_memory fastdds::optionparser + GTest::gtest ${CMAKE_THREAD_LIBS_INIT} ${CMAKE_DL_LIBS} ${GST_LIBRARIES} diff --git a/test/performance/video/VideoTestPublisher.cpp b/test/performance/video/VideoTestPublisher.cpp index 52eadbcecb5..7f2456e7252 100644 --- a/test/performance/video/VideoTestPublisher.cpp +++ b/test/performance/video/VideoTestPublisher.cpp @@ -23,15 +23,18 @@ #include #include +#include +#include #include #include #include +#include #define TIME_LIMIT_US 10000 using namespace eprosima; using namespace eprosima::fastrtps; -using namespace eprosima::fastrtps::rtps; +using namespace eprosima::fastdds::dds; using std::cout; using std::endl; @@ -90,17 +93,56 @@ VideoTestPublisher::~VideoTestPublisher() pipeline = nullptr; } - Domain::removeParticipant(mp_participant); + if (mp_participant != nullptr) + { + if (mp_commandsub) + { + if (mp_dr) + { + mp_commandsub->delete_datareader(mp_dr); + } + mp_participant->delete_subscriber(mp_commandsub); + } + if (mp_datapub) + { + if (mp_data_dw) + { + mp_datapub->delete_datawriter(mp_data_dw); + } + mp_participant->delete_publisher(mp_datapub); + } + if (mp_commandpub) + { + if (mp_command_dw) + { + mp_commandpub->delete_datawriter(mp_command_dw); + } + mp_participant->delete_publisher(mp_commandpub); + } + if (mp_command_sub_topic) + { + mp_participant->delete_topic(mp_command_sub_topic); + } + if (mp_video_topic) + { + mp_participant->delete_topic(mp_video_topic); + } + if (mp_command_pub_topic) + { + mp_participant->delete_topic(mp_command_pub_topic); + } + DomainParticipantFactory::get_instance()->delete_participant(mp_participant); + } } -bool VideoTestPublisher::init( +void VideoTestPublisher::init( int n_sub, int n_sam, bool reliable, uint32_t pid, bool hostname, - const PropertyPolicy& part_property_policy, - const PropertyPolicy& property_policy, + const eprosima::fastrtps::rtps::PropertyPolicy& part_property_policy, + const eprosima::fastrtps::rtps::PropertyPolicy& property_policy, bool large_data, const std::string& sXMLConfigFile, int test_time, @@ -127,146 +169,169 @@ bool VideoTestPublisher::init( // GSTREAMER PIPELINE INITIALIZATION. InitGStreamer(); - // Create RTPSParticipant + // Create Participant std::string participant_profile_name = "pub_participant_profile"; - ParticipantAttributes PParam; - if (m_forcedDomain >= 0) - { - PParam.domainId = m_forcedDomain; - } - else - { - PParam.domainId = pid % 230; - } - PParam.rtps.properties = part_property_policy; - PParam.rtps.setName("video_test_publisher"); + DomainParticipantQos participant_qos; + + participant_qos.properties(part_property_policy); + + participant_qos.name("video_test_publisher"); if (m_sXMLConfigFile.length() > 0) { if (m_forcedDomain >= 0) { - ParticipantAttributes participant_att; - if (eprosima::fastrtps::xmlparser::XMLP_ret::XML_OK == - eprosima::fastrtps::xmlparser::XMLProfileManager::fillParticipantAttributes(participant_profile_name, - participant_att)) - { - participant_att.domainId = m_forcedDomain; - mp_participant = Domain::createParticipant(participant_att); - } + mp_participant = DomainParticipantFactory::get_instance()->create_participant_with_profile(m_forcedDomain, + participant_profile_name); } else { - mp_participant = Domain::createParticipant(participant_profile_name); + mp_participant = DomainParticipantFactory::get_instance()->create_participant_with_profile( + participant_profile_name); } } else { - mp_participant = Domain::createParticipant(PParam); + if (m_forcedDomain >= 0) + { + mp_participant = DomainParticipantFactory::get_instance()->create_participant( + m_forcedDomain, participant_qos); + } + else + { + mp_participant = DomainParticipantFactory::get_instance()->create_participant( + pid % 230, participant_qos); + } } - if (mp_participant == nullptr) - { - return false; - } + ASSERT_NE(mp_participant, nullptr); // Register the type - Domain::registerType(mp_participant, (TopicDataType*)&video_t); - Domain::registerType(mp_participant, (TopicDataType*)&command_t); - + TypeSupport type_video; + TypeSupport type_command; + type_video.reset(new VideoDataType()); + type_command.reset(new TestCommandDataType()); + ASSERT_EQ(mp_participant->register_type(type_video), ReturnCode_t::RETCODE_OK); + ASSERT_EQ(mp_participant->register_type(type_command), ReturnCode_t::RETCODE_OK); // Create Data Publisher std::string profile_name = "publisher_profile"; - PublisherAttributes PubDataparam; - if (!reliable) + if (m_sXMLConfigFile.length() > 0) { - PubDataparam.qos.m_reliability.kind = BEST_EFFORT_RELIABILITY_QOS; + mp_datapub = mp_participant->create_publisher_with_profile(profile_name); } - PubDataparam.properties = property_policy; - if (large_data) + else { - PubDataparam.historyMemoryPolicy = eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; - PubDataparam.qos.m_publishMode.kind = eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE; + mp_datapub = mp_participant->create_publisher(PUBLISHER_QOS_DEFAULT); } + ASSERT_NE(mp_datapub, nullptr); + ASSERT_TRUE(mp_datapub->is_enabled()); - if (m_sXMLConfigFile.length() > 0) + // Create topic + std::ostringstream video_topic_name; + video_topic_name << "VideoTest_"; + if (hostname) { - eprosima::fastrtps::xmlparser::XMLProfileManager::fillPublisherAttributes(profile_name, PubDataparam); + video_topic_name << asio::ip::host_name() << "_"; } + video_topic_name << pid << "_PUB2SUB"; + mp_video_topic = mp_participant->create_topic(video_topic_name.str(), + "VideoType", TOPIC_QOS_DEFAULT); + ASSERT_NE(mp_video_topic, nullptr); + ASSERT_TRUE(mp_video_topic->is_enabled()); - PubDataparam.topic.topicDataType = "VideoType"; - PubDataparam.topic.topicKind = NO_KEY; - std::ostringstream pt; - pt << "VideoTest_"; - if (hostname) + // Create Data DataWriter + if (reliable_) { - pt << asio::ip::host_name() << "_"; + datawriter_qos_data.reliability().kind = eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS; } - pt << pid << "_PUB2SUB"; - PubDataparam.topic.topicName = pt.str(); - PubDataparam.times.heartbeatPeriod.seconds = 0; - PubDataparam.times.heartbeatPeriod.nanosec = 100000000; + else + { + datawriter_qos_data.reliability().kind = eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS; + } + + datawriter_qos_data.properties(property_policy); - mp_datapub = Domain::createPublisher(mp_participant, PubDataparam, (PublisherListener*)&this->m_datapublistener); - if (mp_datapub == nullptr) + if (large_data) { - return false; + datawriter_qos_data.endpoint().history_memory_policy = + eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + datawriter_qos_data.publish_mode().kind = PublishModeQosPolicyKind::ASYNCHRONOUS_PUBLISH_MODE; } + datawriter_qos_data.reliable_writer_qos().times.heartbeatPeriod.seconds = 0; + datawriter_qos_data.reliable_writer_qos().times.heartbeatPeriod.nanosec = 100000000; + + mp_data_dw = mp_datapub->create_datawriter(mp_video_topic, datawriter_qos_data, &this->m_datapublistener); + ASSERT_NE(mp_data_dw, nullptr); + ASSERT_TRUE(mp_data_dw->is_enabled()); + // Create Command Publisher - PublisherAttributes PubCommandParam; - PubCommandParam.topic.topicDataType = "TestCommandType"; - PubCommandParam.topic.topicKind = NO_KEY; - std::ostringstream pct; - pct << "VideoTest_Command_"; + mp_commandpub = mp_participant->create_publisher(PUBLISHER_QOS_DEFAULT); + + // Create topic + std::ostringstream command_topic_name; + command_topic_name << "VideoTest_Command_"; if (hostname) { - pct << asio::ip::host_name() << "_"; - } - pct << pid << "_PUB2SUB"; - PubCommandParam.topic.topicName = pct.str(); - PubCommandParam.topic.historyQos.kind = KEEP_ALL_HISTORY_QOS; - PubCommandParam.qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS; - PubCommandParam.qos.m_durability.kind = TRANSIENT_LOCAL_DURABILITY_QOS; - PubCommandParam.qos.m_publishMode.kind = eprosima::fastrtps::SYNCHRONOUS_PUBLISH_MODE; - mp_commandpub = Domain::createPublisher(mp_participant, PubCommandParam, &this->m_commandpublistener); - - if (mp_commandpub == nullptr) - { - return false; + command_topic_name << asio::ip::host_name() << "_"; } + command_topic_name << pid << "_PUB2SUB"; + + mp_command_pub_topic = mp_participant->create_topic(command_topic_name.str(), + "TestCommandType", TOPIC_QOS_DEFAULT); + ASSERT_NE(mp_command_pub_topic, nullptr); + ASSERT_TRUE(mp_command_pub_topic->is_enabled()); + + //Create Command DataWriter + datawriter_qos_cmd.history().kind = eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS; + datawriter_qos_cmd.reliability().kind = eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS; + datawriter_qos_cmd.durability().kind = eprosima::fastdds::dds::TRANSIENT_LOCAL_DURABILITY_QOS; + datawriter_qos_cmd.publish_mode().kind = PublishModeQosPolicyKind::SYNCHRONOUS_PUBLISH_MODE; + + mp_command_dw = mp_commandpub->create_datawriter(mp_command_pub_topic, datawriter_qos_cmd, + &this->m_commandpublistener); + ASSERT_NE(mp_command_dw, nullptr); + ASSERT_TRUE(mp_command_dw->is_enabled()); + - SubscriberAttributes SubCommandParam; - SubCommandParam.topic.topicDataType = "TestCommandType"; - SubCommandParam.topic.topicKind = NO_KEY; - std::ostringstream sct; - sct << "VideoTest_Command_"; + // Create subscriber + mp_commandsub = mp_participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT); + ASSERT_NE(mp_commandsub, nullptr); + ASSERT_TRUE(mp_commandsub->is_enabled()); + + // Create topic + std::ostringstream sub_topic_name; + sub_topic_name << "VideoTest_Command_"; if (hostname) { - sct << asio::ip::host_name() << "_"; - } - sct << pid << "_SUB2PUB"; - SubCommandParam.topic.topicName = sct.str(); - SubCommandParam.topic.historyQos.kind = KEEP_ALL_HISTORY_QOS; - SubCommandParam.qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS; - SubCommandParam.qos.m_durability.kind = TRANSIENT_LOCAL_DURABILITY_QOS; - - mp_commandsub = Domain::createSubscriber(mp_participant, SubCommandParam, &this->m_commandsublistener); - if (mp_commandsub == nullptr) - { - return false; + sub_topic_name << asio::ip::host_name() << "_"; } + sub_topic_name << pid << "_SUB2PUB"; + + mp_command_sub_topic = mp_participant->create_topic(sub_topic_name.str(), + "TestCommandType", eprosima::fastdds::dds::TOPIC_QOS_DEFAULT); + ASSERT_NE(mp_command_sub_topic, nullptr); + ASSERT_TRUE(mp_command_sub_topic->is_enabled()); + + // Create DataReader + datareader_qos.history().kind = eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS; + datareader_qos.reliability().kind = eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS; + datareader_qos.durability().kind = eprosima::fastdds::dds::TRANSIENT_LOCAL_DURABILITY_QOS; + mp_dr = mp_commandsub->create_datareader(mp_command_sub_topic, datareader_qos, &this->m_commandsublistener); + ASSERT_NE(mp_dr, nullptr); + ASSERT_TRUE(mp_dr->is_enabled()); - return true; } -void VideoTestPublisher::DataPubListener::onPublicationMatched( - Publisher* /*pub*/, - MatchingInfo& info) +void VideoTestPublisher::DataPubListener::on_publication_matched( + eprosima::fastdds::dds::DataWriter* /*datawriter*/, + const eprosima::fastdds::dds::PublicationMatchedStatus& info) { std::unique_lock lock(mp_up->mutex_); - if (info.status == MATCHED_MATCHING) + if (info.current_count_change > 0) { cout << C_MAGENTA << "Data Pub Matched " << C_DEF << endl; @@ -288,13 +353,13 @@ void VideoTestPublisher::DataPubListener::onPublicationMatched( mp_up->disc_cond_.notify_one(); } -void VideoTestPublisher::CommandPubListener::onPublicationMatched( - Publisher* /*pub*/, - MatchingInfo& info) +void VideoTestPublisher::CommandPubListener::on_publication_matched( + eprosima::fastdds::dds::DataWriter* /*datawriter*/, + const eprosima::fastdds::dds::PublicationMatchedStatus& info) { std::unique_lock lock(mp_up->mutex_); - if (info.status == MATCHED_MATCHING) + if (info.current_count_change > 0) { cout << C_MAGENTA << "Command Pub Matched " << C_DEF << endl; @@ -316,12 +381,12 @@ void VideoTestPublisher::CommandPubListener::onPublicationMatched( mp_up->disc_cond_.notify_one(); } -void VideoTestPublisher::CommandSubListener::onSubscriptionMatched( - Subscriber* /*sub*/, - MatchingInfo& info) +void VideoTestPublisher::CommandSubListener::on_subscription_matched( + eprosima::fastdds::dds::DataReader* /*datareader*/, + const eprosima::fastdds::dds::SubscriptionMatchedStatus& info) { std::unique_lock lock(mp_up->mutex_); - if (info.status == MATCHED_MATCHING) + if (info.current_count_change > 0) { cout << C_MAGENTA << "Command Sub Matched " << C_DEF << endl; @@ -343,20 +408,19 @@ void VideoTestPublisher::CommandSubListener::onSubscriptionMatched( mp_up->disc_cond_.notify_one(); } -void VideoTestPublisher::CommandSubListener::onNewDataMessage( - Subscriber* subscriber) +void VideoTestPublisher::CommandSubListener::on_data_available( + eprosima::fastdds::dds::DataReader* datareader) { + ASSERT_NE(datareader, nullptr); + TestCommandType command; - SampleInfo_t info; - // cout << "COMMAND RECEIVED"<takeNextData((void*)&command, &info)) + eprosima::fastdds::dds::SampleInfo info; + if (ReturnCode_t::RETCODE_OK == datareader->take_next_sample((void*)&command, &info)) { - if (info.sampleKind == ALIVE) + if (info.valid_data) { - //cout << "ALIVE "<mutex_.lock(); ++mp_up->comm_count_; mp_up->mutex_.unlock(); @@ -384,17 +448,6 @@ void VideoTestPublisher::run() cout << C_B_MAGENTA << "DISCOVERY COMPLETE " << C_DEF << endl; this->test(0); - - cout << "REMOVING PUBLISHER" << endl; - Domain::removePublisher(this->mp_commandpub); - cout << "REMOVING SUBSCRIBER" << endl; - Domain::removeSubscriber(mp_commandsub); - - std::string str_reliable = "besteffort"; - if (reliable_) - { - str_reliable = "reliable"; - } } bool VideoTestPublisher::test( @@ -408,7 +461,7 @@ bool VideoTestPublisher::test( // Send READY command TestCommandType command; command.m_command = READY; - mp_commandpub->write(&command); + mp_command_dw->write(&command); std::unique_lock lock(mutex_); // Wait for all the subscribers @@ -438,7 +491,7 @@ bool VideoTestPublisher::test( // Send STOP command to subscriber command.m_command = STOP; - mp_commandpub->write(&command); + mp_command_dw->write(&command); // Wait until all subscribers unmatch disc_cond_.wait(lock, [&]() @@ -456,7 +509,7 @@ bool VideoTestPublisher::test( // Clean up size_t removed = 0; - mp_datapub->removeAllChange(&removed); + mp_data_dw->clear_history(&removed); delete(mp_video_out); return true; @@ -496,7 +549,6 @@ void VideoTestPublisher::InitGStreamer() // Link the camera source and colorspace filter using capabilities specified gst_bin_add_many(GST_BIN(pipeline), filesrc, videorate, sink, NULL); - //ok = gst_element_link_filtered(filesrc, sink, caps) == TRUE; ok = gst_element_link_filtered(filesrc, videorate, caps) == TRUE; ok = gst_element_link_filtered(videorate, sink, caps) == TRUE; gst_caps_unref(caps); @@ -541,7 +593,7 @@ GstFlowReturn VideoTestPublisher::new_sample( if (rand() % 100 > sub->m_dropRate) { - if (!sub->mp_datapub->write((void*)sub->mp_video_out)) + if (!sub->mp_data_dw->write((void*)sub->mp_video_out)) { std::cout << "VideoPublication::run -> Cannot write video" << std::endl; } diff --git a/test/performance/video/VideoTestPublisher.hpp b/test/performance/video/VideoTestPublisher.hpp index 8f0934cce2d..659d608e428 100644 --- a/test/performance/video/VideoTestPublisher.hpp +++ b/test/performance/video/VideoTestPublisher.hpp @@ -21,102 +21,162 @@ #define VIDEOPUBLISHER_H_ #include +#include +#include #include "VideoTestTypes.hpp" #include #include #include -#include -#include class VideoTestPublisher { +public: + + VideoTestPublisher(); + virtual ~VideoTestPublisher(); + + eprosima::fastdds::dds::DomainParticipant* mp_participant; + eprosima::fastdds::dds::Publisher* mp_datapub; + eprosima::fastdds::dds::Publisher* mp_commandpub; + eprosima::fastdds::dds::Subscriber* mp_commandsub; + eprosima::fastdds::dds::Topic* mp_video_topic; + eprosima::fastdds::dds::Topic* mp_command_pub_topic; + eprosima::fastdds::dds::Topic* mp_command_sub_topic; + VideoType* mp_video_out; + std::chrono::steady_clock::time_point t_start_; + int n_subscribers; + unsigned int n_samples; + std::mutex mutex_; + int disc_count_; + std::condition_variable disc_cond_; + int comm_count_; + std::condition_variable comm_cond_; + bool timer_on_; + std::chrono::steady_clock::time_point send_start_; + std::condition_variable timer_cond_; + int m_status; + unsigned int n_received; + void init( + int n_sub, + int n_sam, + bool reliable, + uint32_t pid, + bool hostname, + const eprosima::fastrtps::rtps::PropertyPolicy& part_property_policy, + const eprosima::fastrtps::rtps::PropertyPolicy& property_policy, + bool large_data, + const std::string& sXMLConfigFile, + int test_time, + int drop_rate, + int max_sleep_time, + int forced_domain, + int video_width, + int video_height, + int frame_rate); + void run(); + bool test( + uint32_t datasize); + + class DataPubListener : public eprosima::fastdds::dds::DataWriterListener + { + public: + + DataPubListener( + VideoTestPublisher* up) + : mp_up(up) + , n_matched(0) + { + } + + ~DataPubListener() + { + } + + void on_publication_matched( + eprosima::fastdds::dds::DataWriter* /*datawriter*/, + const eprosima::fastdds::dds::PublicationMatchedStatus& info) override; + VideoTestPublisher* mp_up; + int n_matched; + } + m_datapublistener; + + class CommandPubListener : public eprosima::fastdds::dds::DataWriterListener + { public: - VideoTestPublisher(); - virtual ~VideoTestPublisher(); - - eprosima::fastrtps::Participant* mp_participant; - eprosima::fastrtps::Publisher* mp_datapub; - eprosima::fastrtps::Publisher* mp_commandpub; - eprosima::fastrtps::Subscriber* mp_commandsub; - VideoType* mp_video_out; - std::chrono::steady_clock::time_point t_start_; - int n_subscribers; - unsigned int n_samples; - eprosima::fastrtps::SampleInfo_t m_sampleinfo; - std::mutex mutex_; - int disc_count_; - std::condition_variable disc_cond_; - int comm_count_; - std::condition_variable comm_cond_; - bool timer_on_; - std::chrono::steady_clock::time_point send_start_; - std::condition_variable timer_cond_; - int m_status; - unsigned int n_received; - bool init(int n_sub, int n_sam, bool reliable, uint32_t pid, bool hostname, - const eprosima::fastrtps::rtps::PropertyPolicy& part_property_policy, - const eprosima::fastrtps::rtps::PropertyPolicy& property_policy, bool large_data, - const std::string& sXMLConfigFile, int test_time, int drop_rate, int max_sleep_time, - int forced_domain, int video_width, int video_height, int frame_rate); - void run(); - bool test(uint32_t datasize); - - class DataPubListener : public eprosima::fastrtps::PublisherListener + + CommandPubListener( + VideoTestPublisher* up) + : mp_up(up) + , n_matched(0) + { + } + + ~CommandPubListener() { - public: - DataPubListener(VideoTestPublisher* up):mp_up(up),n_matched(0){} - ~DataPubListener(){} - void onPublicationMatched(eprosima::fastrtps::Publisher* pub, - eprosima::fastrtps::rtps::MatchingInfo& info); - VideoTestPublisher* mp_up; - int n_matched; - } m_datapublistener; - - class CommandPubListener : public eprosima::fastrtps::PublisherListener + } + + void on_publication_matched( + eprosima::fastdds::dds::DataWriter* /*datawriter*/, + const eprosima::fastdds::dds::PublicationMatchedStatus& info) override; + VideoTestPublisher* mp_up; + int n_matched; + } + m_commandpublistener; + + class CommandSubListener : public eprosima::fastdds::dds::DataReaderListener + { + public: + + CommandSubListener( + VideoTestPublisher* up) + : mp_up(up) + , n_matched(0) { - public: - CommandPubListener(VideoTestPublisher* up):mp_up(up),n_matched(0){} - ~CommandPubListener(){} - void onPublicationMatched(eprosima::fastrtps::Publisher* pub, - eprosima::fastrtps::rtps::MatchingInfo& info); - VideoTestPublisher* mp_up; - int n_matched; - } m_commandpublistener; - - class CommandSubListener : public eprosima::fastrtps::SubscriberListener + } + + ~CommandSubListener() { - public: - CommandSubListener(VideoTestPublisher* up):mp_up(up),n_matched(0){} - ~CommandSubListener(){} - void onSubscriptionMatched(eprosima::fastrtps::Subscriber* sub, - eprosima::fastrtps::rtps::MatchingInfo& into); - void onNewDataMessage(eprosima::fastrtps::Subscriber* sub); - VideoTestPublisher* mp_up; - int n_matched; - } m_commandsublistener; - - VideoDataType video_t; - TestCommandDataType command_t; - std::string m_sXMLConfigFile; - bool reliable_; - - GstElement* pipeline; - GstElement* filesrc; - GstElement* videorate; - GstElement* sink; - int m_testTime; - int m_dropRate; - int m_sendSleepTime; - int m_forcedDomain; - int m_videoWidth; - int m_videoHeight; - int m_videoFrameRate; + } + + void on_subscription_matched( + eprosima::fastdds::dds::DataReader* /*datareader*/, + const eprosima::fastdds::dds::SubscriptionMatchedStatus& info) override; + void on_data_available( + eprosima::fastdds::dds::DataReader* datareader) override; + VideoTestPublisher* mp_up; + int n_matched; + } + m_commandsublistener; + + std::string m_sXMLConfigFile; + bool reliable_; + + GstElement* pipeline; + GstElement* filesrc; + GstElement* videorate; + GstElement* sink; + int m_testTime; + int m_dropRate; + int m_sendSleepTime; + int m_forcedDomain; + int m_videoWidth; + int m_videoHeight; + int m_videoFrameRate; + protected: - void InitGStreamer(); - static GstFlowReturn new_sample(GstElement *sink, VideoTestPublisher *sub); + void InitGStreamer(); + static GstFlowReturn new_sample( + GstElement* sink, + VideoTestPublisher* sub); + eprosima::fastdds::dds::DataReaderQos datareader_qos; + eprosima::fastdds::dds::DataWriterQos datawriter_qos_cmd; + eprosima::fastdds::dds::DataWriterQos datawriter_qos_data; + eprosima::fastdds::dds::DataWriter* mp_data_dw; + eprosima::fastdds::dds::DataReader* mp_dr; + eprosima::fastdds::dds::DataWriter* mp_command_dw; }; diff --git a/test/performance/video/VideoTestSubscriber.cpp b/test/performance/video/VideoTestSubscriber.cpp index c2ef229abec..59762423270 100644 --- a/test/performance/video/VideoTestSubscriber.cpp +++ b/test/performance/video/VideoTestSubscriber.cpp @@ -16,6 +16,7 @@ * @file VideoTestSubscriber.cpp * */ +#include "VideoTestSubscriber.hpp" #include #include @@ -25,12 +26,11 @@ #include #include #include - -#include "VideoTestSubscriber.hpp" +#include using namespace eprosima; +using namespace eprosima::fastdds::dds; using namespace eprosima::fastrtps; -using namespace eprosima::fastrtps::rtps; using std::cout; using std::endl; @@ -71,7 +71,38 @@ VideoTestSubscriber::~VideoTestSubscriber() { stop(); - Domain::removeParticipant(mp_participant); + if (mp_participant != nullptr) + { + if (mp_commandsub) + { + if (mp_commanhd_dr) + { + mp_commandsub->delete_datareader(mp_commanhd_dr); + } + mp_participant->delete_subscriber(mp_commandsub); + } + if (mp_datasub) + { + if (mp_data_dr) + { + mp_datasub->delete_datareader(mp_data_dr); + } + mp_participant->delete_subscriber(mp_datasub); + } + if (mp_commandpub) + { + if (mp_dw) + { + mp_commandpub->delete_datawriter(mp_dw); + } + mp_participant->delete_publisher(mp_commandpub); + } + if (mp_command_sub_topic) + { + mp_participant->delete_topic(mp_command_sub_topic); + } + DomainParticipantFactory::get_instance()->delete_participant(mp_participant); + } if (gmain_loop_ != nullptr) { @@ -81,20 +112,17 @@ VideoTestSubscriber::~VideoTestSubscriber() } gst_bin_remove_many(GST_BIN(pipeline), appsrc, videoconvert, sink, NULL); - //if (pipeline) gst_object_unref(GST_OBJECT(pipeline)), pipeline = nullptr; - //if (sink) gst_object_unref(GST_OBJECT(sink)), sink = nullptr; - //if (videoconvert) gst_object_unref(GST_OBJECT(videoconvert)), videoconvert = nullptr; - //if (appsrc) gst_object_unref(GST_OBJECT(appsrc)), appsrc = nullptr; + thread_.join(); } -bool VideoTestSubscriber::init( +void VideoTestSubscriber::init( int nsam, bool reliable, uint32_t pid, bool hostname, - const PropertyPolicy& part_property_policy, - const PropertyPolicy& property_policy, + const eprosima::fastrtps::rtps::PropertyPolicy& part_property_policy, + const eprosima::fastrtps::rtps::PropertyPolicy& property_policy, bool large_data, const std::string& sXMLConfigFile, bool export_csv, @@ -117,140 +145,161 @@ bool VideoTestSubscriber::init( InitGStreamer(); - // Create RTPSParticipant + // Create Participant std::string participant_profile_name = "sub_participant_profile"; - ParticipantAttributes PParam; + DomainParticipantQos participant_qos; - if (m_forcedDomain >= 0) - { - PParam.domainId = m_forcedDomain; - } - else - { - PParam.domainId = pid % 230; - } - PParam.rtps.setName("video_test_subscriber"); - PParam.rtps.properties = part_property_policy; + participant_qos.name("video_test_subscriber"); + + participant_qos.properties(part_property_policy); if (m_sXMLConfigFile.length() > 0) { if (m_forcedDomain >= 0) { - ParticipantAttributes participant_att; - if (eprosima::fastrtps::xmlparser::XMLP_ret::XML_OK == - eprosima::fastrtps::xmlparser::XMLProfileManager::fillParticipantAttributes(participant_profile_name, - participant_att)) - { - participant_att.domainId = m_forcedDomain; - mp_participant = Domain::createParticipant(participant_att); - } + mp_participant = DomainParticipantFactory::get_instance()->create_participant_with_profile(m_forcedDomain, + participant_profile_name); } else { - mp_participant = Domain::createParticipant(participant_profile_name); + mp_participant = DomainParticipantFactory::get_instance()->create_participant_with_profile( + participant_profile_name); } } else { - mp_participant = Domain::createParticipant(PParam); + if (m_forcedDomain >= 0) + { + mp_participant = DomainParticipantFactory::get_instance()->create_participant( + m_forcedDomain, participant_qos); + } + else + { + mp_participant = DomainParticipantFactory::get_instance()->create_participant( + pid % 230, participant_qos); + } } - if (mp_participant == nullptr) - { - return false; - } + ASSERT_NE(mp_participant, nullptr); - Domain::registerType(mp_participant, (TopicDataType*)&video_t); - Domain::registerType(mp_participant, (TopicDataType*)&command_t); + // Register the type + TypeSupport video_type; + TypeSupport command_type; + video_type.reset(new VideoDataType()); + command_type.reset(new TestCommandDataType()); + ASSERT_EQ(mp_participant->register_type(video_type), ReturnCode_t::RETCODE_OK); + ASSERT_EQ(mp_participant->register_type(command_type), ReturnCode_t::RETCODE_OK); - // Create Data subscriber + // Create Data Subscriber std::string profile_name = "subscriber_profile"; - SubscriberAttributes SubDataparam; - if (reliable) + if (m_sXMLConfigFile.length() > 0) { - SubDataparam.qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS; + mp_datasub = mp_participant->create_subscriber_with_profile(profile_name); } - SubDataparam.properties = property_policy; - if (large_data) + else { - SubDataparam.historyMemoryPolicy = eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + mp_datasub = mp_participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT); } + ASSERT_NE(mp_datasub, nullptr); + ASSERT_TRUE(mp_datasub->is_enabled()); - if (m_sXMLConfigFile.length() > 0) + // Create topic + std::ostringstream video_topic_name; + video_topic_name << "VideoTest_"; + if (hostname) { - eprosima::fastrtps::xmlparser::XMLProfileManager::fillSubscriberAttributes(profile_name, SubDataparam); + video_topic_name << asio::ip::host_name() << "_"; } + video_topic_name << pid << "_PUB2SUB"; + mp_video_topic = mp_participant->create_topic(video_topic_name.str(), + "VideoType", TOPIC_QOS_DEFAULT); + ASSERT_NE(mp_video_topic, nullptr); + ASSERT_TRUE(mp_video_topic->is_enabled()); - SubDataparam.topic.topicDataType = "VideoType"; - SubDataparam.topic.topicKind = NO_KEY; - std::ostringstream st; - st << "VideoTest_"; - if (hostname) + // Create data DataReader + if (m_bReliable) { - st << asio::ip::host_name() << "_"; + datareader_qos_data.reliability().kind = eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS; } - st << pid << "_PUB2SUB"; - SubDataparam.topic.topicName = st.str(); + else + { + datareader_qos_data.reliability().kind = eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS; + } + datareader_qos_data.properties(property_policy); - mp_datasub = Domain::createSubscriber(mp_participant, SubDataparam, &this->m_datasublistener); - if (mp_datasub == nullptr) + if (large_data) { - std::cout << "Cannot create data subscriber" << std::endl; - return false; + datareader_qos_data.endpoint().history_memory_policy = + eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; } + mp_data_dr = mp_datasub->create_datareader(mp_video_topic, datareader_qos_data, &this->m_datasublistener); + ASSERT_NE(mp_data_dr, nullptr); + ASSERT_TRUE(mp_data_dr->is_enabled()); + + // Create Command Publisher - PublisherAttributes PubCommandParam; - PubCommandParam.topic.topicDataType = "TestCommandType"; - PubCommandParam.topic.topicKind = NO_KEY; - std::ostringstream pct; - pct << "VideoTest_Command_"; + mp_commandpub = mp_participant->create_publisher(PUBLISHER_QOS_DEFAULT); + + // Create topic + std::ostringstream pub_cmd_topic_name; + pub_cmd_topic_name << "VideoTest_Command_"; if (hostname) { - pct << asio::ip::host_name() << "_"; + pub_cmd_topic_name << asio::ip::host_name() << "_"; } - pct << pid << "_SUB2PUB"; - PubCommandParam.topic.topicName = pct.str(); - PubCommandParam.topic.historyQos.kind = KEEP_ALL_HISTORY_QOS; - PubCommandParam.qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS; - PubCommandParam.qos.m_durability.kind = TRANSIENT_LOCAL_DURABILITY_QOS; + pub_cmd_topic_name << pid << "_SUB2PUB"; - mp_commandpub = Domain::createPublisher(mp_participant, PubCommandParam, &this->m_commandpublistener); - if (mp_commandpub == nullptr) - { - return false; - } + mp_command_pub_topic = mp_participant->create_topic(pub_cmd_topic_name.str(), + "TestCommandType", TOPIC_QOS_DEFAULT); + ASSERT_NE(mp_command_pub_topic, nullptr); + ASSERT_TRUE(mp_command_pub_topic->is_enabled()); + + // Create DataWriter + datawriter_qos.history().kind = eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS; + datawriter_qos.reliability().kind = eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS; + datawriter_qos.durability().kind = eprosima::fastdds::dds::TRANSIENT_LOCAL_DURABILITY_QOS; + mp_dw = mp_commandpub->create_datawriter(mp_command_pub_topic, datawriter_qos, + &this->m_commandpublistener); + ASSERT_NE(mp_dw, nullptr); + ASSERT_TRUE(mp_dw->is_enabled()); + + + // Create Command Subscriber + mp_commandsub = mp_participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT); + ASSERT_NE(mp_commandsub, nullptr); + ASSERT_TRUE(mp_commandsub->is_enabled()); - SubscriberAttributes SubCommandParam; - SubCommandParam.topic.topicDataType = "TestCommandType"; - SubCommandParam.topic.topicKind = NO_KEY; - std::ostringstream sct; - sct << "VideoTest_Command_"; + // Create topic + std::ostringstream sub_cmd_topic_name; + sub_cmd_topic_name << "VideoTest_Command_"; if (hostname) { - sct << asio::ip::host_name() << "_"; + sub_cmd_topic_name << asio::ip::host_name() << "_"; } - sct << pid << "_PUB2SUB"; - SubCommandParam.topic.topicName = sct.str(); - SubCommandParam.topic.historyQos.kind = KEEP_ALL_HISTORY_QOS; - SubCommandParam.qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS; - SubCommandParam.qos.m_durability.kind = TRANSIENT_LOCAL_DURABILITY_QOS; + sub_cmd_topic_name << pid << "_PUB2SUB"; + mp_command_sub_topic = mp_participant->create_topic(sub_cmd_topic_name.str(), + "TestCommandType", TOPIC_QOS_DEFAULT); + ASSERT_NE(mp_command_sub_topic, nullptr); + ASSERT_TRUE(mp_command_sub_topic->is_enabled()); - mp_commandsub = Domain::createSubscriber(mp_participant, SubCommandParam, &this->m_commandsublistener); - if (mp_commandsub == nullptr) - { - return false; - } - return true; + datareader_qos_cmd.history().kind = eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS; + datareader_qos_cmd.reliability().kind = eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS; + datareader_qos_cmd.durability().kind = eprosima::fastdds::dds::TRANSIENT_LOCAL_DURABILITY_QOS; + + mp_commanhd_dr = mp_commandsub->create_datareader(mp_command_sub_topic, datareader_qos_cmd, + &this->m_commandsublistener); + ASSERT_NE(mp_commanhd_dr, nullptr); + ASSERT_TRUE(mp_commanhd_dr->is_enabled()); } -void VideoTestSubscriber::DataSubListener::onSubscriptionMatched( - Subscriber* /*sub*/, - MatchingInfo& info) +void VideoTestSubscriber::DataSubListener::on_subscription_matched( + DataReader* /*datareader*/, + const SubscriptionMatchedStatus& info) { std::unique_lock lock(mp_up->mutex_); - if (info.status == MATCHED_MATCHING) + if (info.current_count_change > 0) { EPROSIMA_LOG_INFO(VideoTest, "Data Sub Matched "); std::cout << "Data Sub Matched " << std::endl; @@ -267,13 +316,13 @@ void VideoTestSubscriber::DataSubListener::onSubscriptionMatched( mp_up->disc_cond_.notify_one(); } -void VideoTestSubscriber::CommandPubListener::onPublicationMatched( - Publisher* /*pub*/, - MatchingInfo& info) +void VideoTestSubscriber::CommandPubListener::on_publication_matched( + DataWriter* /*datawriter*/, + const PublicationMatchedStatus& info) { std::unique_lock lock(mp_up->mutex_); - if (info.status == MATCHED_MATCHING) + if (info.current_count_change > 0) { EPROSIMA_LOG_INFO(VideoTest, "Command Pub Matched "); std::cout << "Command Pub Matched " << std::endl; @@ -290,12 +339,12 @@ void VideoTestSubscriber::CommandPubListener::onPublicationMatched( mp_up->disc_cond_.notify_one(); } -void VideoTestSubscriber::CommandSubListener::onSubscriptionMatched( - Subscriber* /*sub*/, - MatchingInfo& info) +void VideoTestSubscriber::CommandSubListener::on_subscription_matched( + DataReader* /*datareader*/, + const SubscriptionMatchedStatus& info) { std::unique_lock lock(mp_up->mutex_); - if (info.status == MATCHED_MATCHING) + if (info.current_count_change > 0) { EPROSIMA_LOG_INFO(VideoTest, "Command Sub Matched "); std::cout << "Command Sub Matched " << std::endl; @@ -312,53 +361,56 @@ void VideoTestSubscriber::CommandSubListener::onSubscriptionMatched( mp_up->disc_cond_.notify_one(); } -void VideoTestSubscriber::CommandSubListener::onNewDataMessage( - Subscriber* subscriber) +void VideoTestSubscriber::CommandSubListener::on_data_available( + DataReader* datareader) { + SampleInfo info; TestCommandType command; - if (subscriber->takeNextData(&command, &mp_up->m_sampleinfo)) + if (ReturnCode_t::RETCODE_OK == datareader->take_next_sample((void*)&command, &info)) { - //cout << "RCOMMAND: "<< command.m_command << endl; - if (command.m_command == READY) + if (info.valid_data) { - cout << "Publisher has new test ready..." << endl; - mp_up->mutex_.lock(); - ++mp_up->comm_count_; - mp_up->mutex_.unlock(); - mp_up->comm_cond_.notify_one(); - } - else if (command.m_command == STOP) - { - cout << "Publisher has stopped the test" << endl; - mp_up->mutex_.lock(); - ++mp_up->data_count_; - mp_up->mutex_.unlock(); - mp_up->comm_cond_.notify_one(); - mp_up->data_cond_.notify_one(); - } - else if (command.m_command == STOP_ERROR) - { - cout << "Publisher has canceled the test" << endl; - mp_up->m_status = -1; - mp_up->mutex_.lock(); - ++mp_up->data_count_; - mp_up->mutex_.unlock(); - mp_up->comm_cond_.notify_one(); - mp_up->data_cond_.notify_one(); - } - else if (command.m_command == DEFAULT) - { - std::cout << "Something is wrong" << std::endl; + if (command.m_command == READY) + { + cout << "Publisher has new test ready..." << endl; + mp_up->mutex_.lock(); + ++mp_up->comm_count_; + mp_up->mutex_.unlock(); + mp_up->comm_cond_.notify_one(); + } + else if (command.m_command == STOP) + { + cout << "Publisher has stopped the test" << endl; + mp_up->mutex_.lock(); + ++mp_up->data_count_; + mp_up->mutex_.unlock(); + mp_up->comm_cond_.notify_one(); + mp_up->data_cond_.notify_one(); + } + else if (command.m_command == STOP_ERROR) + { + cout << "Publisher has canceled the test" << endl; + mp_up->m_status = -1; + mp_up->mutex_.lock(); + ++mp_up->data_count_; + mp_up->mutex_.unlock(); + mp_up->comm_cond_.notify_one(); + mp_up->data_cond_.notify_one(); + } + else if (command.m_command == DEFAULT) + { + std::cout << "Something is wrong" << std::endl; + } } } } -void VideoTestSubscriber::DataSubListener::onNewDataMessage( - Subscriber* subscriber) +void VideoTestSubscriber::DataSubListener::on_data_available( + DataReader* datareader) { VideoType videoData; - eprosima::fastrtps::SampleInfo_t info; - subscriber->takeNextData((void*)&videoData, &info); + SampleInfo info; + datareader->take_next_sample((void*)&videoData, &info); { mp_up->push_video_packet(videoData); } @@ -415,7 +467,7 @@ bool VideoTestSubscriber::test() avgs_.clear(); TestCommandType command; command.m_command = BEGIN; - mp_commandpub->write(&command); + mp_dw->write(&command); lock.lock(); data_cond_.wait(lock, [&]() @@ -553,9 +605,7 @@ gboolean VideoTestSubscriber::push_data_cb( GstBuffer* buffer; GstFlowReturn ret; GstMapInfo map; - //gint16 *raw; gint num_samples = 0; - //gint size = 0; const VideoType& vpacket = sub->pop_video_packet(); diff --git a/test/performance/video/VideoTestSubscriber.hpp b/test/performance/video/VideoTestSubscriber.hpp index 135cceb5cbf..1705253c00e 100644 --- a/test/performance/video/VideoTestSubscriber.hpp +++ b/test/performance/video/VideoTestSubscriber.hpp @@ -22,6 +22,7 @@ #include #include + #include "VideoTestTypes.hpp" #include #include @@ -30,16 +31,32 @@ class TimeStats { public: - TimeStats() : received(0), m_minAvg(0), m_maxAvg(0), pDrop50(0), pDrop90(0), pDrop99(0), pDrop9999(0), pDropMean(0), - pDropStdev(0), pAvg50(0), pAvg90(0), pAvg99(0), pAvg9999(0), pAvgMean(0), pAvgStdev(0) + + TimeStats() + : received(0) + , m_minAvg(0) + , m_maxAvg(0) + , pDrop50(0) + , pDrop90(0) + , pDrop99(0) + , pDrop9999(0) + , pDropMean(0) + , pDropStdev(0) + , pAvg50(0) + , pAvg90(0) + , pAvg99(0) + , pAvg9999(0) + , pAvgMean(0) + , pAvgStdev(0) { } + ~TimeStats() { } unsigned int received; - double m_minDrop, m_maxDrop, m_minAvg, m_maxAvg; + double m_minDrop, m_maxDrop, m_minAvg, m_maxAvg; double pDrop50, pDrop90, pDrop99, pDrop9999, pDropMean, pDropStdev; double pAvg50, pAvg90, pAvg99, pAvg9999, pAvgMean, pAvgStdev; @@ -47,113 +64,178 @@ class TimeStats class VideoTestSubscriber { +public: + + VideoTestSubscriber(); + virtual ~VideoTestSubscriber(); + + eprosima::fastdds::dds::DomainParticipant* mp_participant; + eprosima::fastdds::dds::Publisher* mp_commandpub; + eprosima::fastdds::dds::Subscriber* mp_datasub; + eprosima::fastdds::dds::Subscriber* mp_commandsub; + eprosima::fastdds::dds::Topic* mp_video_topic; + eprosima::fastdds::dds::Topic* mp_command_pub_topic; + eprosima::fastdds::dds::Topic* mp_command_sub_topic; + std::mutex mutex_; + int disc_count_; + std::condition_variable disc_cond_; + int comm_count_; + std::condition_variable comm_cond_; + int data_count_; + std::condition_variable data_cond_; + int m_status; + int n_received; + int n_samples; + void init( + int nsam, + bool reliable, + uint32_t pid, + bool hostname, + const eprosima::fastrtps::rtps::PropertyPolicy& part_property_policy, + const eprosima::fastrtps::rtps::PropertyPolicy& property_policy, + bool large_data, + const std::string& sXMLConfigFile, + bool export_csv, + const std::string& export_file, + int forced_domain, + int video_width, + int video_height, + int frame_rate); + + void run(); + bool test(); + + class DataSubListener : public eprosima::fastdds::dds::DataReaderListener + { + public: + + DataSubListener( + VideoTestSubscriber* up) + : mp_up(up) + { + } + + ~DataSubListener() + { + } + + void on_subscription_matched( + eprosima::fastdds::dds::DataReader* /*datareader*/, + const eprosima::fastdds::dds::SubscriptionMatchedStatus& info) override; + void on_data_available( + eprosima::fastdds::dds::DataReader* datareader) override; + VideoTestSubscriber* mp_up; + } + m_datasublistener; + + class CommandPubListener : public eprosima::fastdds::dds::DataWriterListener + { public: - VideoTestSubscriber(); - virtual ~VideoTestSubscriber(); - - eprosima::fastrtps::Participant* mp_participant; - eprosima::fastrtps::Publisher* mp_commandpub; - eprosima::fastrtps::Subscriber* mp_datasub; - eprosima::fastrtps::Subscriber* mp_commandsub; - eprosima::fastrtps::SampleInfo_t m_sampleinfo; - std::mutex mutex_; - int disc_count_; - std::condition_variable disc_cond_; - int comm_count_; - std::condition_variable comm_cond_; - int data_count_; - std::condition_variable data_cond_; - int m_status; - int n_received; - int n_samples; - bool init(int nsam, bool reliable, uint32_t pid, bool hostname, - const eprosima::fastrtps::rtps::PropertyPolicy& part_property_policy, - const eprosima::fastrtps::rtps::PropertyPolicy& property_policy, bool large_data, - const std::string& sXMLConfigFile, bool export_csv, const std::string& export_file, - int forced_domain, int video_width, int video_height, int frame_rate); - - void run(); - bool test(); - - class DataSubListener : public eprosima::fastrtps::SubscriberListener + + CommandPubListener( + VideoTestSubscriber* up) + : mp_up(up) { - public: - DataSubListener(VideoTestSubscriber* up):mp_up(up){} - ~DataSubListener(){} - void onSubscriptionMatched(eprosima::fastrtps::Subscriber* sub, - eprosima::fastrtps::rtps::MatchingInfo& into); - void onNewDataMessage(eprosima::fastrtps::Subscriber* sub); - VideoTestSubscriber* mp_up; - } m_datasublistener; - - class CommandPubListener : public eprosima::fastrtps::PublisherListener + } + + ~CommandPubListener() + { + } + + void on_publication_matched( + eprosima::fastdds::dds::DataWriter* /*datawriter*/, + const eprosima::fastdds::dds::PublicationMatchedStatus& info) override; + VideoTestSubscriber* mp_up; + } + m_commandpublistener; + + class CommandSubListener : public eprosima::fastdds::dds::DataReaderListener + { + public: + + CommandSubListener( + VideoTestSubscriber* up) + : mp_up(up) { - public: - CommandPubListener(VideoTestSubscriber* up):mp_up(up){} - ~CommandPubListener(){} - void onPublicationMatched(eprosima::fastrtps::Publisher* pub, - eprosima::fastrtps::rtps::MatchingInfo& info); - VideoTestSubscriber* mp_up; - } m_commandpublistener; - - class CommandSubListener : public eprosima::fastrtps::SubscriberListener + } + + ~CommandSubListener() { - public: - CommandSubListener(VideoTestSubscriber* up):mp_up(up){} - ~CommandSubListener(){} - void onSubscriptionMatched(eprosima::fastrtps::Subscriber* sub, - eprosima::fastrtps::rtps::MatchingInfo& into); - void onNewDataMessage(eprosima::fastrtps::Subscriber* sub); - VideoTestSubscriber* mp_up; - } m_commandsublistener; - - VideoDataType video_t; - TestCommandDataType command_t; - std::string m_sXMLConfigFile; - bool m_bRunning; - - GstElement* pipeline; - GstElement* appsrc; - GstElement* sink; - GstElement* videoconvert; - guint source_id_; // To control the GSource - GMainLoop* gmain_loop_; // GLib's Main Loop - guint64 g_servertimestamp, g_clienttimestamp; - gint64 g_framesDropped; - bool m_bReliable; - bool m_bExportCsv; - std::string m_sExportPrefix; - int m_forcedDomain; - int m_videoWidth; - int m_videoHeight; - int m_videoFrameRate; - - std::thread thread_; - std::deque packet_deque_; - std::mutex stats_mutex_; - std::mutex deque_mutex_; - std::condition_variable deque_cond_; - std::mutex gst_mutex_; - - std::chrono::steady_clock::time_point t_start_, t_end_; - std::chrono::steady_clock::time_point t_drop_start_, t_drop_end_; - std::vector> samples_; - std::vector drops_; - std::vector avgs_; - std::vector m_stats; + } + + void on_subscription_matched( + eprosima::fastdds::dds::DataReader* /*datareader*/, + const eprosima::fastdds::dds::SubscriptionMatchedStatus& info) override; + void on_data_available( + eprosima::fastdds::dds::DataReader* datareader) override; + VideoTestSubscriber* mp_up; + } + m_commandsublistener; + + std::string m_sXMLConfigFile; + bool m_bRunning; + + GstElement* pipeline; + GstElement* appsrc; + GstElement* sink; + GstElement* videoconvert; + guint source_id_; // To control the GSource + GMainLoop* gmain_loop_; // GLib's Main Loop + guint64 g_servertimestamp, g_clienttimestamp; + gint64 g_framesDropped; + bool m_bReliable; + bool m_bExportCsv; + std::string m_sExportPrefix; + int m_forcedDomain; + int m_videoWidth; + int m_videoHeight; + int m_videoFrameRate; + + std::thread thread_; + std::deque packet_deque_; + std::mutex stats_mutex_; + std::mutex deque_mutex_; + std::condition_variable deque_cond_; + std::mutex gst_mutex_; + + std::chrono::steady_clock::time_point t_start_, t_end_; + std::chrono::steady_clock::time_point t_drop_start_, t_drop_end_; + std::vector> samples_; + std::vector drops_; + std::vector avgs_; + std::vector m_stats; protected: - static void start_feed_cb(GstElement* source, guint size, VideoTestSubscriber* sub); - static void stop_feed_cb(GstElement* source, VideoTestSubscriber* sub); - static gboolean push_data_cb(VideoTestSubscriber* sub); - static void fps_stats_cb(GstElement* source, gdouble fps, gdouble droprate, gdouble avgfps, VideoTestSubscriber* sub); + static void start_feed_cb( + GstElement* source, + guint size, + VideoTestSubscriber* sub); + static void stop_feed_cb( + GstElement* source, + VideoTestSubscriber* sub); + static gboolean push_data_cb( + VideoTestSubscriber* sub); + static void fps_stats_cb( + GstElement* source, + gdouble fps, + gdouble droprate, + gdouble avgfps, + VideoTestSubscriber* sub); void InitGStreamer(); void stop(); void analyzeTimes(); - void printStat(TimeStats& TS); + void printStat( + TimeStats& TS); + eprosima::fastdds::dds::DataReaderQos datareader_qos_data; + eprosima::fastdds::dds::DataReaderQos datareader_qos_cmd; + eprosima::fastdds::dds::DataWriterQos datawriter_qos; + eprosima::fastdds::dds::DataWriter* mp_dw; + eprosima::fastdds::dds::DataReader* mp_data_dr; + eprosima::fastdds::dds::DataReader* mp_commanhd_dr; - void push_video_packet(VideoType& packet) + void push_video_packet( + VideoType& packet) { std::unique_lock lock(deque_mutex_); packet_deque_.push_back(std::move(packet)); @@ -163,9 +245,12 @@ class VideoTestSubscriber VideoType pop_video_packet() { std::unique_lock lock(deque_mutex_); - deque_cond_.wait(lock, [&]() { return !m_bRunning || !packet_deque_.empty(); }); + deque_cond_.wait(lock, [&]() + { + return !m_bRunning || !packet_deque_.empty(); + }); VideoType vpacket; - if(!packet_deque_.empty()) + if (!packet_deque_.empty()) { vpacket = std::move(packet_deque_[0]); packet_deque_.pop_front(); @@ -173,8 +258,12 @@ class VideoTestSubscriber return vpacket; } - static void gst_run(VideoTestSubscriber* sub); - static void message_cb(GstBus* bus, GstMessage* message, gpointer user_data); + static void gst_run( + VideoTestSubscriber* sub); + static void message_cb( + GstBus* bus, + GstMessage* message, + gpointer user_data); }; #endif /* VIDEOTESTSUBSCRIBER_H_ */ diff --git a/test/performance/video/VideoTestTypes.cpp b/test/performance/video/VideoTestTypes.cpp index 67dbdd441da..7c82450174e 100644 --- a/test/performance/video/VideoTestTypes.cpp +++ b/test/performance/video/VideoTestTypes.cpp @@ -22,7 +22,9 @@ using namespace eprosima::fastrtps; using namespace eprosima::fastrtps::rtps; -bool VideoDataType::serialize(void*data, SerializedPayload_t* payload) +bool VideoDataType::serialize( + void* data, + eprosima::fastrtps::rtps::SerializedPayload_t* payload) { VideoType* lt = (VideoType*)data; @@ -37,7 +39,9 @@ bool VideoDataType::serialize(void*data, SerializedPayload_t* payload) return true; } -bool VideoDataType::deserialize(SerializedPayload_t* payload,void * data) +bool VideoDataType::deserialize( + eprosima::fastrtps::rtps::SerializedPayload_t* payload, + void* data) { VideoType* lt = (VideoType*)data; lt->seqnum = *(uint32_t*)payload->data; @@ -45,21 +49,24 @@ bool VideoDataType::deserialize(SerializedPayload_t* payload,void * data) lt->duration = *(uint64_t*)(payload->data + 12); uint32_t siz = *(uint32_t*)(payload->data + 20); lt->data.resize(siz + 1); - std::copy(payload->data+24,payload->data+24+siz,lt->data.begin()); + std::copy(payload->data + 24, payload->data + 24 + siz, lt->data.begin()); return true; } -std::function VideoDataType::getSerializedSizeProvider(void* data) +std::function VideoDataType::getSerializedSizeProvider( + void* data) { return [data]() -> uint32_t - { - VideoType *tdata = static_cast(data); - uint32_t size = 0; + { + VideoType* tdata = static_cast(data); + uint32_t size = 0; - size = (uint32_t)(sizeof(uint32_t) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(uint32_t) + tdata->data.size()); + size = + (uint32_t)(sizeof(uint32_t) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(uint32_t) + + tdata->data.size()); - return size; - }; + return size; + }; } void* VideoDataType::createData() @@ -67,21 +74,27 @@ void* VideoDataType::createData() return (void*)new VideoType(); } -void VideoDataType::deleteData(void* data) + +void VideoDataType::deleteData( + void* data) { delete((VideoType*)data); } - -bool TestCommandDataType::serialize(void*data,SerializedPayload_t* payload) +bool TestCommandDataType::serialize( + void* data, + SerializedPayload_t* payload) { TestCommandType* t = (TestCommandType*)data; *(TESTCOMMAND*)payload->data = t->m_command; payload->length = 4; return true; } -bool TestCommandDataType::deserialize(SerializedPayload_t* payload,void * data) + +bool TestCommandDataType::deserialize( + SerializedPayload_t* payload, + void* data) { TestCommandType* t = (TestCommandType*)data; // cout << "PAYLOAD LENGTH: "<length << endl; @@ -91,23 +104,26 @@ bool TestCommandDataType::deserialize(SerializedPayload_t* payload,void * data) return true; } -std::function TestCommandDataType::getSerializedSizeProvider(void*) +std::function TestCommandDataType::getSerializedSizeProvider( + void*) { return []() -> uint32_t - { - uint32_t size = 0; + { + uint32_t size = 0; - size = (uint32_t)sizeof(uint32_t); + size = (uint32_t)sizeof(uint32_t); - return size; - }; + return size; + }; } void* TestCommandDataType::createData() { return (void*)new TestCommandType(); } -void TestCommandDataType::deleteData(void* data) + +void TestCommandDataType::deleteData( + void* data) { delete((TestCommandType*)data); } diff --git a/test/performance/video/VideoTestTypes.hpp b/test/performance/video/VideoTestTypes.hpp index 082ece1ccd6..77e6f4f0fbe 100644 --- a/test/performance/video/VideoTestTypes.hpp +++ b/test/performance/video/VideoTestTypes.hpp @@ -20,70 +20,134 @@ #ifndef VIDEOTESTTYPES_H_ #define VIDEOTESTTYPES_H_ -#include "fastrtps/fastrtps_all.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include + + class VideoType { - public: +public: - uint32_t seqnum; - uint64_t timestamp; - uint64_t duration; - std::vector data; + uint32_t seqnum; + uint64_t timestamp; + uint64_t duration; + std::vector data; - VideoType(): seqnum(0), timestamp(0), duration(0) - {} + VideoType() + : seqnum(0) + , timestamp(0) + , duration(0) + { + } - VideoType(uint32_t number) : - seqnum(0), timestamp(0), duration(0), data(number, 0) - { - } + VideoType( + uint32_t number) + : seqnum(0) + , timestamp(0) + , duration(0) + , data(number, 0) + { + } + + ~VideoType() + { + } - ~VideoType() {} }; -inline bool operator==(const VideoType& lt1, const VideoType& lt2) +inline bool operator ==( + const VideoType& lt1, + const VideoType& lt2) { if (lt1.seqnum != lt2.seqnum) + { return false; + } if (lt1.timestamp != lt2.timestamp) + { return false; + } if (lt1.duration != lt2.duration) + { return false; + } - if(lt1.data.size()!=lt2.data.size()) + if (lt1.data.size() != lt2.data.size()) + { return false; - for(size_t i = 0;i getSerializedSizeProvider(void* data) override; - void* createData() override; - void deleteData(void* data) override; - bool getKey(void* /*data*/, eprosima::fastrtps::rtps::InstanceHandle_t* /*ihandle*/, bool force_md5 = false) override { - (void)force_md5; - return false; - } +public: + + VideoDataType() + { + setName("VideoType"); + m_typeSize = 17000; + m_isGetKeyDefined = false; + } + + ~VideoDataType() + { + } + + bool serialize( + void* data, + eprosima::fastrtps::rtps::SerializedPayload_t* payload) override; + bool deserialize( + eprosima::fastrtps::rtps::SerializedPayload_t* payload, + void* data) override; + std::function getSerializedSizeProvider( + void* data) override; + void* createData() override; + void deleteData( + void* data) override; + bool getKey( + void* /*data*/, + eprosima::fastrtps::rtps::InstanceHandle_t* /*ihandle*/, + bool force_md5 = false) override + { + (void)force_md5; + return false; + } + }; -enum TESTCOMMAND:uint32_t{ +enum TESTCOMMAND : uint32_t +{ DEFAULT, READY, BEGIN, @@ -94,31 +158,54 @@ enum TESTCOMMAND:uint32_t{ typedef struct TestCommandType { TESTCOMMAND m_command; - TestCommandType(){ + TestCommandType() + { m_command = DEFAULT; } - TestCommandType(TESTCOMMAND com):m_command(com){} + + TestCommandType( + TESTCOMMAND com) + : m_command(com) + { + } + }TestCommandType; -class TestCommandDataType : public eprosima::fastrtps::TopicDataType +class TestCommandDataType : public eprosima::fastdds::dds::TopicDataType { - public: - TestCommandDataType() - { - setName("TestCommandType"); - m_typeSize = 4; - m_isGetKeyDefined = false; - }; - ~TestCommandDataType(){}; - bool serialize(void*data,eprosima::fastrtps::rtps::SerializedPayload_t* payload) override; - bool deserialize(eprosima::fastrtps::rtps::SerializedPayload_t* payload,void * data) override; - std::function getSerializedSizeProvider(void* data) override; - void* createData() override; - void deleteData(void* data) override; - bool getKey(void* /*data*/, eprosima::fastrtps::rtps::InstanceHandle_t* /*ihandle*/, bool force_md5 = false) override { - (void)force_md5; - return false; - } +public: + + TestCommandDataType() + { + setName("TestCommandType"); + m_typeSize = 4; + m_isGetKeyDefined = false; + } + + ~TestCommandDataType() + { + } + + bool serialize( + void* data, + eprosima::fastrtps::rtps::SerializedPayload_t* payload) override; + bool deserialize( + eprosima::fastrtps::rtps::SerializedPayload_t* payload, + void* data) override; + std::function getSerializedSizeProvider( + void* data) override; + void* createData() override; + void deleteData( + void* data) override; + bool getKey( + void* /*data*/, + eprosima::fastrtps::rtps::InstanceHandle_t* /*ihandle*/, + bool force_md5 = false) override + { + (void)force_md5; + return false; + } + }; diff --git a/test/performance/video/main_VideoTest.cpp b/test/performance/video/main_VideoTest.cpp index 45726a6c023..b09069dee7b 100644 --- a/test/performance/video/main_VideoTest.cpp +++ b/test/performance/video/main_VideoTest.cpp @@ -38,7 +38,7 @@ #endif // if defined(_MSC_VER) using namespace eprosima::fastrtps; -using namespace eprosima::fastrtps::rtps; +using namespace eprosima::fastdds::dds; using std::cout; using std::endl; @@ -403,7 +403,7 @@ int main( } } - PropertyPolicy pub_part_property_policy, sub_part_property_policy, + eprosima::fastrtps::rtps::PropertyPolicy pub_part_property_policy, sub_part_property_policy, pub_property_policy, sub_property_policy; #if HAVE_SECURITY @@ -415,29 +415,35 @@ int main( return -1; } - sub_part_property_policy.properties().emplace_back(Property("dds.sec.auth.plugin", + sub_part_property_policy.properties().emplace_back(eprosima::fastrtps::rtps::Property("dds.sec.auth.plugin", "builtin.PKI-DH")); - sub_part_property_policy.properties().emplace_back(Property("dds.sec.auth.builtin.PKI-DH.identity_ca", - "file://" + certs_path + "/maincacert.pem")); - sub_part_property_policy.properties().emplace_back(Property("dds.sec.auth.builtin.PKI-DH.identity_certificate", - "file://" + certs_path + "/mainsubcert.pem")); - sub_part_property_policy.properties().emplace_back(Property("dds.sec.auth.builtin.PKI-DH.private_key", - "file://" + certs_path + "/mainsubkey.pem")); - sub_part_property_policy.properties().emplace_back(Property("dds.sec.crypto.plugin", + sub_part_property_policy.properties().emplace_back(eprosima::fastrtps::rtps::Property( + "dds.sec.auth.builtin.PKI-DH.identity_ca", + "file://" + certs_path + "/maincacert.pem")); + sub_part_property_policy.properties().emplace_back(eprosima::fastrtps::rtps::Property( + "dds.sec.auth.builtin.PKI-DH.identity_certificate", + "file://" + certs_path + "/mainsubcert.pem")); + sub_part_property_policy.properties().emplace_back(eprosima::fastrtps::rtps::Property( + "dds.sec.auth.builtin.PKI-DH.private_key", + "file://" + certs_path + "/mainsubkey.pem")); + sub_part_property_policy.properties().emplace_back(eprosima::fastrtps::rtps::Property("dds.sec.crypto.plugin", "builtin.AES-GCM-GMAC")); sub_part_property_policy.properties().emplace_back("rtps.participant.rtps_protection_kind", "ENCRYPT"); sub_property_policy.properties().emplace_back("rtps.endpoint.submessage_protection_kind", "ENCRYPT"); sub_property_policy.properties().emplace_back("rtps.endpoint.payload_protection_kind", "ENCRYPT"); - pub_part_property_policy.properties().emplace_back(Property("dds.sec.auth.plugin", + pub_part_property_policy.properties().emplace_back(eprosima::fastrtps::rtps::Property("dds.sec.auth.plugin", "builtin.PKI-DH")); - pub_part_property_policy.properties().emplace_back(Property("dds.sec.auth.builtin.PKI-DH.identity_ca", - "file://" + certs_path + "/maincacert.pem")); - pub_part_property_policy.properties().emplace_back(Property("dds.sec.auth.builtin.PKI-DH.identity_certificate", - "file://" + certs_path + "/mainpubcert.pem")); - pub_part_property_policy.properties().emplace_back(Property("dds.sec.auth.builtin.PKI-DH.private_key", - "file://" + certs_path + "/mainpubkey.pem")); - pub_part_property_policy.properties().emplace_back(Property("dds.sec.crypto.plugin", + pub_part_property_policy.properties().emplace_back(eprosima::fastrtps::rtps::Property( + "dds.sec.auth.builtin.PKI-DH.identity_ca", + "file://" + certs_path + "/maincacert.pem")); + pub_part_property_policy.properties().emplace_back(eprosima::fastrtps::rtps::Property( + "dds.sec.auth.builtin.PKI-DH.identity_certificate", + "file://" + certs_path + "/mainpubcert.pem")); + pub_part_property_policy.properties().emplace_back(eprosima::fastrtps::rtps::Property( + "dds.sec.auth.builtin.PKI-DH.private_key", + "file://" + certs_path + "/mainpubkey.pem")); + pub_part_property_policy.properties().emplace_back(eprosima::fastrtps::rtps::Property("dds.sec.crypto.plugin", "builtin.AES-GCM-GMAC")); pub_part_property_policy.properties().emplace_back("rtps.participant.rtps_protection_kind", "ENCRYPT"); pub_property_policy.properties().emplace_back("rtps.endpoint.submessage_protection_kind", "ENCRYPT"); @@ -448,7 +454,8 @@ int main( // Load an XML file with predefined profiles for publisher and subscriber if (sXMLConfigFile.length() > 0) { - xmlparser::XMLProfileManager::loadXMLFile(sXMLConfigFile); + // xmlparser::XMLProfileManager::loadXMLFile(sXMLConfigFile); + DomainParticipantFactory::get_instance()->load_XML_profiles_file(sXMLConfigFile); } int num_args = 0;