Skip to content

Commit

Permalink
eth/protocols/eth: add JustifiedNumber into StatusPacket
Browse files Browse the repository at this point in the history
  • Loading branch information
buddh0 committed Sep 13, 2024
1 parent 282aee5 commit cbb9f3e
Show file tree
Hide file tree
Showing 15 changed files with 157 additions and 69 deletions.
2 changes: 1 addition & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ func (s *Ethereum) ArchiveMode() bool { return s.config.NoPruni
func (s *Ethereum) BloomIndexer() *core.ChainIndexer { return s.bloomIndexer }
func (s *Ethereum) Merger() *consensus.Merger { return s.merger }
func (s *Ethereum) SyncMode() downloader.SyncMode {
mode, _ := s.handler.chainSync.modeAndLocalHead()
mode, _, _ := s.handler.chainSync.modeAndLocalHead()
return mode
}

Expand Down
2 changes: 1 addition & 1 deletion eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ func (d *Downloader) fetchHead(p *peerConnection) (head *types.Header, pivot *ty
mode := d.getMode()

// Request the advertised remote head block and wait for the response
latest, _ := p.peer.Head()
latest, _, _ := p.peer.Head()
fetch := 1
if mode == SnapSync {
fetch = 2 // head + pivot headers
Expand Down
9 changes: 5 additions & 4 deletions eth/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,12 @@ type downloadTesterPeer struct {
func (dlp *downloadTesterPeer) MarkLagging() {
}

// Head constructs a function to retrieve a peer's current head hash
// and total difficulty.
func (dlp *downloadTesterPeer) Head() (common.Hash, *big.Int) {
// Head constructs a function to retrieve a peer's current head hash,
// justifiedNumber and total difficulty.
func (dlp *downloadTesterPeer) Head() (common.Hash, *uint64, *big.Int) {
head := dlp.chain.CurrentBlock()
return head.Hash(), dlp.chain.GetTd(head.Hash(), head.Number.Uint64())
justifiedNumber := dlp.chain.GetJustifiedNumber(head)
return head.Hash(), &justifiedNumber, dlp.chain.GetTd(head.Hash(), head.Number.Uint64())
}

func unmarshalRlpHeaders(rlpdata []rlp.RawValue) []*types.Header {
Expand Down
2 changes: 1 addition & 1 deletion eth/downloader/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type peerConnection struct {

// Peer encapsulates the methods required to synchronise with a remote full peer.
type Peer interface {
Head() (common.Hash, *big.Int)
Head() (common.Hash, *uint64, *big.Int)
MarkLagging()
RequestHeadersByHash(common.Hash, int, int, bool, chan *eth.Response) (*eth.Request, error)
RequestHeadersByNumber(uint64, int, int, bool, chan *eth.Response) (*eth.Request, error)
Expand Down
15 changes: 8 additions & 7 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,14 +436,15 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {

// Execute the Ethereum handshake
var (
genesis = h.chain.Genesis()
head = h.chain.CurrentHeader()
hash = head.Hash()
number = head.Number.Uint64()
td = h.chain.GetTd(hash, number)
genesis = h.chain.Genesis()
head = h.chain.CurrentHeader()
justifiedNumber = h.chain.GetJustifiedNumber(head)
hash = head.Hash()
number = head.Number.Uint64()
td = h.chain.GetTd(hash, number)
)
forkID := forkid.NewID(h.chain.Config(), genesis, number, head.Time)
if err := peer.Handshake(h.networkID, td, hash, genesis.Hash(), forkID, h.forkFilter, &eth.UpgradeStatusExtension{DisablePeerTxBroadcast: h.disablePeerTxBroadcast}); err != nil {
if err := peer.Handshake(h.networkID, justifiedNumber, td, hash, genesis.Hash(), forkID, h.forkFilter, &eth.UpgradeStatusExtension{DisablePeerTxBroadcast: h.disablePeerTxBroadcast}); err != nil {
peer.Log().Debug("Ethereum handshake failed", "err", err)
return err
}
Expand Down Expand Up @@ -920,7 +921,7 @@ func (h *handler) BroadcastVote(vote *types.VoteEnvelope) {
headBlock := h.chain.CurrentBlock()
currentTD := h.chain.GetTd(headBlock.Hash(), headBlock.Number.Uint64())
for _, peer := range peers {
_, peerTD := peer.Head()
_, _, peerTD := peer.Head()
deltaTD := new(big.Int).Abs(new(big.Int).Sub(currentTD, peerTD))
if deltaTD.Cmp(big.NewInt(deltaTdThreshold)) < 1 && peer.bscExt != nil {
voteMap[peer] = vote
Expand Down
4 changes: 2 additions & 2 deletions eth/handler_bsc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func testSendVotes(t *testing.T, protocol uint) {
td = handler.chain.GetTd(head.Hash(), head.Number.Uint64())
)
time.Sleep(200 * time.Millisecond)
if err := remoteEth.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
if err := remoteEth.Handshake(1, 0, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
t.Fatalf("failed to run protocol handshake: %d", err)
}
// After the handshake completes, the source handler should stream the sink
Expand Down Expand Up @@ -227,7 +227,7 @@ func testRecvVotes(t *testing.T, protocol uint) {
td = handler.chain.GetTd(head.Hash(), head.Number.Uint64())
)
time.Sleep(200 * time.Millisecond)
if err := remoteEth.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
if err := remoteEth.Handshake(1, 0, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
t.Fatalf("failed to run protocol handshake: %d", err)
}

Expand Down
38 changes: 32 additions & 6 deletions eth/handler_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
)

Expand Down Expand Up @@ -135,13 +136,38 @@ func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, packet *eth.NewBlockPa
// Assuming the block is importable by the peer, but possibly not yet done so,
// calculate the head hash and TD that the peer truly must have.
var (
trueHead = block.ParentHash()
trueTD = new(big.Int).Sub(td, block.Difficulty())
trueHead = block.ParentHash()
trueJustifiedNumber *uint64
trueTD = new(big.Int).Sub(td, block.Difficulty())
)
// Update the peer's total difficulty if better than the previous
if _, td := peer.Head(); trueTD.Cmp(td) > 0 {
peer.SetHead(trueHead, trueTD)
h.chainSync.handlePeerEvent()
if trueHeadHeader := h.chain.GetHeaderByHash(trueHead); trueHeadHeader != nil {
// If the trueHeadHeader is not found in the local chain, GetJustifiedNumber will return 0.
// Ignore cases where GetJustifiedNumber actually returns 0, as this only occurs in a self-test environment.
if tmp := h.chain.GetJustifiedNumber(trueHeadHeader); tmp != 0 {
trueJustifiedNumber = &tmp
}
}
// Update the peer's justifiedNumber and total difficulty if better than the previous
if _, justifiedNumber, td := peer.Head(); trueJustifiedNumber != nil && justifiedNumber != nil {
if *trueJustifiedNumber > *justifiedNumber ||
(*trueJustifiedNumber == *justifiedNumber && trueTD.Cmp(td) > 0) {
peer.SetHead(trueHead, trueJustifiedNumber, trueTD)
h.chainSync.handlePeerEvent()
log.Trace("handleBlockBroadcast|SetHead", "justifiedNumber", *justifiedNumber, "td", td.Uint64(), "trueHead", trueHead, "trueJustifiedNumber", *trueJustifiedNumber, "trueTD", trueTD.Uint64())
}
} else {
// back to behavior without fast finality
if trueTD.Cmp(td) > 0 {
peer.SetHead(trueHead, trueJustifiedNumber, trueTD)
h.chainSync.handlePeerEvent()

if trueJustifiedNumber != nil {
log.Trace("handleBlockBroadcast|SetHead", "trueHead", trueHead, "trueJustifiedNumber", nil, "trueTD", trueTD.Uint64(), "td", td.Uint64())
} else {
log.Trace("handleBlockBroadcast|SetHead", "trueHead", trueHead, "trueJustifiedNumber", *trueJustifiedNumber, "trueTD", trueTD.Uint64(), "td", td.Uint64())
}
}
}

return nil
}
10 changes: 5 additions & 5 deletions eth/handler_eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func testRecvTransactions(t *testing.T, protocol uint) {
head = handler.chain.CurrentBlock()
td = handler.chain.GetTd(head.Hash(), head.Number.Uint64())
)
if err := src.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
if err := src.Handshake(1, 0, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
t.Fatalf("failed to run protocol handshake")
}
// Send the transaction to the sink and verify that it's added to the tx pool
Expand Down Expand Up @@ -419,7 +419,7 @@ func testSendTransactions(t *testing.T, protocol uint) {
head = handler.chain.CurrentBlock()
td = handler.chain.GetTd(head.Hash(), head.Number.Uint64())
)
if err := sink.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
if err := sink.Handshake(1, 0, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
t.Fatalf("failed to run protocol handshake")
}
// After the handshake completes, the source handler should stream the sink
Expand Down Expand Up @@ -636,7 +636,7 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) {
// Wait a bit for the above handlers to start
time.Sleep(100 * time.Millisecond)

if err := sinkPeer.Handshake(1, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain), nil); err != nil {
if err := sinkPeer.Handshake(1, 0, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain), nil); err != nil {
t.Fatalf("failed to run protocol handshake")
}
go eth.Handle(sink, sinkPeer)
Expand Down Expand Up @@ -709,7 +709,7 @@ func testBroadcastMalformedBlock(t *testing.T, protocol uint) {
genesis = source.chain.Genesis()
td = source.chain.GetTd(genesis.Hash(), genesis.NumberU64())
)
if err := sink.Handshake(1, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain), nil); err != nil {
if err := sink.Handshake(1, 0, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain), nil); err != nil {
t.Fatalf("failed to run protocol handshake")
}
// After the handshake completes, the source handler should stream the sink
Expand Down Expand Up @@ -811,7 +811,7 @@ func TestOptionMaxPeersPerIP(t *testing.T) {
t.Errorf("current num is %d, maxPeersPerIP is %d, but failed:%s", num, maxPeersPerIP, err)
}(tryNum)

if err := src.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
if err := src.Handshake(1, 0, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
t.Fatalf("failed to run protocol handshake")
}
// make sure runEthPeer execute one by one.
Expand Down
50 changes: 42 additions & 8 deletions eth/peerset.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ import (
"sync"
"time"

"github.com/cometbft/cometbft/libs/rand"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/eth/protocols/bsc"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/eth/protocols/snap"
"github.com/ethereum/go-ethereum/eth/protocols/trust"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
)

Expand Down Expand Up @@ -515,24 +517,56 @@ func (ps *peerSet) snapLen() int {
return ps.snapPeers
}

// peerWithHighestTD retrieves the known peer with the currently highest total
// difficulty, but below the given PoS switchover threshold.
func (ps *peerSet) peerWithHighestTD() *eth.Peer {
// peerWithHighestHead retrieves the known peer with the currently highest head
func (ps *peerSet) peerWithHighestHead() *eth.Peer {
ps.lock.RLock()
defer ps.lock.RUnlock()

var (
bestPeer *eth.Peer
bestTd *big.Int
)
var knowJustifiedPeers, notKnowJustifiedPeers []*ethPeer
for _, p := range ps.peers {
if p.Lagging() {
continue
}
if _, td := p.Head(); bestPeer == nil || td.Cmp(bestTd) > 0 {
if _, justifiedNumber, td := p.Head(); justifiedNumber != nil {
knowJustifiedPeers = append(knowJustifiedPeers, p)
log.Trace("peerWithHighestHead", "id", p.Peer.ID(), "justifiedNumber", *justifiedNumber, "td", td.Uint64())
} else {
notKnowJustifiedPeers = append(notKnowJustifiedPeers, p)
log.Trace("peerWithHighestHead", "id", p.Peer.ID(), "td", td.Uint64())
}
}

var (
bestPeer *eth.Peer
bestJustified *uint64
bestTd *big.Int
randUint = rand.Uint()
)
for _, p := range knowJustifiedPeers {
_, justifiedNumber, td := p.Head()
if bestPeer == nil {
bestPeer, bestJustified, bestTd = p.Peer, justifiedNumber, td
} else if *justifiedNumber > *bestJustified {
bestPeer, bestJustified = p.Peer, justifiedNumber
if td.Cmp(bestTd) > 0 {
bestTd = td // may be not equal `to bestPeer.td`
}
} else if *justifiedNumber == *bestJustified {
if td.Cmp(bestTd) > 0 || (td.Cmp(bestTd) == 0 && randUint%2 == 0) {
bestPeer, bestTd = p.Peer, td
}
}
}
// if some nodes does not have justified number, back to behavior without fast finality
for _, p := range notKnowJustifiedPeers {
if _, _, td := p.Head(); bestPeer == nil || td.Cmp(bestTd) > 0 || (td.Cmp(bestTd) == 0 && randUint%2 == 0) {
bestPeer, bestTd = p.Peer, td
}
}
if bestPeer != nil {
log.Trace("peerWithHighestHead|selected", "id", bestPeer.Peer.ID(), "td", bestTd.Uint64())
}

return bestPeer
}

Expand Down
12 changes: 9 additions & 3 deletions eth/protocols/eth/handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ const (
)

// Handshake executes the eth protocol handshake, negotiating version number,
// network IDs, difficulties, head and genesis blocks.
func (p *Peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis common.Hash, forkID forkid.ID, forkFilter forkid.Filter, extension *UpgradeStatusExtension) error {
// network IDs, justifiedNumber, difficulties, head and genesis blocks.
func (p *Peer) Handshake(network uint64, justifiedNumber uint64, td *big.Int, head common.Hash, genesis common.Hash, forkID forkid.ID, forkFilter forkid.Filter, extension *UpgradeStatusExtension) error {
// Send out own handshake in a new thread
errc := make(chan error, 2)

Expand All @@ -51,6 +51,8 @@ func (p *Peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis
Head: head,
Genesis: genesis,
ForkID: forkID,
// Step 2
// JustifiedNumber: &justifiedNumber,
})
})
gopool.Submit(func() {
Expand All @@ -70,7 +72,11 @@ func (p *Peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis
return p2p.DiscReadTimeout
}
}
p.td, p.head = status.TD, status.Head
p.td, p.justifiedNumber, p.head = status.TD, status.JustifiedNumber, status.Head
// Step 3
// if p.justifiedNumber == nil {
// return errors.New("nil justifiedNumber when Handshake")
// }

if p.version >= ETH68 {
var upgradeStatus UpgradeStatusPacket // safe to read after two values have been received from errc
Expand Down
10 changes: 5 additions & 5 deletions eth/protocols/eth/handshake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,19 @@ func testHandshake(t *testing.T, protocol uint) {
want: errNoStatusMsg,
},
{
code: StatusMsg, data: StatusPacket{10, 1, td, head.Hash(), genesis.Hash(), forkID},
code: StatusMsg, data: StatusPacket{10, 1, td, head.Hash(), genesis.Hash(), forkID, nil},
want: errProtocolVersionMismatch,
},
{
code: StatusMsg, data: StatusPacket{uint32(protocol), 999, td, head.Hash(), genesis.Hash(), forkID},
code: StatusMsg, data: StatusPacket{uint32(protocol), 999, td, head.Hash(), genesis.Hash(), forkID, nil},
want: errNetworkIDMismatch,
},
{
code: StatusMsg, data: StatusPacket{uint32(protocol), 1, td, head.Hash(), common.Hash{3}, forkID},
code: StatusMsg, data: StatusPacket{uint32(protocol), 1, td, head.Hash(), common.Hash{3}, forkID, nil},
want: errGenesisMismatch,
},
{
code: StatusMsg, data: StatusPacket{uint32(protocol), 1, td, head.Hash(), genesis.Hash(), forkid.ID{Hash: [4]byte{0x00, 0x01, 0x02, 0x03}}},
code: StatusMsg, data: StatusPacket{uint32(protocol), 1, td, head.Hash(), genesis.Hash(), forkid.ID{Hash: [4]byte{0x00, 0x01, 0x02, 0x03}}, nil},
want: errForkIDRejected,
},
}
Expand All @@ -80,7 +80,7 @@ func testHandshake(t *testing.T, protocol uint) {
// Send the junk test with one peer, check the handshake failure
go p2p.Send(app, test.code, test.data)

err := peer.Handshake(1, td, head.Hash(), genesis.Hash(), forkID, forkid.NewFilter(backend.chain), nil)
err := peer.Handshake(1, 0, td, head.Hash(), genesis.Hash(), forkID, forkid.NewFilter(backend.chain), nil)
if err == nil {
t.Errorf("test %d: protocol returned nil error, want %q", i, test.want)
} else if !errors.Is(err, test.want) {
Expand Down
26 changes: 18 additions & 8 deletions eth/protocols/eth/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,10 @@ type Peer struct {
version uint // Protocol version negotiated
statusExtension *UpgradeStatusExtension

lagging bool // lagging peer is still connected, but won't be used to sync.
head common.Hash // Latest advertised head block hash
td *big.Int // Latest advertised head block total difficulty
lagging bool // lagging peer is still connected, but won't be used to sync.
head common.Hash // Latest advertised head block hash
justifiedNumber *uint64 // Latest advertised justified block number
td *big.Int // Latest advertised head block total difficulty

knownBlocks *knownCache // Set of block hashes known to be known by this peer
queuedBlocks chan *blockPropagation // Queue of blocks to broadcast to the peer
Expand Down Expand Up @@ -160,21 +161,30 @@ func (p *Peer) MarkLagging() {
p.lagging = true
}

// Head retrieves the current head hash and total difficulty of the peer.
func (p *Peer) Head() (hash common.Hash, td *big.Int) {
// Head retrieves the current head hash, justifiedNumber and total difficulty of the peer.
func (p *Peer) Head() (hash common.Hash, justifiedNumber *uint64, td *big.Int) {
p.lock.RLock()
defer p.lock.RUnlock()

copy(hash[:], p.head[:])
return hash, new(big.Int).Set(p.td)
if p.justifiedNumber != nil {
tmp := *p.justifiedNumber
justifiedNumber = &tmp
}
return hash, justifiedNumber, new(big.Int).Set(p.td)
}

// SetHead updates the head hash and total difficulty of the peer.
func (p *Peer) SetHead(hash common.Hash, td *big.Int) {
// SetHead updates the head hash, justifiedNumber and total difficulty of the peer.
func (p *Peer) SetHead(hash common.Hash, justifiedNumber *uint64, td *big.Int) {
p.lock.Lock()
defer p.lock.Unlock()
p.lagging = false
copy(p.head[:], hash[:])
p.justifiedNumber = nil
if justifiedNumber != nil {
tmp := *justifiedNumber
p.justifiedNumber = &tmp
}
p.td.Set(td)
}

Expand Down
Loading

0 comments on commit cbb9f3e

Please sign in to comment.