Skip to content

Commit

Permalink
Address slice issues
Browse files Browse the repository at this point in the history
Address issues with setting a zero sized slice for the resumable
download. Since we're explicit with download bits and we assume the
buffers can be re-used there is no reason we should not create the
buffers at their actual size.

This commit also cleans up some edge cases with the resume code
  • Loading branch information
tempusfrangit committed Jun 6, 2024
1 parent 97a8eb3 commit 7205752
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 6 deletions.
6 changes: 6 additions & 0 deletions pkg/download/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ func (m *BufferMode) Fetch(ctx context.Context, url string) (io.Reader, int64, e
contentLength := firstChunkResp.ContentLength
n, err := io.ReadFull(firstChunkResp.Body, buf[0:contentLength])
if err == io.ErrUnexpectedEOF {
logger.Warn().
Int("connection_interrupted_at_byte", n).
Msg("Resuming Chunk Download")
_, err = resumeDownload(firstChunkResp.Request, buf[n-1:], m.Client, int64(n))
}
firstChunk.Deliver(buf[0:n], err)
Expand Down Expand Up @@ -148,6 +151,9 @@ func (m *BufferMode) Fetch(ctx context.Context, url string) (io.Reader, int64, e
contentLength := resp.ContentLength
n, err := io.ReadFull(resp.Body, buf[0:contentLength])
if err == io.ErrUnexpectedEOF {
logger.Warn().
Int("connection_interrupted_at_byte", n).
Msg("Resuming Chunk Download")
_, err = resumeDownload(resp.Request, buf[n-1:], m.Client, int64(n))
}
chunk.Deliver(buf[0:n], err)
Expand Down
21 changes: 18 additions & 3 deletions pkg/download/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/dustin/go-humanize"

"github.com/replicate/pget/pkg/client"
"github.com/replicate/pget/pkg/logging"
)

const defaultChunkSize = 125 * humanize.MiByte
Expand All @@ -26,11 +27,17 @@ var (

func resumeDownload(req *http.Request, buffer []byte, client client.HTTPClient, bytesReceived int64) (*http.Response, error) {
var startByte int
logger := logging.GetLogger()

var resumeCount = 1
var initialBytesReceived = bytesReceived

for {
var n int
if err := updateRangeRequestHeader(req, bytesReceived); err != nil {
return nil, err
}

resp, err := client.Do(req)
if err != nil {
return nil, err
Expand All @@ -41,7 +48,14 @@ func resumeDownload(req *http.Request, buffer []byte, client client.HTTPClient,
}
n, err = io.ReadFull(resp.Body, buffer[startByte:])
if err == io.ErrUnexpectedEOF {
startByte = n
bytesReceived = int64(n)
startByte += n
resumeCount++
logger.Warn().
Int("connection_interrupted_at_byte", n).
Int("resume_count", resumeCount).
Int64("total_bytes_received", initialBytesReceived+int64(startByte)).
Msg("Resuming Chunk Download")
continue
}
return nil, err
Expand Down Expand Up @@ -77,11 +91,12 @@ func updateRangeRequestHeader(req *http.Request, receivedBytes int64) error {
}

start = start + receivedBytes
newRangeHeader := fmt.Sprintf("bytes=%d-%d", start, end)

if start > end {
return fmt.Errorf("%w: %s", errInvalidContentRange, rangeHeader)
return fmt.Errorf("%w: %s", errInvalidContentRange, newRangeHeader)
}

newRangeHeader := fmt.Sprintf("bytes=%d-%d", start, end)
req.Header.Set("Range", newRangeHeader)

return nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/download/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (
"io"
"net/http"
"net/http/httptest"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)

type mockHTTPClient struct {
Expand Down Expand Up @@ -94,7 +94,7 @@ func TestResumeDownload(t *testing.T) {
}
if tt.name == "multi-pass download" {
switch req.Header.Get("Range") {
case "bytes=16-19":
case "bytes=15-19":
return &http.Response{
StatusCode: http.StatusPartialContent,
Body: io.NopCloser(bytes.NewReader([]byte("56789"))),
Expand Down
6 changes: 6 additions & 0 deletions pkg/download/consistent_hashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ func (m *ConsistentHashingMode) Fetch(ctx context.Context, urlString string) (io
contentLength := firstChunkResp.ContentLength
n, err := io.ReadFull(firstChunkResp.Body, buf[0:contentLength])
if err == io.ErrUnexpectedEOF {
logger.Warn().
Int("connection_interrupted_at_byte", n).
Msg("Resuming Chunk Download")
_, err = resumeDownload(firstChunkResp.Request, buf[n-1:], m.Client, int64(n))
}
firstChunk.Deliver(buf[0:n], err)
Expand Down Expand Up @@ -225,6 +228,9 @@ func (m *ConsistentHashingMode) downloadRemainingChunks(ctx context.Context, url
contentLength := resp.ContentLength
n, err := io.ReadFull(resp.Body, buf[0:contentLength])
if err == io.ErrUnexpectedEOF {
logger.Warn().
Int("connection_interrupted_at_byte", n).
Msg("Resuming Chunk Download")
_, err = resumeDownload(resp.Request, buf[n-1:], m.Client, int64(n))
}
chunk.Deliver(buf[0:n], err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/download/work_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (q *priorityWorkQueue) submitHigh(w work) {

func (q *priorityWorkQueue) start() {
for i := 0; i < q.concurrency; i++ {
go q.run(make([]byte, 0, q.bufSize))
go q.run(make([]byte, q.bufSize))
}
}

Expand Down

0 comments on commit 7205752

Please sign in to comment.