diff --git a/CHANGELOG.md b/CHANGELOG.md index 0f6d53c7c..de8d1eb97 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ The following emojis are used to highlight certain changes: - `routing/http/client`: optional address and protocol filter parameters from [IPIP-484](https://github.com/ipfs/specs/pull/484) use human-readable `,` instead of `%2C`. [#688](https://github.com/ipfs/boxo/pull/688) - `bitswap/client` Cleanup live wants when wants are canceled. This prevents live wants from continuing to get rebroadcasted even after the wants are canceled. [#690](https://github.com/ipfs/boxo/pull/690) +- Fix problem adding invalid CID to exhausted wants list resulting in possible performance issue. [#692](https://github.com/ipfs/boxo/pull/692) ### Security diff --git a/bitswap/client/internal/messagequeue/messagequeue.go b/bitswap/client/internal/messagequeue/messagequeue.go index fac72f7cd..edea20b9c 100644 --- a/bitswap/client/internal/messagequeue/messagequeue.go +++ b/bitswap/client/internal/messagequeue/messagequeue.go @@ -93,7 +93,7 @@ type MessageQueue struct { // Dont touch any of these variables outside of run loop sender bsnet.MessageSender - rebroadcastIntervalLk sync.RWMutex + rebroadcastIntervalLk sync.Mutex rebroadcastInterval time.Duration rebroadcastTimer *clock.Timer // For performance reasons we just clear out the fields of the message @@ -389,9 +389,9 @@ func (mq *MessageQueue) SetRebroadcastInterval(delay time.Duration) { // Startup starts the processing of messages and rebroadcasting. func (mq *MessageQueue) Startup() { - mq.rebroadcastIntervalLk.RLock() + mq.rebroadcastIntervalLk.Lock() mq.rebroadcastTimer = mq.clock.Timer(mq.rebroadcastInterval) - mq.rebroadcastIntervalLk.RUnlock() + mq.rebroadcastIntervalLk.Unlock() go mq.runQueue() } @@ -422,7 +422,7 @@ func (mq *MessageQueue) runQueue() { } var workScheduled time.Time - for mq.ctx.Err() == nil { + for { select { case <-mq.rebroadcastTimer.C: mq.rebroadcastWantlist() @@ -471,9 +471,9 @@ func (mq *MessageQueue) runQueue() { // Periodically resend the list of wants to the peer func (mq *MessageQueue) rebroadcastWantlist() { - mq.rebroadcastIntervalLk.RLock() + mq.rebroadcastIntervalLk.Lock() mq.rebroadcastTimer.Reset(mq.rebroadcastInterval) - mq.rebroadcastIntervalLk.RUnlock() + mq.rebroadcastIntervalLk.Unlock() // If some wants were transferred from the rebroadcast list if mq.transferRebroadcastWants() { diff --git a/bitswap/client/internal/notifications/notifications.go b/bitswap/client/internal/notifications/notifications.go index 499a61c42..cd0816161 100644 --- a/bitswap/client/internal/notifications/notifications.go +++ b/bitswap/client/internal/notifications/notifications.go @@ -69,12 +69,13 @@ func (ps *impl) Shutdown() { // corresponding to |keys|. func (ps *impl) Subscribe(ctx context.Context, keys ...cid.Cid) <-chan blocks.Block { blocksCh := make(chan blocks.Block, len(keys)) - valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking if len(keys) == 0 { close(blocksCh) return blocksCh } + valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking + // prevent shutdown ps.lk.RLock() defer ps.lk.RUnlock() diff --git a/bitswap/client/internal/peermanager/peermanager.go b/bitswap/client/internal/peermanager/peermanager.go index 25cdd605f..78a1d4c88 100644 --- a/bitswap/client/internal/peermanager/peermanager.go +++ b/bitswap/client/internal/peermanager/peermanager.go @@ -42,7 +42,7 @@ type PeerManager struct { createPeerQueue PeerQueueFactory ctx context.Context - psLk sync.RWMutex + psLk sync.Mutex sessions map[uint64]Session peerSessions map[peer.ID]map[uint64]struct{} diff --git a/bitswap/client/internal/peermanager/peerwantmanager.go b/bitswap/client/internal/peermanager/peerwantmanager.go index 0bc4732ca..e9fdfbb46 100644 --- a/bitswap/client/internal/peermanager/peerwantmanager.go +++ b/bitswap/client/internal/peermanager/peerwantmanager.go @@ -1,8 +1,8 @@ package peermanager import ( - "bytes" "fmt" + "strings" cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p/core/peer" @@ -158,8 +158,6 @@ func (pwm *peerWantManager) broadcastWantHaves(wantHaves []cid.Cid) { // sendWants only sends the peer the want-blocks and want-haves that have not // already been sent to it. func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) { - fltWantBlks := make([]cid.Cid, 0, len(wantBlocks)) - fltWantHvs := make([]cid.Cid, 0, len(wantHaves)) // Get the existing want-blocks and want-haves for the peer pws, ok := pwm.peerWants[p] @@ -169,6 +167,8 @@ func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves return } + fltWantBlks := make([]cid.Cid, 0, len(wantBlocks)) + // Iterate over the requested want-blocks for _, c := range wantBlocks { // If the want-block hasn't been sent to the peer @@ -198,6 +198,8 @@ func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves pwm.reverseIndexAdd(c, p) } + fltWantHvs := make([]cid.Cid, 0, len(wantHaves)) + // Iterate over the requested want-haves for _, c := range wantHaves { // If we've already broadcasted this want, don't bother with a @@ -450,7 +452,7 @@ func (pwm *peerWantManager) getWants() []cid.Cid { } func (pwm *peerWantManager) String() string { - var b bytes.Buffer + var b strings.Builder for p, ws := range pwm.peerWants { b.WriteString(fmt.Sprintf("Peer %s: %d want-have / %d want-block:\n", p, ws.wantHaves.Len(), ws.wantBlocks.Len())) for _, c := range ws.wantHaves.Keys() { diff --git a/bitswap/client/internal/session/peerresponsetracker.go b/bitswap/client/internal/session/peerresponsetracker.go index 25c10fe87..f96424742 100644 --- a/bitswap/client/internal/session/peerresponsetracker.go +++ b/bitswap/client/internal/session/peerresponsetracker.go @@ -31,8 +31,6 @@ func (prt *peerResponseTracker) choose(peers []peer.ID) peer.ID { return "" } - rnd := rand.Float64() - // Find the total received blocks for all candidate peers total := 0 for _, p := range peers { @@ -41,6 +39,7 @@ func (prt *peerResponseTracker) choose(peers []peer.ID) peer.ID { // Choose one of the peers with a chance proportional to the number // of blocks received from that peer + rnd := rand.Float64() counted := 0.0 for _, p := range peers { counted += float64(prt.getPeerCount(p)) / float64(total) @@ -52,8 +51,7 @@ func (prt *peerResponseTracker) choose(peers []peer.ID) peer.ID { // We shouldn't get here unless there is some weirdness with floating point // math that doesn't quite cover the whole range of peers in the for loop // so just choose the last peer. - index := len(peers) - 1 - return peers[index] + return peers[len(peers)-1] } // getPeerCount returns the number of times the peer was first to send us a diff --git a/bitswap/client/internal/session/sessionwants.go b/bitswap/client/internal/session/sessionwants.go index ace8cd952..97c3e7800 100644 --- a/bitswap/client/internal/session/sessionwants.go +++ b/bitswap/client/internal/session/sessionwants.go @@ -56,8 +56,12 @@ func (sw *sessionWants) GetNextWants() []cid.Cid { // limit) currentLiveCount := len(sw.liveWants) toAdd := sw.broadcastLimit - currentLiveCount + liveSize := min(toAdd, sw.toFetch.Len()) + if liveSize == 0 { + return nil + } - var live []cid.Cid + live := make([]cid.Cid, 0, liveSize) for ; toAdd > 0 && sw.toFetch.Len() > 0; toAdd-- { c := sw.toFetch.Pop() live = append(live, c) @@ -117,6 +121,7 @@ func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration) cleaned = append(cleaned, c) } } + clear(sw.liveWantsOrder[len(cleaned):]) // GC cleared items sw.liveWantsOrder = cleaned } @@ -127,7 +132,7 @@ func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration) // live want CIDs up to the broadcast limit. func (sw *sessionWants) PrepareBroadcast() []cid.Cid { now := time.Now() - live := make([]cid.Cid, 0, len(sw.liveWants)) + live := make([]cid.Cid, 0, min(len(sw.liveWants), sw.broadcastLimit)) for _, c := range sw.liveWantsOrder { if _, ok := sw.liveWants[c]; ok { // No response was received for the want, so reset the sent time diff --git a/bitswap/client/internal/session/sessionwantsender.go b/bitswap/client/internal/session/sessionwantsender.go index 1beefeb94..338150ec3 100644 --- a/bitswap/client/internal/session/sessionwantsender.go +++ b/bitswap/client/internal/session/sessionwantsender.go @@ -161,8 +161,7 @@ func (sws *sessionWantSender) Cancel(ks []cid.Cid) { // Update is called when the session receives a message with incoming blocks // or HAVE / DONT_HAVE func (sws *sessionWantSender) Update(from peer.ID, ks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) { - hasUpdate := len(ks) > 0 || len(haves) > 0 || len(dontHaves) > 0 - if !hasUpdate { + if len(ks) == 0 && len(haves) == 0 && len(dontHaves) == 0 { return } @@ -349,8 +348,7 @@ func (sws *sessionWantSender) trackWant(c cid.Cid) { } // Create the want info - wi := newWantInfo(sws.peerRspTrkr) - sws.wants[c] = wi + sws.wants[c] = newWantInfo(sws.peerRspTrkr) // For each available peer, register any information we know about // whether the peer has the block @@ -481,7 +479,7 @@ func (sws *sessionWantSender) checkForExhaustedWants(dontHaves []cid.Cid, newlyU // (because it may be the last peer who hadn't sent a DONT_HAVE for a CID) if len(newlyUnavailable) > 0 { // Collect all pending wants - wants = make([]cid.Cid, len(sws.wants)) + wants = make([]cid.Cid, 0, len(sws.wants)) for c := range sws.wants { wants = append(wants, c) } diff --git a/bitswap/client/internal/sessionmanager/sessionmanager.go b/bitswap/client/internal/sessionmanager/sessionmanager.go index a75a3f769..0d2b24330 100644 --- a/bitswap/client/internal/sessionmanager/sessionmanager.go +++ b/bitswap/client/internal/sessionmanager/sessionmanager.go @@ -57,7 +57,7 @@ type SessionManager struct { notif notifications.PubSub // Sessions - sessLk sync.RWMutex + sessLk sync.Mutex sessions map[uint64]Session // Session Index @@ -159,13 +159,13 @@ func (sm *SessionManager) ReceiveFrom(ctx context.Context, p peer.ID, blks []cid // Notify each session that is interested in the blocks / HAVEs / DONT_HAVEs for _, id := range sm.sessionInterestManager.InterestedSessions(blks, haves, dontHaves) { - sm.sessLk.RLock() + sm.sessLk.Lock() if sm.sessions == nil { // check if SessionManager was shutdown - sm.sessLk.RUnlock() + sm.sessLk.Unlock() return } sess, ok := sm.sessions[id] - sm.sessLk.RUnlock() + sm.sessLk.Unlock() if ok { sess.ReceiveFrom(p, blks, haves, dontHaves)