Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/protocols/handler: add packet sending condition, prevent send small packets frequently; #1776

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions eth/protocols/eth/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package eth

import (
"math/big"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/gopool"
Expand All @@ -27,7 +28,9 @@ import (
const (
// This is the target size for the packs of transactions or announcements. A
// pack can get larger than this if a single transactions exceeds this size.
maxTxPacketSize = 100 * 1024
maxTxPacketSize = 100 * 1024
minTxPacketSize = 1 * 1024
sendPacketTimeout = 300 * time.Millisecond
)

// blockPropagation is a block propagation event, waiting for its turn in the
Expand Down Expand Up @@ -136,14 +139,15 @@ func (p *Peer) broadcastTransactions() {
// node internals and at the same time rate limits queued data.
func (p *Peer) announceTransactions() {
var (
queue []common.Hash // Queue of hashes to announce as transaction stubs
done chan struct{} // Non-nil if background announcer is running
fail = make(chan error, 1) // Channel used to receive network error
failed bool // Flag whether a send failed, discard everything onward
queue []common.Hash // Queue of hashes to announce as transaction stubs
done chan struct{} // Non-nil if background announcer is running
fail = make(chan error, 1) // Channel used to receive network error
failed bool // Flag whether a send failed, discard everything onward
lastSentTime = time.Now()
)
for {
// If there's no in-flight announce running, check if a new one is needed
if done == nil && len(queue) > 0 {
if done == nil && triggerPacketSending(len(queue)*common.HashLength, lastSentTime) {
// Pile transaction hashes until we reach our allowed network limit
var (
count int
Expand All @@ -170,6 +174,7 @@ func (p *Peer) announceTransactions() {
close(done)
//p.Log().Trace("Sent transaction announcements", "count", len(pending))
})
lastSentTime = time.Now()
}
}
// Transfer goroutine may or may not have been started, listen for events
Expand Down Expand Up @@ -200,3 +205,16 @@ func (p *Peer) announceTransactions() {
}
}
}

// triggerPacketSending if packet reach minTxPacketSize or sendPacketTimeout, it will trigger packet sending
// to prevent only small packets sent frequently in network
func triggerPacketSending(estimateSize int, lastSentTime time.Time) bool {
if estimateSize >= minTxPacketSize {
return true
}

if time.Since(lastSentTime) >= sendPacketTimeout && estimateSize > 0 {
return true
}
return false
}