From d4adf08d71126b5cc420caf8c1c776c2a918d01f Mon Sep 17 00:00:00 2001 From: Ed Hennis Date: Tue, 20 Aug 2024 20:13:31 -0400 Subject: [PATCH] Collapse multiple outgoing TMLedgerData messages with cookies into one * When work is done for a given TMGetLedger request, send the response to all the peers that are waiting on the request, sending one message per peer, including all the cookies and a "directResponse" flag indicating the data is intended for the sender, too. --- include/xrpl/proto/ripple.proto | 10 ++ src/test/overlay/ProtocolVersion_test.cpp | 4 +- src/xrpld/overlay/Peer.h | 1 + src/xrpld/overlay/detail/PeerImp.cpp | 177 ++++++++++++++++--- src/xrpld/overlay/detail/ProtocolVersion.cpp | 4 +- 5 files changed, 166 insertions(+), 30 deletions(-) diff --git a/include/xrpl/proto/ripple.proto b/include/xrpl/proto/ripple.proto index a06bbd9a311..e121a39706c 100644 --- a/include/xrpl/proto/ripple.proto +++ b/include/xrpl/proto/ripple.proto @@ -321,8 +321,18 @@ message TMLedgerData required uint32 ledgerSeq = 2; required TMLedgerInfoType type = 3; repeated TMLedgerNode nodes = 4; + // If the peer supports "responseCookies", this field will + // never be populated. optional uint32 requestCookie = 5; optional TMReplyError error = 6; + // The old field is called "requestCookie", but this is + // a response, so this name makes more sense + repeated uint32 responseCookies = 7; + // If a TMGetLedger request was received without a "requestCookie", + // and the peer supports it, this flag will be set to true to + // indicate that the receiver should process the result in addition + // to forwarding it to its "responseCookies" peers. + optional bool directResponse = 8; } message TMPing diff --git a/src/test/overlay/ProtocolVersion_test.cpp b/src/test/overlay/ProtocolVersion_test.cpp index dfc0ee70b8e..97469c59805 100644 --- a/src/test/overlay/ProtocolVersion_test.cpp +++ b/src/test/overlay/ProtocolVersion_test.cpp @@ -87,8 +87,8 @@ class ProtocolVersion_test : public beast::unit_test::suite negotiateProtocolVersion("XRPL/2.2") == make_protocol(2, 2)); BEAST_EXPECT( negotiateProtocolVersion( - "RTXP/1.2, XRPL/2.2, XRPL/2.3, XRPL/999.999") == - make_protocol(2, 2)); + "RTXP/1.2, XRPL/2.2, XRPL/2.3, XRPL/2.4, XRPL/999.999") == + make_protocol(2, 3)); BEAST_EXPECT( negotiateProtocolVersion("XRPL/999.999, WebSocket/1.0") == std::nullopt); diff --git a/src/xrpld/overlay/Peer.h b/src/xrpld/overlay/Peer.h index c4ad617f51f..efa5c63ebb9 100644 --- a/src/xrpld/overlay/Peer.h +++ b/src/xrpld/overlay/Peer.h @@ -36,6 +36,7 @@ enum class ProtocolFeature { ValidatorListPropagation, ValidatorList2Propagation, LedgerReplay, + LedgerDataCookies }; /** Represents a peer connection in the overlay. */ diff --git a/src/xrpld/overlay/detail/PeerImp.cpp b/src/xrpld/overlay/detail/PeerImp.cpp index d771c77e702..83f10a6146d 100644 --- a/src/xrpld/overlay/detail/PeerImp.cpp +++ b/src/xrpld/overlay/detail/PeerImp.cpp @@ -509,6 +509,8 @@ PeerImp::supportsFeature(ProtocolFeature f) const return protocol_ >= make_protocol(2, 2); case ProtocolFeature::LedgerReplay: return ledgerReplayEnabled_; + case ProtocolFeature::LedgerDataCookies: + return protocol_ >= make_protocol(2, 3); } return false; } @@ -1322,8 +1324,9 @@ PeerImp::handleTransaction( void PeerImp::onMessage(std::shared_ptr const& m) { - auto badData = [&](std::string const& msg) { - charge(Resource::feeBadData); + auto badData = [&](std::string const& msg, bool chargefee = true) { + if (chargefee) + charge(Resource::feeBadData); JLOG(p_journal_.warn()) << "TMGetLedger: " << msg; }; auto const itype{m->itype()}; @@ -1452,7 +1455,10 @@ PeerImp::onMessage(std::shared_ptr const& m) } else { - return badData("duplicate request: " + to_string(messageHash)); + // Don't punish nodes that don't know any better + return badData( + "duplicate request: " + to_string(messageHash), + supportsFeature(ProtocolFeature::LedgerDataCookies)); } } @@ -1568,8 +1574,9 @@ PeerImp::onMessage(std::shared_ptr const& m) void PeerImp::onMessage(std::shared_ptr const& m) { - auto badData = [&](std::string const& msg) { - fee_ = Resource::feeBadData; + auto badData = [&](std::string const& msg, bool charge = true) { + if (charge) + fee_ = Resource::feeBadData; JLOG(p_journal_.warn()) << "TMLedgerData: " << msg; }; @@ -1623,22 +1630,83 @@ PeerImp::onMessage(std::shared_ptr const& m) auto const messageHash = sha512Half(*m); if (!app_.getHashRouter().addSuppressionPeer(messageHash, id_)) { - return badData("Duplicate message: " + to_string(messageHash)); + // Don't punish nodes that don't know any better + return badData( + "Duplicate message: " + to_string(messageHash), + supportsFeature(ProtocolFeature::LedgerDataCookies)); } - // If there is a request cookie, attempt to relay the message - if (m->has_requestcookie()) + bool const routed = m->has_directresponse() || m->responsecookies_size() || + m->has_requestcookie(); + { - if (auto peer = overlay_.findPeerByShortID(m->requestcookie())) + // Check if this message needs to be forwarded to one or more peers. + // Maximum of one of the relevant fields should be populated. + assert(!m->has_requestcookie() || !m->responsecookies_size()); + + // Make a copy of the response cookies, then wipe the list so it can be + // forwarded cleanly + auto const responseCookies = m->responsecookies(); + m->clear_responsecookies(); + // Flag indicating if this response should be processed locally, + // possibly in addition to being forwarded. + bool const directResponse = + m->has_directresponse() && m->directresponse(); + m->clear_directresponse(); + + auto const relay = [this, m, &messageHash](auto const cookie) { + if (auto peer = overlay_.findPeerByShortID(cookie)) + { + assert(!m->has_requestcookie() && !m->responsecookies_size()); + if (peer->supportsFeature(ProtocolFeature::LedgerDataCookies)) + // Setting this flag is not _strictly_ necessary for peers + // that support it if there are no cookies included in the + // message, but it is more accurate. + m->set_directresponse(true); + else + m->clear_directresponse(); + peer->send( + std::make_shared(*m, protocol::mtLEDGER_DATA)); + } + else + JLOG(p_journal_.info()) + << "Unable to route TX/ledger data reply to peer [" + << cookie << "]: " << messageHash; + }; + // If there is a request cookie, attempt to relay the message + if (m->has_requestcookie()) { + assert(responseCookies.empty()); m->clear_requestcookie(); - peer->send(std::make_shared(*m, protocol::mtLEDGER_DATA)); + relay(m->requestcookie()); + if (!directResponse && responseCookies.empty()) + return; } - else + // If there's a list of request cookies, attempt to relay the message to + // all of them. + if (responseCookies.size()) { - JLOG(p_journal_.info()) << "Unable to route TX/ledger data reply"; + for (auto const cookie : responseCookies) + relay(cookie); + if (!directResponse) + return; + } + } + + // Now that any forwarding is done check the base message (data only, no + // routing info for duplicates) + if (routed) + { + m->clear_directresponse(); + assert(!m->has_requestcookie() && !m->responsecookies_size()); + auto const baseMessageHash = sha512Half(*m); + if (!app_.getHashRouter().addSuppressionPeer(baseMessageHash, id_)) + { + // Don't punish nodes that don't know any better + return badData( + "Duplicate message: " + to_string(baseMessageHash), + supportsFeature(ProtocolFeature::LedgerDataCookies)); } - return; } uint256 const ledgerHash{m->ledgerhash()}; @@ -3060,21 +3128,72 @@ PeerImp::sendToMultiple( { if (peer.get() == this) foundSelf = true; - JLOG(p_journal_.debug()) - << "sendToMultiple: Sending " << cookies.size() - << " TMLedgerData messages to peer[" << peer->id() - << "]: " << sha512Half(ledgerData); + bool const multipleCookies = + peer->supportsFeature(ProtocolFeature::LedgerDataCookies); + std::vector sendCookies; + + bool directResponse = false; + if (!multipleCookies) + { + JLOG(p_journal_.debug()) + << "sendToMultiple: Sending " << cookies.size() + << " TMLedgerData messages to peer [" << peer->id() + << "]: " << sha512Half(ledgerData); + } for (auto const& cookie : cookies) { // Unfortunately, need a separate Message object for every // combination if (cookie) + { + if (multipleCookies) + { + // Save this one for later to send a single message + sendCookies.emplace_back(*cookie); + continue; + } + + // Feature not supported, so send a single message with a + // single cookie ledgerData.set_requestcookie(*cookie); + } else + { + if (multipleCookies) + { + // Set this flag later on the single message + directResponse = true; + continue; + } + ledgerData.clear_requestcookie(); + } + assert(!multipleCookies); + auto message{ + std::make_shared(ledgerData, protocol::mtLEDGER_DATA)}; + peer->send(message); + } + if (multipleCookies) + { + // Send a single message with all the cookies and/or the direct + // response flag, so the receiver can farm out the single message to + // multiple peers and/or itself + assert(sendCookies.size() || directResponse); + ledgerData.clear_requestcookie(); + ledgerData.clear_responsecookies(); + ledgerData.set_directresponse(directResponse); + for (auto const& cookie : sendCookies) + ledgerData.add_responsecookies(cookie); auto message{ std::make_shared(ledgerData, protocol::mtLEDGER_DATA)}; peer->send(message); + + JLOG(p_journal_.debug()) + << "sendToMultiple: Sent 1 TMLedgerData message to peer [" + << peer->id() << "]: including " + << (directResponse ? "the direct response flag and " : "") + << sendCookies.size() << " response cookies. " + << ": " << sha512Half(ledgerData); } } assert(foundSelf); @@ -3085,7 +3204,7 @@ PeerImp::getLedger( std::shared_ptr const& m, uint256 const& mHash) { - JLOG(p_journal_.trace()) << "getLedger: Ledger"; + JLOG(p_journal_.trace()) << "getLedger: Ledger " << mHash; std::shared_ptr ledger; @@ -3126,7 +3245,8 @@ PeerImp::getLedger( } JLOG(p_journal_.trace()) - << "getLedger: Don't have ledger with hash " << ledgerHash; + << "getLedger: Don't have ledger with hash " << ledgerHash + << ": " << mHash; if (m->has_querytype() && !m->has_requestcookie()) { @@ -3169,7 +3289,7 @@ PeerImp::getLedger( if (m->ledgerseq() < app_.getLedgerMaster().getEarliestFetch()) { JLOG(p_journal_.debug()) - << "getLedger: Early ledger sequence request"; + << "getLedger: Early ledger sequence request " << mHash; } else { @@ -3178,7 +3298,7 @@ PeerImp::getLedger( { JLOG(p_journal_.debug()) << "getLedger: Don't have ledger with sequence " - << m->ledgerseq(); + << m->ledgerseq() << ": " << mHash; } } } @@ -3200,20 +3320,22 @@ PeerImp::getLedger( charge(Resource::feeInvalidRequest); ledger.reset(); - JLOG(p_journal_.warn()) - << "getLedger: Invalid ledger sequence " << ledgerSeq; + JLOG(p_journal_.warn()) << "getLedger: Invalid ledger sequence " + << ledgerSeq << ": " << mHash; } } else if (ledgerSeq < app_.getLedgerMaster().getEarliestFetch()) { ledger.reset(); JLOG(p_journal_.debug()) - << "getLedger: Early ledger sequence request " << ledgerSeq; + << "getLedger: Early ledger sequence request " << ledgerSeq + << ": " << mHash; } } else { - JLOG(p_journal_.debug()) << "getLedger: Unable to find ledger"; + JLOG(p_journal_.debug()) + << "getLedger: Unable to find ledger " << mHash; } return ledger; @@ -3224,7 +3346,7 @@ PeerImp::getTxSet( std::shared_ptr const& m, uint256 const& mHash) const { - JLOG(p_journal_.trace()) << "getTxSet: TX set"; + JLOG(p_journal_.trace()) << "getTxSet: TX set " << mHash; uint256 const txSetHash{m->ledgerhash()}; std::shared_ptr shaMap{ @@ -3260,7 +3382,8 @@ PeerImp::getTxSet( } else { - JLOG(p_journal_.debug()) << "getTxSet: Failed to find TX set"; + JLOG(p_journal_.debug()) + << "getTxSet: Failed to find TX set " << mHash; } } diff --git a/src/xrpld/overlay/detail/ProtocolVersion.cpp b/src/xrpld/overlay/detail/ProtocolVersion.cpp index bd2effa6341..cb9a4db64ea 100644 --- a/src/xrpld/overlay/detail/ProtocolVersion.cpp +++ b/src/xrpld/overlay/detail/ProtocolVersion.cpp @@ -37,7 +37,9 @@ namespace ripple { constexpr ProtocolVersion const supportedProtocolList[] { {2, 1}, - {2, 2} + {2, 2}, + // Adds TMLedgerData::responseCookies and directResponse + {2, 3} }; // clang-format on