From 2599f37a0c1a8df492b439413be77a3cabe699e7 Mon Sep 17 00:00:00 2001 From: Jorropo Date: Sat, 18 Nov 2023 15:54:34 +0300 Subject: [PATCH] TODO --- unixfs/feather/entry.go | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/unixfs/feather/entry.go b/unixfs/feather/entry.go index 93dd813be..f87a44f39 100644 --- a/unixfs/feather/entry.go +++ b/unixfs/feather/entry.go @@ -47,6 +47,8 @@ type downloader struct { client *Client remainingAttempts uint stream io.Closer + hasRetries bool + gotOneBlock bool } type Client struct { @@ -119,12 +121,14 @@ func (client *Client) DownloadFile(c cid.Cid) (io.ReadCloser, error) { state: []region{{c: normalizeCidv0(c)}}, buf: *bufio.NewReaderSize(nil, maxElementSize*2), remainingAttempts: attempts, + hasRetries: client.retries != 0, } return d, nil } func (d *downloader) startStream(todo region) error { + d.gotOneBlock = false req, err := http.NewRequest("GET", d.client.hostname+todo.c.String()+"?dag-scope=entity", bytes.NewReader(nil)) if err != nil { return err @@ -265,6 +269,9 @@ func (d *downloader) next(todo region) ([]byte, error) { var errStartStream, errRead error for { if d.stream == nil { + if !d.hasRetries && errRead == io.EOF { + return nil, fmt.Errorf("gateway terminated too early, still want: %s", cidStringTruncate(c)) + } if attempts := d.remainingAttempts; attempts != math.MaxUint { if attempts == 0 { return nil, fmt.Errorf("could not download next block: %w", errors.Join(errRead, errStartStream)) @@ -286,10 +293,15 @@ func (d *downloader) next(todo region) ([]byte, error) { // readBlockFromStream must perform hash verification on the input. // The slice returned only has to be valid between two readBlockFromStream and Close calls. // Implementations should reuse buffers to avoid allocations. -func (d *downloader) readBlockFromStream(expectedCid cid.Cid) ([]byte, error) { +func (d *downloader) readBlockFromStream(expectedCid cid.Cid) (_ []byte, rErr error) { itemLenU, err := binary.ReadUvarint(&d.buf) - if err != nil { + switch err { + case io.EOF: return nil, err + case nil: + break + default: + return nil, fmt.Errorf("reading next block length: %w", err) } if itemLenU > maxBlockSize+maxCidSize { return nil, fmt.Errorf("item size (%d) for %s exceed maxBlockSize+maxCidSize (%d)", itemLenU, cidStringTruncate(expectedCid), maxBlockSize+maxCidSize) @@ -298,6 +310,7 @@ func (d *downloader) readBlockFromStream(expectedCid cid.Cid) ([]byte, error) { cidLen, cidFound, err := cid.CidFromReader(&d.buf) if err != nil { + err = eofWouldBeUnexpected(err) return nil, fmt.Errorf("trying to read %s failed to read cid: %w", cidStringTruncate(expectedCid), err) } if cidLen > maxCidSize { @@ -314,10 +327,7 @@ func (d *downloader) readBlockFromStream(expectedCid cid.Cid) ([]byte, error) { } data, err := d.buf.Peek(blockSize) if err != nil { - if err == io.EOF { - // don't show io.EOF in case peeking is too short - err = io.ErrUnexpectedEOF - } + err = eofWouldBeUnexpected(err) return nil, fmt.Errorf("peeking at block data for %s verification: %w", cidStringTruncate(expectedCid), err) } _, err = d.buf.Discard(len(data)) @@ -338,6 +348,13 @@ func (d *downloader) readBlockFromStream(expectedCid cid.Cid) ([]byte, error) { return data, nil } +func eofWouldBeUnexpected(err error) error { + if err == io.EOF { + return io.ErrUnexpectedEOF + } + return err +} + func (d *downloader) Close() error { if s := d.stream; s != nil { d.stream = nil