Skip to content

Commit

Permalink
TODO
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorropo committed Nov 18, 2023
1 parent b568a58 commit 2599f37
Showing 1 changed file with 23 additions and 6 deletions.
29 changes: 23 additions & 6 deletions unixfs/feather/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ type downloader struct {
client *Client
remainingAttempts uint
stream io.Closer
hasRetries bool
gotOneBlock bool
}

type Client struct {
Expand Down Expand Up @@ -119,12 +121,14 @@ func (client *Client) DownloadFile(c cid.Cid) (io.ReadCloser, error) {
state: []region{{c: normalizeCidv0(c)}},
buf: *bufio.NewReaderSize(nil, maxElementSize*2),
remainingAttempts: attempts,
hasRetries: client.retries != 0,
}

return d, nil
}

func (d *downloader) startStream(todo region) error {
d.gotOneBlock = false
req, err := http.NewRequest("GET", d.client.hostname+todo.c.String()+"?dag-scope=entity", bytes.NewReader(nil))
if err != nil {
return err
Expand Down Expand Up @@ -265,6 +269,9 @@ func (d *downloader) next(todo region) ([]byte, error) {
var errStartStream, errRead error
for {
if d.stream == nil {
if !d.hasRetries && errRead == io.EOF {
return nil, fmt.Errorf("gateway terminated too early, still want: %s", cidStringTruncate(c))
}
if attempts := d.remainingAttempts; attempts != math.MaxUint {
if attempts == 0 {
return nil, fmt.Errorf("could not download next block: %w", errors.Join(errRead, errStartStream))
Expand All @@ -286,10 +293,15 @@ func (d *downloader) next(todo region) ([]byte, error) {
// readBlockFromStream must perform hash verification on the input.
// The slice returned only has to be valid between two readBlockFromStream and Close calls.
// Implementations should reuse buffers to avoid allocations.
func (d *downloader) readBlockFromStream(expectedCid cid.Cid) ([]byte, error) {
func (d *downloader) readBlockFromStream(expectedCid cid.Cid) (_ []byte, rErr error) {
itemLenU, err := binary.ReadUvarint(&d.buf)
if err != nil {
switch err {
case io.EOF:
return nil, err
case nil:
break
default:
return nil, fmt.Errorf("reading next block length: %w", err)
}
if itemLenU > maxBlockSize+maxCidSize {
return nil, fmt.Errorf("item size (%d) for %s exceed maxBlockSize+maxCidSize (%d)", itemLenU, cidStringTruncate(expectedCid), maxBlockSize+maxCidSize)
Expand All @@ -298,6 +310,7 @@ func (d *downloader) readBlockFromStream(expectedCid cid.Cid) ([]byte, error) {

cidLen, cidFound, err := cid.CidFromReader(&d.buf)
if err != nil {
err = eofWouldBeUnexpected(err)
return nil, fmt.Errorf("trying to read %s failed to read cid: %w", cidStringTruncate(expectedCid), err)
}
if cidLen > maxCidSize {
Expand All @@ -314,10 +327,7 @@ func (d *downloader) readBlockFromStream(expectedCid cid.Cid) ([]byte, error) {
}
data, err := d.buf.Peek(blockSize)
if err != nil {
if err == io.EOF {
// don't show io.EOF in case peeking is too short
err = io.ErrUnexpectedEOF
}
err = eofWouldBeUnexpected(err)
return nil, fmt.Errorf("peeking at block data for %s verification: %w", cidStringTruncate(expectedCid), err)
}
_, err = d.buf.Discard(len(data))
Expand All @@ -338,6 +348,13 @@ func (d *downloader) readBlockFromStream(expectedCid cid.Cid) ([]byte, error) {
return data, nil
}

func eofWouldBeUnexpected(err error) error {
if err == io.EOF {
return io.ErrUnexpectedEOF
}
return err
}

func (d *downloader) Close() error {
if s := d.stream; s != nil {
d.stream = nil
Expand Down

0 comments on commit 2599f37

Please sign in to comment.