Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move initial rescan to global syncer loop #2301

Merged
merged 4 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 5 additions & 27 deletions spv/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,13 @@ func (s *Syncer) Blocks(ctx context.Context, blockHashes []*chainhash.Hash) ([]*
if err := ctx.Err(); err != nil {
return nil, err
}
rp, err := s.pickRemote(pickAny)
rp, err := s.waitForRemote(ctx, pickAny, true)
if err != nil {
return nil, err
}
blocks, err := rp.Blocks(ctx, blockHashes)
if err != nil {
log.Debugf("unable to fetch blocks from %v: %v", rp, err)
continue
}
return blocks, nil
Expand Down Expand Up @@ -88,24 +89,6 @@ func (s *Syncer) CFiltersV2(ctx context.Context, blockHashes []*chainhash.Hash)
}
}

// Headers implements the Headers method of the wallet.Peer interface.
func (s *Syncer) Headers(ctx context.Context, blockLocators []*chainhash.Hash, hashStop *chainhash.Hash) ([]*wire.BlockHeader, error) {
for {
if err := ctx.Err(); err != nil {
return nil, err
}
rp, err := s.pickRemote(pickAny)
if err != nil {
return nil, err
}
hs, err := rp.Headers(ctx, blockLocators, hashStop)
if err != nil {
continue
}
return hs, nil
}
}

func (s *Syncer) String() string {
// This method is part of the wallet.Peer interface and will typically
// specify the remote address of the peer. Since the syncer can encompass
Expand Down Expand Up @@ -242,23 +225,18 @@ FilterLoop:
wg.Wait()

if len(fmatches) != 0 {
var rp *p2p.RemotePeer
PickPeer:
for {
if err := ctx.Err(); err != nil {
return err
}
if rp == nil {
var err error
rp, err = s.pickRemote(pickAny)
if err != nil {
return err
}
rp, err := s.waitForRemote(ctx, pickAny, true)
if err != nil {
return err
}

blocks, err := rp.Blocks(ctx, fmatches)
if err != nil {
rp = nil
continue PickPeer
}

Expand Down
180 changes: 80 additions & 100 deletions spv/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,14 @@ const reqSvcs = wire.SFNodeNetwork
// protocol using Simplified Payment Verification (SPV) with compact filters.
type Syncer struct {
// atomics
atomicCatchUpTryLock uint32 // CAS (entered=1) to perform discovery/rescan
atomicWalletSynced uint32 // CAS (synced=1) when wallet syncing complete
atomicWalletSynced uint32 // CAS (synced=1) when wallet syncing complete

wallet *wallet.Wallet
lp *p2p.LocalPeer

// Protected by atomicCatchUpTryLock
// discoverAccounts is true if the initial sync should perform account
// discovery. Only used during initial sync.
discoverAccounts bool
loadedFilters bool

persistentPeers []string

Expand All @@ -53,9 +52,8 @@ type Syncer struct {
remoteAvailable chan struct{}
remotesMu sync.Mutex

// headersFetched is closed when the initial getheaders loop has
// finished fetching headers.
headersFetched chan struct{}
// initialSyncDone is closed when the initial sync loop has finished.
initialSyncDone chan struct{}

// Data filters
//
Expand Down Expand Up @@ -126,8 +124,7 @@ func NewSyncer(w *wallet.Wallet, lp *p2p.LocalPeer) *Syncer {
seenTxs: lru.NewCache[chainhash.Hash](2000),
lp: lp,
mempoolAdds: make(chan *chainhash.Hash),

headersFetched: make(chan struct{}),
initialSyncDone: make(chan struct{}),
}
}

Expand Down Expand Up @@ -191,16 +188,6 @@ func (s *Syncer) GetRemotePeers() map[string]*p2p.RemotePeer {
return remotes
}

// unsynced checks the atomic that controls wallet syncness and if previously
// synced, updates to unsynced and notifies the callback, if set.
func (s *Syncer) unsynced() {
if atomic.CompareAndSwapUint32(&s.atomicWalletSynced, 1, 0) &&
s.notifications != nil &&
s.notifications.Synced != nil {
s.notifications.Synced(false)
}
}

// peerConnected updates the notification for peer count, if set.
func (s *Syncer) peerConnected(remotesCount int, addr string) {
if s.notifications != nil && s.notifications.PeerConnected != nil {
Expand Down Expand Up @@ -389,9 +376,14 @@ func (s *Syncer) Run(ctx context.Context) error {
}
s.fetchHeadersFinished()

// Signal that startup fetching of headers has completed.
close(s.headersFetched)
// Finally: Perform the initial rescan over the received blocks.
err = s.initialSyncRescan(ctx)
if err != nil {
return err
}

// Signal that the initial sync has completed.
close(s.initialSyncDone)
return nil
})

Expand Down Expand Up @@ -479,7 +471,7 @@ func (s *Syncer) connectAndRunPeer(ctx context.Context, raddr string) {
}()

// Perform peer startup.
err = s.startupSync(ctx, rp)
err = s.peerStartup(ctx, rp)
if err != nil {
if !errors.Is(err, context.Canceled) {
log.Warnf("Unable to complete startup sync with peer %v: %v", raddr, err)
Expand Down Expand Up @@ -562,18 +554,6 @@ func (s *Syncer) forRemotes(f func(rp *p2p.RemotePeer) error) error {
return nil
}

func (s *Syncer) pickRemote(pick func(*p2p.RemotePeer) bool) (*p2p.RemotePeer, error) {
defer s.remotesMu.Unlock()
s.remotesMu.Lock()

for _, rp := range s.remotes {
if pick(rp) {
return rp, nil
}
}
return nil, errors.E(errors.NoPeers)
}

// waitForAnyRemote blocks until there is one or more remote peers available or
// the context is canceled.
//
Expand Down Expand Up @@ -1523,83 +1503,83 @@ nextbatch:
return ctx.Err()
}

func (s *Syncer) startupSync(ctx context.Context, rp *p2p.RemotePeer) error {

select {
case <-ctx.Done():
return ctx.Err()
case <-s.headersFetched:
case <-rp.Done():
return rp.Err()
// initialSyncRescan performs account and address discovery and rescans blocks
// during the initial syncer operation.
func (s *Syncer) initialSyncRescan(ctx context.Context) error {
rescanPoint, err := s.wallet.RescanPoint(ctx)
if err != nil {
return err
}
if rescanPoint == nil {
// The wallet is already up to date with transactions in all
// blocks. Load the data filters to check for transactions in
// future blocks received via sendheaders.
log.Debugf("Skipping rescanning due to rescanPoint == nil")
err = s.wallet.LoadActiveDataFilters(ctx, s, true)
if err != nil {
return err
}

var err error
if atomic.CompareAndSwapUint32(&s.atomicCatchUpTryLock, 0, 1) {
err = func() error {
rescanPoint, err := s.wallet.RescanPoint(ctx)
if err != nil {
return err
}
if rescanPoint == nil {
if !s.loadedFilters {
err = s.wallet.LoadActiveDataFilters(ctx, s, true)
if err != nil {
return err
}
s.loadedFilters = true
}

s.synced()

return nil
}
// RescanPoint is != nil so we are not synced to the peer and
// check to see if it was previously synced
s.unsynced()

s.discoverAddressesStart()
err = s.wallet.DiscoverActiveAddresses(ctx, rp, rescanPoint, s.discoverAccounts, s.wallet.GapLimit())
if err != nil {
return err
}
s.discoverAddressesFinished()
s.discoverAccounts = false
s.synced()
return nil
}

err = s.wallet.LoadActiveDataFilters(ctx, s, true)
if err != nil {
return err
}
s.loadedFilters = true
// Perform address/account discovery.
gapLimit := s.wallet.GapLimit()
log.Debugf("Starting address discovery (discoverAccounts=%v, gapLimit=%d, rescanPoint=%v)",
s.discoverAccounts, gapLimit, rescanPoint)
s.discoverAddressesStart()
err = s.wallet.DiscoverActiveAddresses(ctx, s, rescanPoint, s.discoverAccounts, gapLimit)
if err != nil {
return err
}
s.discoverAddressesFinished()

s.rescanStart()
// Prepare the filters with the list of addresses to watch for.
err = s.wallet.LoadActiveDataFilters(ctx, s, true)
if err != nil {
return err
}

rescanBlock, err := s.wallet.BlockHeader(ctx, rescanPoint)
if err != nil {
return err
}
progress := make(chan wallet.RescanProgress, 1)
go s.wallet.RescanProgressFromHeight(ctx, s, int32(rescanBlock.Height), progress)
// Start the rescan asynchronously.
rescanBlock, err := s.wallet.BlockHeader(ctx, rescanPoint)
if err != nil {
return err
}
progress := make(chan wallet.RescanProgress, 1)
go s.wallet.RescanProgressFromHeight(ctx, s, int32(rescanBlock.Height), progress)

for p := range progress {
if p.Err != nil {
return p.Err
}
s.rescanProgress(p.ScannedThrough)
}
s.rescanFinished()
// Read the rescan progress.
s.rescanStart()
for p := range progress {
if p.Err != nil {
return p.Err
}
s.rescanProgress(p.ScannedThrough)
}
s.rescanFinished()

s.synced()
// Wallet is now synced.
log.Debugf("Wallet considered synced")
s.synced()
return nil
}

return nil
}()
atomic.StoreUint32(&s.atomicCatchUpTryLock, 0)
if err != nil {
return err
}
// peerStartup performs initial startup operations with a recently connected
// peer.
func (s *Syncer) peerStartup(ctx context.Context, rp *p2p.RemotePeer) error {
// Only continue with peer startup after the initial sync process
// has completed.
select {
case <-ctx.Done():
return ctx.Err()
case <-rp.Done():
return rp.Err()
case <-s.initialSyncDone:
}

if rp.Pver() >= wire.InitStateVersion {
err = s.GetInitState(ctx, rp)
err := s.GetInitState(ctx, rp)
if err != nil {
log.Errorf("Failed to get init state", err)
}
Expand Down
1 change: 0 additions & 1 deletion wallet/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ type FilterProof = struct {
type Peer interface {
Blocks(ctx context.Context, blockHashes []*chainhash.Hash) ([]*wire.MsgBlock, error)
CFiltersV2(ctx context.Context, blockHashes []*chainhash.Hash) ([]FilterProof, error)
Headers(ctx context.Context, blockLocators []*chainhash.Hash, hashStop *chainhash.Hash) ([]*wire.BlockHeader, error)
PublishTransactions(ctx context.Context, txs ...*wire.MsgTx) error
}

Expand Down
3 changes: 0 additions & 3 deletions wallet/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ func (mockNetwork) Blocks(ctx context.Context, blockHashes []*chainhash.Hash) ([
func (mockNetwork) CFiltersV2(ctx context.Context, blockHashes []*chainhash.Hash) ([]FilterProof, error) {
return nil, nil
}
func (mockNetwork) Headers(ctx context.Context, blockLocators []*chainhash.Hash, hashStop *chainhash.Hash) ([]*wire.BlockHeader, error) {
return nil, nil
}
func (mockNetwork) PublishTransactions(ctx context.Context, txs ...*wire.MsgTx) error { return nil }
func (mockNetwork) LoadTxFilter(ctx context.Context, reload bool, addrs []stdaddr.Address, outpoints []wire.OutPoint) error {
return nil
Expand Down
Loading