From d3a1ed80f75088a75477991bc1c8970e52bab8fe Mon Sep 17 00:00:00 2001 From: Will Scott Date: Wed, 7 Jun 2023 11:40:39 +0200 Subject: [PATCH] Parse indexed deals for data segment index this makes boost aware of the format in https://github.com/filecoin-project/go-data-segment/ for new deals. this is the same logic as in filecoin-project/lotus#10674 --- go.mod | 9 +++-- go.sum | 10 +++-- piecedirectory/piecedirectory.go | 68 +++++++++++++++++++++++++++----- 3 files changed, 70 insertions(+), 17 deletions(-) diff --git a/go.mod b/go.mod index 98654e407..b5b193739 100644 --- a/go.mod +++ b/go.mod @@ -31,7 +31,8 @@ require ( github.com/filecoin-project/go-commp-utils v0.1.4 github.com/filecoin-project/go-data-transfer v1.15.4-boost github.com/filecoin-project/go-fil-commcid v0.1.0 - github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 + github.com/filecoin-project/go-fil-commp-hashhash v0.2.0 + github.com/filecoin-project/go-fil-markets v1.28.3 github.com/filecoin-project/go-jsonrpc v0.3.1 github.com/filecoin-project/go-padreader v0.0.1 github.com/filecoin-project/go-paramfetch v0.0.4 @@ -106,7 +107,7 @@ require ( go.uber.org/fx v1.19.3 go.uber.org/multierr v1.11.0 golang.org/x/crypto v0.10.0 - golang.org/x/exp v0.0.0-20230321023759-10a507213a29 + golang.org/x/exp v0.0.0-20230418202329-0354be287a23 golang.org/x/sync v0.2.0 golang.org/x/text v0.10.0 golang.org/x/tools v0.9.1 @@ -212,7 +213,6 @@ require ( github.com/ipfs/bbloom v0.0.4 // indirect github.com/ipfs/go-bitfield v1.1.0 // indirect github.com/ipfs/go-ds-badger2 v0.1.3 // indirect - github.com/ipfs/go-ds-leveldb v0.5.0 github.com/ipfs/go-ds-measure v0.2.0 // indirect github.com/ipfs/go-fs-lock v0.0.7 // indirect github.com/ipfs/go-ipfs-cmds v0.9.0 // indirect @@ -335,10 +335,11 @@ require ( require ( github.com/filecoin-project/boost-gfm v1.26.7 github.com/filecoin-project/boost-graphsync v0.13.6 + github.com/filecoin-project/go-data-segment v0.0.0-20230605095649-5d01fdd3e4a1 github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc7 - github.com/filecoin-project/go-fil-markets v1.28.3 github.com/filecoin-project/lotus v1.23.2-0.20230622154405-168d022018ce github.com/ipfs/boxo v0.10.1 + github.com/ipfs/go-ds-leveldb v0.5.0 github.com/ipfs/kubo v0.21.0-rc1 github.com/ipni/go-libipni v0.0.8 github.com/schollz/progressbar/v3 v3.13.1 diff --git a/go.sum b/go.sum index 902027ad7..af713f25d 100644 --- a/go.sum +++ b/go.sum @@ -363,6 +363,8 @@ github.com/filecoin-project/go-commp-utils/nonffi v0.0.0-20220905160352-62059082 github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ= github.com/filecoin-project/go-crypto v0.0.1 h1:AcvpSGGCgjaY8y1az6AMfKQWreF/pWO2JJGLl6gCq6o= github.com/filecoin-project/go-crypto v0.0.1/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ= +github.com/filecoin-project/go-data-segment v0.0.0-20230605095649-5d01fdd3e4a1 h1:yPNfH/4vjeLiip2N1QDN64kg+TFd9duY9kPIEKx2ujU= +github.com/filecoin-project/go-data-segment v0.0.0-20230605095649-5d01fdd3e4a1/go.mod h1:H0/NKbsRxmRFBcLibmABv+yFNHdmtl5AyplYLnb0Zv4= github.com/filecoin-project/go-data-transfer v1.15.4-boost h1:rGsPDeDk0nbzLOPn/9iCIrhLNy69Vkr9tRBcetM4kd0= github.com/filecoin-project/go-data-transfer v1.15.4-boost/go.mod h1:S5Es9uoD+3TveYyGjxZInAF6mSQtRjNzezV7Y7Sh8X0= github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc7 h1:v+zJS5B6pA3ptWZS4t8tbt1Hz9qENnN4nVr1w99aSWc= @@ -372,8 +374,8 @@ github.com/filecoin-project/go-ds-versioning v0.1.2/go.mod h1:C9/l9PnB1+mwPa26BB github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ= github.com/filecoin-project/go-fil-commcid v0.1.0 h1:3R4ds1A9r6cr8mvZBfMYxTS88OqLYEo6roi+GiIeOh8= github.com/filecoin-project/go-fil-commcid v0.1.0/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ= -github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 h1:imrrpZWEHRnNqqv0tN7LXep5bFEVOVmQWHJvl2mgsGo= -github.com/filecoin-project/go-fil-commp-hashhash v0.1.0/go.mod h1:73S8WSEWh9vr0fDJVnKADhfIv/d6dCbAGaAGWbdJEI8= +github.com/filecoin-project/go-fil-commp-hashhash v0.2.0 h1:HYIUugzjq78YvV3vC6rL95+SfC/aSTVSnZSZiDV5pCk= +github.com/filecoin-project/go-fil-commp-hashhash v0.2.0/go.mod h1:VH3fAFOru4yyWar4626IoS5+VGE8SfZiBODJLUigEo4= github.com/filecoin-project/go-fil-markets v1.28.3 h1:2cFu7tLZYrfNz4LnxjgERaVD7k5+Wwp0H76mnnTGPBk= github.com/filecoin-project/go-fil-markets v1.28.3/go.mod h1:eryxo/oVgIxaR5g5CNr9PlvZOi+u/bak0IsPL/PT1hk= github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM= @@ -2103,8 +2105,8 @@ golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EH golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= golang.org/x/exp v0.0.0-20210615023648-acb5c1269671/go.mod h1:DVyR6MI7P4kEQgvZJSj1fQGrWIi2RzIrfYWycwheUAc= golang.org/x/exp v0.0.0-20210714144626-1041f73d31d8/go.mod h1:DVyR6MI7P4kEQgvZJSj1fQGrWIi2RzIrfYWycwheUAc= -golang.org/x/exp v0.0.0-20230321023759-10a507213a29 h1:ooxPy7fPvB4kwsA2h+iBNHkAbp/4JxTSwCmvdjEYmug= -golang.org/x/exp v0.0.0-20230321023759-10a507213a29/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= +golang.org/x/exp v0.0.0-20230418202329-0354be287a23 h1:4NKENAGIctmZYLK9W+X1kDK8ObBFqOSCJM6WE7CvkJY= +golang.org/x/exp v0.0.0-20230418202329-0354be287a23/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= diff --git a/piecedirectory/piecedirectory.go b/piecedirectory/piecedirectory.go index b11f782a7..2a5730e93 100644 --- a/piecedirectory/piecedirectory.go +++ b/piecedirectory/piecedirectory.go @@ -14,6 +14,7 @@ import ( "github.com/filecoin-project/boostd-data/model" "github.com/filecoin-project/boostd-data/shared/tracing" bdtypes "github.com/filecoin-project/boostd-data/svc/types" + "github.com/filecoin-project/go-data-segment/datasegment" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/markets/dagstore" "github.com/hashicorp/go-multierror" @@ -200,13 +201,35 @@ func (ps *PieceDirectory) addIndexForPiece(ctx context.Context, pieceCid cid.Cid return fmt.Errorf("getting reader over piece %s: %w", pieceCid, err) } - // Iterate over all the blocks in the piece to extract the index records + // Try to parse data as containing a data segment index log.Debugw("add index: read index", "pieceCid", pieceCid) + recs, err := parseShardWithDataSegmentIndex(ctx, pieceCid, int64(dealInfo.PieceLength), reader) + if err != nil { + log.Debugw("add index: data segment check failed. falling back to car", "pieceCid", pieceCid, "err", err) + // Iterate over all the blocks in the piece to extract the index records + recs, err = parseRecordsFromCar(reader) + if err != nil { + return fmt.Errorf("for piece %s: %w", pieceCid, err) + } + } + + // Add mh => piece index to store: "which piece contains the multihash?" + // Add mh => offset index to store: "what is the offset of the multihash within the piece?" + log.Debugw("add index: store index in local index directory", "pieceCid", pieceCid) + if err := ps.store.AddIndex(ctx, pieceCid, recs, true); err != nil { + return fmt.Errorf("adding CAR index for piece %s: %w", pieceCid, err) + } + + return nil +} + +func parseRecordsFromCar(reader io.Reader) ([]model.Record, error) { + // Iterate over all the blocks in the piece to extract the index records recs := make([]model.Record, 0) opts := []carv2.Option{carv2.ZeroLengthSectionAsEOF(true)} blockReader, err := carv2.NewBlockReader(reader, opts...) if err != nil { - return fmt.Errorf("getting block reader over piece %s: %w", pieceCid, err) + return nil, fmt.Errorf("getting block reader over piece: %w", err) } blockMetadata, err := blockReader.SkipNext() @@ -222,17 +245,44 @@ func (ps *PieceDirectory) addIndexForPiece(ctx context.Context, pieceCid cid.Cid blockMetadata, err = blockReader.SkipNext() } if !errors.Is(err, io.EOF) { - return fmt.Errorf("generating index for piece %s: %w", pieceCid, err) + return nil, fmt.Errorf("generating index for piece: %w", err) + } + return recs, nil +} + +func parseShardWithDataSegmentIndex(ctx context.Context, pieceCid cid.Cid, size int64, r types.SectionReader) ([]model.Record, error) { + ps := abi.UnpaddedPieceSize(size).Padded() + dsis := datasegment.DataSegmentIndexStartOffset(ps) + if _, err := r.Seek(int64(dsis), io.SeekStart); err != nil { + return nil, fmt.Errorf("could not seek to data segment index: %w", err) + } + dataSegments, err := datasegment.ParseDataSegmentIndex(r) + if err != nil { + return nil, fmt.Errorf("could not parse data segment index: %w", err) + } + segments, err := dataSegments.ValidEntries() + if err != nil { + return nil, fmt.Errorf("could not calculate valid entries: %w", err) + } + if len(segments) == 0 { + return nil, fmt.Errorf("no data segments found") } - // Add mh => piece index to store: "which piece contains the multihash?" - // Add mh => offset index to store: "what is the offset of the multihash within the piece?" - log.Debugw("add index: store index in local index directory", "pieceCid", pieceCid) - if err := ps.store.AddIndex(ctx, pieceCid, recs, true); err != nil { - return fmt.Errorf("adding CAR index for piece %s: %w", pieceCid, err) + recs := make([]model.Record, 0) + for _, s := range segments { + segOffset := s.UnpaddedOffest() + segSize := s.UnpaddedLength() + + lr := io.NewSectionReader(r, int64(segOffset), int64(segSize)) + subRecs, err := parseRecordsFromCar(lr) + if err != nil { + log.Debugw("Unexpected index format on generation in shard", "piece", pieceCid, "offset", segOffset) + continue + } + recs = append(recs, subRecs...) } - return nil + return recs, nil } // BuildIndexForPiece builds indexes for a given piece CID. The piece must contain a valid deal