-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enhanced MultiReader #168
Enhanced MultiReader #168
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ package download | |
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"io" | ||
"net/http" | ||
|
@@ -14,6 +15,7 @@ import ( | |
|
||
"github.com/replicate/pget/pkg/client" | ||
"github.com/replicate/pget/pkg/logging" | ||
"github.com/replicate/pget/pkg/multireader" | ||
) | ||
|
||
const defaultMinChunkSize = 16 * humanize.MiByte | ||
|
@@ -65,19 +67,34 @@ type firstReqResult struct { | |
err error | ||
} | ||
|
||
func readBody(reader *multireader.BufferedReader, resp *http.Response) error { | ||
expectedBytes := resp.ContentLength | ||
_ = reader.SetSize(expectedBytes) | ||
n, err := reader.ReadFrom(resp.Body) | ||
if errors.Is(err, io.EOF) { | ||
reader.Err = fmt.Errorf("error reading response for %s: %w", resp.Request.URL.String(), err) | ||
return reader.Err | ||
} | ||
if n != expectedBytes { | ||
reader.Err = fmt.Errorf("downloaded %d bytes instead of %d for %s", n, expectedBytes, resp.Request.URL.String()) | ||
return reader.Err | ||
} | ||
return nil | ||
} | ||
|
||
func (m *BufferMode) Fetch(ctx context.Context, url string) (io.Reader, int64, error) { | ||
logger := logging.GetLogger() | ||
|
||
br := newBufferedReader(m.minChunkSize()) | ||
br := multireader.NewBufferedReader(m.minChunkSize()) | ||
|
||
firstReqResultCh := make(chan firstReqResult) | ||
m.queue.submit(func() { | ||
m.sem.Go(func() error { | ||
defer close(firstReqResultCh) | ||
defer br.done() | ||
defer br.Done() | ||
firstChunkResp, err := m.DoRequest(ctx, 0, m.minChunkSize()-1, url) | ||
if err != nil { | ||
br.err = err | ||
br.Err = err | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm confused by this. Are you using the |
||
firstReqResultCh <- firstReqResult{err: err} | ||
return err | ||
} | ||
|
@@ -95,8 +112,7 @@ func (m *BufferMode) Fetch(ctx context.Context, url string) (io.Reader, int64, e | |
return err | ||
} | ||
firstReqResultCh <- firstReqResult{fileSize: fileSize, trueURL: trueURL} | ||
|
||
return br.downloadBody(firstChunkResp) | ||
return readBody(br, firstChunkResp) | ||
}) | ||
}) | ||
|
||
|
@@ -127,7 +143,7 @@ func (m *BufferMode) Fetch(ctx context.Context, url string) (io.Reader, int64, e | |
numChunks = m.maxConcurrency() | ||
} | ||
|
||
readersCh := make(chan io.Reader, m.maxConcurrency()+1) | ||
readersCh := make(chan *multireader.BufferedReader, m.maxConcurrency()+1) | ||
readersCh <- br | ||
|
||
startOffset := m.minChunkSize() | ||
|
@@ -153,23 +169,23 @@ func (m *BufferMode) Fetch(ctx context.Context, url string) (io.Reader, int64, e | |
end = fileSize - 1 | ||
} | ||
|
||
br := newBufferedReader(end - start + 1) | ||
br := multireader.NewBufferedReader(end - start + 1) | ||
readersCh <- br | ||
|
||
m.sem.Go(func() error { | ||
defer br.done() | ||
defer br.Done() | ||
resp, err := m.DoRequest(ctx, start, end, trueURL) | ||
if err != nil { | ||
br.err = err | ||
br.Err = err | ||
return err | ||
} | ||
defer resp.Body.Close() | ||
return br.downloadBody(resp) | ||
return readBody(br, resp) | ||
}) | ||
} | ||
}) | ||
|
||
return newChanMultiReader(readersCh), fileSize, nil | ||
return multireader.NewMultiReader(readersCh), fileSize, nil | ||
} | ||
|
||
func (m *BufferMode) DoRequest(ctx context.Context, start, end int64, trueURL string) (*http.Response, error) { | ||
|
This file was deleted.
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I must be missing some important context. Why do we need our own custom type/implementation if this is what we're doing with it. This appears to be reading the entire body into a
bytes.Buffer
in one go and then checking the length. At that point how is it different from:(By the by,
BufferedReader.ReadFrom
effectively delegates tobytes.Buffer.ReadFrom
, which means it cannot returnio.EOF
.)(Oh, and this code also just completely ignores the case where
err != nil
, which doesn't seem safe.)