Skip to content

Commit

Permalink
Introduction of address ratelimit (#1230)
Browse files Browse the repository at this point in the history
* Get rid of redundand code

* net: rate limit the processing of incoming addr messages

* Fix for macOS build

* Fixed permission for p2p-addr.py

* Fixed RPC test framework

---------

Co-authored-by: chromatic <[email protected]>
  • Loading branch information
psolstice and chromatic committed Mar 24, 2023
1 parent 9ecd943 commit 4892e42
Show file tree
Hide file tree
Showing 10 changed files with 280 additions and 7 deletions.
1 change: 1 addition & 0 deletions qa/pull-tester/rpc-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@
'rest.py',
'httpbasics.py',
'reindex.py',
'p2p-addr.py',
'multi_rpc.py',
'zapwallettxes.py',
'proxy_test.py',
Expand Down
196 changes: 196 additions & 0 deletions qa/rpc-tests/p2p-addr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
#!/usr/bin/env python3
# Copyright (c) 2022 The Dogecoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
#

from test_framework.mininode import * #NodeConnCB, NODE_NETWORK, NetworkThread, NodeConn, wait_until, CAddress, msg_addr, msg_ping, msg_pong
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import *
import time

'''
AddrTest -- test processing of addr messages
'''

class AddrTestNode(SingleNodeConnCB):
def __init__(self, name):
SingleNodeConnCB.__init__(self)
self.ports_received = []

def add_connection(self, conn):
self.connection = conn

def getaddr(self):
self.connection.send_message(msg_getaddr())

def on_addr(self, conn, message):
for addr in message.addrs:
self.ports_received.append(addr.port)

def on_getaddr(self, conn, message):
self.getaddr_msg_received += 1

def wait_for_disconnect(self):
if self.connection == None:
return True
def is_closed():
return self.connection.state == "closed"
return wait_until(is_closed, timeout=30)

def disconnect(self):
self.connection.disconnect_node()
return self.wait_for_disconnect()

class AddrTest(BitcoinTestFramework):
def __init__(self):
super().__init__()
self.setup_clean_chain = True
self.num_nodes = 1
self.counter = 0
self.mocktime = int(time.time())
self.start_port = 10000

def run_test(self):
self.nodes[0].generate(1)

self.rate_limiting_test() # run this first so that we can test the
# initial state of 1 token
self.simple_relay_test()
self.oversized_addr_test()

self.send_node.disconnect()
for recv_node in self.recv_nodes:
recv_node.disconnect()

def setup_network(self):
self.nodes = []
self.nodes.append(start_node(0, self.options.tmpdir, ["-debug=net", "-peertimeout=999999999"]))

self.send_node = self.create_testnode()
self.recv_nodes = []
for i in range(4):
self.recv_nodes.append(self.create_testnode())

NetworkThread().start()
self.send_node.wait_for_verack()
for recv_node in self.recv_nodes:
recv_node.wait_for_verack()

def create_testnode(self, send_getaddr=False, node_idx=0):
node = AddrTestNode(send_getaddr)
conn = NodeConn('127.0.0.1', p2p_port(node_idx), self.nodes[node_idx], node)
node.add_connection(conn)
return node

def index_to_port(self, idx):
return self.start_port + idx

def last_port_sent(self):
assert self.counter > 0
return self.index_to_port(self.counter - 1)

def have_received_port(self, port):
for peer in self.recv_nodes:
if port in peer.ports_received:
return True
return False

def wait_for_specific_port(self, port):
def port_received():
return self.have_received_port(port)
return wait_until(port_received, timeout=600)

def create_addr_msg(self, num, services):
addrs = []
for i in range(num):
addr = CAddress()
addr.time = self.mocktime + random.randrange(-100, 100)
addr.nServices = services
assert self.counter < 256 ** 2 # Don't allow the returned ip addresses to wrap.
addr.ip = f"123.123.{self.counter // 256}.{self.counter % 256}"
addr.port = self.index_to_port(self.counter)
self.counter += 1
addrs.append(addr)

msg = msg_addr()
msg.addrs = addrs
return msg

def send_addr_msg(self, msg):
self.send_node.connection.send_message(msg)
time.sleep(0.5) # sleep half a second to prevent mocktime racing the msg

# invoke m_next_addr_send timer:
# `addr` messages are sent on an exponential distribution with mean interval of 30s.
# Setting the mocktime 600s forward gives a probability of (1 - e^-(600/30)) that
# the event will occur (i.e. this fails once in ~500 million repeats).
self.mocktime += 60 * 10
self.nodes[0].setmocktime(self.mocktime)

time.sleep(0.5) # sleep half a second to prevent pings racing mocktime
for peer in self.recv_nodes:
peer.sync_with_ping()

def create_and_send_addr_msg(self, num, services=NODE_NETWORK):
self.send_addr_msg(self.create_addr_msg(num, services))

def simple_relay_test(self):
# send a message with 2 addresses
self.create_and_send_addr_msg(2)

# make sure we received the last addr record
assert self.wait_for_specific_port(self.last_port_sent())

def oversized_addr_test(self):
# create message with 1010 entries and
# confirm that the node discarded the entries

# to make sure we are not rate-limited, add 1001 / 0.1 seconds
# to mocktime to allocate the maximum non-burst amount of tokens
self.mocktime += 10010
self.nodes[0].setmocktime(self.mocktime)

# send one valid message, keep track of the port it contains
valid_port_before = self.index_to_port(self.counter)
self.create_and_send_addr_msg(1)

# send a too large message that will be ignored
self.create_and_send_addr_msg(1010)

# finish with a valid message, keep track of the port it contains
valid_port_after = self.index_to_port(self.counter)
self.create_and_send_addr_msg(1)

# wait until both valid addresses were propagated
assert self.wait_for_specific_port(valid_port_before)
assert self.wait_for_specific_port(valid_port_after)

# make sure that all addresses from the invalid message were discarded
# by making sure that none of them were propagated
for port in range(valid_port_before+1, valid_port_after):
assert not self.have_received_port(port)

def rate_limiting_test(self):
# send 1 addr on connect
self.create_and_send_addr_msg(1)

# because we set mocktime after sending the message now have
# 600 * 0.1 = 60 tokens, minus the one we just sent.

# send 69 tokens
first_port = self.index_to_port(self.counter)
self.create_and_send_addr_msg(69)

# check that we have a peer with 60 processed addrs
# and 10 rate limited addrs
peerinfo = self.nodes[0].getpeerinfo()
sendingPeer = None
for info in peerinfo:
if info["addr_processed"] == 60:
sendingPeer = info
assert not sendingPeer is None
assert sendingPeer["addr_rate_limited"] == 10

if __name__ == '__main__':
AddrTest().main()
5 changes: 3 additions & 2 deletions qa/rpc-tests/test_framework/mininode.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,10 @@ def deserialize(self, f, *, with_time = True):
self.ip = socket.inet_ntoa(f.read(4))
self.port = struct.unpack(">H", f.read(2))[0]

def serialize(self):
def serialize(self, with_time=True):
r = b""
r += struct.pack("<I", 0)
if with_time:
r += struct.pack("<I", self.time)
r += struct.pack("<Q", self.nServices)
r += self.pchReserved
r += socket.inet_aton(self.ip)
Expand Down
7 changes: 7 additions & 0 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,9 @@ void CNode::copyStats(CNodeStats &stats)
X(nRecvBytes);
}
X(fWhitelisted);
X(minFeeFilter);
X(nProcessedAddrs);
X(nRatelimitedAddrs);

// It is common for nodes with good ping times to suddenly become lagged,
// due to a new block arriving or other large transfer.
Expand Down Expand Up @@ -3409,6 +3412,10 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
fGetAddr = false;
nNextLocalAddrSend = 0;
nNextAddrSend = 0;
nAddrTokenBucket = 1; // initialize to 1 to allow self-announcement
nAddrTokenTimestamp = GetTimeMicros();
nProcessedAddrs = 0;
nRatelimitedAddrs = 0;
nNextInvSend = 0;
fRelayTxes = false;
fSentAddr = false;
Expand Down
11 changes: 11 additions & 0 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,9 @@ class CNodeStats
CAddress addr;
// In case this is a verified MN, this value is the proTx of the MN
uint256 verifiedProRegTxHash;
CAmount minFeeFilter;
uint64_t nProcessedAddrs;
uint64_t nRatelimitedAddrs;
};


Expand Down Expand Up @@ -792,6 +795,14 @@ class CNode
int64_t nNextAddrSend;
int64_t nNextLocalAddrSend;

/** Number of addresses that can be processed from this peer. */
double nAddrTokenBucket;
/** When nAddrTokenBucket was last updated, in microseconds */
int64_t nAddrTokenTimestamp;

std::atomic<uint64_t> nProcessedAddrs;
std::atomic<uint64_t> nRatelimitedAddrs;

// inventory based relay
CRollingBloomFilter filterInventoryKnown;
// Set of Dandelion transactions that should be known to this peer
Expand Down
43 changes: 40 additions & 3 deletions src/net_processing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1537,6 +1537,10 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
{
connman.PushMessage(pfrom, CNetMsgMaker(nSendVersion).Make(NetMsgType::GETADDR));
pfrom->fGetAddr = true;

// When requesting a getaddr, accept an additional MAX_ADDR_TO_SEND addresses in response
// (bypassing the MAX_ADDR_PROCESSING_TOKEN_BUCKET limit).
pfrom->nAddrTokenBucket += MAX_ADDR_TO_SEND;
}
connman.MarkAddressGood(pfrom->addr);
}
Expand Down Expand Up @@ -1657,11 +1661,40 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
std::vector<CAddress> vAddrOk;
int64_t nNow = GetAdjustedTime();
int64_t nSince = nNow - 10 * 60;

// track rate limiting within this message
uint64_t nProcessedAddrs = 0;
uint64_t nRatelimitedAddrs = 0;

// Update/increment addr rate limiting bucket.
const uint64_t nCurrentTime = GetMockableTimeMicros();
if (pfrom->nAddrTokenBucket < MAX_ADDR_PROCESSING_TOKEN_BUCKET) {
const uint64_t nTimeElapsed = std::max(nCurrentTime - pfrom->nAddrTokenTimestamp, uint64_t(0));
const double nIncrement = nTimeElapsed * MAX_ADDR_RATE_PER_SECOND / 1e6;
pfrom->nAddrTokenBucket = std::min<double>(pfrom->nAddrTokenBucket + nIncrement, MAX_ADDR_PROCESSING_TOKEN_BUCKET);
}
pfrom->nAddrTokenTimestamp = nCurrentTime;

// Randomize entries before processing, to prevent an attacker to
// determine which entries will make it through the rate limit
std::shuffle(vAddr.begin(), vAddr.end(), FastRandomContext());

BOOST_FOREACH(CAddress& addr, vAddr)
{
if (interruptMsgProc)
return true;

// apply rate limiting
if (!pfrom->fWhitelisted) {
if (pfrom->nAddrTokenBucket < 1.0) {
nRatelimitedAddrs++;
continue;
}
pfrom->nAddrTokenBucket -= 1.0;
}

nProcessedAddrs++;

if ((addr.nServices & REQUIRED_SERVICES) != REQUIRED_SERVICES)
continue;

Expand All @@ -1678,6 +1711,13 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
if (fReachable)
vAddrOk.push_back(addr);
}

pfrom->nProcessedAddrs += nProcessedAddrs;
pfrom->nRatelimitedAddrs += nRatelimitedAddrs;

LogPrint("net", "Received addr: %u addresses (%u processed, %u rate-limited) peer=%d\n",
vAddr.size(), nProcessedAddrs, nRatelimitedAddrs, pfrom->GetId());

connman.AddNewAddresses(vAddrOk, pfrom->addr, 2 * 60 * 60);
if (vAddr.size() < 1000)
pfrom->fGetAddr = false;
Expand Down Expand Up @@ -1809,9 +1849,6 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
pfrom->AskFor(inv, doubleRequestDelay);
}
}

// Track requests for our stuff
GetMainSignals().Inventory(inv.hash);
}

if (!vToFetch.empty())
Expand Down
9 changes: 9 additions & 0 deletions src/net_processing.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ static const int64_t ORPHAN_TX_EXPIRE_INTERVAL = 5 * 60;
/** Default number of orphan+recently-replaced txn to keep around for block reconstruction */
static const unsigned int DEFAULT_BLOCK_RECONSTRUCTION_EXTRA_TXN = 100;

/** The maximum rate of address records we're willing to process on average.
* Is bypassed for whitelisted connections. */
static constexpr double MAX_ADDR_RATE_PER_SECOND{0.1};

/** The soft limit of the address processing token bucket (the regular MAX_ADDR_RATE_PER_SECOND
* based increments won't go above this, but the MAX_ADDR_TO_SEND increment following GETADDR
* is exempt from this limit. */
static constexpr size_t MAX_ADDR_PROCESSING_TOKEN_BUCKET{MAX_ADDR_TO_SEND};

/** Register with a network node to receive its signals */
void RegisterNodeSignals(CNodeSignals& nodeSignals);
/** Unregister a network node */
Expand Down
8 changes: 6 additions & 2 deletions src/rpc/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ UniValue getpeerinfo(const JSONRPCRequest& request)
" n, (numeric) The heights of blocks we're currently asking from this peer\n"
" ...\n"
" ],\n"
" \"whitelisted\": true|false, (boolean) Whether the peer is whitelisted\n"
" \"addr_processed\": n, (numeric) The total number of addresses processed, excluding those dropped due to rate limiting\n"
" \"addr_rate_limited\": n, (numeric) The total number of addresses dropped due to rate limiting\n"
" \"whitelisted\": true|false, (boolean) Whether the peer is whitelisted\n"
" \"bytessent_per_msg\": {\n"
" \"addr\": n, (numeric) The total bytes sent aggregated by message type\n"
" ...\n"
Expand Down Expand Up @@ -171,7 +173,9 @@ UniValue getpeerinfo(const JSONRPCRequest& request)
}
obj.push_back(Pair("inflight", heights));
}
obj.push_back(Pair("whitelisted", stats.fWhitelisted));
obj.pushKV("addr_processed", stats.nProcessedAddrs);
obj.pushKV("addr_rate_limited", stats.nRatelimitedAddrs);
obj.pushKV("whitelisted", stats.fWhitelisted);

UniValue sendPerMsgCmd(UniValue::VOBJ);
BOOST_FOREACH(const mapMsgCmdSize::value_type &i, stats.mapSendBytesPerMsgCmd) {
Expand Down
6 changes: 6 additions & 0 deletions src/utiltime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ int64_t GetTime()
return now;
}

int64_t GetMockableTimeMicros()
{
if (nMockTime) return nMockTime * 1000000;
return GetTimeMicros();
}

void SetMockTime(int64_t nMockTimeIn)
{
nMockTime = nMockTimeIn;
Expand Down
1 change: 1 addition & 0 deletions src/utiltime.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ int64_t GetTimeMillis();
int64_t GetTimeMicros();
int64_t GetSystemTimeInSeconds(); // Like GetTime(), but not mockable
int64_t GetLogTimeMicros();
int64_t GetMockableTimeMicros();
void SetMockTime(int64_t nMockTimeIn);
void MilliSleep(int64_t n);

Expand Down

0 comments on commit 4892e42

Please sign in to comment.