Skip to content

Commit

Permalink
TCP first message loss (#4454)
Browse files Browse the repository at this point in the history
* Refs #20508: add remaining add_logical_port calls

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20508: Add unittests

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20508: Add blackbox test + uncomment section (commented for testing purposes)

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20508: Add blackbox tests + functional fixes

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20508: Fix tests

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20508: Add cv + fix windows tests

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20508. Use lock_guard where apropiate.

Signed-off-by: Miguel Company <[email protected]>

* Refs #20508. Proxy is_local_port_opened.

Signed-off-by: Miguel Company <[email protected]>

* Refs #20508. Refactor wait_logical_port_under_negotiation.

Signed-off-by: Miguel Company <[email protected]>

* Refs #20508: Apply suggestions

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20508: Fix typo

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20508: Add wait_for_logical_port_negotiation_ms to transport descriptor

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20508: Apply suggestions - firs message loss related

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20508: Apply suggestions - transport descriptor related

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20508: Reuse unused transport descriptor tcp_negotiation_timeout

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20508: Change timeout behavior

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20508: Uncrustify

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20508: Fix xml parser

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20508: Undo fastcdr commit change

Signed-off-by: Jesus Perez <[email protected]>

---------

Signed-off-by: Jesus Perez <[email protected]>
Signed-off-by: Miguel Company <[email protected]>
Co-authored-by: Miguel Company <[email protected]>
  • Loading branch information
jepemi and MiguelCompany authored Mar 6, 2024
1 parent fe11650 commit 8103cf0
Show file tree
Hide file tree
Showing 24 changed files with 599 additions and 77 deletions.
8 changes: 7 additions & 1 deletion include/fastdds/rtps/transport/TCPTransportDescriptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ namespace rtps {
* immediately if the buffer might get full, but no error will be returned to the upper layer. This means
* that the application will behave as if the datagram is sent and lost.
*
* - \c tcp_negotiation_timeout: time to wait for logical port negotiation (in ms).
*
* @ingroup TRANSPORT_MODULE
*/
struct TCPTransportDescriptor : public SocketTransportDescriptor
Expand Down Expand Up @@ -255,7 +257,11 @@ struct TCPTransportDescriptor : public SocketTransportDescriptor
//! Increment between logical ports to try during RTCP negotiation
uint16_t logical_port_increment;

FASTDDS_TODO_BEFORE(3, 0, "Eliminate tcp_negotiation_timeout, variable not in use.");
/**
* Time to wait for logical port negotiation (ms). If a logical port is under negotiation, it waits for the
* negotiation to finish up to this timeout before trying to send a message to that port.
* Zero value means no waiting (default).
*/
uint32_t tcp_negotiation_timeout;

//! Enables the TCP_NODELAY socket option
Expand Down
1 change: 1 addition & 0 deletions include/fastrtps/xmlparser/XMLParserCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ extern const char* CALCULATE_CRC;
extern const char* CHECK_CRC;
extern const char* KEEP_ALIVE_THREAD;
extern const char* ACCEPT_THREAD;
extern const char* TCP_NEGOTIATION_TIMEOUT;
extern const char* SEGMENT_SIZE;
extern const char* PORT_QUEUE_CAPACITY;
extern const char* PORT_OVERFLOW_POLICY;
Expand Down
66 changes: 34 additions & 32 deletions resources/xsd/fastRTPS_profiles.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -844,38 +844,39 @@

<!--| Transport Descriptor Definition |-->
<!--Transport Descriptor:
├ transport_id [string],
├ type [string] ("UDPv4", "UDPv6", "TCPv4", "TCPv6", "SHM"),
├ sendBufferSize [uint32],
├ receiveBufferSize [uint32],
├ maxMessageSize [uint32],
├ maxInitialPeersRange [uint32],
├ interfaceWhiteList [0~*], (NOT available for SHM type)
| └ address [ipv4Address|ipv6Address]
├ TTL [uint8], (ONLY available for UDP type)
├ non_blocking_send [boolean], (NOT available for SHM type)
├ output_port [uint16], (ONLY available for UDP type)
├ wan_addr [ipv4AddressFormat], (ONLY available for TCPv4 type)
├ keep_alive_frequency_ms [uint32], (ONLY available for TCP type)
├ keep_alive_timeout_ms [uint32], (ONLY available for TCP type)
├ max_logical_port [uint16], (ONLY available for TCP type)
├ logical_port_range [uint16], (ONLY available for TCP type)
├ logical_port_increment [uint16], (ONLY available for TCP type)
├ listening_ports [0~*], (ONLY available for TCP type)
| └ port [uint16] (ONLY available for TCP type)
├ tls [0~1], (ONLY available for TCP type)
├ calculate_crc [bool], (ONLY available for TCP type)
├ check_crc [bool], (ONLY available for TCP type)
├ enable_tcp_nodelay [bool], (ONLY available for TCP type)
├ keep_alive_thread [threadSettingsType], (ONLY available for TCP type)
├ accept_thread [threadSettingsType], (ONLY available for TCP type)
├ segment_size [uint32], (ONLY available for SHM type)
├ port_queue_capacity [uint32], (ONLY available for SHM type)
├ healthy_check_timeout_ms [uint32], (ONLY available for SHM type)
├ rtps_dump_file [string] (ONLY available for SHM type)
├ default_reception_threads [threadSettingsType]
├ reception_threads [receptionThreadsListType] (ONLY available for SHM type)
└ dump_thread [threadSettingsType] (ONLY available for SHM type) -->
├ transport_id [string],
├ type [string] ("UDPv4", "UDPv6", "TCPv4", "TCPv6", "SHM"),
├ sendBufferSize [uint32],
├ receiveBufferSize [uint32],
├ maxMessageSize [uint32],
├ maxInitialPeersRange [uint32],
├ interfaceWhiteList [0~*], (NOT available for SHM type)
| └ address [ipv4Address|ipv6Address]
├ TTL [uint8], (ONLY available for UDP type)
├ non_blocking_send [boolean], (NOT available for SHM type)
├ output_port [uint16], (ONLY available for UDP type)
├ wan_addr [ipv4AddressFormat], (ONLY available for TCPv4 type)
├ keep_alive_frequency_ms [uint32], (ONLY available for TCP type)
├ keep_alive_timeout_ms [uint32], (ONLY available for TCP type)
├ max_logical_port [uint16], (ONLY available for TCP type)
├ logical_port_range [uint16], (ONLY available for TCP type)
├ logical_port_increment [uint16], (ONLY available for TCP type)
├ listening_ports [0~*], (ONLY available for TCP type)
| └ port [uint16] (ONLY available for TCP type)
├ tls [0~1], (ONLY available for TCP type)
├ calculate_crc [bool], (ONLY available for TCP type)
├ check_crc [bool], (ONLY available for TCP type)
├ enable_tcp_nodelay [bool], (ONLY available for TCP type)
├ keep_alive_thread [threadSettingsType], (ONLY available for TCP type)
├ accept_thread [threadSettingsType], (ONLY available for TCP type)
├ tcp_negotiation_timeout [uint32], (ONLY available for TCP type)
├ segment_size [uint32], (ONLY available for SHM type)
├ port_queue_capacity [uint32], (ONLY available for SHM type)
├ healthy_check_timeout_ms [uint32], (ONLY available for SHM type)
├ rtps_dump_file [string] (ONLY available for SHM type)
├ default_reception_threads [threadSettingsType]
├ reception_threads [receptionThreadsListType] (ONLY available for SHM type)
└ dump_thread [threadSettingsType] (ONLY available for SHM type) -->
<!-- TODO: How to ensure all elements are declared properly (UDP only, TCP only, etc...)? -->
<xs:complexType name="transportDescriptorType">
<xs:all minOccurs="0">
Expand Down Expand Up @@ -931,6 +932,7 @@
<xs:element name="tls" type="tlsConfigType" minOccurs="0" maxOccurs="1"/>
<xs:element name="keep_alive_thread" type="threadSettingsType" minOccurs="0" maxOccurs="1"/>
<xs:element name="accept_thread" type="threadSettingsType" minOccurs="0" maxOccurs="1"/>
<xs:element name="tcp_negotiation_timeout" type="uint32" minOccurs="0" maxOccurs="1"/>
<xs:element name="segment_size" type="uint32" minOccurs="0" maxOccurs="1"/>
<xs:element name="port_queue_capacity" type="uint32" minOccurs="0" maxOccurs="1"/>
<xs:element name="healthy_check_timeout_ms" type="uint32" minOccurs="0" maxOccurs="1"/>
Expand Down
2 changes: 2 additions & 0 deletions src/cpp/rtps/attributes/RTPSParticipantAttributes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ static std::shared_ptr<fastdds::rtps::TCPv4TransportDescriptor> create_tcpv4_tra
descriptor->check_crc = false;
descriptor->apply_security = false;
descriptor->enable_tcp_nodelay = true;
descriptor->tcp_negotiation_timeout = 0;

descriptor->default_reception_threads(att.builtin_transports_reception_threads);
descriptor->accept_thread = att.builtin_transports_reception_threads;
Expand All @@ -120,6 +121,7 @@ static std::shared_ptr<fastdds::rtps::TCPv6TransportDescriptor> create_tcpv6_tra
descriptor->check_crc = false;
descriptor->apply_security = false;
descriptor->enable_tcp_nodelay = true;
descriptor->tcp_negotiation_timeout = 0;

descriptor->default_reception_threads(att.builtin_transports_reception_threads);
descriptor->accept_thread = att.builtin_transports_reception_threads;
Expand Down
70 changes: 61 additions & 9 deletions src/cpp/rtps/transport/TCPChannelResource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ ResponseCode TCPChannelResource::process_bind_request(

void TCPChannelResource::set_all_ports_pending()
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
pending_logical_output_ports_.insert(pending_logical_output_ports_.end(),
logical_output_ports_.begin(),
logical_output_ports_.end());
Expand All @@ -107,24 +107,75 @@ void TCPChannelResource::set_all_ports_pending()
bool TCPChannelResource::is_logical_port_opened(
uint16_t port)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
return is_logical_port_opened_nts(port);
}

bool TCPChannelResource::is_logical_port_opened_nts(
uint16_t port)
{
return std::find(logical_output_ports_.begin(), logical_output_ports_.end(), port) != logical_output_ports_.end();
}

bool TCPChannelResource::is_logical_port_added(
uint16_t port)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
return std::find(logical_output_ports_.begin(), logical_output_ports_.end(), port) != logical_output_ports_.end()
|| std::find(pending_logical_output_ports_.begin(), pending_logical_output_ports_.end(), port)
!= pending_logical_output_ports_.end();
}

bool TCPChannelResource::wait_logical_port_under_negotiation(
uint16_t port,
const std::chrono::milliseconds& timeout)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);

// Early return if the port is already opened.
if (is_logical_port_opened_nts(port))
{
return true;
}

// Early return if the timeout is 0.
if (timeout == std::chrono::milliseconds(0))
{
return false;
}

// The port is under negotiation if it's in the pending list and in the negotiation list.
bool found_in_negotiating_list = negotiating_logical_ports_.end() != std::find_if(
negotiating_logical_ports_.begin(),
negotiating_logical_ports_.end(),
[port](const decltype(negotiating_logical_ports_)::value_type& item)
{
return item.second == port;
});

if (found_in_negotiating_list &&
pending_logical_output_ports_.end() != std::find(
pending_logical_output_ports_.begin(),
pending_logical_output_ports_.end(),
port))
{
// Wait for the negotiation to finish. The condition variable might get notified if other logical port is opened. In such case,
// it should wait again with the respective remaining time.
auto wait_predicate = [this, port]() -> bool
{
return is_logical_port_opened_nts(port);
};
logical_output_ports_updated_cv.wait_for(scopedLock, timeout, wait_predicate);
}

return is_logical_port_opened_nts(port);
}

void TCPChannelResource::add_logical_port(
uint16_t port,
RTCPMessageManager* rtcp_manager)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
// Already opened?
if (std::find(logical_output_ports_.begin(), logical_output_ports_.end(), port) == logical_output_ports_.end())
{
Expand All @@ -150,7 +201,7 @@ void TCPChannelResource::add_logical_port(
void TCPChannelResource::send_pending_open_logical_ports(
RTCPMessageManager* rtcp_manager)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
if (!pending_logical_output_ports_.empty())
{
for (uint16_t port : pending_logical_output_ports_)
Expand Down Expand Up @@ -180,6 +231,7 @@ void TCPChannelResource::add_logical_port_response(
{
pending_logical_output_ports_.erase(portIt);
logical_output_ports_.push_back(port);
logical_output_ports_updated_cv.notify_all();
EPROSIMA_LOG_INFO(RTCP, "OpenedLogicalPort: " << port);
}
else
Expand Down Expand Up @@ -217,7 +269,7 @@ void TCPChannelResource::prepare_send_check_logical_ports_req(
// Don't add ports just tested and already pendings
if (p <= max_port && p != closedPort)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
auto pendingIt = std::find(pending_logical_output_ports_.begin(), pending_logical_output_ports_.end(), p);
if (pendingIt == pending_logical_output_ports_.end())
{
Expand All @@ -233,7 +285,7 @@ void TCPChannelResource::prepare_send_check_logical_ports_req(
else
{
TCPTransactionId id = rtcp_manager->sendCheckLogicalPortsRequest(this, candidatePorts);
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
last_checked_logical_port_[id] = candidatePorts.back();
}
}
Expand Down Expand Up @@ -268,7 +320,7 @@ void TCPChannelResource::process_check_logical_ports_response(
void TCPChannelResource::set_logical_port_pending(
uint16_t port)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
auto it = std::find(logical_output_ports_.begin(), logical_output_ports_.end(), port);
if (it != logical_output_ports_.end())
{
Expand All @@ -280,7 +332,7 @@ void TCPChannelResource::set_logical_port_pending(
bool TCPChannelResource::remove_logical_port(
uint16_t port)
{
std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
if (!is_logical_port_added(port))
{
return false;
Expand Down
17 changes: 17 additions & 0 deletions src/cpp/rtps/transport/TCPChannelResource.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class TCPChannelResource : public ChannelResource
std::map<TCPTransactionId, uint16_t> last_checked_logical_port_;
std::vector<uint16_t> pending_logical_output_ports_; // Must be accessed after lock pending_logical_mutex_
std::vector<uint16_t> logical_output_ports_;
std::condition_variable_any logical_output_ports_updated_cv;
std::mutex read_mutex_;
std::recursive_mutex pending_logical_mutex_;
std::atomic<eConnectionStatus> connection_status_;
Expand All @@ -94,6 +95,19 @@ class TCPChannelResource : public ChannelResource
bool is_logical_port_added(
uint16_t port);

/**
* This method checks if a logical port is under negotiation. If it is, it waits for the negotiation to finish up to a timeout.
* Independently if being under negotiation or not, it returns true if the port is opened, false otherwise.
*
* @param port The logical port to check.
* @param timeout The maximum time to wait for the negotiation to finish. Zero value means no wait
*
* @return true if the port is opened, false otherwise.
*/
bool wait_logical_port_under_negotiation(
uint16_t port,
const std::chrono::milliseconds& timeout);

bool connection_established()
{
return connection_status_ == eConnectionStatus::eEstablished;
Expand Down Expand Up @@ -227,6 +241,9 @@ class TCPChannelResource : public ChannelResource

private:

bool is_logical_port_opened_nts(
uint16_t port);

void prepare_send_check_logical_ports_req(
uint16_t closedPort,
RTCPMessageManager* rtcp_manager);
Expand Down
Loading

0 comments on commit 8103cf0

Please sign in to comment.