Skip to content

Commit

Permalink
Make sure we use the correct sized buffer
Browse files Browse the repository at this point in the history
The buffer may be smaller, lean on content length to ensure we're sizing
the buffer correctly in the deliver and resume calls.
  • Loading branch information
tempusfrangit committed Jun 6, 2024
1 parent edf9621 commit 17a0361
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 10 deletions.
4 changes: 2 additions & 2 deletions pkg/download/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (m *BufferMode) Fetch(ctx context.Context, url string) (io.Reader, int64, e
logger.Warn().
Int("connection_interrupted_at_byte", n).
Msg("Resuming Chunk Download")
_, err = resumeDownload(firstChunkResp.Request, buf[n:], m.Client, int64(n))
n, err = resumeDownload(firstChunkResp.Request, buf[n:contentLength], m.Client, int64(n))
}
firstChunk.Deliver(buf[0:n], err)
})
Expand Down Expand Up @@ -154,7 +154,7 @@ func (m *BufferMode) Fetch(ctx context.Context, url string) (io.Reader, int64, e
logger.Warn().
Int("connection_interrupted_at_byte", n).
Msg("Resuming Chunk Download")
_, err = resumeDownload(resp.Request, buf[n:], m.Client, int64(n))
n, err = resumeDownload(resp.Request, buf[n:contentLength], m.Client, int64(n))
}
chunk.Deliver(buf[0:n], err)
})
Expand Down
12 changes: 7 additions & 5 deletions pkg/download/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,30 @@ var (
errInvalidContentRange = errors.New("invalid content range")
)

func resumeDownload(req *http.Request, buffer []byte, client client.HTTPClient, bytesReceived int64) (*http.Response, error) {
func resumeDownload(req *http.Request, buffer []byte, client client.HTTPClient, bytesReceived int64) (int, error) {
var startByte int
logger := logging.GetLogger()

var resumeCount = 1
var initialBytesReceived = bytesReceived
var totalBytesReceived = bytesReceived

for {
var n int
if err := updateRangeRequestHeader(req, bytesReceived); err != nil {
return nil, err
return int(totalBytesReceived), err
}

resp, err := client.Do(req)
if err != nil {
return nil, err
return int(totalBytesReceived), err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusPartialContent {
return nil, fmt.Errorf("expected status code %d, got %d", http.StatusPartialContent, resp.StatusCode)
return int(totalBytesReceived), fmt.Errorf("expected status code %d, got %d", http.StatusPartialContent, resp.StatusCode)
}
n, err = io.ReadFull(resp.Body, buffer[startByte:])
totalBytesReceived += int64(n)
if err == io.ErrUnexpectedEOF {
bytesReceived = int64(n)
startByte += n
Expand All @@ -58,7 +60,7 @@ func resumeDownload(req *http.Request, buffer []byte, client client.HTTPClient,
Msg("Resuming Chunk Download")
continue
}
return nil, err
return int(totalBytesReceived), err

}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/download/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,13 @@ func TestResumeDownload(t *testing.T) {
},
}

_, err = resumeDownload(req, buffer[tt.bytesReceived:], mockClient, tt.bytesReceived)
totalBytesReceived, err := resumeDownload(req, buffer[tt.bytesReceived:], mockClient, tt.bytesReceived)
if tt.expectedError != nil {
assert.Error(t, err)
assert.Equal(t, tt.expectedError.Error(), err.Error())
} else {
assert.NoError(t, err)
assert.Equal(t, len(tt.expectedOutput), totalBytesReceived)
assert.Equal(t, tt.expectedOutput, buffer[:len(tt.expectedOutput)])
}
assert.Equal(t, tt.expectedCalls, mockClient.callCount.Load(), "Unexpected number of HTTP client calls")
Expand Down
4 changes: 2 additions & 2 deletions pkg/download/consistent_hashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (m *ConsistentHashingMode) Fetch(ctx context.Context, urlString string) (io
logger.Warn().
Int("connection_interrupted_at_byte", n).
Msg("Resuming Chunk Download")
_, err = resumeDownload(firstChunkResp.Request, buf[n:], m.Client, int64(n))
_, err = resumeDownload(firstChunkResp.Request, buf[n:contentLength], m.Client, int64(n))
}
firstChunk.Deliver(buf[0:n], err)
})
Expand Down Expand Up @@ -231,7 +231,7 @@ func (m *ConsistentHashingMode) downloadRemainingChunks(ctx context.Context, url
logger.Warn().
Int("connection_interrupted_at_byte", n).
Msg("Resuming Chunk Download")
_, err = resumeDownload(resp.Request, buf[n:], m.Client, int64(n))
_, err = resumeDownload(resp.Request, buf[n:contentLength], m.Client, int64(n))
}
chunk.Deliver(buf[0:n], err)
})
Expand Down

0 comments on commit 17a0361

Please sign in to comment.