diff --git a/include/fastdds/dds/core/policy/QosPolicies.hpp b/include/fastdds/dds/core/policy/QosPolicies.hpp index 1c8de695b48..3c4341fd5b5 100644 --- a/include/fastdds/dds/core/policy/QosPolicies.hpp +++ b/include/fastdds/dds/core/policy/QosPolicies.hpp @@ -2264,6 +2264,7 @@ class DisablePositiveACKsQosPolicy : public Parameter_t, public QosPolicy const DisablePositiveACKsQosPolicy& b) const { return enabled == b.enabled && + duration == b.duration && Parameter_t::operator ==(b) && QosPolicy::operator ==(b); } diff --git a/include/fastdds/dds/publisher/qos/DataWriterQos.hpp b/include/fastdds/dds/publisher/qos/DataWriterQos.hpp index d8bb92d5738..02e682575cb 100644 --- a/include/fastdds/dds/publisher/qos/DataWriterQos.hpp +++ b/include/fastdds/dds/publisher/qos/DataWriterQos.hpp @@ -54,7 +54,8 @@ class RTPSReliableWriterQos const RTPSReliableWriterQos& b) const { return (this->times == b.times) && - (this->disable_positive_acks == b.disable_positive_acks); + (this->disable_positive_acks == b.disable_positive_acks) && + (this->disable_heartbeat_piggyback == b.disable_heartbeat_piggyback); } //!Writer Timing Attributes diff --git a/include/fastdds/rtps/writer/StatefulWriter.h b/include/fastdds/rtps/writer/StatefulWriter.h index 58c00f0fecc..56587aa411c 100644 --- a/include/fastdds/rtps/writer/StatefulWriter.h +++ b/include/fastdds/rtps/writer/StatefulWriter.h @@ -321,6 +321,13 @@ class StatefulWriter : public RTPSWriter void updateTimes( const WriterTimes& times); + /** + * Update the period of the disable positive ACKs policy. + * @param att WriterAttributes parameter. + */ + void updatePositiveAcks( + const WriterAttributes& att); + SequenceNumber_t next_sequence_number() const; /** diff --git a/src/cpp/fastdds/publisher/DataWriterImpl.cpp b/src/cpp/fastdds/publisher/DataWriterImpl.cpp index 7b1cafa53ce..6909459bcfc 100644 --- a/src/cpp/fastdds/publisher/DataWriterImpl.cpp +++ b/src/cpp/fastdds/publisher/DataWriterImpl.cpp @@ -1143,10 +1143,22 @@ ReturnCode_t DataWriterImpl::set_qos( { return ReturnCode_t::RETCODE_IMMUTABLE_POLICY; } - set_qos(qos_, qos_to_set, !enabled); + + set_qos(qos_, qos_to_set, enabled); if (enabled) { + if (qos_.reliability().kind == eprosima::fastrtps::RELIABLE_RELIABILITY_QOS && + qos_.reliable_writer_qos() == qos_to_set.reliable_writer_qos()) + { + // Update times and positive_acks attributes on RTPS Layer + WriterAttributes w_att; + w_att.times = qos_.reliable_writer_qos().times; + w_att.disable_positive_acks = qos_.reliable_writer_qos().disable_positive_acks.enabled; + w_att.keep_duration = qos_.reliable_writer_qos().disable_positive_acks.duration; + writer_->updateAttributes(w_att); + } + //Notify the participant that a Writer has changed its QOS fastrtps::TopicAttributes topic_att = get_topic_attributes(qos_, *topic_, type_); WriterQos wqos = qos_.get_writerqos(get_publisher()->get_qos(), topic_->get_qos()); @@ -1884,6 +1896,13 @@ bool DataWriterImpl::can_qos_be_updated( EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK, "Data sharing configuration cannot be changed after the creation of a DataWriter."); } + if (to.reliable_writer_qos().disable_positive_acks.enabled != + from.reliable_writer_qos().disable_positive_acks.enabled) + { + updatable = false; + EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK, + "Only the period of Positive ACKs can be changed after the creation of a DataWriter."); + } return updatable; } diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp index 5346fcbd0cc..700de22f92b 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp @@ -1556,6 +1556,13 @@ bool DataReaderImpl::can_qos_be_updated( EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK, "Unique network flows request cannot be changed after the creation of a DataReader."); } + if (to.reliable_reader_qos().disable_positive_ACKs.enabled != + from.reliable_reader_qos().disable_positive_ACKs.enabled) + { + updatable = false; + EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK, + "Positive ACKs QoS cannot be changed after the creation of a DataReader."); + } return updatable; } diff --git a/src/cpp/rtps/writer/StatefulWriter.cpp b/src/cpp/rtps/writer/StatefulWriter.cpp index cd265475844..7eb71b08b3e 100644 --- a/src/cpp/rtps/writer/StatefulWriter.cpp +++ b/src/cpp/rtps/writer/StatefulWriter.cpp @@ -66,7 +66,7 @@ namespace rtps { /** * Loops over all the readers in the vector, applying the given routine. * The loop continues until the result of the routine is true for any reader - * or all readers have been processes. + * or all readers have been processed. * The returned value is true if the routine returned true at any point, * or false otherwise. */ @@ -953,6 +953,17 @@ DeliveryRetCode StatefulWriter::deliver_sample_to_network( if (disable_positive_acks_ && last_sequence_number_ == SequenceNumber_t()) { last_sequence_number_ = change->sequenceNumber; + if ( !(ack_event_->getRemainingTimeMilliSec() > 0)) + { + // Restart ack_timer + auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns()); + auto now = system_clock::now(); + auto interval = source_timestamp - now + keep_duration_us_; + assert(interval.count() >= 0); + + ack_event_->update_interval_millisec((double)duration_cast(interval).count()); + ack_event_->restart_timer(max_blocking_time); + } } // Restore in case a exception was launched by RTPSMessageGroup. @@ -1612,6 +1623,24 @@ void StatefulWriter::updateAttributes( const WriterAttributes& att) { this->updateTimes(att.times); + if (this->get_disable_positive_acks()) + { + this->updatePositiveAcks(att); + } +} + +void StatefulWriter::updatePositiveAcks( + const WriterAttributes& att) +{ + std::lock_guard guard(mp_mutex); + if (keep_duration_us_.count() != (att.keep_duration.to_ns() * 1e-3)) + { + // Implicit conversion to microseconds + keep_duration_us_ = std::chrono::nanoseconds {att.keep_duration.to_ns()}; + } + // Restart ack timer with new duration + ack_event_->update_interval_millisec(keep_duration_us_.count() * 1e-3); + ack_event_->restart_timer(); } void StatefulWriter::updateTimes( @@ -2030,26 +2059,40 @@ bool StatefulWriter::ack_timer_expired() while (interval.count() < 0) { + bool acks_flag = false; for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_, - [this](ReaderProxy* reader) + [this, &acks_flag](ReaderProxy* reader) { if (reader->disable_positive_acks()) { reader->acked_changes_set(last_sequence_number_ + 1); + acks_flag = true; } return false; } ); - last_sequence_number_++; + if (acks_flag) + { + check_acked_status(); + } - // Get the next cache change from the history CacheChange_t* change; + // Skip removed changes until reaching the last change + do + { + last_sequence_number_++; + } while (!mp_history->get_change( + last_sequence_number_, + getGuid(), + &change) && last_sequence_number_ < next_sequence_number()); + if (!mp_history->get_change( last_sequence_number_, getGuid(), &change)) { + // Stop ack_timer return false; } diff --git a/test/blackbox/api/dds-pim/PubSubWriter.hpp b/test/blackbox/api/dds-pim/PubSubWriter.hpp index 65f72a52d8b..27f411cd6ed 100644 --- a/test/blackbox/api/dds-pim/PubSubWriter.hpp +++ b/test/blackbox/api/dds-pim/PubSubWriter.hpp @@ -1470,6 +1470,17 @@ class PubSubWriter return (ReturnCode_t::RETCODE_OK == datawriter_->set_qos(datawriter_qos_)); } + bool set_qos( + const eprosima::fastdds::dds::DataWriterQos& att) + { + return (ReturnCode_t::RETCODE_OK == datawriter_->set_qos(att)); + } + + eprosima::fastdds::dds::DataWriterQos get_qos() + { + return (datawriter_->get_qos()); + } + bool remove_all_changes( size_t* number_of_changes_removed) { diff --git a/test/blackbox/common/BlackboxTestsAcknackQos.cpp b/test/blackbox/common/DDSBlackboxTestsAckPositive.cpp similarity index 58% rename from test/blackbox/common/BlackboxTestsAcknackQos.cpp rename to test/blackbox/common/DDSBlackboxTestsAckPositive.cpp index a977a53f152..72b6473a7cb 100644 --- a/test/blackbox/common/BlackboxTestsAcknackQos.cpp +++ b/test/blackbox/common/DDSBlackboxTestsAckPositive.cpp @@ -1,4 +1,4 @@ -// Copyright 2019 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,23 +12,138 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "BlackboxTests.hpp" - -#include "PubSubReader.hpp" -#include "PubSubWriter.hpp" -#include "ReqRepAsReliableHelloWorldRequester.hpp" -#include "ReqRepAsReliableHelloWorldReplier.hpp" +#include +#include +#include +#include +#include #include - +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include +#include #include +#include + +#include "BlackboxTests.hpp" +#include "../api/dds-pim/CustomPayloadPool.hpp" +#include "../api/dds-pim/PubSubReader.hpp" +#include "../api/dds-pim/PubSubWriter.hpp" +#include "../api/dds-pim/ReqRepAsReliableHelloWorldRequester.hpp" +#include "../api/dds-pim/ReqRepAsReliableHelloWorldReplier.hpp" +#include "../types/FixedSized.h" +#include "../types/FixedSizedPubSubTypes.h" +#include "../types/HelloWorldPubSubTypes.h" using namespace eprosima::fastrtps; using namespace eprosima::fastrtps::rtps; using test_UDPv4Transport = eprosima::fastdds::rtps::test_UDPv4Transport; using test_UDPv4TransportDescriptor = eprosima::fastdds::rtps::test_UDPv4TransportDescriptor; + +TEST(AcknackQos, DDSEnableUpdatabilityOfPositiveAcksPeriodDDSLayer) +{ + // This test checks the behaviour of disabling positive ACKs. + // It also checks that only the positive ACKs + // period is updatable at runtime through set_qos. + + PubSubWriter writer(TEST_TOPIC_NAME); + PubSubReader reader(TEST_TOPIC_NAME); + + // Configure datapublisher_qos + writer.keep_duration({1, 0}); + writer.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS); + writer.durability_kind(eprosima::fastrtps::VOLATILE_DURABILITY_QOS); + writer.init(); + + ASSERT_TRUE(writer.isInitialized()); + + // Configure datasubscriber_qos + reader.keep_duration({1, 0}); + reader.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS); + reader.init(); + + ASSERT_TRUE(reader.isInitialized()); + + // Check correct initialitation + eprosima::fastdds::dds::DataWriterQos get_att = writer.get_qos(); + EXPECT_TRUE(get_att.reliable_writer_qos().disable_positive_acks.enabled); + EXPECT_EQ(get_att.reliable_writer_qos().disable_positive_acks.duration, eprosima::fastrtps::Duration_t({1, 0})); + + // Wait for discovery. + writer.wait_discovery(); + reader.wait_discovery(); + + auto data = default_helloworld_data_generator(); + + reader.startReception(data); + // Send data + writer.send(data); + // In this test all data should be sent. + ASSERT_TRUE(data.empty()); + // Block reader until reception finished or timeout. + reader.block_for_all(); + // Wait for all acked msgs + EXPECT_TRUE(writer.waitForAllAcked(std::chrono::milliseconds(1200))); + + // Wait to disable timer because no new messages are sent + std::this_thread::sleep_for(std::chrono::milliseconds(1200)); + // Send a new message to check that timer is restarted correctly + data = default_helloworld_data_generator(1); + reader.startReception(data); + writer.send(data); + ASSERT_TRUE(data.empty()); + reader.block_for_all(); + EXPECT_TRUE(writer.waitForAllAcked(std::chrono::milliseconds(1200))); + + // Update attributes on DDS layer + eprosima::fastdds::dds::DataWriterQos w_att = writer.get_qos(); + w_att.reliable_writer_qos().disable_positive_acks.enabled = true; + w_att.reliable_writer_qos().disable_positive_acks.duration = eprosima::fastrtps::Duration_t({2, 0}); + + EXPECT_TRUE(writer.set_qos(w_att)); + + // Check that period has been changed in DataWriterQos + get_att = writer.get_qos(); + EXPECT_TRUE(get_att.reliable_writer_qos().disable_positive_acks.enabled); + EXPECT_EQ(get_att.reliable_writer_qos().disable_positive_acks.duration, eprosima::fastrtps::Duration_t({2, 0})); + + data = default_helloworld_data_generator(); + + reader.startReception(data); + // Send data + writer.send(data); + // In this test all data should be sent. + ASSERT_TRUE(data.empty()); + // Block reader until reception finished or timeout. + reader.block_for_all(); + // Check that period has been correctly updated + EXPECT_FALSE(writer.waitForAllAcked(std::chrono::milliseconds(1200))); + EXPECT_TRUE(writer.waitForAllAcked(std::chrono::milliseconds(1200))); + + // Try to disable positive_acks + w_att.reliable_writer_qos().disable_positive_acks.enabled = false; + + // Check that is not possible to change disable_positive_acks at runtime + EXPECT_FALSE(writer.set_qos(w_att)); +} + TEST(AcknackQos, RecoverAfterLosingCommunicationWithDisablePositiveAck) { // This test makes the writer send a few samples diff --git a/test/blackbox/common/RTPSAsSocketReader.hpp b/test/blackbox/common/RTPSAsSocketReader.hpp index 1cf97b553f0..dd79222108c 100644 --- a/test/blackbox/common/RTPSAsSocketReader.hpp +++ b/test/blackbox/common/RTPSAsSocketReader.hpp @@ -324,6 +324,13 @@ class RTPSAsSocketReader } } + RTPSAsSocketReader& disable_positive_acks( + bool disable) + { + reader_attr_.disable_positive_acks = disable; + return *this; + } + private: void receive_one( diff --git a/test/blackbox/common/RTPSAsSocketWriter.hpp b/test/blackbox/common/RTPSAsSocketWriter.hpp index 39aa927f15f..010cce25843 100644 --- a/test/blackbox/common/RTPSAsSocketWriter.hpp +++ b/test/blackbox/common/RTPSAsSocketWriter.hpp @@ -92,6 +92,7 @@ class RTPSAsSocketWriter : public eprosima::fastrtps::rtps::WriterListener if (writer_attr_.endpoint.durabilityKind == eprosima::fastrtps::rtps::VOLATILE) { history_->remove_change_g(change); + std::cout << "Change removed" << std::endl; } } @@ -230,6 +231,13 @@ class RTPSAsSocketWriter : public eprosima::fastrtps::rtps::WriterListener rattr.m_qos.m_reliability.kind = eprosima::fastrtps::RELIABLE_RELIABILITY_QOS; } + // Check disable_positive_acks_ attribute + if (writer_attr_.disable_positive_acks) + { + rattr.m_qos.m_disablePositiveACKs.enabled = writer_attr_.disable_positive_acks; + rattr.m_qos.m_disablePositiveACKs.duration = writer_attr_.keep_duration; + } + rattr.guid().guidPrefix.value[0] = guid.guidPrefix.value[0]; rattr.guid().guidPrefix.value[1] = guid.guidPrefix.value[1]; rattr.guid().guidPrefix.value[2] = guid.guidPrefix.value[2]; @@ -282,6 +290,28 @@ class RTPSAsSocketWriter : public eprosima::fastrtps::rtps::WriterListener return *this; } + RTPSAsSocketWriter& disable_positive_acks_seconds( + bool disable, + int32_t sec) + { + writer_attr_.disable_positive_acks = disable; + writer_attr_.keep_duration = eprosima::fastrtps::Duration_t(sec, 0); + return *this; + } + + /*** Access RTPSWriter functions ***/ + void updateAttributes( + const eprosima::fastrtps::rtps::WriterAttributes& att) + { + writer_->updateAttributes(att); + return; + } + + bool get_disable_positive_acks() + { + return writer_->get_disable_positive_acks(); + } + private: eprosima::fastrtps::rtps::RTPSParticipant* participant_; diff --git a/test/blackbox/common/RTPSBlackboxTestsPositiveAck.cpp b/test/blackbox/common/RTPSBlackboxTestsPositiveAck.cpp new file mode 100644 index 00000000000..8d2ab09bd60 --- /dev/null +++ b/test/blackbox/common/RTPSBlackboxTestsPositiveAck.cpp @@ -0,0 +1,101 @@ +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "BlackboxTests.hpp" + +#include + +#include "RTPSAsSocketWriter.hpp" +#include "RTPSAsSocketReader.hpp" +#include "RTPSWithRegistrationReader.hpp" +#include "RTPSWithRegistrationWriter.hpp" + +using namespace eprosima::fastrtps; +using namespace eprosima::fastrtps::rtps; + +TEST(RTPSAck, EnableUpdatabilityOfPositiveAcksPeriodRTPSLayer) +{ + // This test checks that only the positive ACKs + // period is updatable at runtime on the RTPS Layer. + + RTPSAsSocketReader reader(TEST_TOPIC_NAME); + RTPSAsSocketWriter writer(TEST_TOPIC_NAME); + std::string ip("239.255.1.4"); + + reader.reliability(eprosima::fastrtps::rtps::ReliabilityKind_t::RELIABLE). + add_to_multicast_locator_list(ip, global_port). + disable_positive_acks(true).init(); + + ASSERT_TRUE(reader.isInitialized()); + + writer.reliability(eprosima::fastrtps::rtps::ReliabilityKind_t::RELIABLE). + durability(eprosima::fastrtps::rtps::DurabilityKind_t::VOLATILE). + add_to_multicast_locator_list(ip, global_port). + auto_remove_on_volatile(). + disable_positive_acks_seconds(true, 1).init(); + + ASSERT_TRUE(writer.isInitialized()); + + auto data = default_helloworld_data_generator(); + + reader.expected_data(data); + reader.startReception(); + + // Send data + writer.send(data); + // In this test all data should be sent. + ASSERT_TRUE(data.empty()); + // Block reader until reception finished or timeout. + reader.block_for_all(); + // Check history is not empty + EXPECT_FALSE(writer.is_history_empty()); + // Check history after keep_duration period + std::this_thread::sleep_for(std::chrono::milliseconds(1200)); + EXPECT_TRUE(writer.is_history_empty()); + + // Update attributes at RTPS layer + WriterAttributes w_att; + w_att.disable_positive_acks = true; + w_att.keep_duration = eprosima::fastrtps::Duration_t(2, 0); + + writer.updateAttributes(w_att); + + data = default_helloworld_data_generator(); + + reader.expected_data(data); + reader.startReception(); + + // Send data + writer.send(data); + // In this test all data should be sent. + ASSERT_TRUE(data.empty()); + // Block reader until reception finished or timeout. + reader.block_for_all(); + // Check history is not empty + EXPECT_FALSE(writer.is_history_empty()); + // Check history before keep_duration period + std::this_thread::sleep_for(std::chrono::milliseconds(1200)); + EXPECT_FALSE(writer.is_history_empty()); + // Check history after keep_duration period + std::this_thread::sleep_for(std::chrono::milliseconds(1200)); + EXPECT_TRUE(writer.is_history_empty()); + + // Update attributes at RTPS layer + w_att.disable_positive_acks = false; + + writer.updateAttributes(w_att); + + // Check that positive_acks feature is not changed at runtime + EXPECT_TRUE(writer.get_disable_positive_acks()); +} \ No newline at end of file