Skip to content

Commit

Permalink
Update Buffer to use ChunkSize
Browse files Browse the repository at this point in the history
Update Buffer downloader to use ChunkSize instead of MinChunkSize.
Options have been modified to only contain ChunkSize now as the root and
multifile commands no longer reference MinChunkSize.

This change is in support of moving towards simplifying the chunksize
logic. Additionally this will make reuse of buffers significantly easier
as the next phase.
  • Loading branch information
tempusfrangit committed Mar 4, 2024
1 parent d700109 commit efe1ac0
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 53 deletions.
4 changes: 2 additions & 2 deletions cmd/multifile/multifile.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func maxConcurrentFiles() int {
}

func multifileExecute(ctx context.Context, manifest pget.Manifest) error {
minChunkSize, err := humanize.ParseBytes(viper.GetString(config.OptMinimumChunkSize))
chunkSize, err := humanize.ParseBytes(viper.GetString(config.OptMinimumChunkSize))
if err != nil {
return err
}
Expand All @@ -119,7 +119,7 @@ func multifileExecute(ctx context.Context, manifest pget.Manifest) error {
}
downloadOpts := download.Options{
MaxConcurrency: viper.GetInt(config.OptConcurrency),
MinChunkSize: int64(minChunkSize),
ChunkSize: int64(chunkSize),
Client: clientOpts,
}
pgetOpts := pget.Options{
Expand Down
6 changes: 3 additions & 3 deletions cmd/root/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func persistentFlags(cmd *cobra.Command) error {
cmd.PersistentFlags().IntVar(&concurrency, config.OptMaxChunks, runtime.GOMAXPROCS(0)*4, "Maximum number of chunks for a given file")
cmd.PersistentFlags().Duration(config.OptConnTimeout, 5*time.Second, "Timeout for establishing a connection, format is <number><unit>, e.g. 10s")
cmd.PersistentFlags().StringVarP(&chunkSize, config.OptChunkSize, "m", "125M", "Chunk size (in bytes) to use when downloading a file (e.g. 10M)")
cmd.PersistentFlags().StringVar(&chunkSize, config.OptMinimumChunkSize, "16M", "Minimum chunk size (in bytes) to use when downloading a file (e.g. 10M)")
cmd.PersistentFlags().StringVar(&chunkSize, config.OptMinimumChunkSize, "125M", "Minimum chunk size (in bytes) to use when downloading a file (e.g. 10M)")
cmd.PersistentFlags().BoolP(config.OptForce, "f", false, "OptForce download, overwriting existing file")
cmd.PersistentFlags().StringSlice(config.OptResolve, []string{}, "OptResolve hostnames to specific IPs")
cmd.PersistentFlags().IntP(config.OptRetries, "r", 5, "Number of retries when attempting to retrieve a file")
Expand Down Expand Up @@ -202,7 +202,7 @@ func runRootCMD(cmd *cobra.Command, args []string) error {
// rootExecute is the main function of the program and encapsulates the general logic
// returns any/all errors to the caller.
func rootExecute(ctx context.Context, urlString, dest string) error {
minChunkSize, err := humanize.ParseBytes(viper.GetString(config.OptMinimumChunkSize))
chunkSize, err := humanize.ParseBytes(viper.GetString(config.OptMinimumChunkSize))
if err != nil {
return fmt.Errorf("error parsing minimum chunk size: %w", err)
}
Expand All @@ -223,7 +223,7 @@ func rootExecute(ctx context.Context, urlString, dest string) error {

downloadOpts := download.Options{
MaxConcurrency: viper.GetInt(config.OptConcurrency),
MinChunkSize: int64(minChunkSize),
ChunkSize: int64(chunkSize),
Client: clientOpts,
}

Expand Down
25 changes: 11 additions & 14 deletions pkg/download/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ func GetBufferMode(opts Options) *BufferMode {
}
}

func (m *BufferMode) minChunkSize() int64 {
minChunkSize := m.MinChunkSize
func (m *BufferMode) chunkSize() int64 {
minChunkSize := m.ChunkSize
if minChunkSize == 0 {
return defaultMinChunkSize
return defaultChunkSize
}
return minChunkSize
}
Expand All @@ -61,14 +61,14 @@ type firstReqResult struct {
func (m *BufferMode) Fetch(ctx context.Context, url string) (io.Reader, int64, error) {
logger := logging.GetLogger()

br := newBufferedReader(m.minChunkSize())
br := newBufferedReader(m.chunkSize())

firstReqResultCh := make(chan firstReqResult)
m.queue.submit(func() {
m.sem.Go(func() error {
defer close(firstReqResultCh)
defer br.done()
firstChunkResp, err := m.DoRequest(ctx, 0, m.minChunkSize()-1, url)
firstChunkResp, err := m.DoRequest(ctx, 0, m.chunkSize()-1, url)
if err != nil {
br.err = err
firstReqResultCh <- firstReqResult{err: err}
Expand Down Expand Up @@ -105,25 +105,22 @@ func (m *BufferMode) Fetch(ctx context.Context, url string) (io.Reader, int64, e
fileSize := firstReqResult.fileSize
trueURL := firstReqResult.trueURL

if fileSize <= m.minChunkSize() {
if fileSize <= m.chunkSize() {
// we only need a single chunk: just download it and finish
return br, fileSize, nil
}

remainingBytes := fileSize - m.minChunkSize()
numChunks := int(remainingBytes / m.minChunkSize())
remainingBytes := fileSize - m.chunkSize()
numChunks := int(remainingBytes / m.chunkSize())
// Number of chunks can never be 0
if numChunks <= 0 {
numChunks = 1
}
if numChunks > m.maxConcurrency() {
numChunks = m.maxConcurrency()
}

readersCh := make(chan io.Reader, m.maxConcurrency()+1)
readersCh := make(chan io.Reader, numChunks+1)
readersCh <- br

startOffset := m.minChunkSize()
startOffset := m.chunkSize()

chunkSize := remainingBytes / int64(numChunks)
if chunkSize < 0 {
Expand All @@ -146,7 +143,7 @@ func (m *BufferMode) Fetch(ctx context.Context, url string) (io.Reader, int64, e
end = fileSize - 1
}

br := newBufferedReader(end - start + 1)
br := newBufferedReader(m.chunkSize())
readersCh <- br

m.sem.Go(func() error {
Expand Down
38 changes: 19 additions & 19 deletions pkg/download/buffer_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,64 +52,64 @@ func TestFileToBufferChunkCountExceedsMaxChunks(t *testing.T) {
opts := Options{
Client: client.Options{},
}
// Ensure that the math generally works out as such for this test case where minChunkSize is < 0.5* contentSize
// (contentSize - minChunkSize) / minChunkSize < maxChunks
// Ensure that the math generally works out as such for this test case where chunkSize is < 0.5* contentSize
// (contentSize - chunkSize) / chunkSize < maxChunks
// This ensures that we're always testing the case where the number of chunks exceeds the maxChunks
// Additional cases added to validate various cases where the final chunk is less than minChunkSize
// Additional cases added to validate various cases where the final chunk is less than chunkSize
tc := []struct {
name string
maxConcurrency int
minChunkSize int64
chunkSize int64
}{
// In these first cases we will never have more than 2 chunks as the minChunkSize is greater than 0.5*contentSize
// In these first cases we will never have more than 2 chunks as the chunkSize is greater than 0.5*contentSize
{
name: "minChunkSize greater than contentSize",
minChunkSize: contentSize + 1,
name: "chunkSize greater than contentSize",
chunkSize: contentSize + 1,
maxConcurrency: 1,
},
{
name: "minChunkSize equal to contentSize",
minChunkSize: contentSize,
name: "chunkSize equal to contentSize",
chunkSize: contentSize,
maxConcurrency: 1,
},
{
name: "minChunkSize less than contentSize",
minChunkSize: contentSize - 1,
name: "chunkSize less than contentSize",
chunkSize: contentSize - 1,
maxConcurrency: 2,
},
{
name: "minChunkSize is 3/4 contentSize",
minChunkSize: int64(float64(contentSize) * 0.75),
name: "chunkSize is 3/4 contentSize",
chunkSize: int64(float64(contentSize) * 0.75),
maxConcurrency: 2,
},
{
// This is an exceptional case where we only need a single additional chunk beyond the default "get content size"
// request.
name: "minChunkSize is 1/2 contentSize",
minChunkSize: int64(float64(contentSize) * 0.5),
name: "chunkSize is 1/2 contentSize",
chunkSize: int64(float64(contentSize) * 0.5),
maxConcurrency: 2,
},
// These test cases cover a few scenarios of downloading where the maxChunks will force a re-calculation of
// the chunkSize to ensure that we don't exceed the maxChunks.
{
// remainder will result in 3 chunks, max-chunks is 2
name: "minChunkSize is 1/4 contentSize",
minChunkSize: int64(float64(contentSize) * 0.25),
name: "chunkSize is 1/4 contentSize",
chunkSize: int64(float64(contentSize) * 0.25),
maxConcurrency: 2,
},
{
// humanize.KByte = 1024, remainder will result in 1024/10 = 102 chunks, max-chunks is set to 25
// resulting in a chunkSize of 41
name: "many chunks, low maxChunks",
minChunkSize: 10,
chunkSize: 10,
maxConcurrency: 25,
},
}

for _, tc := range tc {
t.Run(tc.name, func(t *testing.T) {
opts.MaxConcurrency = tc.maxConcurrency
opts.MinChunkSize = tc.minChunkSize
opts.ChunkSize = tc.chunkSize
bufferMode := GetBufferMode(opts)
path, _ := url.JoinPath(server.URL, testFilePath)
download, size, err := bufferMode.Fetch(context.Background(), path)
Expand Down
4 changes: 2 additions & 2 deletions pkg/download/common.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package download

import (
"github.com/dustin/go-humanize"
"regexp"

"github.com/dustin/go-humanize"
)

const defaultMinChunkSize = 16 * humanize.MiByte
const defaultChunkSize = 125 * humanize.MiByte

var contentRangeRegexp = regexp.MustCompile(`^bytes .*/([0-9]+)$`)
14 changes: 7 additions & 7 deletions pkg/download/consistent_hashing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func TestConsistentHashingPathBased(t *testing.T) {
opts := download.Options{
Client: client.Options{},
MaxConcurrency: tc.concurrency,
MinChunkSize: tc.chunkSize,
ChunkSize: tc.chunkSize,
CacheHosts: hostnames[0:tc.numCacheHosts],
CacheableURIPrefixes: makeCacheableURIPrefixes(fmt.Sprintf("http://%s", hostname)),
CacheUsePathProxy: true,
Expand Down Expand Up @@ -336,7 +336,7 @@ func TestConsistentHashRetries(t *testing.T) {
opts := download.Options{
Client: client.Options{Transport: mockTransport},
MaxConcurrency: 8,
MinChunkSize: 1,
ChunkSize: 1,
CacheHosts: hostnames,
CacheableURIPrefixes: makeCacheableURIPrefixes("http://fake.replicate.delivery"),
SliceSize: 1,
Expand Down Expand Up @@ -371,7 +371,7 @@ func TestConsistentHashRetriesMissingHostname(t *testing.T) {
Transport: mockTransport,
},
MaxConcurrency: 8,
MinChunkSize: 1,
ChunkSize: 1,
CacheHosts: hostnames,
CacheableURIPrefixes: makeCacheableURIPrefixes("http://fake.replicate.delivery"),
SliceSize: 1,
Expand Down Expand Up @@ -405,7 +405,7 @@ func TestConsistentHashRetriesTwoHosts(t *testing.T) {
opts := download.Options{
Client: client.Options{Transport: mockTransport},
MaxConcurrency: 8,
MinChunkSize: 1,
ChunkSize: 1,
CacheHosts: hostnames,
CacheableURIPrefixes: makeCacheableURIPrefixes("http://testing.replicate.delivery"),
SliceSize: 1,
Expand All @@ -432,7 +432,7 @@ func TestConsistentHashingHasFallback(t *testing.T) {
opts := download.Options{
Client: client.Options{Transport: mockTransport},
MaxConcurrency: 8,
MinChunkSize: 2,
ChunkSize: 2,
CacheHosts: []string{""}, // simulate a single unavailable cache host
CacheableURIPrefixes: makeCacheableURIPrefixes("http://fake.replicate.delivery"),
SliceSize: 3,
Expand Down Expand Up @@ -525,7 +525,7 @@ func TestConsistentHashingFileFallback(t *testing.T) {
opts := download.Options{
Client: client.Options{},
MaxConcurrency: 8,
MinChunkSize: 2,
ChunkSize: 2,
CacheHosts: []string{url.Host},
CacheableURIPrefixes: makeCacheableURIPrefixes("http://fake.replicate.delivery"),
SliceSize: 3,
Expand Down Expand Up @@ -587,7 +587,7 @@ func TestConsistentHashingChunkFallback(t *testing.T) {
opts := download.Options{
Client: client.Options{},
MaxConcurrency: 8,
MinChunkSize: 3,
ChunkSize: 3,
CacheHosts: []string{url.Host},
CacheableURIPrefixes: makeCacheableURIPrefixes("http://fake.replicate.delivery"),
SliceSize: 3,
Expand Down
6 changes: 0 additions & 6 deletions pkg/download/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,6 @@ type Options struct {
// See https://nginx.org/en/docs/http/ngx_http_slice_module.html
SliceSize int64

// Minimum number of bytes per chunk. If set to zero, 16 MiB will be
// used.
//
// Deprecated: use ChunkSize instead
MinChunkSize int64

// Number of bytes per chunk. If set to zero, 125 MiB will be used.
ChunkSize int64

Expand Down

0 comments on commit efe1ac0

Please sign in to comment.