diff --git a/qa/pull-tester/rpc-tests.py b/qa/pull-tester/rpc-tests.py index 835c613671..322a259050 100755 --- a/qa/pull-tester/rpc-tests.py +++ b/qa/pull-tester/rpc-tests.py @@ -151,6 +151,7 @@ 'rest.py', 'httpbasics.py', 'reindex.py', + 'p2p-addr.py', 'multi_rpc.py', 'zapwallettxes.py', 'proxy_test.py', diff --git a/qa/rpc-tests/p2p-addr.py b/qa/rpc-tests/p2p-addr.py new file mode 100755 index 0000000000..3422f231fb --- /dev/null +++ b/qa/rpc-tests/p2p-addr.py @@ -0,0 +1,196 @@ +#!/usr/bin/env python3 +# Copyright (c) 2022 The Dogecoin Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. +# + +from test_framework.mininode import * #NodeConnCB, NODE_NETWORK, NetworkThread, NodeConn, wait_until, CAddress, msg_addr, msg_ping, msg_pong +from test_framework.test_framework import BitcoinTestFramework +from test_framework.util import * +import time + +''' +AddrTest -- test processing of addr messages +''' + +class AddrTestNode(SingleNodeConnCB): + def __init__(self, name): + SingleNodeConnCB.__init__(self) + self.ports_received = [] + + def add_connection(self, conn): + self.connection = conn + + def getaddr(self): + self.connection.send_message(msg_getaddr()) + + def on_addr(self, conn, message): + for addr in message.addrs: + self.ports_received.append(addr.port) + + def on_getaddr(self, conn, message): + self.getaddr_msg_received += 1 + + def wait_for_disconnect(self): + if self.connection == None: + return True + def is_closed(): + return self.connection.state == "closed" + return wait_until(is_closed, timeout=30) + + def disconnect(self): + self.connection.disconnect_node() + return self.wait_for_disconnect() + +class AddrTest(BitcoinTestFramework): + def __init__(self): + super().__init__() + self.setup_clean_chain = True + self.num_nodes = 1 + self.counter = 0 + self.mocktime = int(time.time()) + self.start_port = 10000 + + def run_test(self): + self.nodes[0].generate(1) + + self.rate_limiting_test() # run this first so that we can test the + # initial state of 1 token + self.simple_relay_test() + self.oversized_addr_test() + + self.send_node.disconnect() + for recv_node in self.recv_nodes: + recv_node.disconnect() + + def setup_network(self): + self.nodes = [] + self.nodes.append(start_node(0, self.options.tmpdir, ["-debug=net", "-peertimeout=999999999"])) + + self.send_node = self.create_testnode() + self.recv_nodes = [] + for i in range(4): + self.recv_nodes.append(self.create_testnode()) + + NetworkThread().start() + self.send_node.wait_for_verack() + for recv_node in self.recv_nodes: + recv_node.wait_for_verack() + + def create_testnode(self, send_getaddr=False, node_idx=0): + node = AddrTestNode(send_getaddr) + conn = NodeConn('127.0.0.1', p2p_port(node_idx), self.nodes[node_idx], node) + node.add_connection(conn) + return node + + def index_to_port(self, idx): + return self.start_port + idx + + def last_port_sent(self): + assert self.counter > 0 + return self.index_to_port(self.counter - 1) + + def have_received_port(self, port): + for peer in self.recv_nodes: + if port in peer.ports_received: + return True + return False + + def wait_for_specific_port(self, port): + def port_received(): + return self.have_received_port(port) + return wait_until(port_received, timeout=600) + + def create_addr_msg(self, num, services): + addrs = [] + for i in range(num): + addr = CAddress() + addr.time = self.mocktime + random.randrange(-100, 100) + addr.nServices = services + assert self.counter < 256 ** 2 # Don't allow the returned ip addresses to wrap. + addr.ip = f"123.123.{self.counter // 256}.{self.counter % 256}" + addr.port = self.index_to_port(self.counter) + self.counter += 1 + addrs.append(addr) + + msg = msg_addr() + msg.addrs = addrs + return msg + + def send_addr_msg(self, msg): + self.send_node.connection.send_message(msg) + time.sleep(0.5) # sleep half a second to prevent mocktime racing the msg + + # invoke m_next_addr_send timer: + # `addr` messages are sent on an exponential distribution with mean interval of 30s. + # Setting the mocktime 600s forward gives a probability of (1 - e^-(600/30)) that + # the event will occur (i.e. this fails once in ~500 million repeats). + self.mocktime += 60 * 10 + self.nodes[0].setmocktime(self.mocktime) + + time.sleep(0.5) # sleep half a second to prevent pings racing mocktime + for peer in self.recv_nodes: + peer.sync_with_ping() + + def create_and_send_addr_msg(self, num, services=NODE_NETWORK): + self.send_addr_msg(self.create_addr_msg(num, services)) + + def simple_relay_test(self): + # send a message with 2 addresses + self.create_and_send_addr_msg(2) + + # make sure we received the last addr record + assert self.wait_for_specific_port(self.last_port_sent()) + + def oversized_addr_test(self): + # create message with 1010 entries and + # confirm that the node discarded the entries + + # to make sure we are not rate-limited, add 1001 / 0.1 seconds + # to mocktime to allocate the maximum non-burst amount of tokens + self.mocktime += 10010 + self.nodes[0].setmocktime(self.mocktime) + + # send one valid message, keep track of the port it contains + valid_port_before = self.index_to_port(self.counter) + self.create_and_send_addr_msg(1) + + # send a too large message that will be ignored + self.create_and_send_addr_msg(1010) + + # finish with a valid message, keep track of the port it contains + valid_port_after = self.index_to_port(self.counter) + self.create_and_send_addr_msg(1) + + # wait until both valid addresses were propagated + assert self.wait_for_specific_port(valid_port_before) + assert self.wait_for_specific_port(valid_port_after) + + # make sure that all addresses from the invalid message were discarded + # by making sure that none of them were propagated + for port in range(valid_port_before+1, valid_port_after): + assert not self.have_received_port(port) + + def rate_limiting_test(self): + # send 1 addr on connect + self.create_and_send_addr_msg(1) + + # because we set mocktime after sending the message now have + # 600 * 0.1 = 60 tokens, minus the one we just sent. + + # send 69 tokens + first_port = self.index_to_port(self.counter) + self.create_and_send_addr_msg(69) + + # check that we have a peer with 60 processed addrs + # and 10 rate limited addrs + peerinfo = self.nodes[0].getpeerinfo() + sendingPeer = None + for info in peerinfo: + if info["addr_processed"] == 60: + sendingPeer = info + assert not sendingPeer is None + assert sendingPeer["addr_rate_limited"] == 10 + +if __name__ == '__main__': + AddrTest().main() diff --git a/qa/rpc-tests/test_framework/mininode.py b/qa/rpc-tests/test_framework/mininode.py index 9c51a6f038..efc334f736 100644 --- a/qa/rpc-tests/test_framework/mininode.py +++ b/qa/rpc-tests/test_framework/mininode.py @@ -280,9 +280,10 @@ def deserialize(self, f, *, with_time = True): self.ip = socket.inet_ntoa(f.read(4)) self.port = struct.unpack(">H", f.read(2))[0] - def serialize(self): + def serialize(self, with_time=True): r = b"" - r += struct.pack(" nProcessedAddrs; + std::atomic nRatelimitedAddrs; + // inventory based relay CRollingBloomFilter filterInventoryKnown; // Set of Dandelion transactions that should be known to this peer diff --git a/src/net_processing.cpp b/src/net_processing.cpp index b0d29aef8f..ed1f37e7f8 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -1537,6 +1537,10 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr { connman.PushMessage(pfrom, CNetMsgMaker(nSendVersion).Make(NetMsgType::GETADDR)); pfrom->fGetAddr = true; + + // When requesting a getaddr, accept an additional MAX_ADDR_TO_SEND addresses in response + // (bypassing the MAX_ADDR_PROCESSING_TOKEN_BUCKET limit). + pfrom->nAddrTokenBucket += MAX_ADDR_TO_SEND; } connman.MarkAddressGood(pfrom->addr); } @@ -1657,11 +1661,40 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr std::vector vAddrOk; int64_t nNow = GetAdjustedTime(); int64_t nSince = nNow - 10 * 60; + + // track rate limiting within this message + uint64_t nProcessedAddrs = 0; + uint64_t nRatelimitedAddrs = 0; + + // Update/increment addr rate limiting bucket. + const uint64_t nCurrentTime = GetMockableTimeMicros(); + if (pfrom->nAddrTokenBucket < MAX_ADDR_PROCESSING_TOKEN_BUCKET) { + const uint64_t nTimeElapsed = std::max(nCurrentTime - pfrom->nAddrTokenTimestamp, uint64_t(0)); + const double nIncrement = nTimeElapsed * MAX_ADDR_RATE_PER_SECOND / 1e6; + pfrom->nAddrTokenBucket = std::min(pfrom->nAddrTokenBucket + nIncrement, MAX_ADDR_PROCESSING_TOKEN_BUCKET); + } + pfrom->nAddrTokenTimestamp = nCurrentTime; + + // Randomize entries before processing, to prevent an attacker to + // determine which entries will make it through the rate limit + std::shuffle(vAddr.begin(), vAddr.end(), FastRandomContext()); + BOOST_FOREACH(CAddress& addr, vAddr) { if (interruptMsgProc) return true; + // apply rate limiting + if (!pfrom->fWhitelisted) { + if (pfrom->nAddrTokenBucket < 1.0) { + nRatelimitedAddrs++; + continue; + } + pfrom->nAddrTokenBucket -= 1.0; + } + + nProcessedAddrs++; + if ((addr.nServices & REQUIRED_SERVICES) != REQUIRED_SERVICES) continue; @@ -1678,6 +1711,13 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr if (fReachable) vAddrOk.push_back(addr); } + + pfrom->nProcessedAddrs += nProcessedAddrs; + pfrom->nRatelimitedAddrs += nRatelimitedAddrs; + + LogPrint("net", "Received addr: %u addresses (%u processed, %u rate-limited) peer=%d\n", + vAddr.size(), nProcessedAddrs, nRatelimitedAddrs, pfrom->GetId()); + connman.AddNewAddresses(vAddrOk, pfrom->addr, 2 * 60 * 60); if (vAddr.size() < 1000) pfrom->fGetAddr = false; @@ -1809,9 +1849,6 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr pfrom->AskFor(inv, doubleRequestDelay); } } - - // Track requests for our stuff - GetMainSignals().Inventory(inv.hash); } if (!vToFetch.empty()) diff --git a/src/net_processing.h b/src/net_processing.h index 7d8df75fe7..4a6ceb76da 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -18,6 +18,15 @@ static const int64_t ORPHAN_TX_EXPIRE_INTERVAL = 5 * 60; /** Default number of orphan+recently-replaced txn to keep around for block reconstruction */ static const unsigned int DEFAULT_BLOCK_RECONSTRUCTION_EXTRA_TXN = 100; +/** The maximum rate of address records we're willing to process on average. + * Is bypassed for whitelisted connections. */ +static constexpr double MAX_ADDR_RATE_PER_SECOND{0.1}; + +/** The soft limit of the address processing token bucket (the regular MAX_ADDR_RATE_PER_SECOND + * based increments won't go above this, but the MAX_ADDR_TO_SEND increment following GETADDR + * is exempt from this limit. */ +static constexpr size_t MAX_ADDR_PROCESSING_TOKEN_BUCKET{MAX_ADDR_TO_SEND}; + /** Register with a network node to receive its signals */ void RegisterNodeSignals(CNodeSignals& nodeSignals); /** Unregister a network node */ diff --git a/src/rpc/net.cpp b/src/rpc/net.cpp index cfbe14addb..d9505c8724 100644 --- a/src/rpc/net.cpp +++ b/src/rpc/net.cpp @@ -103,7 +103,9 @@ UniValue getpeerinfo(const JSONRPCRequest& request) " n, (numeric) The heights of blocks we're currently asking from this peer\n" " ...\n" " ],\n" - " \"whitelisted\": true|false, (boolean) Whether the peer is whitelisted\n" + " \"addr_processed\": n, (numeric) The total number of addresses processed, excluding those dropped due to rate limiting\n" + " \"addr_rate_limited\": n, (numeric) The total number of addresses dropped due to rate limiting\n" + " \"whitelisted\": true|false, (boolean) Whether the peer is whitelisted\n" " \"bytessent_per_msg\": {\n" " \"addr\": n, (numeric) The total bytes sent aggregated by message type\n" " ...\n" @@ -171,7 +173,9 @@ UniValue getpeerinfo(const JSONRPCRequest& request) } obj.push_back(Pair("inflight", heights)); } - obj.push_back(Pair("whitelisted", stats.fWhitelisted)); + obj.pushKV("addr_processed", stats.nProcessedAddrs); + obj.pushKV("addr_rate_limited", stats.nRatelimitedAddrs); + obj.pushKV("whitelisted", stats.fWhitelisted); UniValue sendPerMsgCmd(UniValue::VOBJ); BOOST_FOREACH(const mapMsgCmdSize::value_type &i, stats.mapSendBytesPerMsgCmd) { diff --git a/src/utiltime.cpp b/src/utiltime.cpp index e425e63f59..f88c42f079 100644 --- a/src/utiltime.cpp +++ b/src/utiltime.cpp @@ -24,6 +24,12 @@ int64_t GetTime() return now; } +int64_t GetMockableTimeMicros() +{ + if (nMockTime) return nMockTime * 1000000; + return GetTimeMicros(); +} + void SetMockTime(int64_t nMockTimeIn) { nMockTime = nMockTimeIn; diff --git a/src/utiltime.h b/src/utiltime.h index e37ee32813..3004a13715 100644 --- a/src/utiltime.h +++ b/src/utiltime.h @@ -24,6 +24,7 @@ int64_t GetTimeMillis(); int64_t GetTimeMicros(); int64_t GetSystemTimeInSeconds(); // Like GetTime(), but not mockable int64_t GetLogTimeMicros(); +int64_t GetMockableTimeMicros(); void SetMockTime(int64_t nMockTimeIn); void MilliSleep(int64_t n);