diff --git a/config/default-config.yml b/config/default-config.yml index a215519cfc2..fea9e3fd3f8 100644 --- a/config/default-config.yml +++ b/config/default-config.yml @@ -6,7 +6,7 @@ network-config: networking-connection-pruning: true # Preferred unicasts protocols list of unicast protocols in preferred order preferred-unicast-protocols: [ ] - received-message-cache-size: 10e4 + received-message-cache-size: 10_000 peerupdate-interval: 10m unicast-message-timeout: 5s # Unicast create stream retry delay is initial delay used in the exponential backoff for create stream retries @@ -47,7 +47,7 @@ network-config: unicast-dial-in-progress-backoff-delay: 1s # The size of the dial config cache used to keep track of the dial config for each remote peer. The dial config is used to keep track of the dial retry budget for each remote peer. # Recommended to set it to the maximum number of remote peers in the network. - unicast-dial-config-cache-size: 10000 + unicast-dial-config-cache-size: 10_000 # Resource manager config # Maximum allowed fraction of file descriptors to be allocated by the libp2p resources in (0,1] libp2p-memory-limit-ratio: 0.5 # flow default @@ -58,6 +58,19 @@ network-config: # Without this limit peers can end up in a state where there exists n number of connections per peer which # can lead to resource exhaustion of the libp2p node. libp2p-peer-base-limits-conns-inbound: 1 + # maximum number of inbound system-wide streams, across all peers and protocols + # Note that streams are ephemeral and are created and destroyed intermittently. + libp2p-inbound-stream-limit-system: 15_000 + # maximum number of inbound transient streams, across all streams that are not yet fully opened and associated with a protocol. + # Note that streams are ephemeral and are created and destroyed intermittently. + libp2p-inbound-stream-limit-transient: 15_000 + # maximum number of inbound streams for each protocol across all peers; this is a per-protocol limit. We expect at least + # three protocols per node; gossipsub, unicast, and dht. Note that streams are ephemeral and are created and destroyed intermittently. + libp2p-inbound-stream-limit-protocol: 5000 + # maximum number of inbound streams from each peer across all protocols. + libp2p-inbound-stream-limit-peer: 1000 + # maximum number of inbound streams from each peer for each protocol. + libp2p-inbound-stream-limit-protocol-peer: 500 # Connection manager config # HighWatermark and LowWatermark govern the number of connections are maintained by the ConnManager. # When the peer count exceeds the HighWatermark, as many peers will be pruned (and @@ -98,7 +111,7 @@ network-config: # Gossipsub rpc inspectors configs # The size of the queue for notifications about invalid RPC messages - gossipsub-rpc-inspector-notification-cache-size: 10000 + gossipsub-rpc-inspector-notification-cache-size: 10_000 # RPC control message validation inspector configs # Rpc validation inspector number of pool workers gossipsub-rpc-validation-inspector-workers: 5 @@ -141,8 +154,8 @@ network-config: # The size of the queue used by worker pool for the control message metrics inspector gossipsub-rpc-metrics-inspector-cache-size: 100 # Application layer spam prevention - alsp-spam-record-cache-size: 10e3 - alsp-spam-report-queue-size: 10e4 + alsp-spam-record-cache-size: 1000 + alsp-spam-report-queue-size: 10_000 alsp-disable-penalty: false alsp-heart-beat-interval: 1s diff --git a/network/internal/p2putils/utils.go b/network/internal/p2putils/utils.go index 2325df8734a..0ec8b8aba11 100644 --- a/network/internal/p2putils/utils.go +++ b/network/internal/p2putils/utils.go @@ -4,11 +4,11 @@ import ( "fmt" "net" - "github.com/libp2p/go-libp2p/core" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" "github.com/multiformats/go-multiaddr" "github.com/rs/zerolog" @@ -69,46 +69,95 @@ func ConnectednessToString(connectedness network.Connectedness) (string, bool) { } -// FindOutboundStream finds an existing outbound stream to the target id if it exists by querying libp2p -func FindOutboundStream(host host.Host, targetID peer.ID, protocol core.ProtocolID) (network.Stream, bool) { - streams := FilterStream(host, targetID, protocol, network.DirOutbound, false) - if len(streams) > 0 { - return streams[0], true +// CountStream finds total number of outbound stream to the target id +func CountStream(host host.Host, targetID peer.ID, opts ...FilterOption) int { + streams := FilterStream(host, targetID, append(opts, All())...) + return len(streams) +} + +// FilterOptions holds the filtering options used in FilterStream. +type FilterOptions struct { + // dir specifies the direction of the streams to be filtered. + // The default value is network.DirBoth, which considers both inbound and outbound streams. + dir network.Direction + + // protocol specifies the protocol ID of the streams to be filtered. + // The default value is an empty string, which considers streams of all protocol IDs. + protocol protocol.ID + + // all specifies whether to return all matching streams or just the first matching stream. + // The default value is false, which returns just the first matching stream. + all bool +} + +// FilterOption defines a function type that modifies FilterOptions. +type FilterOption func(*FilterOptions) + +// Direction is a FilterOption for setting the direction of the streams to be filtered. +func Direction(dir network.Direction) FilterOption { + return func(opts *FilterOptions) { + opts.dir = dir } - return nil, false } -// CountStream finds total number of outbound stream to the target id -func CountStream(host host.Host, targetID peer.ID, protocol core.ProtocolID, dir network.Direction) int { - streams := FilterStream(host, targetID, protocol, dir, true) - return len(streams) +// Protocol is a FilterOption for setting the protocol ID of the streams to be filtered. +func Protocol(protocol protocol.ID) FilterOption { + return func(opts *FilterOptions) { + opts.protocol = protocol + } } -// FilterStream finds one or all existing outbound streams to the target id if it exists. -// if parameter all is true - all streams are found else the first stream found is returned -func FilterStream(host host.Host, targetID peer.ID, protocol core.ProtocolID, dir network.Direction, all bool) []network.Stream { +// All is a FilterOption for setting whether to return all matching streams or just the first matching stream. +func All() FilterOption { + return func(opts *FilterOptions) { + opts.all = true + } +} +// FilterStream filters the streams to a target peer based on the provided options. +// The default behavior is to consider all directions and protocols, and return just the first matching stream. +// This behavior can be customized by providing FilterOption values. +// +// Usage: +// +// - To find all outbound streams to a target peer with a specific protocol ID: +// streams := FilterStream(host, targetID, Direction(network.DirOutbound), Protocol(myProtocolID), All(true)) +// +// - To find the first inbound stream to a target peer, regardless of protocol ID: +// stream := FilterStream(host, targetID, Direction(network.DirInbound)) +// +// host is the host from which to filter streams. +// targetID is the ID of the target peer. +// options is a variadic parameter that allows zero or more FilterOption values to be provided. +// +// It returns a slice of network.Stream values that match the filtering criteria. +func FilterStream(host host.Host, targetID peer.ID, options ...FilterOption) []network.Stream { var filteredStreams []network.Stream + const allProtocols = "*" + // default values + opts := FilterOptions{ + dir: network.DirUnknown, // by default, consider both inbound and outbound streams + protocol: allProtocols, // by default, consider streams of all protocol IDs + all: false, // by default, return just the first matching stream + } + + // apply provided options + for _, option := range options { + option(&opts) + } - // choose the connection only if it is connected if host.Network().Connectedness(targetID) != network.Connected { return filteredStreams } - // get all connections conns := host.Network().ConnsToPeer(targetID) - - // find a connection which is in the connected state for _, conn := range conns { - - // get all streams streams := conn.GetStreams() for _, stream := range streams { - - // choose a stream which is marked as outbound and is for the flow protocol - if stream.Stat().Direction == dir && stream.Protocol() == protocol { + if (opts.dir == network.DirUnknown || stream.Stat().Direction == opts.dir) && + (opts.protocol == allProtocols || stream.Protocol() == opts.protocol) { filteredStreams = append(filteredStreams, stream) - if !all { + if !opts.all { return filteredStreams } } @@ -159,7 +208,7 @@ func IPPortFromMultiAddress(addrs ...multiaddr.Multiaddr) (string, string, error return "", "", err } - //there should only be one valid IPv4 address + // there should only be one valid IPv4 address return ipOrHostname, port, nil } return "", "", fmt.Errorf("ip address or hostname not found") diff --git a/network/netconf/flags.go b/network/netconf/flags.go index 8069a5e607d..cb0722b5567 100644 --- a/network/netconf/flags.go +++ b/network/netconf/flags.go @@ -33,9 +33,15 @@ const ( bandwidthRateLimit = "unicast-bandwidth-rate-limit" bandwidthBurstLimit = "unicast-bandwidth-burst-limit" // resource manager config - memoryLimitRatio = "libp2p-memory-limit-ratio" - fileDescriptorsRatio = "libp2p-file-descriptors-ratio" - peerBaseLimitConnsInbound = "libp2p-peer-base-limits-conns-inbound" + memoryLimitRatio = "libp2p-memory-limit-ratio" + fileDescriptorsRatio = "libp2p-file-descriptors-ratio" + peerBaseLimitConnsInbound = "libp2p-peer-base-limits-conns-inbound" + inboundStreamLimitSystem = "libp2p-inbound-stream-limit-system" + inboundStreamLimitPeer = "libp2p-inbound-stream-limit-peer" + inboundStreamLimitProtocol = "libp2p-inbound-stream-limit-protocol" + inboundStreamLimitProtocolPeer = "libp2p-inbound-stream-limit-protocol-peer" + inboundStreamLimitTransient = "libp2p-inbound-stream-limit-transient" + // connection manager highWatermark = "libp2p-high-watermark" lowWatermark = "libp2p-low-watermark" @@ -105,6 +111,11 @@ func AllFlagNames() []string { memoryLimitRatio, fileDescriptorsRatio, peerBaseLimitConnsInbound, + inboundStreamLimitSystem, + inboundStreamLimitPeer, + inboundStreamLimitProtocol, + inboundStreamLimitProtocolPeer, + inboundStreamLimitTransient, highWatermark, lowWatermark, gracePeriod, @@ -155,28 +166,52 @@ func InitializeNetworkFlags(flags *pflag.FlagSet, config *Config) { flags.Duration(peerUpdateInterval, config.PeerUpdateInterval, "how often to refresh the peer connections for the node") flags.Duration(unicastMessageTimeout, config.UnicastMessageTimeout, "how long a unicast transmission can take to complete") // unicast manager options - flags.Duration(unicastCreateStreamRetryDelay, config.UnicastConfig.CreateStreamBackoffDelay, "initial backoff delay between failing to establish a connection with another node and retrying, "+ - "this delay increases exponentially with the number of subsequent failures to establish a connection.") - flags.Duration(unicastDialBackoffDelay, config.UnicastConfig.DialInProgressBackoffDelay, "initial backoff delay between failing to establish a connection with another node and retrying, "+ - "this delay increases exponentially with the number of subsequent failures to establish a connection.") - flags.Duration(unicastDialInProgressBackoffDelay, config.UnicastConfig.DialInProgressBackoffDelay, "initial backoff delay for concurrent stream creations to a remote peer when there is no exising connection and a dial is in progress. "+ - "this delay increases exponentially with the number of subsequent failure attempts") - flags.Uint64(unicastStreamZeroRetryResetThreshold, config.UnicastConfig.StreamZeroRetryResetThreshold, "reset stream creation retry budget from zero to the maximum after consecutive successful streams reach this threshold.") - flags.Duration(unicastDialZeroRetryResetThreshold, config.UnicastConfig.DialZeroRetryResetThreshold, "reset dial retry budget if the last successful dial is longer than this threshold.") + flags.Duration(unicastCreateStreamRetryDelay, + config.UnicastConfig.CreateStreamBackoffDelay, + "initial backoff delay between failing to establish a connection with another node and retrying, "+ + "this delay increases exponentially with the number of subsequent failures to establish a connection.") + flags.Duration(unicastDialBackoffDelay, + config.UnicastConfig.DialInProgressBackoffDelay, + "initial backoff delay between failing to establish a connection with another node and retrying, "+ + "this delay increases exponentially with the number of subsequent failures to establish a connection.") + flags.Duration(unicastDialInProgressBackoffDelay, + config.UnicastConfig.DialInProgressBackoffDelay, + "initial backoff delay for concurrent stream creations to a remote peer when there is no exising connection and a dial is in progress. "+ + "this delay increases exponentially with the number of subsequent failure attempts") + flags.Uint64(unicastStreamZeroRetryResetThreshold, + config.UnicastConfig.StreamZeroRetryResetThreshold, + "reset stream creation retry budget from zero to the maximum after consecutive successful streams reach this threshold.") + flags.Duration(unicastDialZeroRetryResetThreshold, + config.UnicastConfig.DialZeroRetryResetThreshold, + "reset dial retry budget if the last successful dial is longer than this threshold.") flags.Uint64(unicastMaxDialRetryAttemptTimes, config.UnicastConfig.MaxDialRetryAttemptTimes, "maximum attempts to establish a unicast connection.") flags.Uint64(unicastMaxStreamCreationRetryAttemptTimes, config.UnicastConfig.MaxStreamCreationRetryAttemptTimes, "max attempts to create a unicast stream.") - flags.Uint32(unicastDialConfigCacheSize, config.UnicastConfig.DialConfigCacheSize, "cache size of the dial config cache, recommended to be big enough to accommodate the entire nodes in the network.") + flags.Uint32(unicastDialConfigCacheSize, + config.UnicastConfig.DialConfigCacheSize, + "cache size of the dial config cache, recommended to be big enough to accommodate the entire nodes in the network.") // unicast stream handler rate limits flags.Int(messageRateLimit, config.UnicastConfig.UnicastRateLimitersConfig.MessageRateLimit, "maximum number of unicast messages that a peer can send per second") - flags.Int(bandwidthRateLimit, config.UnicastConfig.UnicastRateLimitersConfig.BandwidthRateLimit, "bandwidth size in bytes a peer is allowed to send via unicast streams per second") + flags.Int(bandwidthRateLimit, + config.UnicastConfig.UnicastRateLimitersConfig.BandwidthRateLimit, + "bandwidth size in bytes a peer is allowed to send via unicast streams per second") flags.Int(bandwidthBurstLimit, config.UnicastConfig.UnicastRateLimitersConfig.BandwidthBurstLimit, "bandwidth size in bytes a peer is allowed to send at one time") - flags.Duration(lockoutDuration, config.UnicastConfig.UnicastRateLimitersConfig.LockoutDuration, "the number of seconds a peer will be forced to wait before being allowed to successful reconnect to the node after being rate limited") + flags.Duration(lockoutDuration, + config.UnicastConfig.UnicastRateLimitersConfig.LockoutDuration, + "the number of seconds a peer will be forced to wait before being allowed to successful reconnect to the node after being rate limited") flags.Bool(dryRun, config.UnicastConfig.UnicastRateLimitersConfig.DryRun, "disable peer disconnects and connections gating when rate limiting peers") // resource manager cli flags flags.Float64(fileDescriptorsRatio, config.ResourceManagerConfig.FileDescriptorsRatio, "ratio of available file descriptors to be used by libp2p (in (0,1])") flags.Float64(memoryLimitRatio, config.ResourceManagerConfig.MemoryLimitRatio, "ratio of available memory to be used by libp2p (in (0,1])") flags.Int(peerBaseLimitConnsInbound, config.ResourceManagerConfig.PeerBaseLimitConnsInbound, "the maximum amount of allowed inbound connections per peer") + flags.Int(inboundStreamLimitSystem, config.ResourceManagerConfig.InboundStream.System, "the system-wide limit on the number of inbound streams") + flags.Int(inboundStreamLimitPeer, config.ResourceManagerConfig.InboundStream.Peer, "the limit on the number of inbound streams per peer (over all protocols)") + flags.Int(inboundStreamLimitProtocol, config.ResourceManagerConfig.InboundStream.Protocol, "the limit on the number of inbound streams per protocol (over all peers)") + flags.Int(inboundStreamLimitProtocolPeer, config.ResourceManagerConfig.InboundStream.ProtocolPeer, "the limit on the number of inbound streams per protocol per peer") + flags.Int(inboundStreamLimitTransient, + config.ResourceManagerConfig.InboundStream.Transient, + "the transient limit on the number of inbound streams (applied to streams that are not associated with a peer or protocol yet)") + // connection manager flags.Int(lowWatermark, config.ConnectionManagerConfig.LowWatermark, "low watermarking for libp2p connection manager") flags.Int(highWatermark, config.ConnectionManagerConfig.HighWatermark, "high watermarking for libp2p connection manager") @@ -189,16 +224,32 @@ func InitializeNetworkFlags(flags *pflag.FlagSet, config *Config) { flags.Uint32(rpcSentTrackerQueueCacheSize, config.GossipSubConfig.RPCSentTrackerQueueCacheSize, "cache size of the rpc sent tracker worker queue.") flags.Int(rpcSentTrackerNumOfWorkers, config.GossipSubConfig.RpcSentTrackerNumOfWorkers, "number of workers for the rpc sent tracker worker pool.") // gossipsub RPC control message validation limits used for validation configuration and rate limiting - flags.Int(validationInspectorNumberOfWorkers, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.NumberOfWorkers, "number of gossupsub RPC control message validation inspector component workers") - flags.Uint32(validationInspectorInspectMessageQueueCacheSize, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.CacheSize, "cache size for gossipsub RPC validation inspector events worker pool queue.") - flags.Uint32(validationInspectorClusterPrefixedTopicsReceivedCacheSize, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.ClusterPrefixedControlMsgsReceivedCacheSize, "cache size for gossipsub RPC validation inspector cluster prefix received tracker.") - flags.Float64(validationInspectorClusterPrefixedTopicsReceivedCacheDecay, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.ClusterPrefixedControlMsgsReceivedCacheDecay, "the decay value used to decay cluster prefix received topics received cached counters.") - flags.Float64(validationInspectorClusterPrefixHardThreshold, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.ClusterPrefixHardThreshold, "the maximum number of cluster-prefixed control messages allowed to be processed when the active cluster id is unset or a mismatch is detected, exceeding this threshold will result in node penalization by gossipsub.") + flags.Int(validationInspectorNumberOfWorkers, + config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.NumberOfWorkers, + "number of gossupsub RPC control message validation inspector component workers") + flags.Uint32(validationInspectorInspectMessageQueueCacheSize, + config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.CacheSize, + "cache size for gossipsub RPC validation inspector events worker pool queue.") + flags.Uint32(validationInspectorClusterPrefixedTopicsReceivedCacheSize, + config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.ClusterPrefixedControlMsgsReceivedCacheSize, + "cache size for gossipsub RPC validation inspector cluster prefix received tracker.") + flags.Float64(validationInspectorClusterPrefixedTopicsReceivedCacheDecay, + config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.ClusterPrefixedControlMsgsReceivedCacheDecay, + "the decay value used to decay cluster prefix received topics received cached counters.") + flags.Float64(validationInspectorClusterPrefixHardThreshold, + config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.ClusterPrefixHardThreshold, + "the maximum number of cluster-prefixed control messages allowed to be processed when the active cluster id is unset or a mismatch is detected, exceeding this threshold will result in node penalization by gossipsub.") // gossipsub RPC control message metrics observer inspector configuration - flags.Int(metricsInspectorNumberOfWorkers, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCMetricsInspectorConfigs.NumberOfWorkers, "cache size for gossipsub RPC metrics inspector events worker pool queue.") - flags.Uint32(metricsInspectorCacheSize, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCMetricsInspectorConfigs.CacheSize, "cache size for gossipsub RPC metrics inspector events worker pool.") + flags.Int(metricsInspectorNumberOfWorkers, + config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCMetricsInspectorConfigs.NumberOfWorkers, + "cache size for gossipsub RPC metrics inspector events worker pool queue.") + flags.Uint32(metricsInspectorCacheSize, + config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCMetricsInspectorConfigs.CacheSize, + "cache size for gossipsub RPC metrics inspector events worker pool.") // networking event notifications - flags.Uint32(gossipSubRPCInspectorNotificationCacheSize, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCInspectorNotificationCacheSize, "cache size for notification events from gossipsub rpc inspector") + flags.Uint32(gossipSubRPCInspectorNotificationCacheSize, + config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCInspectorNotificationCacheSize, + "cache size for notification events from gossipsub rpc inspector") // application layer spam prevention (alsp) protocol flags.Bool(alspDisabled, config.AlspConfig.DisablePenalty, "disable the penalty mechanism of the alsp protocol. default value (recommended) is false") flags.Uint32(alspSpamRecordCacheSize, config.AlspConfig.SpamRecordCacheSize, "size of spam record cache, recommended to be 10x the number of authorized nodes") @@ -208,14 +259,30 @@ func InitializeNetworkFlags(flags *pflag.FlagSet, config *Config) { flags.Float32(alspSyncEngineRangeRequestBaseProb, config.AlspConfig.SyncEngine.RangeRequestBaseProb, "base probability of creating a misbehavior report for a range request message") flags.Float32(alspSyncEngineSyncRequestProb, config.AlspConfig.SyncEngine.SyncRequestProb, "probability of creating a misbehavior report for a sync request message") - flags.Int(ihaveMaxSampleSize, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.IHaveRPCInspectionConfig.MaxSampleSize, "max number of ihaves to sample when performing validation") - flags.Int(ihaveMaxMessageIDSampleSize, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.IHaveRPCInspectionConfig.MaxMessageIDSampleSize, "max number of message ids to sample when performing validation per ihave") - flags.Int(controlMessageMaxSampleSize, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.GraftPruneMessageMaxSampleSize, "max number of control messages to sample when performing validation on GRAFT and PRUNE message types") - flags.Uint(iwantMaxSampleSize, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.IWantRPCInspectionConfig.MaxSampleSize, "max number of iwants to sample when performing validation") - flags.Int(iwantMaxMessageIDSampleSize, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.IWantRPCInspectionConfig.MaxMessageIDSampleSize, "max number of message ids to sample when performing validation per iwant") - flags.Float64(iwantCacheMissThreshold, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.IWantRPCInspectionConfig.CacheMissThreshold, "max number of iwants to sample when performing validation") - flags.Int(iwantCacheMissCheckSize, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.IWantRPCInspectionConfig.CacheMissCheckSize, "the iWants size at which message id cache misses will be checked") - flags.Float64(iwantDuplicateMsgIDThreshold, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.IWantRPCInspectionConfig.DuplicateMsgIDThreshold, "max allowed duplicate message IDs in a single iWant control message") + flags.Int(ihaveMaxSampleSize, + config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.IHaveRPCInspectionConfig.MaxSampleSize, + "max number of ihaves to sample when performing validation") + flags.Int(ihaveMaxMessageIDSampleSize, + config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.IHaveRPCInspectionConfig.MaxMessageIDSampleSize, + "max number of message ids to sample when performing validation per ihave") + flags.Int(controlMessageMaxSampleSize, + config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.GraftPruneMessageMaxSampleSize, + "max number of control messages to sample when performing validation on GRAFT and PRUNE message types") + flags.Uint(iwantMaxSampleSize, + config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.IWantRPCInspectionConfig.MaxSampleSize, + "max number of iwants to sample when performing validation") + flags.Int(iwantMaxMessageIDSampleSize, + config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.IWantRPCInspectionConfig.MaxMessageIDSampleSize, + "max number of message ids to sample when performing validation per iwant") + flags.Float64(iwantCacheMissThreshold, + config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.IWantRPCInspectionConfig.CacheMissThreshold, + "max number of iwants to sample when performing validation") + flags.Int(iwantCacheMissCheckSize, + config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.IWantRPCInspectionConfig.CacheMissCheckSize, + "the iWants size at which message id cache misses will be checked") + flags.Float64(iwantDuplicateMsgIDThreshold, + config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.IWantRPCInspectionConfig.DuplicateMsgIDThreshold, + "max allowed duplicate message IDs in a single iWant control message") } // SetAliases this func sets an aliases for each CLI flag defined for network config overrides to it's corresponding diff --git a/network/p2p/p2pbuilder/libp2pNodeBuilder.go b/network/p2p/p2pbuilder/libp2pNodeBuilder.go index 4b9b5ae8dad..0340069a5de 100644 --- a/network/p2p/p2pbuilder/libp2pNodeBuilder.go +++ b/network/p2p/p2pbuilder/libp2pNodeBuilder.go @@ -222,9 +222,11 @@ func (builder *LibP2PNodeBuilder) Build() (p2p.LibP2PNode, error) { if err != nil { return nil, fmt.Errorf("could not get allowed file descriptors: %w", err) } - limits.PeerBaseLimit.ConnsInbound = builder.resourceManagerCfg.PeerBaseLimitConnsInbound - l := limits.Scale(mem, fd) - mgr, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(l), rcmgr.WithMetrics(builder.metricsConfig.Metrics)) + + // scales the default limits by the allowed memory and file descriptors and applies the inbound connection and stream limits. + appliedLimits := ApplyInboundConnectionLimits(builder.logger, limits.Scale(mem, fd), builder.resourceManagerCfg.PeerBaseLimitConnsInbound) + appliedLimits = ApplyInboundStreamLimits(builder.logger, appliedLimits, builder.resourceManagerCfg.InboundStream) + mgr, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(appliedLimits), rcmgr.WithMetrics(builder.metricsConfig.Metrics)) if err != nil { return nil, fmt.Errorf("could not create libp2p resource manager: %w", err) } @@ -233,7 +235,7 @@ func (builder *LibP2PNodeBuilder) Build() (p2p.LibP2PNode, error) { Int64("allowed_memory", mem). Int("allowed_file_descriptors", fd). Msg("allowed memory and file descriptors are fetched from the system") - newLimitConfigLogger(builder.logger).logResourceManagerLimits(l) + newLimitConfigLogger(builder.logger).LogResourceManagerLimits(appliedLimits) opts = append(opts, libp2p.ResourceManager(mgr)) builder.logger.Info().Msg("libp2p resource manager is set to default with metrics") diff --git a/network/p2p/p2pbuilder/libp2pscaler_test.go b/network/p2p/p2pbuilder/libp2pscaler_test.go index 789554866d0..dd5cfe48088 100644 --- a/network/p2p/p2pbuilder/libp2pscaler_test.go +++ b/network/p2p/p2pbuilder/libp2pscaler_test.go @@ -3,8 +3,13 @@ package p2pbuilder import ( "testing" + "github.com/libp2p/go-libp2p" + rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" "github.com/pbnjay/memory" "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/config" + "github.com/onflow/flow-go/utils/unittest" ) func TestAllowedMemoryScale(t *testing.T) { @@ -102,3 +107,73 @@ func TestAllowedFileDescriptorsScale(t *testing.T) { require.NoError(t, err) require.Equal(t, fd/10000, s) } + +// TestApplyInboundStreamLimits tests that the inbound stream limits are applied correctly, i.e., the limits from the config file +// are applied to the concrete limit config when the concrete limit config is greater than the limits from the config file. +func TestApplyInboundStreamAndConnectionLimits(t *testing.T) { + cfg, err := config.DefaultConfig() + require.NoError(t, err) + + mem, err := allowedMemory(cfg.NetworkConfig.ResourceManagerConfig.MemoryLimitRatio) + require.NoError(t, err) + + fd, err := allowedFileDescriptors(cfg.NetworkConfig.FileDescriptorsRatio) + require.NoError(t, err) + limits := rcmgr.DefaultLimits + libp2p.SetDefaultServiceLimits(&limits) + scaled := limits.Scale(mem, fd) + + concrete := rcmgr.PartialLimitConfig{ + System: rcmgr.ResourceLimits{ + // intentionally sets to 1 to test that it is not overridden. + StreamsInbound: 1, + }, + Transient: rcmgr.ResourceLimits{ + // sets it higher than the default to test that it is overridden. + StreamsInbound: rcmgr.LimitVal(cfg.NetworkConfig.ResourceManagerConfig.InboundStream.Transient + 1), + }, + ProtocolDefault: rcmgr.ResourceLimits{ + // sets it higher than the default to test that it is overridden. + StreamsInbound: rcmgr.LimitVal(cfg.NetworkConfig.ResourceManagerConfig.InboundStream.Protocol + 1), + // intentionally sets it lower than the default to test that it is not overridden. + ConnsInbound: rcmgr.LimitVal(cfg.NetworkConfig.ResourceManagerConfig.PeerBaseLimitConnsInbound - 1), + }, + ProtocolPeerDefault: rcmgr.ResourceLimits{ + StreamsInbound: 1, // intentionally sets to 1 to test that it is not overridden. + }, + PeerDefault: rcmgr.ResourceLimits{ + StreamsInbound: rcmgr.LimitVal(cfg.NetworkConfig.ResourceManagerConfig.InboundStream.Peer + 1), + }, + Conn: rcmgr.ResourceLimits{ + StreamsInbound: 1, // intentionally sets to 1 to test that it is not overridden. + }, + Stream: rcmgr.ResourceLimits{ + StreamsInbound: 1, // intentionally sets to 1 to test that it is not overridden. + }, + }.Build(scaled) + + // apply inbound stream limits from the config file. + applied := ApplyInboundStreamLimits(unittest.Logger(), concrete, cfg.NetworkConfig.ResourceManagerConfig.InboundStream) + + // then applies the peer base limit connections from the config file. + applied = ApplyInboundConnectionLimits(unittest.Logger(), applied, cfg.NetworkConfig.ResourceManagerConfig.PeerBaseLimitConnsInbound) + + // check that the applied limits are overridden. + // transient limit should be overridden. + require.Equal(t, int(cfg.NetworkConfig.ResourceManagerConfig.InboundStream.Transient), int(applied.ToPartialLimitConfig().Transient.StreamsInbound)) + // protocol default limit should be overridden. + require.Equal(t, int(cfg.NetworkConfig.ResourceManagerConfig.InboundStream.Protocol), int(applied.ToPartialLimitConfig().ProtocolDefault.StreamsInbound)) + // peer default limit should be overridden. + require.Equal(t, int(cfg.NetworkConfig.ResourceManagerConfig.InboundStream.Peer), int(applied.ToPartialLimitConfig().PeerDefault.StreamsInbound)) + // protocol peer default limit should not be overridden. + require.Equal(t, int(1), int(applied.ToPartialLimitConfig().ProtocolPeerDefault.StreamsInbound)) + // conn limit should not be overridden. + require.Equal(t, int(1), int(applied.ToPartialLimitConfig().Conn.StreamsInbound)) + // stream limit should not be overridden. + require.Equal(t, int(1), int(applied.ToPartialLimitConfig().Stream.StreamsInbound)) + // system limit should not be overridden. + require.Equal(t, int(1), int(applied.ToPartialLimitConfig().System.StreamsInbound)) + + // check that the applied peer base limit connections are overridden. + require.Equal(t, int(cfg.NetworkConfig.ResourceManagerConfig.PeerBaseLimitConnsInbound), int(applied.ToPartialLimitConfig().PeerDefault.ConnsInbound)) +} diff --git a/network/p2p/p2pbuilder/utils.go b/network/p2p/p2pbuilder/utils.go index ef2a2bc1ae9..3f54c886fd5 100644 --- a/network/p2p/p2pbuilder/utils.go +++ b/network/p2p/p2pbuilder/utils.go @@ -10,6 +10,7 @@ import ( "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/network/p2p" + "github.com/onflow/flow-go/network/p2p/p2pconf" "github.com/onflow/flow-go/network/p2p/p2plogging" ) @@ -51,7 +52,7 @@ func (l *limitConfigLogger) withBaseLimit(prefix string, baseLimit rcmgr.Resourc Str(fmt.Sprintf("%s_memory", prefix), fmt.Sprintf("%v", baseLimit.Memory)).Logger() } -func (l *limitConfigLogger) logResourceManagerLimits(config rcmgr.ConcreteLimitConfig) { +func (l *limitConfigLogger) LogResourceManagerLimits(config rcmgr.ConcreteLimitConfig) { // PartialLimit config is the same as ConcreteLimit config, but with the exported fields. pCfg := config.ToPartialLimitConfig() l.logGlobalResourceLimits(pCfg) @@ -123,3 +124,91 @@ func (l *limitConfigLogger) logPeerProtocolLimits(p map[protocol.ID]rcmgr.Resour lg.Info().Msg("protocol peer limits set") } } + +// ApplyInboundStreamLimits applies the inbound stream limits to the concrete limit config. The concrete limit config is assumed coming from scaling the +// base limit config by the scaling factor. The inbound stream limits are applied to the concrete limit config if the concrete limit config is greater than +// the inbound stream limits. +// The inbound limits are assumed coming from the config file. +// Args: +// +// logger: the logger to log the applied limits. +// concrete: the concrete limit config. +// limit: the inbound stream limits. +// +// Returns: +// +// a copy of the concrete limit config with the inbound stream limits applied and overridden. +func ApplyInboundStreamLimits(logger zerolog.Logger, concrete rcmgr.ConcreteLimitConfig, limit p2pconf.InboundStreamLimit) rcmgr.ConcreteLimitConfig { + c := concrete.ToPartialLimitConfig() + + partial := rcmgr.PartialLimitConfig{} + lg := logger.With().Logger() + + if int(c.System.StreamsInbound) > limit.System { + lg = lg.With().Int("concrete_system_inbound_streams", int(c.System.StreamsInbound)).Int("partial_system_inbound_streams", limit.System).Logger() + partial.System.StreamsInbound = rcmgr.LimitVal(limit.System) + } + + if int(c.Transient.StreamsInbound) > limit.Transient { + lg = lg.With().Int("concrete_transient_inbound_streams", int(c.Transient.StreamsInbound)).Int("partial_transient_inbound_streams", limit.Transient).Logger() + partial.Transient.StreamsInbound = rcmgr.LimitVal(limit.Transient) + } + + if int(c.ProtocolDefault.StreamsInbound) > limit.Protocol { + lg = lg.With().Int("concrete_protocol_default_inbound_streams", int(c.ProtocolDefault.StreamsInbound)).Int("partial_protocol_default_inbound_streams", + limit.Protocol).Logger() + partial.ProtocolDefault.StreamsInbound = rcmgr.LimitVal(limit.Protocol) + } + + if int(c.PeerDefault.StreamsInbound) > limit.Peer { + lg = lg.With().Int("concrete_peer_default_inbound_streams", int(c.PeerDefault.StreamsInbound)).Int("partial_peer_default_inbound_streams", limit.Peer).Logger() + partial.PeerDefault.StreamsInbound = rcmgr.LimitVal(limit.Peer) + } + + if int(c.ProtocolPeerDefault.StreamsInbound) > limit.ProtocolPeer { + lg = lg.With().Int("concrete_protocol_peer_default_inbound_streams", + int(c.ProtocolPeerDefault.StreamsInbound)).Int("partial_protocol_peer_default_inbound_streams", limit.ProtocolPeer).Logger() + partial.ProtocolPeerDefault.StreamsInbound = rcmgr.LimitVal(limit.ProtocolPeer) + } + + if int(c.Stream.StreamsInbound) > limit.Peer { + lg = lg.With().Int("concrete_stream_inbound_streams", int(c.Stream.StreamsInbound)).Int("partial_stream_inbound_streams", limit.Peer).Logger() + partial.ServiceDefault.StreamsInbound = rcmgr.LimitVal(limit.Peer) + } + + if int(c.Conn.StreamsInbound) > limit.Peer { + lg = lg.With().Int("concrete_conn_inbound_streams", int(c.Conn.StreamsInbound)).Int("partial_conn_inbound_streams", limit.Peer).Logger() + partial.ServicePeerDefault.StreamsInbound = rcmgr.LimitVal(limit.Peer) + } + + lg.Info().Msg("inbound stream limits applied") + return partial.Build(concrete) +} + +// ApplyInboundConnectionLimits applies the inbound connection limits to the concrete limit config. The concrete limit config is assumed coming from scaling the +// base limit config by the scaling factor. The inbound connection limits are applied to the concrete limit config if the concrete limit config is greater than +// the inbound connection limits. +// The inbound limits are assumed coming from the config file. +// Args: +// +// logger: the logger to log the applied limits. +// concrete: the concrete limit config. +// peerLimit: the inbound connection limit from each remote peer. +// +// Returns: +// +// a copy of the concrete limit config with the inbound connection limits applied and overridden. +func ApplyInboundConnectionLimits(logger zerolog.Logger, concrete rcmgr.ConcreteLimitConfig, peerLimit int) rcmgr.ConcreteLimitConfig { + c := concrete.ToPartialLimitConfig() + + partial := rcmgr.PartialLimitConfig{} + lg := logger.With().Logger() + + if int(c.PeerDefault.ConnsInbound) > peerLimit { + lg = lg.With().Int("concrete_peer_inbound_conns", int(c.PeerDefault.ConnsInbound)).Int("partial_peer_inbound_conns", peerLimit).Logger() + partial.PeerDefault.ConnsInbound = rcmgr.LimitVal(peerLimit) + } + + lg.Info().Msg("inbound connection limits applied") + return partial.Build(concrete) +} diff --git a/network/p2p/p2pconf/gossipsub.go b/network/p2p/p2pconf/gossipsub.go index 683dff67fdc..62cde38ff73 100644 --- a/network/p2p/p2pconf/gossipsub.go +++ b/network/p2p/p2pconf/gossipsub.go @@ -8,9 +8,29 @@ import ( // The resource manager is used to limit the number of open connections and streams (as well as any other resources // used by libp2p) for each peer. type ResourceManagerConfig struct { - MemoryLimitRatio float64 `mapstructure:"libp2p-memory-limit-ratio"` // maximum allowed fraction of memory to be allocated by the libp2p resources in (0,1] - FileDescriptorsRatio float64 `mapstructure:"libp2p-file-descriptors-ratio"` // maximum allowed fraction of file descriptors to be allocated by the libp2p resources in (0,1] - PeerBaseLimitConnsInbound int `mapstructure:"libp2p-peer-base-limits-conns-inbound"` // the maximum amount of allowed inbound connections per peer + InboundStream InboundStreamLimit `mapstructure:",squash"` + MemoryLimitRatio float64 `mapstructure:"libp2p-memory-limit-ratio"` // maximum allowed fraction of memory to be allocated by the libp2p resources in (0,1] + FileDescriptorsRatio float64 `mapstructure:"libp2p-file-descriptors-ratio"` // maximum allowed fraction of file descriptors to be allocated by the libp2p resources in (0,1] + PeerBaseLimitConnsInbound int `mapstructure:"libp2p-peer-base-limits-conns-inbound"` // the maximum amount of allowed inbound connections per peer +} + +// InboundStreamLimit is the configuration for the inbound stream limit. The inbound stream limit is used to limit the +// number of inbound streams that can be opened by the node. +type InboundStreamLimit struct { + // the system-wide limit on the number of inbound streams + System int `validate:"gt=0" mapstructure:"libp2p-inbound-stream-limit-system"` + + // Transient is the transient limit on the number of inbound streams (applied to streams that are not associated with a peer or protocol yet) + Transient int `validate:"gt=0" mapstructure:"libp2p-inbound-stream-limit-transient"` + + // Protocol is the limit on the number of inbound streams per protocol (over all peers). + Protocol int `validate:"gt=0" mapstructure:"libp2p-inbound-stream-limit-protocol"` + + // Peer is the limit on the number of inbound streams per peer (over all protocols). + Peer int `validate:"gt=0" mapstructure:"libp2p-inbound-stream-limit-peer"` + + // ProtocolPeer is the limit on the number of inbound streams per protocol per peer. + ProtocolPeer int `validate:"gt=0" mapstructure:"libp2p-inbound-stream-limit-protocol-peer"` } // GossipSubConfig is the configuration for the GossipSub pubsub implementation. diff --git a/network/p2p/p2pnode/libp2pNode_test.go b/network/p2p/p2pnode/libp2pNode_test.go index 3dd10b34eac..9ec66082a0e 100644 --- a/network/p2p/p2pnode/libp2pNode_test.go +++ b/network/p2p/p2pnode/libp2pNode_test.go @@ -22,7 +22,6 @@ import ( "github.com/onflow/flow-go/network/p2p" "github.com/onflow/flow-go/network/p2p/p2plogging" p2ptest "github.com/onflow/flow-go/network/p2p/test" - "github.com/onflow/flow-go/network/p2p/unicast/protocols" "github.com/onflow/flow-go/network/p2p/utils" validator "github.com/onflow/flow-go/network/validator/pubsub" "github.com/onflow/flow-go/utils/unittest" @@ -276,60 +275,16 @@ func TestCreateStream_SinglePairwiseConnection(t *testing.T) { go createConcurrentStreams(t, ctxWithTimeout, nodes, ids, numOfStreamsPerNode, streamChan, done) unittest.RequireCloseBefore(t, done, 5*time.Second, "could not create streamChan on time") - require.Len(t, streamChan, expectedTotalNumOfStreams, fmt.Sprintf("expected %d total number of streamChan created got %d", expectedTotalNumOfStreams, len(streamChan))) + require.Len(t, + streamChan, + expectedTotalNumOfStreams, + fmt.Sprintf("expected %d total number of streamChan created got %d", expectedTotalNumOfStreams, len(streamChan))) // ensure only a single connection exists between all nodes ensureSinglePairwiseConnection(t, nodes) close(streamChan) } -// TestCreateStream_InboundConnResourceLimit ensures that the setting the resource limit config for -// PeerDefaultLimits.ConnsInbound restricts the number of inbound connections created from a peer to the configured value. -// NOTE: If this test becomes flaky, it indicates a violation of the single inbound connection guarantee. -// In such cases the test should not be quarantined but requires immediate resolution. -func TestCreateStream_InboundConnResourceLimit(t *testing.T) { - idProvider := mockmodule.NewIdentityProvider(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) - - sporkID := unittest.IdentifierFixture() - - sender, id1 := p2ptest.NodeFixture(t, sporkID, t.Name(), idProvider, p2ptest.WithDefaultResourceManager(), p2ptest.WithCreateStreamRetryDelay(10*time.Millisecond)) - - receiver, id2 := p2ptest.NodeFixture(t, sporkID, t.Name(), idProvider, p2ptest.WithDefaultResourceManager(), p2ptest.WithCreateStreamRetryDelay(10*time.Millisecond)) - - idProvider.On("ByPeerID", sender.ID()).Return(&id1, true).Maybe() - idProvider.On("ByPeerID", receiver.ID()).Return(&id2, true).Maybe() - - p2ptest.StartNodes(t, signalerCtx, []p2p.LibP2PNode{sender, receiver}) - defer p2ptest.StopNodes(t, []p2p.LibP2PNode{sender, receiver}, cancel) - - p2ptest.LetNodesDiscoverEachOther(t, signalerCtx, []p2p.LibP2PNode{sender, receiver}, flow.IdentityList{&id1, &id2}) - - var allStreamsCreated sync.WaitGroup - // at this point both nodes have discovered each other and we can now create an - // arbitrary number of streams from sender -> receiver. This will force libp2p - // to create multiple streams concurrently and attempt to reuse the single pairwise - // connection. If more than one connection is established while creating the conccurent - // streams this indicates a bug in the libp2p PeerBaseLimitConnsInbound limit. - defaultProtocolID := protocols.FlowProtocolID(sporkID) - expectedNumOfStreams := int64(50) - for i := int64(0); i < expectedNumOfStreams; i++ { - allStreamsCreated.Add(1) - go func() { - defer allStreamsCreated.Done() - _, err := sender.Host().NewStream(ctx, receiver.ID(), defaultProtocolID) - require.NoError(t, err) - }() - } - - unittest.RequireReturnsBefore(t, allStreamsCreated.Wait, 2*time.Second, "could not create streams on time") - require.Len(t, receiver.Host().Network().ConnsToPeer(sender.ID()), 1) - actualNumOfStreams := p2putils.CountStream(sender.Host(), receiver.ID(), defaultProtocolID, network.DirOutbound) - require.Equal(t, expectedNumOfStreams, int64(actualNumOfStreams), fmt.Sprintf("expected to create %d number of streams got %d", expectedNumOfStreams, actualNumOfStreams)) -} - // createStreams will attempt to create n number of streams concurrently between each combination of node pairs. func createConcurrentStreams(t *testing.T, ctx context.Context, nodes []p2p.LibP2PNode, ids flow.IdentityList, n int, streams chan network.Stream, done chan struct{}) { defer close(done) diff --git a/network/p2p/p2pnode/libp2pStream_test.go b/network/p2p/p2pnode/libp2pStream_test.go index d4f43ed6d90..8131864b41d 100644 --- a/network/p2p/p2pnode/libp2pStream_test.go +++ b/network/p2p/p2pnode/libp2pStream_test.go @@ -153,7 +153,7 @@ func testCreateStream(t *testing.T, sporkId flow.Identifier, unicasts []protocol id2 := identities[1] // Assert that there is no outbound stream to the target yet - require.Equal(t, 0, p2putils.CountStream(nodes[0].Host(), nodes[1].ID(), protocolID, network.DirOutbound)) + require.Equal(t, 0, p2putils.CountStream(nodes[0].Host(), nodes[1].ID(), p2putils.Protocol(protocolID), p2putils.Direction(network.DirOutbound))) // Now attempt to create another 100 outbound stream to the same destination by calling CreateStream streamCount := 100 @@ -182,7 +182,7 @@ func testCreateStream(t *testing.T, sporkId flow.Identifier, unicasts []protocol } require.Eventually(t, func() bool { - return streamCount == p2putils.CountStream(nodes[0].Host(), nodes[1].ID(), protocolID, network.DirOutbound) + return streamCount == p2putils.CountStream(nodes[0].Host(), nodes[1].ID(), p2putils.Protocol(protocolID), p2putils.Direction(network.DirOutbound)) }, 5*time.Second, 100*time.Millisecond, "could not create streams on time") // checks that the number of connections is 1 despite the number of streams; i.e., all streams are created on the same connection @@ -227,8 +227,8 @@ func TestCreateStream_FallBack(t *testing.T) { // Assert that there is no outbound stream to the target yet (neither default nor preferred) defaultProtocolId := protocols.FlowProtocolID(sporkId) preferredProtocolId := protocols.FlowGzipProtocolId(sporkId) - require.Equal(t, 0, p2putils.CountStream(thisNode.Host(), otherNode.ID(), defaultProtocolId, network.DirOutbound)) - require.Equal(t, 0, p2putils.CountStream(thisNode.Host(), otherNode.ID(), preferredProtocolId, network.DirOutbound)) + require.Equal(t, 0, p2putils.CountStream(thisNode.Host(), otherNode.ID(), p2putils.Protocol(defaultProtocolId), p2putils.Direction(network.DirOutbound))) + require.Equal(t, 0, p2putils.CountStream(thisNode.Host(), otherNode.ID(), p2putils.Protocol(preferredProtocolId), p2putils.Direction(network.DirOutbound))) // Now attempt to create another 100 outbound stream to the same destination by calling CreateStream streamCount := 10 @@ -257,11 +257,11 @@ func TestCreateStream_FallBack(t *testing.T) { // wait for the stream to be created on the default protocol id. require.Eventually(t, func() bool { - return streamCount == p2putils.CountStream(nodes[0].Host(), nodes[1].ID(), defaultProtocolId, network.DirOutbound) + return streamCount == p2putils.CountStream(nodes[0].Host(), nodes[1].ID(), p2putils.Protocol(defaultProtocolId), p2putils.Direction(network.DirOutbound)) }, 5*time.Second, 100*time.Millisecond, "could not create streams on time") // no stream must be created on the preferred protocol id - require.Equal(t, 0, p2putils.CountStream(thisNode.Host(), otherNode.ID(), preferredProtocolId, network.DirOutbound)) + require.Equal(t, 0, p2putils.CountStream(thisNode.Host(), otherNode.ID(), p2putils.Protocol(preferredProtocolId), p2putils.Direction(network.DirOutbound))) // checks that the number of connections is 1 despite the number of streams; i.e., all streams are created on the same connection require.Len(t, nodes[0].Host().Network().Conns(), 1) diff --git a/network/p2p/p2pnode/resourceManager_test.go b/network/p2p/p2pnode/resourceManager_test.go new file mode 100644 index 00000000000..bcef3cb8f7f --- /dev/null +++ b/network/p2p/p2pnode/resourceManager_test.go @@ -0,0 +1,416 @@ +package p2pnode_test + +import ( + "context" + "fmt" + "math" + "sync" + "testing" + "time" + + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/network" + rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/irrecoverable" + mockmodule "github.com/onflow/flow-go/module/mock" + "github.com/onflow/flow-go/network/internal/p2putils" + "github.com/onflow/flow-go/network/p2p" + p2ptest "github.com/onflow/flow-go/network/p2p/test" + "github.com/onflow/flow-go/network/p2p/unicast/protocols" + "github.com/onflow/flow-go/utils/unittest" +) + +// TestCreateStream_InboundConnResourceLimit ensures that the setting the resource limit config for +// PeerDefaultLimits.ConnsInbound restricts the number of inbound connections created from a peer to the configured value. +// NOTE: If this test becomes flaky, it indicates a violation of the single inbound connection guarantee. +// In such cases the test should not be quarantined but requires immediate resolution. +func TestCreateStream_InboundConnResourceLimit(t *testing.T) { + idProvider := mockmodule.NewIdentityProvider(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) + + sporkID := unittest.IdentifierFixture() + + sender, id1 := p2ptest.NodeFixture( + t, + sporkID, + t.Name(), + idProvider, + p2ptest.WithDefaultResourceManager(), + p2ptest.WithCreateStreamRetryDelay(10*time.Millisecond)) + + receiver, id2 := p2ptest.NodeFixture( + t, + sporkID, + t.Name(), + idProvider, + p2ptest.WithDefaultResourceManager(), + p2ptest.WithCreateStreamRetryDelay(10*time.Millisecond)) + + idProvider.On("ByPeerID", sender.ID()).Return(&id1, true).Maybe() + idProvider.On("ByPeerID", receiver.ID()).Return(&id2, true).Maybe() + + p2ptest.StartNodes(t, signalerCtx, []p2p.LibP2PNode{sender, receiver}) + defer p2ptest.StopNodes(t, []p2p.LibP2PNode{sender, receiver}, cancel) + + p2ptest.LetNodesDiscoverEachOther(t, signalerCtx, []p2p.LibP2PNode{sender, receiver}, flow.IdentityList{&id1, &id2}) + + var allStreamsCreated sync.WaitGroup + // at this point both nodes have discovered each other and we can now create an + // arbitrary number of streams from sender -> receiver. This will force libp2p + // to create multiple streams concurrently and attempt to reuse the single pairwise + // connection. If more than one connection is established while creating the conccurent + // streams this indicates a bug in the libp2p PeerBaseLimitConnsInbound limit. + defaultProtocolID := protocols.FlowProtocolID(sporkID) + expectedNumOfStreams := int64(50) + for i := int64(0); i < expectedNumOfStreams; i++ { + allStreamsCreated.Add(1) + go func() { + defer allStreamsCreated.Done() + require.NoError(t, sender.Host().Connect(ctx, receiver.Host().Peerstore().PeerInfo(receiver.ID()))) + _, err := sender.Host().NewStream(ctx, receiver.ID(), defaultProtocolID) + require.NoError(t, err) + }() + } + + unittest.RequireReturnsBefore(t, allStreamsCreated.Wait, 2*time.Second, "could not create streams on time") + require.Len(t, receiver.Host().Network().ConnsToPeer(sender.ID()), 1) + actualNumOfStreams := p2putils.CountStream(sender.Host(), receiver.ID(), p2putils.Protocol(defaultProtocolID), p2putils.Direction(network.DirOutbound)) + require.Equal(t, + expectedNumOfStreams, + int64(actualNumOfStreams), + fmt.Sprintf("expected to create %d number of streams got %d", expectedNumOfStreams, actualNumOfStreams)) +} + +type testPeerLimitConfig struct { + // nodeCount is the number of nodes in the test. + nodeCount int + + // maxInboundPeerStream is the maximum number of inbound streams from a single peer to the receiver. + maxInboundPeerStream int + + // maxInboundStreamProtocol is the maximum number of inbound streams at the receiver using a specific protocol; it accumulates all streams from all senders. + maxInboundStreamProtocol int + + // maxInboundStreamPeerProtocol is the maximum number of inbound streams at the receiver from a single peer using a specific protocol. + maxInboundStreamPeerProtocol int + + // maxInboundStreamTransient is the maximum number of inbound transient streams at the receiver; it accumulates all streams from all senders across all protocols. + // transient streams are those that are not associated fully with a peer and protocol. + maxInboundStreamTransient int + + // maxInboundStreamSystem is the maximum number of inbound streams at the receiver; it accumulates all streams from all senders across all protocols. + maxInboundStreamSystem int + + // unknownProtocol when set to true will cause senders to use an unknown protocol ID when creating streams. + unknownProtocol bool +} + +// maxLimit returns the maximum limit across all limits. +func (t testPeerLimitConfig) maxLimit() int { + max := 0 + if t.maxInboundPeerStream > max && t.maxInboundPeerStream != math.MaxInt { + max = t.maxInboundPeerStream + } + if t.maxInboundStreamProtocol > max && t.maxInboundStreamProtocol != math.MaxInt { + max = t.maxInboundStreamProtocol + } + if t.maxInboundStreamPeerProtocol > max && t.maxInboundStreamPeerProtocol != math.MaxInt { + max = t.maxInboundStreamPeerProtocol + } + if t.maxInboundStreamTransient > max && t.maxInboundStreamTransient != math.MaxInt { + max = t.maxInboundStreamTransient + } + if t.maxInboundStreamSystem > max && t.maxInboundStreamSystem != math.MaxInt { + max = t.maxInboundStreamSystem + } + return max +} + +// baseCreateStreamInboundStreamResourceLimitConfig returns a testPeerLimitConfig with default values. +func baseCreateStreamInboundStreamResourceLimitConfig() *testPeerLimitConfig { + return &testPeerLimitConfig{ + nodeCount: 5, + maxInboundPeerStream: 10, + maxInboundStreamProtocol: 10, + maxInboundStreamPeerProtocol: 10, + maxInboundStreamTransient: 10, + maxInboundStreamSystem: 10, + } +} + +func TestCreateStream_DefaultConfig(t *testing.T) { + testCreateStreamInboundStreamResourceLimits(t, baseCreateStreamInboundStreamResourceLimitConfig()) +} + +func TestCreateStream_MinPeerLimit(t *testing.T) { + base := baseCreateStreamInboundStreamResourceLimitConfig() + base.maxInboundPeerStream = 1 + testCreateStreamInboundStreamResourceLimits(t, base) +} + +func TestCreateStream_MaxPeerLimit(t *testing.T) { + + base := baseCreateStreamInboundStreamResourceLimitConfig() + base.maxInboundPeerStream = math.MaxInt + testCreateStreamInboundStreamResourceLimits(t, base) +} + +func TestCreateStream_MinProtocolLimit(t *testing.T) { + // max inbound protocol is not preserved; can be partially due to count stream not counting inbound streams on a protocol + unittest.SkipUnless(t, unittest.TEST_TODO, "broken test") + base := baseCreateStreamInboundStreamResourceLimitConfig() + base.maxInboundStreamProtocol = 1 + testCreateStreamInboundStreamResourceLimits(t, base) +} + +func TestCreateStream_MaxProtocolLimit(t *testing.T) { + base := baseCreateStreamInboundStreamResourceLimitConfig() + base.maxInboundStreamProtocol = math.MaxInt + testCreateStreamInboundStreamResourceLimits(t, base) +} + +func TestCreateStream_MinPeerProtocolLimit(t *testing.T) { + // max inbound stream peer protocol is not preserved; can be partially due to count stream not counting inbound streams on a protocol + unittest.SkipUnless(t, unittest.TEST_TODO, "broken test") + base := baseCreateStreamInboundStreamResourceLimitConfig() + base.maxInboundStreamPeerProtocol = 1 + testCreateStreamInboundStreamResourceLimits(t, base) +} + +func TestCreateStream_MaxPeerProtocolLimit(t *testing.T) { + base := baseCreateStreamInboundStreamResourceLimitConfig() + base.maxInboundStreamPeerProtocol = math.MaxInt + testCreateStreamInboundStreamResourceLimits(t, base) +} + +func TestCreateStream_MinTransientLimit(t *testing.T) { + base := baseCreateStreamInboundStreamResourceLimitConfig() + base.maxInboundStreamTransient = 1 + testCreateStreamInboundStreamResourceLimits(t, base) +} + +func TestCreateStream_MaxTransientLimit(t *testing.T) { + base := baseCreateStreamInboundStreamResourceLimitConfig() + base.maxInboundStreamTransient = math.MaxInt + testCreateStreamInboundStreamResourceLimits(t, base) +} + +func TestCreateStream_MinSystemLimit(t *testing.T) { + base := baseCreateStreamInboundStreamResourceLimitConfig() + base.maxInboundStreamSystem = 1 + testCreateStreamInboundStreamResourceLimits(t, base) +} + +func TestCreateStream_MaxSystemLimit(t *testing.T) { + // max inbound stream protocol is not preserved; can be partially due to count stream not counting inbound streams on a protocol + unittest.SkipUnless(t, unittest.TEST_TODO, "broken test") + base := baseCreateStreamInboundStreamResourceLimitConfig() + base.maxInboundStreamSystem = math.MaxInt + testCreateStreamInboundStreamResourceLimits(t, base) +} + +func TestCreateStream_DefaultConfigWithUnknownProtocol(t *testing.T) { + // limits are not enforced when using an unknown protocol ID + unittest.SkipUnless(t, unittest.TEST_TODO, "broken test") + base := baseCreateStreamInboundStreamResourceLimitConfig() + base.unknownProtocol = true + testCreateStreamInboundStreamResourceLimits(t, base) +} + +func TestCreateStream_PeerLimitLessThanPeerProtocolLimit(t *testing.T) { + // the case where peer-level limit is lower than the peer-protocol-level limit. + base := baseCreateStreamInboundStreamResourceLimitConfig() + base.maxInboundPeerStream = 5 // each peer can only create 5 streams. + base.maxInboundStreamPeerProtocol = 10 // each peer can create 10 streams on a specific protocol (but should still be limited by the peer-level limit). + testCreateStreamInboundStreamResourceLimits(t, base) +} + +func TestCreateStream_PeerLimitGreaterThanPeerProtocolLimit(t *testing.T) { + // the case where peer-level limit is higher than the peer-protocol-level limit. + // max inbound stream peer protocol is not preserved; can be partially due to count stream not counting inbound streams on a protocol + unittest.SkipUnless(t, unittest.TEST_TODO, "broken test") + base := baseCreateStreamInboundStreamResourceLimitConfig() + base.maxInboundPeerStream = 10 // each peer can create 10 streams. + base.maxInboundStreamPeerProtocol = 5 // each peer can create 5 streams on a specific protocol. + base.maxInboundStreamProtocol = 100 // overall limit is 100 streams on a specific protocol (across all peers). + base.maxInboundStreamTransient = 1000 // overall limit is 1000 transient streams. + base.maxInboundStreamSystem = 1000 // overall limit is 1000 system-wide streams. + testCreateStreamInboundStreamResourceLimits(t, base) +} + +func TestCreateStream_ProtocolLimitLessThanPeerProtocolLimit(t *testing.T) { + // max inbound stream peer protocol is not preserved; can be partially due to count stream not counting inbound streams on a protocol + unittest.SkipUnless(t, + unittest.TEST_TODO, "broken test") + // the case where protocol-level limit is lower than the peer-protocol-level limit. + base := baseCreateStreamInboundStreamResourceLimitConfig() + base.maxInboundStreamProtocol = 5 // each peer can create 5 streams on a specific protocol. + base.maxInboundStreamPeerProtocol = 10 // each peer can create 10 streams on a specific protocol (but should still be limited by the protocol-level limit). + testCreateStreamInboundStreamResourceLimits(t, base) +} + +func TestCreateStream_ProtocolLimitGreaterThanPeerProtocolLimit(t *testing.T) { + // the case where protocol-level limit is higher than the peer-protocol-level limit. + base := baseCreateStreamInboundStreamResourceLimitConfig() + base.maxInboundStreamProtocol = 10 // overall limit is 10 streams on a specific protocol (across all peers). + base.maxInboundStreamPeerProtocol = 5 // each peer can create 5 streams on a specific protocol. + base.maxInboundStreamTransient = 1000 // overall limit is 1000 transient streams. + base.maxInboundStreamSystem = 1000 // overall limit is 1000 system-wide streams. + testCreateStreamInboundStreamResourceLimits(t, base) +} + +func TestCreateStream_TransientLimitLessThanPeerProtocolLimit(t *testing.T) { + // the case where transient-level limit is lower than the peer-protocol-level limit. + base := baseCreateStreamInboundStreamResourceLimitConfig() + base.maxInboundStreamTransient = 5 // overall limit is 5 transient streams (across all peers). + base.maxInboundStreamPeerProtocol = 10 // each peer can create 10 streams on a specific protocol (but should still be limited by the transient-level limit). + testCreateStreamInboundStreamResourceLimits(t, base) +} + +// testCreateStreamInboundStreamResourceLimits tests the inbound stream limits for a given testPeerLimitConfig. It creates +// a number of senders and a single receiver. The receiver will have a resource manager with the given limits. +// The senders will have a resource manager with infinite limits to ensure that they can create as many streams as they want. +// The test will create a number of streams from each sender to the receiver. The test will then check that the limits are +// being enforced correctly. +// The number of streams is determined by the maxLimit() of the testPeerLimitConfig, which is the maximum limit across all limits (peer, protocol, transient, system), excluding +// the math.MaxInt limits. +func testCreateStreamInboundStreamResourceLimits(t *testing.T, cfg *testPeerLimitConfig) { + idProvider := mockmodule.NewIdentityProvider(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) + + sporkID := unittest.IdentifierFixture() + + // sender nodes will have infinite stream limit to ensure that they can create as many streams as they want. + resourceManagerSnd, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(rcmgr.InfiniteLimits)) + require.NoError(t, err) + senders, senderIds := p2ptest.NodesFixture(t, + sporkID, + t.Name(), cfg.nodeCount, + idProvider, + p2ptest.WithResourceManager(resourceManagerSnd), + p2ptest.WithCreateStreamRetryDelay(10*time.Millisecond)) + + // receiver node will run with default limits and no scaling. + limits := rcmgr.DefaultLimits + libp2p.SetDefaultServiceLimits(&limits) + l := limits.Scale(0, 0) + partial := rcmgr.PartialLimitConfig{ + System: rcmgr.ResourceLimits{ + StreamsInbound: rcmgr.LimitVal(cfg.maxInboundStreamSystem), + ConnsInbound: rcmgr.LimitVal(cfg.nodeCount), + }, + Transient: rcmgr.ResourceLimits{ + ConnsInbound: rcmgr.LimitVal(cfg.nodeCount), + StreamsInbound: rcmgr.LimitVal(cfg.maxInboundStreamTransient), + }, + ProtocolDefault: rcmgr.ResourceLimits{ + StreamsInbound: rcmgr.LimitVal(cfg.maxInboundStreamProtocol), + }, + ProtocolPeerDefault: rcmgr.ResourceLimits{ + StreamsInbound: rcmgr.LimitVal(cfg.maxInboundStreamPeerProtocol), + }, + PeerDefault: rcmgr.ResourceLimits{ + StreamsInbound: rcmgr.LimitVal(cfg.maxInboundPeerStream), + }, + Conn: rcmgr.ResourceLimits{ + StreamsInbound: rcmgr.LimitVal(cfg.maxInboundPeerStream), + }, + Stream: rcmgr.ResourceLimits{ + StreamsInbound: rcmgr.LimitVal(cfg.maxInboundPeerStream), + }, + } + l = partial.Build(l) + resourceManagerRcv, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(l)) + require.NoError(t, err) + receiver, id2 := p2ptest.NodeFixture(t, + sporkID, + t.Name(), + idProvider, + p2ptest.WithResourceManager(resourceManagerRcv), + p2ptest.WithCreateStreamRetryDelay(10*time.Millisecond)) + + for i, sender := range senders { + idProvider.On("ByPeerID", sender.ID()).Return(senderIds[i], true).Maybe() + } + idProvider.On("ByPeerID", receiver.ID()).Return(&id2, true).Maybe() + + nodes := append(senders, receiver) + ids := append(senderIds, &id2) + + p2ptest.StartNodes(t, signalerCtx, nodes) + defer p2ptest.StopNodes(t, nodes, cancel) + + p2ptest.LetNodesDiscoverEachOther(t, signalerCtx, nodes, ids) + + var allStreamsCreated sync.WaitGroup + + protocolID := protocols.FlowProtocolID(sporkID) + if cfg.unknownProtocol { + protocolID = protocols.FlowProtocolID(unittest.IdentifierFixture()) + } + + loadLimit := cfg.maxLimit() + require.Greaterf(t, loadLimit, 0, "test limit must be greater than 0; got %d", loadLimit) + + streamListMu := sync.Mutex{} // mutex to protect the streamsList. + streamsList := make([]network.Stream, 0) // list of all streams created to avoid garbage collection. + for sIndex := range senders { + for i := 0; i < loadLimit; i++ { + allStreamsCreated.Add(1) + go func(sIndex int) { + defer allStreamsCreated.Done() + sender := senders[sIndex] + s, err := sender.Host().NewStream(ctx, receiver.ID(), protocolID) + if err != nil { + // we don't care about the error here; as we are trying to break a limit; so we expect some of the stream creations to fail. + return + } + + require.NotNil(t, s) + streamListMu.Lock() + streamsList = append(streamsList, s) + streamListMu.Unlock() + }(sIndex) + } + } + + unittest.RequireReturnsBefore(t, allStreamsCreated.Wait, 2*time.Second, "could not create streams on time") + + // transient sanity-check + require.NoError(t, resourceManagerRcv.ViewTransient(func(scope network.ResourceScope) error { + // number of in-transient streams must be less than or equal to the max transient limit + require.LessOrEqual(t, int64(scope.Stat().NumStreamsInbound), int64(cfg.maxInboundStreamTransient)) + + // number of in-transient streams must be less than or equal the total number of streams created. + require.LessOrEqual(t, int64(scope.Stat().NumStreamsInbound), int64(len(streamsList))) + return nil + })) + + // system-wide limit sanity-check + require.NoError(t, resourceManagerRcv.ViewSystem(func(scope network.ResourceScope) error { + require.LessOrEqual(t, int64(scope.Stat().NumStreamsInbound), int64(cfg.maxInboundStreamSystem), "system-wide limit is not being enforced") + return nil + })) + + totalInboundStreams := 0 + for _, sender := range senders { + actualNumOfStreams := p2putils.CountStream(receiver.Host(), sender.ID(), p2putils.Direction(network.DirInbound)) + // number of inbound streams must be less than or equal to the peer-level limit for each sender. + require.LessOrEqual(t, int64(actualNumOfStreams), int64(cfg.maxInboundPeerStream)) + require.LessOrEqual(t, int64(actualNumOfStreams), int64(cfg.maxInboundStreamPeerProtocol)) + totalInboundStreams += actualNumOfStreams + } + // sanity check; the total number of inbound streams must be less than or equal to the system-wide limit. + // TODO: this must be a hard equal check; but falls short; to be shared with libp2p community. + // Failing at this line means the system-wide limit is not being enforced. + require.LessOrEqual(t, totalInboundStreams, cfg.maxInboundStreamSystem) + // sanity check; the total number of inbound streams must be less than or equal to the protocol-level limit. + require.LessOrEqual(t, totalInboundStreams, cfg.maxInboundStreamProtocol) +} diff --git a/network/p2p/test/fixtures.go b/network/p2p/test/fixtures.go index 19bbe92217d..9e9b8e4c8e8 100644 --- a/network/p2p/test/fixtures.go +++ b/network/p2p/test/fixtures.go @@ -50,13 +50,13 @@ const ( // the test is run in parallel with other tests. Hence, no further increase of the timeout is // expected to be necessary. Any failure to start a node within this timeout is likely to be // caused by a bug in the code. - libp2pNodeStartupTimeout = 5 * time.Second + libp2pNodeStartupTimeout = 10 * time.Second // libp2pNodeStartupTimeout is the timeout for starting a libp2p node in tests. Note that the // timeout has been selected to be large enough to allow for the node to start up on a CI even when // the test is run in parallel with other tests. Hence, no further increase of the timeout is // expected to be necessary. Any failure to start a node within this timeout is likely to be // caused by a bug in the code. - libp2pNodeShutdownTimeout = 5 * time.Second + libp2pNodeShutdownTimeout = 10 * time.Second ) // NetworkingKeyFixtures is a test helper that generates a ECDSA flow key pair. @@ -122,7 +122,9 @@ func NodeFixture( opt(parameters) } - identity := unittest.IdentityFixture(unittest.WithNetworkingKey(parameters.Key.PublicKey()), unittest.WithAddress(parameters.Address), unittest.WithRole(parameters.Role)) + identity := unittest.IdentityFixture(unittest.WithNetworkingKey(parameters.Key.PublicKey()), + unittest.WithAddress(parameters.Address), + unittest.WithRole(parameters.Role)) logger = parameters.Logger.With().Hex("node_id", logging.ID(identity.NodeID)).Logger() @@ -386,6 +388,14 @@ func WithDefaultResourceManager() NodeFixtureParameterOption { } } +// WithResourceManager sets the resource manager to the provided resource manager. +// Otherwise, it uses the resource manager provided by the test (the infinite resource manager). +func WithResourceManager(resourceManager network.ResourceManager) NodeFixtureParameterOption { + return func(p *NodeFixtureParameters) { + p.ResourceManager = resourceManager + } +} + func WithUnicastHandlerFunc(handler network.StreamHandler) NodeFixtureParameterOption { return func(p *NodeFixtureParameters) { p.HandlerFunc = handler @@ -655,7 +665,11 @@ func EnsurePubsubMessageExchange( for i := 0; i < count; i++ { // creates a unique message to be published by the node payload := messageFactory() - outgoingMessageScope, err := message.NewOutgoingScope(flow.IdentifierList{unittest.IdentifierFixture()}, topic, payload, unittest.NetworkCodec().Encode, message.ProtocolTypePubSub) + outgoingMessageScope, err := message.NewOutgoingScope(flow.IdentifierList{unittest.IdentifierFixture()}, + topic, + payload, + unittest.NetworkCodec().Encode, + message.ProtocolTypePubSub) require.NoError(t, err) require.NoError(t, node.Publish(ctx, outgoingMessageScope)) @@ -681,7 +695,14 @@ func EnsurePubsubMessageExchange( // - count: the number of messages to exchange from `sender` to `receiver`. // - messageFactory: a function that creates a unique message to be published by the node. func EnsurePubsubMessageExchangeFromNode( - t *testing.T, ctx context.Context, sender p2p.LibP2PNode, receiverNode p2p.LibP2PNode, receiverIdentifier flow.Identifier, topic channels.Topic, count int, messageFactory func() interface{}, + t *testing.T, + ctx context.Context, + sender p2p.LibP2PNode, + receiverNode p2p.LibP2PNode, + receiverIdentifier flow.Identifier, + topic channels.Topic, + count int, + messageFactory func() interface{}, ) { _, err := sender.Subscribe(topic, validator.TopicValidator(unittest.Logger(), unittest.AllowAllPeerFilter())) require.NoError(t, err) @@ -695,7 +716,11 @@ func EnsurePubsubMessageExchangeFromNode( for i := 0; i < count; i++ { // creates a unique message to be published by the node payload := messageFactory() - outgoingMessageScope, err := message.NewOutgoingScope(flow.IdentifierList{receiverIdentifier}, topic, payload, unittest.NetworkCodec().Encode, message.ProtocolTypePubSub) + outgoingMessageScope, err := message.NewOutgoingScope(flow.IdentifierList{receiverIdentifier}, + topic, + payload, + unittest.NetworkCodec().Encode, + message.ProtocolTypePubSub) require.NoError(t, err) require.NoError(t, sender.Publish(ctx, outgoingMessageScope)) @@ -738,7 +763,14 @@ func EnsureNotConnectedBetweenGroups(t *testing.T, ctx context.Context, groupA [ // - count: the number of messages to exchange from each node. // - messageFactory: a function that creates a unique message to be published by the node. func EnsureNoPubsubMessageExchange( - t *testing.T, ctx context.Context, from []p2p.LibP2PNode, to []p2p.LibP2PNode, toIdentifiers flow.IdentifierList, topic channels.Topic, count int, messageFactory func() interface{}, + t *testing.T, + ctx context.Context, + from []p2p.LibP2PNode, + to []p2p.LibP2PNode, + toIdentifiers flow.IdentifierList, + topic channels.Topic, + count int, + messageFactory func() interface{}, ) { subs := make([]p2p.Subscription, len(to)) tv := validator.TopicValidator(unittest.Logger(), unittest.AllowAllPeerFilter())