diff --git a/CHANGELOG.md b/CHANGELOG.md index 472c0aad2..ee78a8b19 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,8 @@ The following emojis are used to highlight certain changes: ### Added +* `boxo/bitswap/server`: + * A new [`WithWantHaveReplaceSize(n)`](https://pkg.go.dev/github.com/ipfs/boxo/bitswap/server/#WithWantHaveReplaceSize) option can be used with `bitswap.New` to fine-tune cost-vs-performance. It sets the maximum size of a block in bytes up to which the bitswap server will replace a WantHave with a WantBlock response. Setting this to 0 disables this WantHave replacement and means that block sizes are not read when processing WantHave requests. [#672](https://github.com/ipfs/boxo/pull/672) - `routing/http`: added support for address and protocol filtering to the delegated routing server ([IPIP-484](https://github.com/ipfs/specs/pull/484)) [#671](https://github.com/ipfs/boxo/pull/671) ### Changed diff --git a/bitswap/internal/defaults/defaults.go b/bitswap/internal/defaults/defaults.go index f5511cc7a..b30bcc87f 100644 --- a/bitswap/internal/defaults/defaults.go +++ b/bitswap/internal/defaults/defaults.go @@ -37,4 +37,7 @@ const ( // RebroadcastDelay is the default delay to trigger broadcast of // random CIDs in the wantlist. RebroadcastDelay = time.Minute + + // DefaultWantHaveReplaceSize controls the implicit behavior of WithWantHaveReplaceSize. + DefaultWantHaveReplaceSize = 1024 ) diff --git a/bitswap/options.go b/bitswap/options.go index 11e89fdf9..6a98b27db 100644 --- a/bitswap/options.go +++ b/bitswap/options.go @@ -71,6 +71,13 @@ func WithTaskComparator(comparator server.TaskComparator) Option { return Option{server.WithTaskComparator(comparator)} } +// WithWantHaveReplaceSize sets the maximum size of a block in bytes up to +// which the bitswap server will replace a WantHave with a WantBlock response. +// See [server.WithWantHaveReplaceSize] for details. +func WithWantHaveReplaceSize(size int) Option { + return Option{server.WithWantHaveReplaceSize(size)} +} + func ProviderSearchDelay(newProvSearchDelay time.Duration) Option { return Option{client.ProviderSearchDelay(newProvSearchDelay)} } diff --git a/bitswap/server/internal/decision/blockstoremanager.go b/bitswap/server/internal/decision/blockstoremanager.go index aa16b3126..d4c0f4254 100644 --- a/bitswap/server/internal/decision/blockstoremanager.go +++ b/bitswap/server/internal/decision/blockstoremanager.go @@ -121,6 +121,42 @@ func (bsm *blockstoreManager) getBlockSizes(ctx context.Context, ks []cid.Cid) ( return res, nil } +func (bsm *blockstoreManager) hasBlocks(ctx context.Context, ks []cid.Cid) (map[cid.Cid]struct{}, error) { + if len(ks) == 0 { + return nil, nil + } + hasBlocks := make([]bool, len(ks)) + + var count atomic.Int32 + err := bsm.jobPerKey(ctx, ks, func(i int, c cid.Cid) { + has, err := bsm.bs.Has(ctx, c) + if err != nil { + // Note: this isn't a fatal error. We shouldn't abort the request + log.Errorf("blockstore.Has(%c) error: %s", c, err) + return + } + if has { + hasBlocks[i] = true + count.Add(1) + } + }) + if err != nil { + return nil, err + } + results := count.Load() + if results == 0 { + return nil, nil + } + + res := make(map[cid.Cid]struct{}, results) + for i, ok := range hasBlocks { + if ok { + res[ks[i]] = struct{}{} + } + } + return res, nil +} + func (bsm *blockstoreManager) getBlocks(ctx context.Context, ks []cid.Cid) (map[cid.Cid]blocks.Block, error) { if len(ks) == 0 { return nil, nil diff --git a/bitswap/server/internal/decision/blockstoremanager_test.go b/bitswap/server/internal/decision/blockstoremanager_test.go index f65c88e83..2f2b7b23f 100644 --- a/bitswap/server/internal/decision/blockstoremanager_test.go +++ b/bitswap/server/internal/decision/blockstoremanager_test.go @@ -98,29 +98,22 @@ func TestBlockstoreManager(t *testing.T) { cids = append(cids, b.Cid()) } - sizes, err := bsm.getBlockSizes(ctx, cids) + hasBlocks, err := bsm.hasBlocks(ctx, cids) if err != nil { t.Fatal(err) } - if len(sizes) != len(blks)-1 { + if len(hasBlocks) != len(blks)-1 { t.Fatal("Wrong response length") } - for _, c := range cids { - expSize := len(exp[c].RawData()) - size, ok := sizes[c] - - // Only the last key should be missing + _, ok := hasBlocks[c] if c.Equals(cids[len(cids)-1]) { if ok { t.Fatal("Non-existent block should not be in sizes map") } } else { if !ok { - t.Fatal("Block should be in sizes map") - } - if size != expSize { - t.Fatal("Block has wrong size") + t.Fatal("Block should be in hasBlocks") } } } diff --git a/bitswap/server/internal/decision/engine.go b/bitswap/server/internal/decision/engine.go index 1174c94c0..5e4463e33 100644 --- a/bitswap/server/internal/decision/engine.go +++ b/bitswap/server/internal/decision/engine.go @@ -77,10 +77,6 @@ const ( // queuedTagWeight is the default weight for peers that have work queued // on their behalf. queuedTagWeight = 10 - - // maxBlockSizeReplaceHasWithBlock is the maximum size of the block in - // bytes up to which we will replace a want-have with a want-block - maxBlockSizeReplaceHasWithBlock = 1024 ) // Envelope contains a message for a Peer. @@ -202,9 +198,9 @@ type Engine struct { targetMessageSize int - // maxBlockSizeReplaceHasWithBlock is the maximum size of the block in - // bytes up to which we will replace a want-have with a want-block - maxBlockSizeReplaceHasWithBlock int + // wantHaveReplaceSize is the maximum size of the block in bytes up to + // which to replace a WantHave with a WantBlock. + wantHaveReplaceSize int sendDontHaves bool @@ -343,6 +339,14 @@ func WithSetSendDontHave(send bool) Option { } } +// WithWantHaveReplaceSize sets the maximum size of a block in bytes up to +// which to replace a WantHave with a WantBlock response. +func WithWantHaveReplaceSize(size int) Option { + return func(e *Engine) { + e.wantHaveReplaceSize = size + } +} + // wrapTaskComparator wraps a TaskComparator so it can be used as a QueueTaskComparator func wrapTaskComparator(tc TaskComparator) peertask.QueueTaskComparator { return func(a, b *peertask.QueueTask) bool { @@ -369,32 +373,14 @@ func wrapTaskComparator(tc TaskComparator) peertask.QueueTaskComparator { } // NewEngine creates a new block sending engine for the given block store. -// maxOutstandingBytesPerPeer hints to the peer task queue not to give a peer more tasks if it has some maximum -// work already outstanding. +// maxOutstandingBytesPerPeer hints to the peer task queue not to give a peer +// more tasks if it has some maximum work already outstanding. func NewEngine( ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID, opts ...Option, -) *Engine { - return newEngine( - ctx, - bs, - peerTagger, - self, - maxBlockSizeReplaceHasWithBlock, - opts..., - ) -} - -func newEngine( - ctx context.Context, - bs bstore.Blockstore, - peerTagger PeerTagger, - self peer.ID, - maxReplaceSize int, - opts ...Option, ) *Engine { e := &Engine{ scoreLedger: NewDefaultScoreLedger(), @@ -404,7 +390,7 @@ func newEngine( outbox: make(chan (<-chan *Envelope), outboxChanBuffer), workSignal: make(chan struct{}, 1), ticker: time.NewTicker(time.Millisecond * 100), - maxBlockSizeReplaceHasWithBlock: maxReplaceSize, + wantHaveReplaceSize: defaults.DefaultWantHaveReplaceSize, taskWorkerCount: defaults.BitswapEngineTaskWorkerCount, sendDontHaves: true, self: self, @@ -445,6 +431,12 @@ func newEngine( e.peerRequestQueue = peertaskqueue.New(peerTaskQueueOpts...) + if e.wantHaveReplaceSize == 0 { + log.Info("Replace WantHave with WantBlock is disabled") + } else { + log.Infow("Replace WantHave with WantBlock is enabled", "maxSize", e.wantHaveReplaceSize) + } + return e } @@ -689,16 +681,38 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap return true } + noReplace := e.wantHaveReplaceSize == 0 + // Get block sizes for unique CIDs. - wantKs := cid.NewSet() + wantKs := make([]cid.Cid, 0, len(wants)) + var haveKs []cid.Cid for _, entry := range wants { - wantKs.Add(entry.Cid) + if noReplace && entry.WantType == pb.Message_Wantlist_Have { + haveKs = append(haveKs, entry.Cid) + } else { + wantKs = append(wantKs, entry.Cid) + } } - blockSizes, err := e.bsm.getBlockSizes(ctx, wantKs.Keys()) + blockSizes, err := e.bsm.getBlockSizes(ctx, wantKs) if err != nil { log.Info("aborting message processing", err) return false } + if len(haveKs) != 0 { + hasBlocks, err := e.bsm.hasBlocks(ctx, haveKs) + if err != nil { + log.Info("aborting message processing", err) + return false + } + if len(hasBlocks) != 0 { + if blockSizes == nil { + blockSizes = make(map[cid.Cid]int, len(hasBlocks)) + } + for blkCid := range hasBlocks { + blockSizes[blkCid] = 0 + } + } + } e.lock.Lock() @@ -707,20 +721,7 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap } var overflow []bsmsg.Entry - if len(wants) != 0 { - filteredWants := wants[:0] // shift inplace - for _, entry := range wants { - if !e.peerLedger.Wants(p, entry.Entry) { - // Cannot add entry because it would exceed size limit. - overflow = append(overflow, entry) - continue - } - filteredWants = append(filteredWants, entry) - } - // Clear truncated entries - early GC. - clear(wants[len(filteredWants):]) - wants = filteredWants - } + wants, overflow = e.filterOverflow(p, wants, overflow) if len(overflow) != 0 { log.Infow("handling wantlist overflow", "local", e.self, "from", p, "wantlistSize", len(wants), "overflowSize", len(overflow)) @@ -764,7 +765,7 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap sendDontHave(entry) } - // For each want-have / want-block + // For each want-block for _, entry := range wants { c := entry.Cid blockSize, found := blockSizes[c] @@ -776,7 +777,10 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap continue } // The block was found, add it to the queue - isWantBlock := e.sendAsBlock(entry.WantType, blockSize) + + // Check if this is a want-block or a have-block that can be converted + // to a want-block. + isWantBlock := blockSize != 0 && e.sendAsBlock(entry.WantType, blockSize) log.Debugw("Bitswap engine: block found", "local", e.self, "from", p, "cid", c, "isWantBlock", isWantBlock) @@ -810,6 +814,25 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap return false } +func (e *Engine) filterOverflow(p peer.ID, wants, overflow []bsmsg.Entry) ([]bsmsg.Entry, []bsmsg.Entry) { + if len(wants) == 0 { + return wants, overflow + } + + filteredWants := wants[:0] // shift inplace + for _, entry := range wants { + if !e.peerLedger.Wants(p, entry.Entry) { + // Cannot add entry because it would exceed size limit. + overflow = append(overflow, entry) + continue + } + filteredWants = append(filteredWants, entry) + } + // Clear truncated entries - early GC. + clear(wants[len(filteredWants):]) + return filteredWants, overflow +} + // handleOverflow processes incoming wants that could not be addded to the peer // ledger without exceeding the peer want limit. These are handled by trying to // make room by canceling existing wants for which there is no block. If this @@ -913,17 +936,17 @@ func (e *Engine) splitWantsCancelsDenials(p peer.ID, m bsmsg.BitSwapMessage) ([] continue } + if e.peerBlockRequestFilter != nil && !e.peerBlockRequestFilter(p, c) { + denials = append(denials, et) + continue + } + if et.WantType == pb.Message_Wantlist_Have { log.Debugw("Bitswap engine <- want-have", "local", e.self, "from", p, "cid", c) } else { log.Debugw("Bitswap engine <- want-block", "local", e.self, "from", p, "cid", c) } - if e.peerBlockRequestFilter != nil && !e.peerBlockRequestFilter(p, c) { - denials = append(denials, et) - continue - } - // Do not take more wants that can be handled. if len(wants) < int(e.maxQueuedWantlistEntriesPerPeer) { wants = append(wants, et) @@ -1057,8 +1080,7 @@ func (e *Engine) PeerDisconnected(p peer.ID) { // If the want is a want-have, and it's below a certain size, send the full // block (instead of sending a HAVE) func (e *Engine) sendAsBlock(wantType pb.Message_Wantlist_WantType, blockSize int) bool { - isWantBlock := wantType == pb.Message_Wantlist_Block - return isWantBlock || blockSize <= e.maxBlockSizeReplaceHasWithBlock + return wantType == pb.Message_Wantlist_Block || blockSize <= e.wantHaveReplaceSize } func (e *Engine) numBytesSentTo(p peer.ID) uint64 { diff --git a/bitswap/server/internal/decision/engine_test.go b/bitswap/server/internal/decision/engine_test.go index 593bbde0f..5cc1375c7 100644 --- a/bitswap/server/internal/decision/engine_test.go +++ b/bitswap/server/internal/decision/engine_test.go @@ -188,17 +188,11 @@ func newEngineForTesting( bs blockstore.Blockstore, peerTagger PeerTagger, self peer.ID, - maxReplaceSize int, + wantHaveReplaceSize int, opts ...Option, ) *Engine { - return newEngine( - ctx, - bs, - peerTagger, - self, - maxReplaceSize, - opts..., - ) + opts = append(opts, WithWantHaveReplaceSize(wantHaveReplaceSize)) + return NewEngine(ctx, bs, peerTagger, self, opts...) } func TestOutboxClosedWhenEngineClosed(t *testing.T) { diff --git a/bitswap/server/server.go b/bitswap/server/server.go index 85651a5ef..46d29a8fc 100644 --- a/bitswap/server/server.go +++ b/bitswap/server/server.go @@ -251,6 +251,38 @@ func HasBlockBufferSize(count int) Option { } } +// WithWantHaveReplaceSize sets the maximum size of a block in bytes up to +// which the bitswap server will replace a WantHave with a WantBlock response. +// +// Behavior: +// - If size > 0: The server may send full blocks instead of just confirming possession +// for blocks up to the specified size. +// - If size = 0: WantHave replacement is disabled entirely. This allows the server to +// skip reading block sizes during WantHave request processing, which can be more +// efficient if the data storage bills "possession" checks and "reads" differently. +// +// Performance considerations: +// - Enabling replacement (size > 0) may reduce network round-trips but requires +// checking block sizes for each WantHave request to decide if replacement should occur. +// - Disabling replacement (size = 0) optimizes server performance by avoiding +// block size checks, potentially reducing infrastructure costs if possession checks +// are less expensive than full reads. +// +// It defaults to [defaults.DefaultWantHaveReplaceSize] +// and the value may change in future releases. +// +// Use this option to set explicit behavior to balance between network +// efficiency, server performance, and potential storage cost optimizations +// based on your specific use case and storage backend. +func WithWantHaveReplaceSize(size int) Option { + if size < 0 { + size = 0 + } + return func(bs *Server) { + bs.engineOptions = append(bs.engineOptions, decision.WithWantHaveReplaceSize(size)) + } +} + // WantlistForPeer returns the currently understood list of blocks requested by a // given peer. func (bs *Server) WantlistForPeer(p peer.ID) []cid.Cid {