Skip to content

Commit

Permalink
Collapse multiple outgoing TMLedgerData messages with cookies into one
Browse files Browse the repository at this point in the history
* When work is done for a given TMGetLedger request, send the
  response to all the peers that are waiting on the request,
  sending one message per peer, including all the cookies and
  a "directResponse" flag indicating the data is intended for the
  sender, too.
  • Loading branch information
ximinez committed Sep 11, 2024
1 parent 8848447 commit d4adf08
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 30 deletions.
10 changes: 10 additions & 0 deletions include/xrpl/proto/ripple.proto
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,18 @@ message TMLedgerData
required uint32 ledgerSeq = 2;
required TMLedgerInfoType type = 3;
repeated TMLedgerNode nodes = 4;
// If the peer supports "responseCookies", this field will
// never be populated.
optional uint32 requestCookie = 5;
optional TMReplyError error = 6;
// The old field is called "requestCookie", but this is
// a response, so this name makes more sense
repeated uint32 responseCookies = 7;
// If a TMGetLedger request was received without a "requestCookie",
// and the peer supports it, this flag will be set to true to
// indicate that the receiver should process the result in addition
// to forwarding it to its "responseCookies" peers.
optional bool directResponse = 8;
}

message TMPing
Expand Down
4 changes: 2 additions & 2 deletions src/test/overlay/ProtocolVersion_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ class ProtocolVersion_test : public beast::unit_test::suite
negotiateProtocolVersion("XRPL/2.2") == make_protocol(2, 2));
BEAST_EXPECT(
negotiateProtocolVersion(
"RTXP/1.2, XRPL/2.2, XRPL/2.3, XRPL/999.999") ==
make_protocol(2, 2));
"RTXP/1.2, XRPL/2.2, XRPL/2.3, XRPL/2.4, XRPL/999.999") ==
make_protocol(2, 3));
BEAST_EXPECT(
negotiateProtocolVersion("XRPL/999.999, WebSocket/1.0") ==
std::nullopt);
Expand Down
1 change: 1 addition & 0 deletions src/xrpld/overlay/Peer.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ enum class ProtocolFeature {
ValidatorListPropagation,
ValidatorList2Propagation,
LedgerReplay,
LedgerDataCookies
};

/** Represents a peer connection in the overlay. */
Expand Down
177 changes: 150 additions & 27 deletions src/xrpld/overlay/detail/PeerImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,8 @@ PeerImp::supportsFeature(ProtocolFeature f) const
return protocol_ >= make_protocol(2, 2);
case ProtocolFeature::LedgerReplay:
return ledgerReplayEnabled_;
case ProtocolFeature::LedgerDataCookies:
return protocol_ >= make_protocol(2, 3);
}
return false;
}
Expand Down Expand Up @@ -1322,8 +1324,9 @@ PeerImp::handleTransaction(
void
PeerImp::onMessage(std::shared_ptr<protocol::TMGetLedger> const& m)
{
auto badData = [&](std::string const& msg) {
charge(Resource::feeBadData);
auto badData = [&](std::string const& msg, bool chargefee = true) {
if (chargefee)
charge(Resource::feeBadData);
JLOG(p_journal_.warn()) << "TMGetLedger: " << msg;
};
auto const itype{m->itype()};
Expand Down Expand Up @@ -1452,7 +1455,10 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetLedger> const& m)
}
else
{
return badData("duplicate request: " + to_string(messageHash));
// Don't punish nodes that don't know any better
return badData(
"duplicate request: " + to_string(messageHash),
supportsFeature(ProtocolFeature::LedgerDataCookies));
}
}

Expand Down Expand Up @@ -1568,8 +1574,9 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaResponse> const& m)
void
PeerImp::onMessage(std::shared_ptr<protocol::TMLedgerData> const& m)
{
auto badData = [&](std::string const& msg) {
fee_ = Resource::feeBadData;
auto badData = [&](std::string const& msg, bool charge = true) {
if (charge)
fee_ = Resource::feeBadData;
JLOG(p_journal_.warn()) << "TMLedgerData: " << msg;
};

Expand Down Expand Up @@ -1623,22 +1630,83 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMLedgerData> const& m)
auto const messageHash = sha512Half(*m);
if (!app_.getHashRouter().addSuppressionPeer(messageHash, id_))
{
return badData("Duplicate message: " + to_string(messageHash));
// Don't punish nodes that don't know any better
return badData(
"Duplicate message: " + to_string(messageHash),
supportsFeature(ProtocolFeature::LedgerDataCookies));
}

// If there is a request cookie, attempt to relay the message
if (m->has_requestcookie())
bool const routed = m->has_directresponse() || m->responsecookies_size() ||
m->has_requestcookie();

{
if (auto peer = overlay_.findPeerByShortID(m->requestcookie()))
// Check if this message needs to be forwarded to one or more peers.
// Maximum of one of the relevant fields should be populated.
assert(!m->has_requestcookie() || !m->responsecookies_size());

// Make a copy of the response cookies, then wipe the list so it can be
// forwarded cleanly
auto const responseCookies = m->responsecookies();
m->clear_responsecookies();
// Flag indicating if this response should be processed locally,
// possibly in addition to being forwarded.
bool const directResponse =
m->has_directresponse() && m->directresponse();
m->clear_directresponse();

auto const relay = [this, m, &messageHash](auto const cookie) {
if (auto peer = overlay_.findPeerByShortID(cookie))
{
assert(!m->has_requestcookie() && !m->responsecookies_size());
if (peer->supportsFeature(ProtocolFeature::LedgerDataCookies))
// Setting this flag is not _strictly_ necessary for peers
// that support it if there are no cookies included in the
// message, but it is more accurate.
m->set_directresponse(true);
else
m->clear_directresponse();
peer->send(
std::make_shared<Message>(*m, protocol::mtLEDGER_DATA));
}
else
JLOG(p_journal_.info())
<< "Unable to route TX/ledger data reply to peer ["
<< cookie << "]: " << messageHash;
};
// If there is a request cookie, attempt to relay the message
if (m->has_requestcookie())
{
assert(responseCookies.empty());
m->clear_requestcookie();
peer->send(std::make_shared<Message>(*m, protocol::mtLEDGER_DATA));
relay(m->requestcookie());
if (!directResponse && responseCookies.empty())
return;
}
else
// If there's a list of request cookies, attempt to relay the message to
// all of them.
if (responseCookies.size())
{
JLOG(p_journal_.info()) << "Unable to route TX/ledger data reply";
for (auto const cookie : responseCookies)
relay(cookie);
if (!directResponse)
return;
}
}

// Now that any forwarding is done check the base message (data only, no
// routing info for duplicates)
if (routed)
{
m->clear_directresponse();
assert(!m->has_requestcookie() && !m->responsecookies_size());
auto const baseMessageHash = sha512Half(*m);
if (!app_.getHashRouter().addSuppressionPeer(baseMessageHash, id_))
{
// Don't punish nodes that don't know any better
return badData(
"Duplicate message: " + to_string(baseMessageHash),
supportsFeature(ProtocolFeature::LedgerDataCookies));
}
return;
}

uint256 const ledgerHash{m->ledgerhash()};
Expand Down Expand Up @@ -3060,21 +3128,72 @@ PeerImp::sendToMultiple(
{
if (peer.get() == this)
foundSelf = true;
JLOG(p_journal_.debug())
<< "sendToMultiple: Sending " << cookies.size()
<< " TMLedgerData messages to peer[" << peer->id()
<< "]: " << sha512Half(ledgerData);
bool const multipleCookies =
peer->supportsFeature(ProtocolFeature::LedgerDataCookies);
std::vector<std::uint64_t> sendCookies;

bool directResponse = false;
if (!multipleCookies)
{
JLOG(p_journal_.debug())
<< "sendToMultiple: Sending " << cookies.size()
<< " TMLedgerData messages to peer [" << peer->id()
<< "]: " << sha512Half(ledgerData);
}
for (auto const& cookie : cookies)
{
// Unfortunately, need a separate Message object for every
// combination
if (cookie)
{
if (multipleCookies)
{
// Save this one for later to send a single message
sendCookies.emplace_back(*cookie);
continue;
}

// Feature not supported, so send a single message with a
// single cookie
ledgerData.set_requestcookie(*cookie);
}
else
{
if (multipleCookies)
{
// Set this flag later on the single message
directResponse = true;
continue;
}

ledgerData.clear_requestcookie();
}
assert(!multipleCookies);
auto message{
std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA)};
peer->send(message);
}
if (multipleCookies)
{
// Send a single message with all the cookies and/or the direct
// response flag, so the receiver can farm out the single message to
// multiple peers and/or itself
assert(sendCookies.size() || directResponse);
ledgerData.clear_requestcookie();
ledgerData.clear_responsecookies();
ledgerData.set_directresponse(directResponse);
for (auto const& cookie : sendCookies)
ledgerData.add_responsecookies(cookie);
auto message{
std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA)};
peer->send(message);

JLOG(p_journal_.debug())
<< "sendToMultiple: Sent 1 TMLedgerData message to peer ["
<< peer->id() << "]: including "
<< (directResponse ? "the direct response flag and " : "")
<< sendCookies.size() << " response cookies. "
<< ": " << sha512Half(ledgerData);
}
}
assert(foundSelf);
Expand All @@ -3085,7 +3204,7 @@ PeerImp::getLedger(
std::shared_ptr<protocol::TMGetLedger> const& m,
uint256 const& mHash)
{
JLOG(p_journal_.trace()) << "getLedger: Ledger";
JLOG(p_journal_.trace()) << "getLedger: Ledger " << mHash;

std::shared_ptr<Ledger const> ledger;

Expand Down Expand Up @@ -3126,7 +3245,8 @@ PeerImp::getLedger(
}

JLOG(p_journal_.trace())
<< "getLedger: Don't have ledger with hash " << ledgerHash;
<< "getLedger: Don't have ledger with hash " << ledgerHash
<< ": " << mHash;

if (m->has_querytype() && !m->has_requestcookie())
{
Expand Down Expand Up @@ -3169,7 +3289,7 @@ PeerImp::getLedger(
if (m->ledgerseq() < app_.getLedgerMaster().getEarliestFetch())
{
JLOG(p_journal_.debug())
<< "getLedger: Early ledger sequence request";
<< "getLedger: Early ledger sequence request " << mHash;
}
else
{
Expand All @@ -3178,7 +3298,7 @@ PeerImp::getLedger(
{
JLOG(p_journal_.debug())
<< "getLedger: Don't have ledger with sequence "
<< m->ledgerseq();
<< m->ledgerseq() << ": " << mHash;
}
}
}
Expand All @@ -3200,20 +3320,22 @@ PeerImp::getLedger(
charge(Resource::feeInvalidRequest);

ledger.reset();
JLOG(p_journal_.warn())
<< "getLedger: Invalid ledger sequence " << ledgerSeq;
JLOG(p_journal_.warn()) << "getLedger: Invalid ledger sequence "
<< ledgerSeq << ": " << mHash;
}
}
else if (ledgerSeq < app_.getLedgerMaster().getEarliestFetch())
{
ledger.reset();
JLOG(p_journal_.debug())
<< "getLedger: Early ledger sequence request " << ledgerSeq;
<< "getLedger: Early ledger sequence request " << ledgerSeq
<< ": " << mHash;
}
}
else
{
JLOG(p_journal_.debug()) << "getLedger: Unable to find ledger";
JLOG(p_journal_.debug())
<< "getLedger: Unable to find ledger " << mHash;
}

return ledger;
Expand All @@ -3224,7 +3346,7 @@ PeerImp::getTxSet(
std::shared_ptr<protocol::TMGetLedger> const& m,
uint256 const& mHash) const
{
JLOG(p_journal_.trace()) << "getTxSet: TX set";
JLOG(p_journal_.trace()) << "getTxSet: TX set " << mHash;

uint256 const txSetHash{m->ledgerhash()};
std::shared_ptr<SHAMap> shaMap{
Expand Down Expand Up @@ -3260,7 +3382,8 @@ PeerImp::getTxSet(
}
else
{
JLOG(p_journal_.debug()) << "getTxSet: Failed to find TX set";
JLOG(p_journal_.debug())
<< "getTxSet: Failed to find TX set " << mHash;
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/xrpld/overlay/detail/ProtocolVersion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ namespace ripple {
constexpr ProtocolVersion const supportedProtocolList[]
{
{2, 1},
{2, 2}
{2, 2},
// Adds TMLedgerData::responseCookies and directResponse
{2, 3}
};
// clang-format on

Expand Down

0 comments on commit d4adf08

Please sign in to comment.