Skip to content

Commit

Permalink
Merge pull request #4846 from onflow/yahya/6871-resource-management-t…
Browse files Browse the repository at this point in the history
…ests

[Networking] Tuning libp2p resource manager limits; supporting by tests
  • Loading branch information
yhassanzadeh13 authored Oct 24, 2023
2 parents 9c4b85c + 4f4edbc commit 692969b
Show file tree
Hide file tree
Showing 11 changed files with 848 additions and 130 deletions.
23 changes: 18 additions & 5 deletions config/default-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ network-config:
networking-connection-pruning: true
# Preferred unicasts protocols list of unicast protocols in preferred order
preferred-unicast-protocols: [ ]
received-message-cache-size: 10e4
received-message-cache-size: 10_000
peerupdate-interval: 10m
unicast-message-timeout: 5s
# Unicast create stream retry delay is initial delay used in the exponential backoff for create stream retries
Expand Down Expand Up @@ -47,7 +47,7 @@ network-config:
unicast-dial-in-progress-backoff-delay: 1s
# The size of the dial config cache used to keep track of the dial config for each remote peer. The dial config is used to keep track of the dial retry budget for each remote peer.
# Recommended to set it to the maximum number of remote peers in the network.
unicast-dial-config-cache-size: 10000
unicast-dial-config-cache-size: 10_000
# Resource manager config
# Maximum allowed fraction of file descriptors to be allocated by the libp2p resources in (0,1]
libp2p-memory-limit-ratio: 0.5 # flow default
Expand All @@ -58,6 +58,19 @@ network-config:
# Without this limit peers can end up in a state where there exists n number of connections per peer which
# can lead to resource exhaustion of the libp2p node.
libp2p-peer-base-limits-conns-inbound: 1
# maximum number of inbound system-wide streams, across all peers and protocols
# Note that streams are ephemeral and are created and destroyed intermittently.
libp2p-inbound-stream-limit-system: 15_000
# maximum number of inbound transient streams, across all streams that are not yet fully opened and associated with a protocol.
# Note that streams are ephemeral and are created and destroyed intermittently.
libp2p-inbound-stream-limit-transient: 15_000
# maximum number of inbound streams for each protocol across all peers; this is a per-protocol limit. We expect at least
# three protocols per node; gossipsub, unicast, and dht. Note that streams are ephemeral and are created and destroyed intermittently.
libp2p-inbound-stream-limit-protocol: 5000
# maximum number of inbound streams from each peer across all protocols.
libp2p-inbound-stream-limit-peer: 1000
# maximum number of inbound streams from each peer for each protocol.
libp2p-inbound-stream-limit-protocol-peer: 500
# Connection manager config
# HighWatermark and LowWatermark govern the number of connections are maintained by the ConnManager.
# When the peer count exceeds the HighWatermark, as many peers will be pruned (and
Expand Down Expand Up @@ -98,7 +111,7 @@ network-config:

# Gossipsub rpc inspectors configs
# The size of the queue for notifications about invalid RPC messages
gossipsub-rpc-inspector-notification-cache-size: 10000
gossipsub-rpc-inspector-notification-cache-size: 10_000
# RPC control message validation inspector configs
# Rpc validation inspector number of pool workers
gossipsub-rpc-validation-inspector-workers: 5
Expand Down Expand Up @@ -141,8 +154,8 @@ network-config:
# The size of the queue used by worker pool for the control message metrics inspector
gossipsub-rpc-metrics-inspector-cache-size: 100
# Application layer spam prevention
alsp-spam-record-cache-size: 10e3
alsp-spam-report-queue-size: 10e4
alsp-spam-record-cache-size: 1000
alsp-spam-report-queue-size: 10_000
alsp-disable-penalty: false
alsp-heart-beat-interval: 1s

Expand Down
99 changes: 74 additions & 25 deletions network/internal/p2putils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"fmt"
"net"

"github.com/libp2p/go-libp2p/core"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/multiformats/go-multiaddr"
"github.com/rs/zerolog"

Expand Down Expand Up @@ -69,46 +69,95 @@ func ConnectednessToString(connectedness network.Connectedness) (string, bool) {

}

// FindOutboundStream finds an existing outbound stream to the target id if it exists by querying libp2p
func FindOutboundStream(host host.Host, targetID peer.ID, protocol core.ProtocolID) (network.Stream, bool) {
streams := FilterStream(host, targetID, protocol, network.DirOutbound, false)
if len(streams) > 0 {
return streams[0], true
// CountStream finds total number of outbound stream to the target id
func CountStream(host host.Host, targetID peer.ID, opts ...FilterOption) int {
streams := FilterStream(host, targetID, append(opts, All())...)
return len(streams)
}

// FilterOptions holds the filtering options used in FilterStream.
type FilterOptions struct {
// dir specifies the direction of the streams to be filtered.
// The default value is network.DirBoth, which considers both inbound and outbound streams.
dir network.Direction

// protocol specifies the protocol ID of the streams to be filtered.
// The default value is an empty string, which considers streams of all protocol IDs.
protocol protocol.ID

// all specifies whether to return all matching streams or just the first matching stream.
// The default value is false, which returns just the first matching stream.
all bool
}

// FilterOption defines a function type that modifies FilterOptions.
type FilterOption func(*FilterOptions)

// Direction is a FilterOption for setting the direction of the streams to be filtered.
func Direction(dir network.Direction) FilterOption {
return func(opts *FilterOptions) {
opts.dir = dir
}
return nil, false
}

// CountStream finds total number of outbound stream to the target id
func CountStream(host host.Host, targetID peer.ID, protocol core.ProtocolID, dir network.Direction) int {
streams := FilterStream(host, targetID, protocol, dir, true)
return len(streams)
// Protocol is a FilterOption for setting the protocol ID of the streams to be filtered.
func Protocol(protocol protocol.ID) FilterOption {
return func(opts *FilterOptions) {
opts.protocol = protocol
}
}

// FilterStream finds one or all existing outbound streams to the target id if it exists.
// if parameter all is true - all streams are found else the first stream found is returned
func FilterStream(host host.Host, targetID peer.ID, protocol core.ProtocolID, dir network.Direction, all bool) []network.Stream {
// All is a FilterOption for setting whether to return all matching streams or just the first matching stream.
func All() FilterOption {
return func(opts *FilterOptions) {
opts.all = true
}
}

// FilterStream filters the streams to a target peer based on the provided options.
// The default behavior is to consider all directions and protocols, and return just the first matching stream.
// This behavior can be customized by providing FilterOption values.
//
// Usage:
//
// - To find all outbound streams to a target peer with a specific protocol ID:
// streams := FilterStream(host, targetID, Direction(network.DirOutbound), Protocol(myProtocolID), All(true))
//
// - To find the first inbound stream to a target peer, regardless of protocol ID:
// stream := FilterStream(host, targetID, Direction(network.DirInbound))
//
// host is the host from which to filter streams.
// targetID is the ID of the target peer.
// options is a variadic parameter that allows zero or more FilterOption values to be provided.
//
// It returns a slice of network.Stream values that match the filtering criteria.
func FilterStream(host host.Host, targetID peer.ID, options ...FilterOption) []network.Stream {
var filteredStreams []network.Stream
const allProtocols = "*"
// default values
opts := FilterOptions{
dir: network.DirUnknown, // by default, consider both inbound and outbound streams
protocol: allProtocols, // by default, consider streams of all protocol IDs
all: false, // by default, return just the first matching stream
}

// apply provided options
for _, option := range options {
option(&opts)
}

// choose the connection only if it is connected
if host.Network().Connectedness(targetID) != network.Connected {
return filteredStreams
}

// get all connections
conns := host.Network().ConnsToPeer(targetID)

// find a connection which is in the connected state
for _, conn := range conns {

// get all streams
streams := conn.GetStreams()
for _, stream := range streams {

// choose a stream which is marked as outbound and is for the flow protocol
if stream.Stat().Direction == dir && stream.Protocol() == protocol {
if (opts.dir == network.DirUnknown || stream.Stat().Direction == opts.dir) &&
(opts.protocol == allProtocols || stream.Protocol() == opts.protocol) {
filteredStreams = append(filteredStreams, stream)
if !all {
if !opts.all {
return filteredStreams
}
}
Expand Down Expand Up @@ -159,7 +208,7 @@ func IPPortFromMultiAddress(addrs ...multiaddr.Multiaddr) (string, string, error
return "", "", err
}

//there should only be one valid IPv4 address
// there should only be one valid IPv4 address
return ipOrHostname, port, nil
}
return "", "", fmt.Errorf("ip address or hostname not found")
Expand Down
Loading

0 comments on commit 692969b

Please sign in to comment.