Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spv: Move getheaders stage to global syncer instead of per-peer #2300

Merged
merged 2 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions p2p/peering.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,11 @@ func (rp *RemotePeer) Err() error {
return rp.err
}

// Done returns a channel that is closed once the peer disconnects.
func (rp *RemotePeer) Done() <-chan struct{} {
return rp.errc
}

// RemoteAddr returns the remote address of the peer's TCP connection.
func (rp *RemotePeer) RemoteAddr() net.Addr {
return rp.c.RemoteAddr()
Expand Down
11 changes: 11 additions & 0 deletions spv/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,17 @@ var _ wallet.NetworkBackend = (*Syncer)(nil)

func pickAny(*p2p.RemotePeer) bool { return true }

// pickForGetHeaders returns a function to use in waitForRemotes which selects
// peers that may have headers that are more recent than the passed tipHeight.
func pickForGetHeaders(tipHeight int32) func(rp *p2p.RemotePeer) bool {
return func(rp *p2p.RemotePeer) bool {
// We are interested in this peer's headers if they announced a
// height greater than the current tip height and if we haven't
// yet fetched all the headers that it announced.
return rp.InitialHeight() > tipHeight && rp.LastHeight() < rp.InitialHeight()
}
}

// Blocks implements the Blocks method of the wallet.Peer interface.
func (s *Syncer) Blocks(ctx context.Context, blockHashes []*chainhash.Hash) ([]*wire.MsgBlock, error) {
for {
Expand Down
182 changes: 81 additions & 101 deletions spv/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ type Syncer struct {
remoteAvailable chan struct{}
remotesMu sync.Mutex

// missingCfiltersFetched is closed when the wallet's missing cfilters
// have been fetched.
missingCfiltersFetched chan struct{}
// headersFetched is closed when the initial getheaders loop has
// finished fetching headers.
headersFetched chan struct{}

// Data filters
//
Expand All @@ -75,10 +75,6 @@ type Syncer struct {
sidechains wallet.SidechainForest
sidechainMu sync.Mutex

currentLocators []*chainhash.Hash
locatorGeneration uint
locatorMu sync.Mutex

// Holds all potential callbacks used to notify clients
notifications *Notifications

Expand Down Expand Up @@ -131,7 +127,7 @@ func NewSyncer(w *wallet.Wallet, lp *p2p.LocalPeer) *Syncer {
lp: lp,
mempoolAdds: make(chan *chainhash.Hash),

missingCfiltersFetched: make(chan struct{}),
headersFetched: make(chan struct{}),
}
}

Expand Down Expand Up @@ -333,12 +329,6 @@ func (s *Syncer) Run(ctx context.Context) error {
log.Infof("Transactions synced through block %v height %d", &tipHash, tipHeight)
}

locators, err := s.wallet.BlockLocators(ctx, nil)
if err != nil {
return err
}
s.currentLocators = locators

s.lp.AddrManager().Start()
defer func() {
err := s.lp.AddrManager().Stop()
Expand Down Expand Up @@ -389,7 +379,19 @@ func (s *Syncer) Run(ctx context.Context) error {
}
s.fetchMissingCfiltersFinished()
log.Debugf("Fetched all missing cfilters")
close(s.missingCfiltersFetched)

// Next: fetch headers and cfilters up to mainchain tip.
s.fetchHeadersStart()
log.Debugf("Fetching headers and CFilters...")
err = s.getHeaders(ctx)
if err != nil {
return err
}
s.fetchHeadersFinished()

// Signal that startup fetching of headers has completed.
close(s.headersFetched)

return nil
})

Expand Down Expand Up @@ -1287,13 +1289,6 @@ func (s *Syncer) handleBlockAnnouncements(ctx context.Context, rp *p2p.RemotePee
return err
}

if len(bestChain) != 0 {
s.locatorMu.Lock()
s.currentLocators = nil
s.locatorGeneration++
s.locatorMu.Unlock()
}

// Log connected blocks.
for _, n := range bestChain {
log.Infof("Connected block %v, height %d, %d wallet transaction(s)",
Expand Down Expand Up @@ -1343,55 +1338,63 @@ func (s *Syncer) disconnectStragglers(height int32) {
// locators.
var hashStop chainhash.Hash

// getHeaders iteratively fetches headers from rp using the latest locators.
// Returns when no more headers are available. A sendheaders message is pushed
// to the peer when there are no more headers to fetch.
func (s *Syncer) getHeaders(ctx context.Context, rp *p2p.RemotePeer) error {
var locators []*chainhash.Hash
var generation uint
var err error
s.locatorMu.Lock()
locators = s.currentLocators
generation = s.locatorGeneration
if len(locators) == 0 {
locators, err = s.wallet.BlockLocators(ctx, nil)
// getHeaders fetches headers from peers until the wallet is up to date with
// all connected peers. This is part of the startup sync process.
func (s *Syncer) getHeaders(ctx context.Context) error {

cnet := s.wallet.ChainParams().Net

startTime := time.Now()

nextbatch:
for ctx.Err() == nil {
tipHash, tipHeight := s.wallet.MainChainTip(ctx)

// Determine if there are any peers from which to request newer
// headers.
rp, err := s.waitForRemote(ctx, pickForGetHeaders(tipHeight), false)
if err != nil {
s.locatorMu.Unlock()
return err
}
s.currentLocators = locators
s.locatorGeneration++
}
s.locatorMu.Unlock()

var lastHeight int32
cnet := s.wallet.ChainParams().Net
if rp == nil {
// All done.
log.Infof("Initial sync to block %s at height %d completed in %s",
tipHash, tipHeight, time.Since(startTime).Round(time.Second))
return nil
}

for {
headers, err := rp.Headers(ctx, locators, &hashStop)
// Request headers from the selected peer.
locators, err := s.wallet.BlockLocators(ctx, nil)
if err != nil {
return err
}
headers, err := rp.Headers(ctx, locators, &hashStop)
if err != nil {
continue nextbatch
}

if len(headers) == 0 {
// Ensure that the peer provided headers through the height
// advertised during handshake.
if lastHeight < rp.InitialHeight() {
if rp.LastHeight() < rp.InitialHeight() {
// Peer may not have provided any headers if our own locators
// were up to date. Compare the best locator hash with the
// advertised height.
h, err := s.wallet.BlockHeader(ctx, locators[0])
if err == nil && int32(h.Height) < rp.InitialHeight() {
return errors.E(errors.Protocol, "peer did not provide "+
err := errors.E(errors.Protocol, "peer did not provide "+
"headers through advertised height")
rp.Disconnect(err)
continue nextbatch
}
}

return rp.SendHeaders(ctx)
// Try to pick a different peer with a higher advertised
// height or check there are no such peers (thus we're
// done with fetching headers for initial sync).
continue nextbatch
}

lastHeight = int32(headers[len(headers)-1].Height)

nodes := make([]*wallet.BlockNode, len(headers))
for i := range headers {
// Determine the hash of the header. It is safe to use
Expand Down Expand Up @@ -1423,7 +1426,11 @@ func (s *Syncer) getHeaders(ctx context.Context, rp *p2p.RemotePeer) error {
if err != nil {
s.sidechainMu.Unlock()
rp.Disconnect(err)
return err
if !errors.Is(err, context.Canceled) {
log.Warnf("Disconnecting from %v due to header "+
"validation error: %v", rp, err)
}
continue nextbatch
}
s.sidechainMu.Unlock()

Expand All @@ -1449,7 +1456,8 @@ func (s *Syncer) getHeaders(ctx context.Context, rp *p2p.RemotePeer) error {
}
err = g.Wait()
if err != nil {
return err
rp.Disconnect(err)
continue nextbatch
}

var added int
Expand All @@ -1466,22 +1474,7 @@ func (s *Syncer) getHeaders(ctx context.Context, rp *p2p.RemotePeer) error {
if added == 0 {
s.sidechainMu.Unlock()

s.locatorMu.Lock()
if s.locatorGeneration > generation {
locators = s.currentLocators
}
if len(locators) == 0 {
locators, err = s.wallet.BlockLocators(ctx, nil)
if err != nil {
s.locatorMu.Unlock()
return err
}
s.currentLocators = locators
s.locatorGeneration++
generation = s.locatorGeneration
}
s.locatorMu.Unlock()
continue
continue nextbatch
}
s.fetchHeadersProgress(headers[len(headers)-1])
log.Debugf("Fetched %d new header(s) ending at height %d from %v",
Expand All @@ -1490,17 +1483,19 @@ func (s *Syncer) getHeaders(ctx context.Context, rp *p2p.RemotePeer) error {
bestChain, err := s.wallet.EvaluateBestChain(ctx, &s.sidechains)
if err != nil {
s.sidechainMu.Unlock()
return err
rp.Disconnect(err)
continue nextbatch
}
if len(bestChain) == 0 {
s.sidechainMu.Unlock()
jrick marked this conversation as resolved.
Show resolved Hide resolved
continue
continue nextbatch
}

prevChain, err := s.wallet.ChainSwitch(ctx, &s.sidechains, bestChain, nil)
if err != nil {
s.sidechainMu.Unlock()
return err
rp.Disconnect(err)
continue nextbatch
}

if len(prevChain) != 0 {
Expand All @@ -1523,38 +1518,22 @@ func (s *Syncer) getHeaders(ctx context.Context, rp *p2p.RemotePeer) error {
// Peers should not be significantly behind the new tip.
s.setRequiredHeight(int32(tip.Header.Height))
s.disconnectStragglers(int32(tip.Header.Height))

// Generate new locators
s.locatorMu.Lock()
locators, err = s.wallet.BlockLocators(ctx, nil)
if err != nil {
s.locatorMu.Unlock()
return err
}
s.currentLocators = locators
s.locatorGeneration++
s.locatorMu.Unlock()
}

return ctx.Err()
}

func (s *Syncer) startupSync(ctx context.Context, rp *p2p.RemotePeer) error {
// Continue with fetching headers only after missing cfilters have
// been fetched.

select {
case <-ctx.Done():
return ctx.Err()
case <-s.missingCfiltersFetched:
}

// Fetch any unseen headers from the peer.
s.fetchHeadersStart()
log.Debugf("Fetching headers from %v", rp.RemoteAddr())
err := s.getHeaders(ctx, rp)
if err != nil {
return err
case <-s.headersFetched:
case <-rp.Done():
return rp.Err()
}
s.fetchHeadersFinished()

var err error
if atomic.CompareAndSwapUint32(&s.atomicCatchUpTryLock, 0, 1) {
err = func() error {
rescanPoint, err := s.wallet.RescanPoint(ctx)
Expand Down Expand Up @@ -1631,15 +1610,16 @@ func (s *Syncer) startupSync(ctx context.Context, rp *p2p.RemotePeer) error {
log.Errorf("Cannot load unmined transactions for resending: %v", err)
return nil
}
if len(unminedTxs) == 0 {
return nil
}
err = rp.PublishTransactions(ctx, unminedTxs...)
if err != nil {
// TODO: Transactions should be removed if this is a double spend.
log.Errorf("Failed to resent one or more unmined transactions: %v", err)
if len(unminedTxs) > 0 {
err = rp.PublishTransactions(ctx, unminedTxs...)
if err != nil {
// TODO: Transactions should be removed if this is a double spend.
log.Errorf("Failed to resent one or more unmined transactions: %v", err)
}
}
return nil

// Ask peer to send any new headers.
return rp.SendHeaders(ctx)
}

// handleMempool handles eviction from the local mempool of non-wallet-backed
Expand Down
Loading