diff --git a/pkg/download/buffer.go b/pkg/download/buffer.go index b3b0af5..84b5137 100644 --- a/pkg/download/buffer.go +++ b/pkg/download/buffer.go @@ -82,6 +82,9 @@ func (m *BufferMode) Fetch(ctx context.Context, url string) (io.Reader, int64, e contentLength := firstChunkResp.ContentLength n, err := io.ReadFull(firstChunkResp.Body, buf[0:contentLength]) if err == io.ErrUnexpectedEOF { + logger.Warn(). + Int("connection_interrupted_at_byte", n). + Msg("Resuming Chunk Download") _, err = resumeDownload(firstChunkResp.Request, buf[n-1:], m.Client, int64(n)) } firstChunk.Deliver(buf[0:n], err) @@ -148,6 +151,9 @@ func (m *BufferMode) Fetch(ctx context.Context, url string) (io.Reader, int64, e contentLength := resp.ContentLength n, err := io.ReadFull(resp.Body, buf[0:contentLength]) if err == io.ErrUnexpectedEOF { + logger.Warn(). + Int("connection_interrupted_at_byte", n). + Msg("Resuming Chunk Download") _, err = resumeDownload(resp.Request, buf[n-1:], m.Client, int64(n)) } chunk.Deliver(buf[0:n], err) diff --git a/pkg/download/common.go b/pkg/download/common.go index ac2fc55..79b57f4 100644 --- a/pkg/download/common.go +++ b/pkg/download/common.go @@ -12,6 +12,7 @@ import ( "github.com/dustin/go-humanize" "github.com/replicate/pget/pkg/client" + "github.com/replicate/pget/pkg/logging" ) const defaultChunkSize = 125 * humanize.MiByte @@ -26,11 +27,17 @@ var ( func resumeDownload(req *http.Request, buffer []byte, client client.HTTPClient, bytesReceived int64) (*http.Response, error) { var startByte int + logger := logging.GetLogger() + + var resumeCount = 1 + var initialBytesReceived = bytesReceived + for { var n int if err := updateRangeRequestHeader(req, bytesReceived); err != nil { return nil, err } + resp, err := client.Do(req) if err != nil { return nil, err @@ -41,7 +48,14 @@ func resumeDownload(req *http.Request, buffer []byte, client client.HTTPClient, } n, err = io.ReadFull(resp.Body, buffer[startByte:]) if err == io.ErrUnexpectedEOF { - startByte = n + bytesReceived = int64(n) + startByte += n + resumeCount++ + logger.Warn(). + Int("connection_interrupted_at_byte", n). + Int("resume_count", resumeCount). + Int64("total_bytes_received", initialBytesReceived+int64(startByte)). + Msg("Resuming Chunk Download") continue } return nil, err @@ -77,11 +91,12 @@ func updateRangeRequestHeader(req *http.Request, receivedBytes int64) error { } start = start + receivedBytes + newRangeHeader := fmt.Sprintf("bytes=%d-%d", start, end) + if start > end { - return fmt.Errorf("%w: %s", errInvalidContentRange, rangeHeader) + return fmt.Errorf("%w: %s", errInvalidContentRange, newRangeHeader) } - newRangeHeader := fmt.Sprintf("bytes=%d-%d", start, end) req.Header.Set("Range", newRangeHeader) return nil diff --git a/pkg/download/common_test.go b/pkg/download/common_test.go index ac50b78..1643fc2 100644 --- a/pkg/download/common_test.go +++ b/pkg/download/common_test.go @@ -6,12 +6,12 @@ import ( "io" "net/http" "net/http/httptest" + "sync/atomic" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/atomic" ) type mockHTTPClient struct { @@ -94,7 +94,7 @@ func TestResumeDownload(t *testing.T) { } if tt.name == "multi-pass download" { switch req.Header.Get("Range") { - case "bytes=16-19": + case "bytes=15-19": return &http.Response{ StatusCode: http.StatusPartialContent, Body: io.NopCloser(bytes.NewReader([]byte("56789"))), diff --git a/pkg/download/consistent_hashing.go b/pkg/download/consistent_hashing.go index 8dca9d0..63a074e 100644 --- a/pkg/download/consistent_hashing.go +++ b/pkg/download/consistent_hashing.go @@ -117,6 +117,9 @@ func (m *ConsistentHashingMode) Fetch(ctx context.Context, urlString string) (io contentLength := firstChunkResp.ContentLength n, err := io.ReadFull(firstChunkResp.Body, buf[0:contentLength]) if err == io.ErrUnexpectedEOF { + logger.Warn(). + Int("connection_interrupted_at_byte", n). + Msg("Resuming Chunk Download") _, err = resumeDownload(firstChunkResp.Request, buf[n-1:], m.Client, int64(n)) } firstChunk.Deliver(buf[0:n], err) @@ -225,6 +228,9 @@ func (m *ConsistentHashingMode) downloadRemainingChunks(ctx context.Context, url contentLength := resp.ContentLength n, err := io.ReadFull(resp.Body, buf[0:contentLength]) if err == io.ErrUnexpectedEOF { + logger.Warn(). + Int("connection_interrupted_at_byte", n). + Msg("Resuming Chunk Download") _, err = resumeDownload(resp.Request, buf[n-1:], m.Client, int64(n)) } chunk.Deliver(buf[0:n], err) diff --git a/pkg/download/work_queue.go b/pkg/download/work_queue.go index ca9c7e2..3956284 100644 --- a/pkg/download/work_queue.go +++ b/pkg/download/work_queue.go @@ -33,7 +33,7 @@ func (q *priorityWorkQueue) submitHigh(w work) { func (q *priorityWorkQueue) start() { for i := 0; i < q.concurrency; i++ { - go q.run(make([]byte, 0, q.bufSize)) + go q.run(make([]byte, q.bufSize)) } }