Skip to content

Commit

Permalink
Support DataRepresentationQos (#3972)
Browse files Browse the repository at this point in the history
* Refs #19832. Support DataRepresentationQos

Signed-off-by: Ricardo González Moreno <[email protected]>

* Refs #19832. Fix tests

Signed-off-by: Ricardo González Moreno <[email protected]>

* Refs #19832. Apply suggestions

Signed-off-by: Ricardo González Moreno <[email protected]>

* Refs #19832. Apply suggestion

Signed-off-by: Ricardo González Moreno <[email protected]>

* Refs #19832. Fix compilation against fastcdr v1

Signed-off-by: Ricardo González Moreno <[email protected]>

* Refs #19832. Fix

Signed-off-by: Ricardo González Moreno <[email protected]>

* Refs #19832. Apply suggestion

Signed-off-by: Ricardo González Moreno <[email protected]>

* Refs #19832. Add line to versions.md

Signed-off-by: Ricardo González Moreno <[email protected]>

---------

Signed-off-by: Ricardo González Moreno <[email protected]>
  • Loading branch information
richiware authored Dec 11, 2023
1 parent f40d54c commit 35b63cb
Show file tree
Hide file tree
Showing 26 changed files with 597 additions and 246 deletions.
2 changes: 1 addition & 1 deletion include/fastdds/dds/core/policy/QosPolicies.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2099,7 +2099,7 @@ class DataRepresentationQosPolicy : public Parameter_t, public QosPolicy
*/
RTPS_DllAPI DataRepresentationQosPolicy()
: Parameter_t(PID_DATA_REPRESENTATION, 0)
, QosPolicy(true)
, QosPolicy(false)
{
}

Expand Down
7 changes: 6 additions & 1 deletion src/cpp/fastdds/publisher/DataWriterImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,11 @@ ReturnCode_t DataWriterImpl::enable()
reader_filters_.reset(new ReaderFilterCollection(qos_.writer_resource_limits().reader_filters_allocation));
}

// Set Datawriter's DataRepresentationId taking into account the QoS.
data_representation_ = qos_.representation().m_value.empty()
|| XCDR_DATA_REPRESENTATION == qos_.representation().m_value.at(0)
? XCDR_DATA_REPRESENTATION : XCDR2_DATA_REPRESENTATION;

auto change_pool = get_change_pool();
if (!change_pool)
{
Expand Down Expand Up @@ -958,7 +963,7 @@ ReturnCode_t DataWriterImpl::perform_create_new_change(
return ReturnCode_t::RETCODE_OUT_OF_RESOURCES;
}

if ((ALIVE == change_kind) && !type_->serialize(data, &payload.payload))
if ((ALIVE == change_kind) && !type_->serialize(data, &payload.payload, data_representation_))
{
EPROSIMA_LOG_WARNING(DATA_WRITER, "Data serialization returned false");
return_payload_to_pool(payload);
Expand Down
2 changes: 2 additions & 0 deletions src/cpp/fastdds/publisher/DataWriterImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,8 @@ class DataWriterImpl : protected rtps::IReaderDataFilter

std::unique_ptr<ReaderFilterCollection> reader_filters_;

DataRepresentationId_t data_representation_ {DEFAULT_DATA_REPRESENTATION};

ReturnCode_t check_write_preconditions(
void* data,
const InstanceHandle_t& handle,
Expand Down
14 changes: 8 additions & 6 deletions src/cpp/rtps/builtin/data/ReaderProxyData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -594,12 +594,14 @@ bool ReaderProxyData::writeToCDRMessage(
}
#endif // if HAVE_SECURITY

/* TODO - Enable when implement XCDR, XCDR2 and/or XML
if (m_qos.representation.send_always() || m_qos.representation.hasChanged)
{
if (!m_qos.representation.addToCDRMessage(msg)) return false;
}
*/
if (m_qos.representation.send_always() || m_qos.representation.hasChanged)
{
if (!fastdds::dds::QosPoliciesSerializer<DataRepresentationQosPolicy>::add_to_cdr_message(m_qos.representation,
msg))
{
return false;
}
}

if (m_qos.type_consistency.send_always() || m_qos.type_consistency.hasChanged)
{
Expand Down
14 changes: 8 additions & 6 deletions src/cpp/rtps/builtin/data/WriterProxyData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -598,12 +598,14 @@ bool WriterProxyData::writeToCDRMessage(
}
#endif // if HAVE_SECURITY

/* TODO - Enable when implement XCDR, XCDR2 and/or XML
if (m_qos.representation.send_always() || m_qos.representation.hasChanged)
{
if (!m_qos.representation.addToCDRMessage(msg)) return false;
}
*/
if (m_qos.representation.send_always() || m_qos.representation.hasChanged)
{
if (!fastdds::dds::QosPoliciesSerializer<DataRepresentationQosPolicy>::add_to_cdr_message(m_qos.representation,
msg))
{
return false;
}
}

if (m_type_information && m_type_information->assigned())
{
Expand Down
189 changes: 24 additions & 165 deletions src/cpp/rtps/builtin/discovery/endpoint/EDP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,13 @@ bool EDP::valid_matching(
incompatible_qos.set(fastdds::dds::LIVELINESS_QOS_POLICY_ID);
}

// DataRepresentationQosPolicy
if (!checkDataRepresentationQos(wdata, rdata))
{
EPROSIMA_LOG_WARNING(RTPS_EDP, "Incompatible Data Representation QoS");
incompatible_qos.set(fastdds::dds::DATAREPRESENTATION_QOS_POLICY_ID);
}

#if HAVE_SECURITY
// TODO: Check EndpointSecurityInfo
#endif // if HAVE_SECURITY
Expand Down Expand Up @@ -827,7 +834,7 @@ bool EDP::valid_matching(
* Table 7.57 XTypes document 1.2
* Writer Reader Compatible
* XCDR XCDR true
* XCDR XCDR2 true
* XCDR XCDR2 false
* XCDR2 XCDR false
* XCDR2 XCDR2 true
* @param wdata
Expand All @@ -843,28 +850,25 @@ bool EDP::checkDataRepresentationQos(

if (wdata->m_qos.representation.m_value.empty())
{
compatible |= std::find(rr.begin(), rr.end(), fastdds::dds::XCDR2_DATA_REPRESENTATION) != rr.end();
compatible |= std::find(rr.begin(), rr.end(), fastdds::dds::XCDR_DATA_REPRESENTATION) != rr.end() || rr.empty();
compatible |= rr.empty() ||
std::find(rr.begin(), rr.end(), fastdds::dds::XCDR_DATA_REPRESENTATION) != rr.end();
}
else
{
for (DataRepresentationId writerRepresentation : wdata->m_qos.representation.m_value)
DataRepresentationId writerRepresentation {wdata->m_qos.representation.m_value.at(0)};

if (writerRepresentation == fastdds::dds::XCDR2_DATA_REPRESENTATION)
{
if (writerRepresentation == fastdds::dds::XCDR2_DATA_REPRESENTATION)
{
compatible |= std::find(rr.begin(), rr.end(), fastdds::dds::XCDR2_DATA_REPRESENTATION) != rr.end();
}
else if (writerRepresentation == fastdds::dds::XCDR_DATA_REPRESENTATION)
{
compatible |= std::find(rr.begin(), rr.end(), fastdds::dds::XCDR2_DATA_REPRESENTATION) != rr.end();
compatible |=
std::find(rr.begin(), rr.end(),
fastdds::dds::XCDR_DATA_REPRESENTATION) != rr.end() || rr.empty();
}
else // XML_DATA_REPRESENTATION
{
EPROSIMA_LOG_INFO(EDP, "DataRepresentationQosPolicy XML_DATA_REPRESENTATION isn't supported.");
}
compatible |= std::find(rr.begin(), rr.end(), fastdds::dds::XCDR2_DATA_REPRESENTATION) != rr.end();
}
else if (writerRepresentation == fastdds::dds::XCDR_DATA_REPRESENTATION)
{
compatible |= rr.empty() ||
std::find(rr.begin(), rr.end(), fastdds::dds::XCDR_DATA_REPRESENTATION) != rr.end();
}
else // XML_DATA_REPRESENTATION
{
EPROSIMA_LOG_INFO(EDP, "DataRepresentationQosPolicy XML_DATA_REPRESENTATION isn't supported.");
}
}

Expand Down Expand Up @@ -905,152 +909,7 @@ bool EDP::valid_matching(
MatchingFailureMask& reason,
fastdds::dds::PolicyMask& incompatible_qos)
{
reason.reset();
incompatible_qos.reset();

if (rdata->topicName() != wdata->topicName())
{
reason.set(MatchingFailureMask::different_topic);
return false;
}

// Type Consistency Enforcement QosPolicy
if (!checkTypeValidation(wdata, rdata))
{
// TODO Trigger INCONSISTENT_TOPIC status change
reason.set(MatchingFailureMask::inconsistent_topic);
return false;
}

if (rdata->topicKind() != wdata->topicKind())
{
EPROSIMA_LOG_WARNING(RTPS_EDP, "INCOMPATIBLE QOS:Remote Writer " << wdata->guid() <<
" is publishing in topic " << wdata->topicName() << "(keyed:" << wdata->topicKind() <<
"), local reader subscribes as keyed: " << rdata->topicKind());
reason.set(MatchingFailureMask::inconsistent_topic);
return false;
}
if (rdata->m_qos.m_reliability.kind == RELIABLE_RELIABILITY_QOS
&& wdata->m_qos.m_reliability.kind == BEST_EFFORT_RELIABILITY_QOS)
//Means our reader is reliable but hte writer is not
{
EPROSIMA_LOG_WARNING(RTPS_EDP,
"INCOMPATIBLE QOS (topic: " << wdata->topicName() << "): Remote Writer " << wdata->guid()
<< " is Best Effort and local reader is RELIABLE "
);
incompatible_qos.set(fastdds::dds::RELIABILITY_QOS_POLICY_ID);
}
if (rdata->m_qos.m_durability.kind > wdata->m_qos.m_durability.kind)
{
// TODO (MCC) Change log message
EPROSIMA_LOG_WARNING(RTPS_EDP,
"INCOMPATIBLE QOS (topic: " << wdata->topicName() << "):RemoteWriter " << wdata->guid()
<< " has VOLATILE DURABILITY and we want TRANSIENT_LOCAL";
);
incompatible_qos.set(fastdds::dds::DURABILITY_QOS_POLICY_ID);
}
if (rdata->m_qos.m_ownership.kind != wdata->m_qos.m_ownership.kind)
{
EPROSIMA_LOG_WARNING(RTPS_EDP,
"INCOMPATIBLE QOS (topic: " << wdata->topicName() << "):Remote Writer " << wdata->guid()
<< " has different Ownership Kind");
incompatible_qos.set(fastdds::dds::OWNERSHIP_QOS_POLICY_ID);
}
if (rdata->m_qos.m_deadline.period < wdata->m_qos.m_deadline.period)
{
EPROSIMA_LOG_WARNING(RTPS_EDP, "INCOMPATIBLE QOS (topic: "
<< wdata->topicName() << "):RemoteWriter "
<< wdata->guid() << "has smaller DEADLINE period");
incompatible_qos.set(fastdds::dds::DEADLINE_QOS_POLICY_ID);
}
if (rdata->m_qos.m_disablePositiveACKs.enabled && !wdata->m_qos.m_disablePositiveACKs.enabled)
{
EPROSIMA_LOG_WARNING(RTPS_EDP, "Incompatible Disable Positive Acks QoS: writer is enabled but reader is not");
incompatible_qos.set(fastdds::dds::DISABLEPOSITIVEACKS_QOS_POLICY_ID);
}
if (wdata->m_qos.m_liveliness.lease_duration > rdata->m_qos.m_liveliness.lease_duration)
{
EPROSIMA_LOG_WARNING(RTPS_EDP, "Incompatible liveliness lease durations: offered lease duration "
<< wdata->m_qos.m_liveliness.lease_duration << " must be <= requested lease duration "
<< rdata->m_qos.m_liveliness.lease_duration);
incompatible_qos.set(fastdds::dds::LIVELINESS_QOS_POLICY_ID);
}
if (wdata->m_qos.m_liveliness.kind < rdata->m_qos.m_liveliness.kind)
{
EPROSIMA_LOG_WARNING(RTPS_EDP, "Incompatible liveliness kinds: offered kind is < than requested kind");
incompatible_qos.set(fastdds::dds::LIVELINESS_QOS_POLICY_ID);
}

#if HAVE_SECURITY
// TODO: Check EndpointSecurityInfo
#endif // if HAVE_SECURITY

//Partition mismatch does not trigger status change
if (incompatible_qos.any())
{
reason.set(MatchingFailureMask::incompatible_qos);
return false;
}

//Partition check:
bool matched = false;
if (rdata->m_qos.m_partition.empty() && wdata->m_qos.m_partition.empty())
{
matched = true;
}
else if (rdata->m_qos.m_partition.empty() && wdata->m_qos.m_partition.size() > 0)
{
for (auto rnameit = wdata->m_qos.m_partition.begin();
rnameit != wdata->m_qos.m_partition.end(); ++rnameit)
{
if (is_partition_empty(*rnameit))
{
matched = true;
break;
}
}
}
else if (rdata->m_qos.m_partition.size() > 0 && wdata->m_qos.m_partition.empty())
{
for (auto wnameit = rdata->m_qos.m_partition.begin();
wnameit != rdata->m_qos.m_partition.end(); ++wnameit)
{
if (is_partition_empty(*wnameit))
{
matched = true;
break;
}
}
}
else
{
for (auto wnameit = rdata->m_qos.m_partition.begin();
wnameit != rdata->m_qos.m_partition.end(); ++wnameit)
{
for (auto rnameit = wdata->m_qos.m_partition.begin();
rnameit != wdata->m_qos.m_partition.end(); ++rnameit)
{
if (StringMatching::matchString(wnameit->name(), rnameit->name()))
{
matched = true;
break;
}
}
if (matched)
{
break;
}
}
}
if (!matched) //Different partitions
{
EPROSIMA_LOG_WARNING(RTPS_EDP, "INCOMPATIBLE QOS (topic: " << wdata->topicName() <<
"): Different Partitions");
reason.set(MatchingFailureMask::partitions);
}

return matched;

return valid_matching(wdata, rdata, reason, incompatible_qos);
}

ProxyPool<ReaderProxyData>& EDP::get_temporary_reader_proxies_pool()
Expand Down
12 changes: 12 additions & 0 deletions test/blackbox/api/dds-pim/PubSubReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1486,6 +1486,13 @@ class PubSubReader
return *this;
}

PubSubReader& data_representation(
const std::vector<eprosima::fastdds::dds::DataRepresentationId_t>& values)
{
datareader_qos_.type_consistency().representation.m_value = values;
return *this;
}

bool update_partition(
const std::string& partition)
{
Expand Down Expand Up @@ -1721,6 +1728,11 @@ class PubSubReader
return status;
}

eprosima::fastdds::dds::TypeSupport get_type_support()
{
return type_;
}

private:

void receive_one(
Expand Down
12 changes: 12 additions & 0 deletions test/blackbox/api/dds-pim/PubSubWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1593,6 +1593,18 @@ class PubSubWriter
return *this;
}

PubSubWriter& data_representation(
const std::vector<eprosima::fastdds::dds::DataRepresentationId_t>& values)
{
datawriter_qos_.representation().m_value = values;
return *this;
}

eprosima::fastdds::dds::TypeSupport get_type_support()
{
return type_;
}

protected:

void participant_matched()
Expand Down
Loading

0 comments on commit 35b63cb

Please sign in to comment.