Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: option to not read size of blocks for want-have requests #672

Merged
merged 10 commits into from
Sep 27, 2024
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ The following emojis are used to highlight certain changes:

### Added

* `boxo/bitswap/server`:
* A new `WithReplaceHasWithBlockMaxSize(n)` option can be used with `bitswap.New`. It sets the maximum size of a block in bytes up to which we will replace a want-have with a want-block. Setting a size of 0 disables this want-have replacement and means that block sizes are not read for want-have requests.

### Changed

### Removed
Expand Down
8 changes: 8 additions & 0 deletions bitswap/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@
return Option{server.WithTaskComparator(comparator)}
}

// WithReplaceHasWithBlockMaxSize sets the maximum size of a block in bytes up
// to which we will replace a want-have with a want-block. Setting a size of 0
// disables this want-have replacement and means that block sizes are not read
// for want-have requests.
func WithReplaceHasWithBlockMaxSize(maxSize int) Option {
return Option{server.WithReplaceHasWithBlockMaxSize(maxSize)}

Check warning on line 79 in bitswap/options.go

View check run for this annotation

Codecov / codecov/patch

bitswap/options.go#L78-L79

Added lines #L78 - L79 were not covered by tests
}

func ProviderSearchDelay(newProvSearchDelay time.Duration) Option {
return Option{client.ProviderSearchDelay(newProvSearchDelay)}
}
Expand Down
36 changes: 36 additions & 0 deletions bitswap/server/internal/decision/blockstoremanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,42 @@
return res, nil
}

func (bsm *blockstoreManager) hasBlocks(ctx context.Context, ks []cid.Cid) (map[cid.Cid]struct{}, error) {
if len(ks) == 0 {
return nil, nil
}

Check warning on line 127 in bitswap/server/internal/decision/blockstoremanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/server/internal/decision/blockstoremanager.go#L126-L127

Added lines #L126 - L127 were not covered by tests
hasBlocks := make([]bool, len(ks))

var count atomic.Int32
err := bsm.jobPerKey(ctx, ks, func(i int, c cid.Cid) {
has, err := bsm.bs.Has(ctx, c)
if err != nil {
// Note: this isn't a fatal error. We shouldn't abort the request
log.Errorf("blockstore.GetSize(%s) error: %s", c, err)
return
}

Check warning on line 137 in bitswap/server/internal/decision/blockstoremanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/server/internal/decision/blockstoremanager.go#L134-L137

Added lines #L134 - L137 were not covered by tests
if has {
hasBlocks[i] = true
count.Add(1)
}
})
if err != nil {
return nil, err
}

Check warning on line 145 in bitswap/server/internal/decision/blockstoremanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/server/internal/decision/blockstoremanager.go#L144-L145

Added lines #L144 - L145 were not covered by tests
results := count.Load()
if results == 0 {
return nil, nil
}

res := make(map[cid.Cid]struct{}, results)
for i, ok := range hasBlocks {
if ok {
res[ks[i]] = struct{}{}
}
}
return res, nil
}

func (bsm *blockstoreManager) getBlocks(ctx context.Context, ks []cid.Cid) (map[cid.Cid]blocks.Block, error) {
if len(ks) == 0 {
return nil, nil
Expand Down
15 changes: 4 additions & 11 deletions bitswap/server/internal/decision/blockstoremanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,29 +98,22 @@ func TestBlockstoreManager(t *testing.T) {
cids = append(cids, b.Cid())
}

sizes, err := bsm.getBlockSizes(ctx, cids)
hasBlocks, err := bsm.hasBlocks(ctx, cids)
if err != nil {
t.Fatal(err)
}
if len(sizes) != len(blks)-1 {
if len(hasBlocks) != len(blks)-1 {
t.Fatal("Wrong response length")
}

for _, c := range cids {
expSize := len(exp[c].RawData())
size, ok := sizes[c]

// Only the last key should be missing
_, ok := hasBlocks[c]
if c.Equals(cids[len(cids)-1]) {
if ok {
t.Fatal("Non-existent block should not be in sizes map")
}
} else {
if !ok {
t.Fatal("Block should be in sizes map")
}
if size != expSize {
t.Fatal("Block has wrong size")
t.Fatal("Block should be in hasBlocks")
}
}
}
Expand Down
131 changes: 79 additions & 52 deletions bitswap/server/internal/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@
// on their behalf.
queuedTagWeight = 10

// maxBlockSizeReplaceHasWithBlock is the maximum size of the block in
// bytes up to which we will replace a want-have with a want-block
maxBlockSizeReplaceHasWithBlock = 1024
// defaultReplaceHasWithBlockMaxSize is the default maximum size of the
// block in bytes up to which we will replace a want-have with a want-block
defaultReplaceHasWithBlockMaxSize = 1024
)

// Envelope contains a message for a Peer.
Expand Down Expand Up @@ -202,9 +202,9 @@

targetMessageSize int

// maxBlockSizeReplaceHasWithBlock is the maximum size of the block in
// replaceHasWithBlockMaxSize is the maximum size of the block in
// bytes up to which we will replace a want-have with a want-block
maxBlockSizeReplaceHasWithBlock int
replaceHasWithBlockMaxSize int

sendDontHaves bool

Expand Down Expand Up @@ -343,6 +343,14 @@
}
}

// WithReplaceHasWithBlockMaxSize sets the maximum size of a block in bytes up
// to which we will replace a want-have with a want-block.
func WithReplaceHasWithBlockMaxSize(maxSize int) Option {
return func(e *Engine) {
e.replaceHasWithBlockMaxSize = maxSize
}
}

// wrapTaskComparator wraps a TaskComparator so it can be used as a QueueTaskComparator
func wrapTaskComparator(tc TaskComparator) peertask.QueueTaskComparator {
return func(a, b *peertask.QueueTask) bool {
Expand All @@ -369,32 +377,14 @@
}

// NewEngine creates a new block sending engine for the given block store.
// maxOutstandingBytesPerPeer hints to the peer task queue not to give a peer more tasks if it has some maximum
// work already outstanding.
// maxOutstandingBytesPerPeer hints to the peer task queue not to give a peer
// more tasks if it has some maximum work already outstanding.
func NewEngine(
ctx context.Context,
bs bstore.Blockstore,
peerTagger PeerTagger,
self peer.ID,
opts ...Option,
) *Engine {
return newEngine(
ctx,
bs,
peerTagger,
self,
maxBlockSizeReplaceHasWithBlock,
opts...,
)
}

func newEngine(
ctx context.Context,
bs bstore.Blockstore,
peerTagger PeerTagger,
self peer.ID,
maxReplaceSize int,
opts ...Option,
) *Engine {
e := &Engine{
scoreLedger: NewDefaultScoreLedger(),
Expand All @@ -404,7 +394,7 @@
outbox: make(chan (<-chan *Envelope), outboxChanBuffer),
workSignal: make(chan struct{}, 1),
ticker: time.NewTicker(time.Millisecond * 100),
maxBlockSizeReplaceHasWithBlock: maxReplaceSize,
replaceHasWithBlockMaxSize: defaultReplaceHasWithBlockMaxSize,
taskWorkerCount: defaults.BitswapEngineTaskWorkerCount,
sendDontHaves: true,
self: self,
Expand Down Expand Up @@ -445,6 +435,12 @@

e.peerRequestQueue = peertaskqueue.New(peerTaskQueueOpts...)

if e.replaceHasWithBlockMaxSize == 0 {
log.Info("Replace WantHave with WantBlock is disabled")
} else {
log.Infow("Replace WantHave with WantBlock is enabled", "maxSize", e.replaceHasWithBlockMaxSize)
}

return e
}

Expand Down Expand Up @@ -689,16 +685,39 @@
return true
}

noReplace := e.replaceHasWithBlockMaxSize == 0

// Get block sizes for unique CIDs.
wantKs := cid.NewSet()
wantKs := make([]cid.Cid, 0, len(wants))
var haveKs []cid.Cid
for _, entry := range wants {
wantKs.Add(entry.Cid)
if noReplace && entry.WantType == pb.Message_Wantlist_Have {
haveKs = append(haveKs, entry.Cid)
} else {
wantKs = append(wantKs, entry.Cid)
}
}
blockSizes, err := e.bsm.getBlockSizes(ctx, wantKs.Keys())
blockSizes, err := e.bsm.getBlockSizes(ctx, wantKs)
if err != nil {
log.Info("aborting message processing", err)
return false
}
if len(haveKs) != 0 {
hasBlocks, err := e.bsm.hasBlocks(ctx, haveKs)
if err != nil {
log.Info("aborting message processing", err)
return false
}

Check warning on line 710 in bitswap/server/internal/decision/engine.go

View check run for this annotation

Codecov / codecov/patch

bitswap/server/internal/decision/engine.go#L708-L710

Added lines #L708 - L710 were not covered by tests
if len(hasBlocks) != 0 {
if blockSizes == nil {
blockSizes = make(map[cid.Cid]int, len(hasBlocks))
}
for blkCid := range hasBlocks {
blockSizes[blkCid] = 0
fmt.Println(" block cid:", blkCid)
gammazero marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

e.lock.Lock()

Expand All @@ -707,20 +726,7 @@
}

var overflow []bsmsg.Entry
if len(wants) != 0 {
filteredWants := wants[:0] // shift inplace
for _, entry := range wants {
if !e.peerLedger.Wants(p, entry.Entry) {
// Cannot add entry because it would exceed size limit.
overflow = append(overflow, entry)
continue
}
filteredWants = append(filteredWants, entry)
}
// Clear truncated entries - early GC.
clear(wants[len(filteredWants):])
wants = filteredWants
}
wants, overflow = e.filterOverflow(p, wants, overflow)

if len(overflow) != 0 {
log.Infow("handling wantlist overflow", "local", e.self, "from", p, "wantlistSize", len(wants), "overflowSize", len(overflow))
Expand Down Expand Up @@ -764,7 +770,7 @@
sendDontHave(entry)
}

// For each want-have / want-block
// For each want-block
for _, entry := range wants {
c := entry.Cid
blockSize, found := blockSizes[c]
Expand All @@ -776,7 +782,10 @@
continue
}
// The block was found, add it to the queue
isWantBlock := e.sendAsBlock(entry.WantType, blockSize)

// Check if this is a want-block or a have-block that can be converted
// to a want-block.
isWantBlock := blockSize != 0 && e.sendAsBlock(entry.WantType, blockSize)

log.Debugw("Bitswap engine: block found", "local", e.self, "from", p, "cid", c, "isWantBlock", isWantBlock)

Expand Down Expand Up @@ -810,6 +819,25 @@
return false
}

func (e *Engine) filterOverflow(p peer.ID, wants, overflow []bsmsg.Entry) ([]bsmsg.Entry, []bsmsg.Entry) {
if len(wants) == 0 {
return wants, overflow
}

filteredWants := wants[:0] // shift inplace
for _, entry := range wants {
if !e.peerLedger.Wants(p, entry.Entry) {
// Cannot add entry because it would exceed size limit.
overflow = append(overflow, entry)
continue
}
filteredWants = append(filteredWants, entry)
}
// Clear truncated entries - early GC.
clear(wants[len(filteredWants):])
return filteredWants, overflow
}

// handleOverflow processes incoming wants that could not be addded to the peer
// ledger without exceeding the peer want limit. These are handled by trying to
// make room by canceling existing wants for which there is no block. If this
Expand Down Expand Up @@ -913,17 +941,17 @@
continue
}

if e.peerBlockRequestFilter != nil && !e.peerBlockRequestFilter(p, c) {
denials = append(denials, et)
continue
}

if et.WantType == pb.Message_Wantlist_Have {
log.Debugw("Bitswap engine <- want-have", "local", e.self, "from", p, "cid", c)
} else {
log.Debugw("Bitswap engine <- want-block", "local", e.self, "from", p, "cid", c)
}

if e.peerBlockRequestFilter != nil && !e.peerBlockRequestFilter(p, c) {
denials = append(denials, et)
continue
}

// Do not take more wants that can be handled.
if len(wants) < int(e.maxQueuedWantlistEntriesPerPeer) {
wants = append(wants, et)
Expand Down Expand Up @@ -1057,8 +1085,7 @@
// If the want is a want-have, and it's below a certain size, send the full
// block (instead of sending a HAVE)
func (e *Engine) sendAsBlock(wantType pb.Message_Wantlist_WantType, blockSize int) bool {
isWantBlock := wantType == pb.Message_Wantlist_Block
return isWantBlock || blockSize <= e.maxBlockSizeReplaceHasWithBlock
return wantType == pb.Message_Wantlist_Block || blockSize <= e.replaceHasWithBlockMaxSize
}

func (e *Engine) numBytesSentTo(p peer.ID) uint64 {
Expand Down
10 changes: 2 additions & 8 deletions bitswap/server/internal/decision/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,14 +191,8 @@ func newEngineForTesting(
maxReplaceSize int,
opts ...Option,
) *Engine {
return newEngine(
ctx,
bs,
peerTagger,
self,
maxReplaceSize,
opts...,
)
opts = append(opts, WithReplaceHasWithBlockMaxSize(maxReplaceSize))
return NewEngine(ctx, bs, peerTagger, self, opts...)
}

func TestOutboxClosedWhenEngineClosed(t *testing.T) {
Expand Down
13 changes: 13 additions & 0 deletions bitswap/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,19 @@
}
}

// WithReplaceHasWithBlockMaxSize sets the maximum size of a block in bytes up
// to which we will replace a want-have with a want-block. Setting a size of 0
// disables this want-have replacement and means that block sizes are not read
// for want-have requests.
func WithReplaceHasWithBlockMaxSize(maxSize int) Option {
if maxSize < 0 {
maxSize = 0
}
return func(bs *Server) {
bs.engineOptions = append(bs.engineOptions, decision.WithReplaceHasWithBlockMaxSize(maxSize))
}

Check warning on line 264 in bitswap/server/server.go

View check run for this annotation

Codecov / codecov/patch

bitswap/server/server.go#L258-L264

Added lines #L258 - L264 were not covered by tests
}

// WantlistForPeer returns the currently understood list of blocks requested by a
// given peer.
func (bs *Server) WantlistForPeer(p peer.ID) []cid.Cid {
Expand Down