Skip to content

Commit

Permalink
Migrate TCPReqRepHelloWorldReplier/Requester (#4296)
Browse files Browse the repository at this point in the history
* Refs #20339: Migrate TCPRequester and TCPReplier test APIs from common/ to api/dds/

Signed-off-by: elianalf <[email protected]>

* Refs #20339: Update TransportDescriptor headers

Signed-off-by: elianalf <[email protected]>

* Refs #20339: Update fastdds APIs of TCPReqRepHelloWorldReplier and TCPReqRepHelloWorldRequester

Signed-off-by: elianalf <[email protected]>

* Refs #20339: Remove comments

Signed-off-by: elianalf <[email protected]>

* Refs #20339: Apply suggestions

Signed-off-by: elianalf <[email protected]>

---------

Signed-off-by: elianalf <[email protected]>
  • Loading branch information
elianalf authored Feb 28, 2024
1 parent 1c82f83 commit e136aab
Show file tree
Hide file tree
Showing 6 changed files with 276 additions and 169 deletions.
4 changes: 2 additions & 2 deletions test/blackbox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ set(BLACKBOXTESTS_SOURCE ${BLACKBOXTESTS_TEST_SOURCE}
utils/print_functions.cpp

common/DatagramInjectionTransport.cpp
common/TCPReqRepHelloWorldRequester.cpp
common/TCPReqRepHelloWorldReplier.cpp
api/dds-pim/TCPReqRepHelloWorldRequester.cpp
api/dds-pim/TCPReqRepHelloWorldReplier.cpp
)

file(GLOB DDS_BLACKBOXTESTS_TEST_SOURCE "common/DDSBlackboxTests*.cpp")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,31 @@
*
*/

#include "BlackboxTests.hpp"
#include "../../common/BlackboxTests.hpp"
#include "TCPReqRepHelloWorldReplier.hpp"

#include <fastrtps/Domain.h>
#include <fastrtps/participant/Participant.h>
#include <fastrtps/attributes/ParticipantAttributes.h>
#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
#include <fastdds/dds/domain/DomainParticipant.hpp>
#include <fastdds/dds/domain/qos/DomainParticipantQos.hpp>
#include <fastdds/dds/topic/Topic.hpp>

#include <fastrtps/subscriber/Subscriber.h>
#include <fastrtps/subscriber/SampleInfo.h>
#include <fastdds/dds/subscriber/Subscriber.hpp>
#include <fastdds/dds/subscriber/DataReader.hpp>

#include <fastrtps/publisher/Publisher.h>
#include <fastdds/dds/subscriber/SampleInfo.hpp>

#include <fastrtps/transport/TCPv4TransportDescriptor.h>
#include <fastrtps/transport/TCPv6TransportDescriptor.h>
#include <fastdds/dds/publisher/Publisher.hpp>
#include <fastdds/dds/publisher/DataWriter.hpp>

#include <fastdds/rtps/transport/TCPv4TransportDescriptor.h>
#include <fastdds/rtps/transport/TCPv6TransportDescriptor.h>
#include <fastrtps/utils/IPLocator.h>

#include <gtest/gtest.h>

using namespace eprosima::fastrtps;
using namespace eprosima::fastrtps::rtps;
using namespace eprosima::fastdds::rtps;
using namespace eprosima::fastdds::dds;

TCPReqRepHelloWorldReplier::TCPReqRepHelloWorldReplier()
: request_listener_(*this)
Expand All @@ -48,15 +53,39 @@ TCPReqRepHelloWorldReplier::TCPReqRepHelloWorldReplier()
, matched_(0)
{
// By default, memory mode is PREALLOCATED_WITH_REALLOC_MEMORY_MODE
sattr.historyMemoryPolicy = PREALLOCATED_WITH_REALLOC_MEMORY_MODE;
puattr.historyMemoryPolicy = PREALLOCATED_WITH_REALLOC_MEMORY_MODE;
datareader_qos_.endpoint().history_memory_policy = PREALLOCATED_WITH_REALLOC_MEMORY_MODE;
datawriter_qos_.endpoint().history_memory_policy = PREALLOCATED_WITH_REALLOC_MEMORY_MODE;
}

TCPReqRepHelloWorldReplier::~TCPReqRepHelloWorldReplier()
{
if (participant_ != nullptr)
{
Domain::removeParticipant(participant_);
if (request_subscriber_)
{
if (request_datareader_)
{
request_subscriber_->delete_datareader(request_datareader_);
}
participant_->delete_subscriber(request_subscriber_);
}
if (reply_publisher_)
{
if (reply_datawriter_)
{
reply_publisher_->delete_datawriter(reply_datawriter_);
}
participant_->delete_publisher(reply_publisher_);
}
if (request_topic_)
{
participant_->delete_topic(request_topic_);
}
if (reply_topic_)
{
participant_->delete_topic(reply_topic_);
}
eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->delete_participant(participant_);
}
}

Expand All @@ -67,18 +96,13 @@ void TCPReqRepHelloWorldReplier::init(
uint32_t maxInitialPeer,
const char* certs_folder)
{
ParticipantAttributes pattr;
pattr.domainId = domainId;
pattr.rtps.participantID = participantId;
pattr.rtps.builtin.discovery_config.leaseDuration = c_TimeInfinite;
//pattr.rtps.builtin.discovery_config.leaseDuration_announcementperiod = Duration_t(1, 0);
//pattr.rtps.builtin.discovery_config.leaseDuration_announcementperiod = Duration_t(0, 2147483648);
pattr.rtps.builtin.discovery_config.leaseDuration_announcementperiod = Duration_t(1, 0);

// TCP CONNECTION PEER.
//uint32_t kind = LOCATOR_KIND_TCPv4;

pattr.rtps.useBuiltinTransports = false;
DomainParticipantQos participant_qos;
participant_qos.wire_protocol().participant_id = participantId;
participant_qos.wire_protocol().builtin.discovery_config.leaseDuration_announcementperiod = Duration_t(1, 0);
participant_qos.wire_protocol().builtin.discovery_config.leaseDuration = eprosima::fastrtps::c_TimeInfinite;

participant_qos.transport().use_builtin_transports = false;

std::shared_ptr<TCPTransportDescriptor> descriptor;
if (use_ipv6)
{
Expand Down Expand Up @@ -112,28 +136,54 @@ void TCPReqRepHelloWorldReplier::init(
descriptor->tls_config.add_option(TLSOptions::NO_SSLV2);
}

pattr.rtps.userTransports.push_back(descriptor);
participant_qos.transport().user_transports.push_back(descriptor);

participant_ = Domain::createParticipant(pattr);
participant_ = DomainParticipantFactory::get_instance()->create_participant(
domainId, participant_qos);
ASSERT_NE(participant_, nullptr);
ASSERT_TRUE(participant_->is_enabled());

// Register type
ASSERT_EQ(Domain::registerType(participant_, &type_), true);
type_.reset(new HelloWorldPubSubType());
ASSERT_EQ(participant_->register_type(type_), ReturnCode_t::RETCODE_OK);

configDatareader("Request");
request_topic_ = participant_->create_topic(datareader_topicname_,
type_->getName(), TOPIC_QOS_DEFAULT);
ASSERT_NE(request_topic_, nullptr);
ASSERT_TRUE(request_topic_->is_enabled());

configDatawriter("Reply");
reply_topic_ = participant_->create_topic(datawriter_topicname_,
type_->getName(), TOPIC_QOS_DEFAULT);
ASSERT_NE(reply_topic_, nullptr);
ASSERT_TRUE(reply_topic_->is_enabled());

//Create subscriber
sattr.topic.topicKind = NO_KEY;
sattr.topic.topicDataType = type_.getName();
configSubscriber("Request");
request_subscriber_ = Domain::createSubscriber(participant_, sattr, &request_listener_);
request_subscriber_ = participant_->create_subscriber(SUBSCRIBER_QOS_DEFAULT);
ASSERT_NE(request_subscriber_, nullptr);
ASSERT_TRUE(request_subscriber_->is_enabled());

//Create publisher
puattr.topic.topicKind = NO_KEY;
puattr.topic.topicDataType = type_.getName();
puattr.topic.topicName = "HelloWorldTopicReply";
configPublisher("Reply");
reply_publisher_ = Domain::createPublisher(participant_, puattr, &reply_listener_);
reply_publisher_ = participant_->create_publisher(PUBLISHER_QOS_DEFAULT);
ASSERT_NE(reply_publisher_, nullptr);
ASSERT_TRUE(reply_publisher_->is_enabled());

//Create datareader
datareader_qos_.reliability().kind = RELIABLE_RELIABILITY_QOS;
//Increase default max_blocking_time to 1s in case the CPU is overhead
datareader_qos_.reliability().max_blocking_time = Duration_t(1, 0);
request_datareader_ = request_subscriber_->create_datareader(request_topic_, datareader_qos_,
&request_listener_);
ASSERT_NE(request_datareader_, nullptr);
ASSERT_TRUE(request_datareader_->is_enabled());

//Create datawriter
datawriter_qos_.reliability().kind = RELIABLE_RELIABILITY_QOS;
datawriter_qos_.reliability().max_blocking_time = Duration_t(1, 0);
reply_datawriter_ = reply_publisher_->create_datawriter(reply_topic_, datawriter_qos_, &reply_listener_);
ASSERT_NE(reply_datawriter_, nullptr);
ASSERT_TRUE(reply_datawriter_->is_enabled());

initialized_ = true;
}
Expand All @@ -147,7 +197,7 @@ void TCPReqRepHelloWorldReplier::newNumber(
hello.index(number);
hello.message("GoodBye");
wparams.related_sample_identity(sample_identity);
ASSERT_EQ(reply_publisher_->write((void*)&hello, wparams), true);
ASSERT_EQ(reply_datawriter_->write((void*)&hello, wparams), true);
}

void TCPReqRepHelloWorldReplier::wait_discovery(
Expand Down Expand Up @@ -225,17 +275,17 @@ bool TCPReqRepHelloWorldReplier::is_matched()
return matched_ > 1;
}

void TCPReqRepHelloWorldReplier::ReplyListener::onNewDataMessage(
Subscriber* sub)
void TCPReqRepHelloWorldReplier::ReplyListener::on_data_available(
DataReader* datareader)
{
ASSERT_NE(sub, nullptr);
ASSERT_NE(datareader, nullptr);

HelloWorld hello;
SampleInfo_t info;
SampleInfo info;

if (sub->takeNextData((void*)&hello, &info))
if (ReturnCode_t::RETCODE_OK == datareader->take_next_sample((void*)&hello, &info))
{
if (info.sampleKind == ALIVE)
if (info.valid_data)
{
ASSERT_EQ(hello.message().compare("HelloWorld"), 0);
replier_.newNumber(info.sample_identity, hello.index());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
#ifndef _TEST_BLACKBOX_TCPReqRepHelloWorldReplier_HPP_
#define _TEST_BLACKBOX_TCPReqRepHelloWorldReplier_HPP_

#include "../types/HelloWorldPubSubTypes.h"
#include "../../types/HelloWorldPubSubTypes.h"

#include <fastrtps/fastrtps_fwd.h>
#include <fastrtps/subscriber/SubscriberListener.h>
#include <fastrtps/attributes/SubscriberAttributes.h>
#include <fastrtps/publisher/PublisherListener.h>
#include <fastrtps/attributes/PublisherAttributes.h>
#include <fastdds/dds/subscriber/DataReader.hpp>
#include <fastdds/dds/subscriber/DataReaderListener.hpp>
#include <fastdds/dds/subscriber/qos/DataReaderQos.hpp>
#include <fastdds/dds/publisher/DataWriter.hpp>
#include <fastdds/dds/publisher/DataWriterListener.hpp>
#include <fastdds/dds/publisher/qos/DataWriterQos.hpp>
#include <fastdds/dds/topic/TypeSupport.hpp>

#include <list>
#include <condition_variable>
Expand All @@ -39,13 +41,11 @@
#define GET_PID getpid
#endif // if defined(_WIN32)



class TCPReqRepHelloWorldReplier
{
public:

class ReplyListener : public eprosima::fastrtps::SubscriberListener
class ReplyListener : public eprosima::fastdds::dds::DataReaderListener
{
public:

Expand All @@ -59,17 +59,18 @@ class TCPReqRepHelloWorldReplier
{
}

void onNewDataMessage(
eprosima::fastrtps::Subscriber* sub);
void onSubscriptionMatched(
eprosima::fastrtps::Subscriber* /*sub*/,
eprosima::fastrtps::rtps::MatchingInfo& info)
void on_data_available(
eprosima::fastdds::dds::DataReader* datareader) override;

void on_subscription_matched(
eprosima::fastdds::dds::DataReader* /*datareader*/,
const eprosima::fastdds::dds::SubscriptionMatchedStatus& info) override
{
if (info.status == eprosima::fastrtps::rtps::MATCHED_MATCHING)
if (0 < info.current_count_change)
{
replier_.matched();
}
else if (info.status == eprosima::fastrtps::rtps::REMOVED_MATCHING)
else
{
replier_.unmatched();
}
Expand All @@ -84,7 +85,7 @@ class TCPReqRepHelloWorldReplier
}
request_listener_;

class RequestListener : public eprosima::fastrtps::PublisherListener
class RequestListener : public eprosima::fastdds::dds::DataWriterListener
{
public:

Expand All @@ -98,14 +99,18 @@ class TCPReqRepHelloWorldReplier
{
}

void onPublicationMatched(
eprosima::fastrtps::Publisher* /*pub*/,
eprosima::fastrtps::rtps::MatchingInfo& info)
void on_publication_matched(
eprosima::fastdds::dds::DataWriter* /*datawriter*/,
const eprosima::fastdds::dds::PublicationMatchedStatus& info) override
{
if (info.status == eprosima::fastrtps::rtps::MATCHED_MATCHING)
if (0 < info.current_count_change)
{
replier_.matched();
}
else
{
replier_.unmatched();
}
}

private:
Expand Down Expand Up @@ -142,53 +147,50 @@ class TCPReqRepHelloWorldReplier
void unmatched();
bool is_matched();

virtual void configSubscriber(
virtual void configDatareader(
const std::string& suffix)
{
sattr.qos.m_reliability.kind = eprosima::fastrtps::RELIABLE_RELIABILITY_QOS;

std::ostringstream t;

t << "TCPReqRepHelloworld_" << asio::ip::host_name() << "_" << GET_PID() << "_" << suffix;

sattr.topic.topicName = t.str();
datareader_topicname_ = t.str();
}

virtual void configPublisher(
virtual void configDatawriter(
const std::string& suffix)
{
puattr.qos.m_reliability.kind = eprosima::fastrtps::RELIABLE_RELIABILITY_QOS;

// Increase default max_blocking_time to 1 second, as our CI infrastructure shows some
// big CPU overhead sometimes
puattr.qos.m_reliability.max_blocking_time.seconds = 1;
puattr.qos.m_reliability.max_blocking_time.nanosec = 0;

std::ostringstream t;

t << "TCPReqRepHelloworld_" << asio::ip::host_name() << "_" << GET_PID() << "_" << suffix;

puattr.topic.topicName = t.str();
datawriter_topicname_ = t.str();
}

protected:

eprosima::fastrtps::SubscriberAttributes sattr;
eprosima::fastrtps::PublisherAttributes puattr;
eprosima::fastdds::dds::DataReaderQos datareader_qos_;
eprosima::fastdds::dds::DataWriterQos datawriter_qos_;
std::string datareader_topicname_;
std::string datawriter_topicname_;

private:

TCPReqRepHelloWorldReplier& operator =(
const TCPReqRepHelloWorldReplier&) = delete;

eprosima::fastrtps::Participant* participant_;
eprosima::fastrtps::Subscriber* request_subscriber_;
eprosima::fastrtps::Publisher* reply_publisher_;
eprosima::fastdds::dds::DomainParticipant* participant_;
eprosima::fastdds::dds::Topic* request_topic_;
eprosima::fastdds::dds::Subscriber* request_subscriber_;
eprosima::fastdds::dds::DataReader* request_datareader_;
eprosima::fastdds::dds::Topic* reply_topic_;
eprosima::fastdds::dds::Publisher* reply_publisher_;
eprosima::fastdds::dds::DataWriter* reply_datawriter_;
bool initialized_;
std::mutex mutexDiscovery_;
std::condition_variable cvDiscovery_;
std::atomic<unsigned int> matched_;
HelloWorldPubSubType type_;
eprosima::fastdds::dds::TypeSupport type_;
};

#endif // _TEST_BLACKBOX_TCPReqRepHelloWorldReplier_HPP_
Loading

0 comments on commit e136aab

Please sign in to comment.