Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce peer message traffic for ledger data #5126

Open
wants to merge 10 commits into
base: develop
Choose a base branch
from
2 changes: 1 addition & 1 deletion Builds/levelization/results/loops.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Loop: xrpld.app xrpld.net
xrpld.app > xrpld.net

Loop: xrpld.app xrpld.overlay
xrpld.overlay == xrpld.app
xrpld.overlay ~= xrpld.app

Loop: xrpld.app xrpld.peerfinder
xrpld.app > xrpld.peerfinder
Expand Down
106 changes: 106 additions & 0 deletions include/xrpl/basics/CanProcess.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2024 Ripple Labs Inc.

Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.

THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================

#ifndef RIPPLE_BASICS_CANPROCESS_H_INCLUDED
#define RIPPLE_BASICS_CANPROCESS_H_INCLUDED

/** RAII class to check if an Item is already being processed on another thread,
* as indicated by it's presence in a Collection.
*
* If the Item is not in the Collection, it will be added under lock in the
* ctor, and removed under lock in the dtor. The object will be considered
* "usable" and evaluate to `true`.
*
* If the Item is in the Collection, no changes will be made to the collection,
* and the CanProcess object will be considered "unusable".
*
* It's up to the caller to decide what "usable" and "unusable" mean. (e.g.
* Process or skip a block of code, or set a flag.)
*
* The current use is to avoid lock contention that would be involved in
* processing something associated with the Item.
*
* Examples:
*
* void IncomingLedgers::acquireAsync(LedgerHash const& hash, ...)
* {
* if (CanProcess check{acquiresMutex_, pendingAcquires_, hash})
* {
* acquire(hash, ...);
* }
* }
*
* bool
* NetworkOPsImp::recvValidation(
* std::shared_ptr<STValidation> const& val,
* std::string const& source)
* {
* CanProcess check(
* validationsMutex_, pendingValidations_, val->getLedgerHash());
* BypassAccept bypassAccept =
* check.canProcess() ? BypassAccept::no : BypassAccept::yes;
* handleNewValidation(app_, val, source, bypassAccept, m_journal);
* }
*
*/
template <class Mutex, class Collection, class Item>
class CanProcess
{
public:
CanProcess(Mutex& mtx, Collection& collection, Item const& item)
: mtx_(mtx), collection_(collection), item_(item), canProcess_(insert())
{
}

~CanProcess()
{
if (canProcess_)
{
std::unique_lock<Mutex> lock_(mtx_);
collection_.erase(item_);
}
}

bool
canProcess() const

Check warning on line 81 in include/xrpl/basics/CanProcess.h

View check run for this annotation

Codecov / codecov/patch

include/xrpl/basics/CanProcess.h#L81

Added line #L81 was not covered by tests
{
return canProcess_;

Check warning on line 83 in include/xrpl/basics/CanProcess.h

View check run for this annotation

Codecov / codecov/patch

include/xrpl/basics/CanProcess.h#L83

Added line #L83 was not covered by tests
}

operator bool() const
{
return canProcess_;
}

private:
bool
insert()
{
std::unique_lock<Mutex> lock_(mtx_);
auto const [_, inserted] = collection_.insert(item_);
return inserted;
}

Mutex& mtx_;
Collection& collection_;
Item const item_;
bool const canProcess_;
};

#endif
7 changes: 7 additions & 0 deletions include/xrpl/basics/base_uint.h
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,13 @@ to_string(base_uint<Bits, Tag> const& a)
return strHex(a.cbegin(), a.cend());
}

template <std::size_t Bits, class Tag>
inline std::string
to_short_string(base_uint<Bits, Tag> const& a)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit]: Adding checks for the to_short_string in the base_unit_test next to existing to_string cases would be good.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't call that a nit. Missing test coverage is pretty significant. Thanks for catching it. Fixed.

{
return strHex(a.cbegin(), a.cend()).substr(0, 8) + "...";
}

template <std::size_t Bits, class Tag>
inline std::ostream&
operator<<(std::ostream& out, base_uint<Bits, Tag> const& u)
Expand Down
10 changes: 10 additions & 0 deletions include/xrpl/proto/ripple.proto
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,18 @@ message TMLedgerData
required uint32 ledgerSeq = 2;
required TMLedgerInfoType type = 3;
repeated TMLedgerNode nodes = 4;
// If the peer supports "responseCookies", this field will
// never be populated.
optional uint32 requestCookie = 5;
optional TMReplyError error = 6;
// The old field is called "requestCookie", but this is
// a response, so this name makes more sense
repeated uint32 responseCookies = 7;
// If a TMGetLedger request was received without a "requestCookie",
// and the peer supports it, this flag will be set to true to
// indicate that the receiver should process the result in addition
// to forwarding it to its "responseCookies" peers.
optional bool directResponse = 8;
}

message TMPing
Expand Down
2 changes: 2 additions & 0 deletions include/xrpl/protocol/LedgerHeader.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ struct LedgerHeader

// If validated is false, it means "not yet validated."
// Once validated is true, it will never be set false at a later time.
// NOTE: If you are accessing this directly, you are probably doing it
// wrong. Use LedgerMaster::isValidated().
// VFALCO TODO Make this not mutable
bool mutable validated = false;
bool accepted = false;
Expand Down
28 changes: 28 additions & 0 deletions src/test/app/HashRouter_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,33 @@ class HashRouter_test : public beast::unit_test::suite
BEAST_EXPECT(router.shouldProcess(key, peer, flags, 1s));
}

void
testProcessPeer()
{
using namespace std::chrono_literals;
TestStopwatch stopwatch;
HashRouter router(stopwatch, 5s);
uint256 const key(1);
HashRouter::PeerShortID peer1 = 1;
HashRouter::PeerShortID peer2 = 2;
auto const timeout = 2s;

BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout));
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer1, timeout));
++stopwatch;
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer1, timeout));
BEAST_EXPECT(router.shouldProcessForPeer(key, peer2, timeout));
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout));
++stopwatch;
BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout));
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout));
++stopwatch;
BEAST_EXPECT(router.shouldProcessForPeer(key, peer2, timeout));
++stopwatch;
BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout));
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout));
}

public:
void
run() override
Expand All @@ -252,6 +279,7 @@ class HashRouter_test : public beast::unit_test::suite
testSetFlags();
testRelay();
testProcess();
testProcessPeer();
}
};

Expand Down
5 changes: 5 additions & 0 deletions src/test/app/LedgerReplay_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,11 @@ class TestPeer : public Peer
{
return false;
}
std::set<std::optional<uint64_t>>
releaseRequestCookies(uint256 const& requestHash) override
{
return {};
}

bool ledgerReplayEnabled_;
PublicKey nodePublicKey_;
Expand Down
4 changes: 2 additions & 2 deletions src/test/overlay/ProtocolVersion_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ class ProtocolVersion_test : public beast::unit_test::suite
negotiateProtocolVersion("XRPL/2.2") == make_protocol(2, 2));
BEAST_EXPECT(
negotiateProtocolVersion(
"RTXP/1.2, XRPL/2.2, XRPL/2.3, XRPL/999.999") ==
make_protocol(2, 2));
"RTXP/1.2, XRPL/2.2, XRPL/2.3, XRPL/2.4, XRPL/999.999") ==
make_protocol(2, 3));
BEAST_EXPECT(
negotiateProtocolVersion("XRPL/999.999, WebSocket/1.0") ==
std::nullopt);
Expand Down
5 changes: 5 additions & 0 deletions src/test/overlay/reduce_relay_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ class PeerPartial : public Peer
removeTxQueue(const uint256&) override
{
}
std::set<std::optional<uint64_t>>
releaseRequestCookies(uint256 const& requestHash) override
{
return {};
}
};

/** Manually advanced clock. */
Expand Down
3 changes: 2 additions & 1 deletion src/xrpld/app/consensus/RCLConsensus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1061,7 +1061,8 @@
RCLConsensus::Adaptor::updateOperatingMode(std::size_t const positions) const
{
if (!positions && app_.getOPs().isFull())
app_.getOPs().setMode(OperatingMode::CONNECTED);
app_.getOPs().setMode(

Check warning on line 1064 in src/xrpld/app/consensus/RCLConsensus.cpp

View check run for this annotation

Codecov / codecov/patch

src/xrpld/app/consensus/RCLConsensus.cpp#L1064

Added line #L1064 was not covered by tests
OperatingMode::CONNECTED, "updateOperatingMode: no positions");
}

void
Expand Down
18 changes: 18 additions & 0 deletions src/xrpld/app/ledger/InboundLedger.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,24 @@
std::unique_ptr<PeerSet> mPeerSet;
};

inline std::string
to_string(InboundLedger::Reason reason)
{
using enum InboundLedger::Reason;
switch (reason)
{
case HISTORY:

Check warning on line 205 in src/xrpld/app/ledger/InboundLedger.h

View check run for this annotation

Codecov / codecov/patch

src/xrpld/app/ledger/InboundLedger.h#L205

Added line #L205 was not covered by tests
return "HISTORY";
case GENERIC:
return "GENERIC";
case CONSENSUS:
return "CONSENSUS";
default:
assert(false);

Check warning on line 212 in src/xrpld/app/ledger/InboundLedger.h

View check run for this annotation

Codecov / codecov/patch

src/xrpld/app/ledger/InboundLedger.h#L211-L212

Added lines #L211 - L212 were not covered by tests
return "unknown";
}
}

} // namespace ripple

#endif
21 changes: 15 additions & 6 deletions src/xrpld/app/ledger/detail/InboundLedger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,14 @@

if (!wasProgress)
{
checkLocal();
if (checkLocal())
{
// Done. Something else (probably consensus) built the ledger
// locally while waiting for data (or possibly before requesting)
assert(isDone());
JLOG(journal_.info()) << "Finished while waiting " << hash_;
return;

Check warning on line 399 in src/xrpld/app/ledger/detail/InboundLedger.cpp

View check run for this annotation

Codecov / codecov/patch

src/xrpld/app/ledger/detail/InboundLedger.cpp#L399

Added line #L399 was not covered by tests
}

mByHash = true;

Expand Down Expand Up @@ -497,15 +504,17 @@

if (auto stream = journal_.debug())
{
stream << "Trigger acquiring ledger " << hash_;
std::stringstream ss;
ss << "Trigger acquiring ledger " << hash_;
if (peer)
stream << " from " << peer;
ss << " from " << peer;

if (complete_ || failed_)
stream << "complete=" << complete_ << " failed=" << failed_;
ss << " complete=" << complete_ << " failed=" << failed_;
else
stream << "header=" << mHaveHeader << " tx=" << mHaveTransactions
<< " as=" << mHaveState;
ss << " header=" << mHaveHeader << " tx=" << mHaveTransactions
<< " as=" << mHaveState;
stream << ss.str();
}

if (!mHaveHeader)
Expand Down
Loading
Loading