From efe1ac02dda727d22258752c1e88a7c42c976168 Mon Sep 17 00:00:00 2001 From: Morgan Fainberg Date: Mon, 4 Mar 2024 12:04:31 -0800 Subject: [PATCH] Update Buffer to use ChunkSize Update Buffer downloader to use ChunkSize instead of MinChunkSize. Options have been modified to only contain ChunkSize now as the root and multifile commands no longer reference MinChunkSize. This change is in support of moving towards simplifying the chunksize logic. Additionally this will make reuse of buffers significantly easier as the next phase. --- cmd/multifile/multifile.go | 4 +-- cmd/root/root.go | 6 ++-- pkg/download/buffer.go | 25 +++++++--------- pkg/download/buffer_unit_test.go | 38 ++++++++++++------------- pkg/download/common.go | 4 +-- pkg/download/consistent_hashing_test.go | 14 ++++----- pkg/download/options.go | 6 ---- 7 files changed, 44 insertions(+), 53 deletions(-) diff --git a/cmd/multifile/multifile.go b/cmd/multifile/multifile.go index 0cb461e..0fc5d0b 100644 --- a/cmd/multifile/multifile.go +++ b/cmd/multifile/multifile.go @@ -97,7 +97,7 @@ func maxConcurrentFiles() int { } func multifileExecute(ctx context.Context, manifest pget.Manifest) error { - minChunkSize, err := humanize.ParseBytes(viper.GetString(config.OptMinimumChunkSize)) + chunkSize, err := humanize.ParseBytes(viper.GetString(config.OptMinimumChunkSize)) if err != nil { return err } @@ -119,7 +119,7 @@ func multifileExecute(ctx context.Context, manifest pget.Manifest) error { } downloadOpts := download.Options{ MaxConcurrency: viper.GetInt(config.OptConcurrency), - MinChunkSize: int64(minChunkSize), + ChunkSize: int64(chunkSize), Client: clientOpts, } pgetOpts := pget.Options{ diff --git a/cmd/root/root.go b/cmd/root/root.go index 00466bf..40a4294 100644 --- a/cmd/root/root.go +++ b/cmd/root/root.go @@ -126,7 +126,7 @@ func persistentFlags(cmd *cobra.Command) error { cmd.PersistentFlags().IntVar(&concurrency, config.OptMaxChunks, runtime.GOMAXPROCS(0)*4, "Maximum number of chunks for a given file") cmd.PersistentFlags().Duration(config.OptConnTimeout, 5*time.Second, "Timeout for establishing a connection, format is , e.g. 10s") cmd.PersistentFlags().StringVarP(&chunkSize, config.OptChunkSize, "m", "125M", "Chunk size (in bytes) to use when downloading a file (e.g. 10M)") - cmd.PersistentFlags().StringVar(&chunkSize, config.OptMinimumChunkSize, "16M", "Minimum chunk size (in bytes) to use when downloading a file (e.g. 10M)") + cmd.PersistentFlags().StringVar(&chunkSize, config.OptMinimumChunkSize, "125M", "Minimum chunk size (in bytes) to use when downloading a file (e.g. 10M)") cmd.PersistentFlags().BoolP(config.OptForce, "f", false, "OptForce download, overwriting existing file") cmd.PersistentFlags().StringSlice(config.OptResolve, []string{}, "OptResolve hostnames to specific IPs") cmd.PersistentFlags().IntP(config.OptRetries, "r", 5, "Number of retries when attempting to retrieve a file") @@ -202,7 +202,7 @@ func runRootCMD(cmd *cobra.Command, args []string) error { // rootExecute is the main function of the program and encapsulates the general logic // returns any/all errors to the caller. func rootExecute(ctx context.Context, urlString, dest string) error { - minChunkSize, err := humanize.ParseBytes(viper.GetString(config.OptMinimumChunkSize)) + chunkSize, err := humanize.ParseBytes(viper.GetString(config.OptMinimumChunkSize)) if err != nil { return fmt.Errorf("error parsing minimum chunk size: %w", err) } @@ -223,7 +223,7 @@ func rootExecute(ctx context.Context, urlString, dest string) error { downloadOpts := download.Options{ MaxConcurrency: viper.GetInt(config.OptConcurrency), - MinChunkSize: int64(minChunkSize), + ChunkSize: int64(chunkSize), Client: clientOpts, } diff --git a/pkg/download/buffer.go b/pkg/download/buffer.go index c75e7ed..f1da310 100644 --- a/pkg/download/buffer.go +++ b/pkg/download/buffer.go @@ -36,10 +36,10 @@ func GetBufferMode(opts Options) *BufferMode { } } -func (m *BufferMode) minChunkSize() int64 { - minChunkSize := m.MinChunkSize +func (m *BufferMode) chunkSize() int64 { + minChunkSize := m.ChunkSize if minChunkSize == 0 { - return defaultMinChunkSize + return defaultChunkSize } return minChunkSize } @@ -61,14 +61,14 @@ type firstReqResult struct { func (m *BufferMode) Fetch(ctx context.Context, url string) (io.Reader, int64, error) { logger := logging.GetLogger() - br := newBufferedReader(m.minChunkSize()) + br := newBufferedReader(m.chunkSize()) firstReqResultCh := make(chan firstReqResult) m.queue.submit(func() { m.sem.Go(func() error { defer close(firstReqResultCh) defer br.done() - firstChunkResp, err := m.DoRequest(ctx, 0, m.minChunkSize()-1, url) + firstChunkResp, err := m.DoRequest(ctx, 0, m.chunkSize()-1, url) if err != nil { br.err = err firstReqResultCh <- firstReqResult{err: err} @@ -105,25 +105,22 @@ func (m *BufferMode) Fetch(ctx context.Context, url string) (io.Reader, int64, e fileSize := firstReqResult.fileSize trueURL := firstReqResult.trueURL - if fileSize <= m.minChunkSize() { + if fileSize <= m.chunkSize() { // we only need a single chunk: just download it and finish return br, fileSize, nil } - remainingBytes := fileSize - m.minChunkSize() - numChunks := int(remainingBytes / m.minChunkSize()) + remainingBytes := fileSize - m.chunkSize() + numChunks := int(remainingBytes / m.chunkSize()) // Number of chunks can never be 0 if numChunks <= 0 { numChunks = 1 } - if numChunks > m.maxConcurrency() { - numChunks = m.maxConcurrency() - } - readersCh := make(chan io.Reader, m.maxConcurrency()+1) + readersCh := make(chan io.Reader, numChunks+1) readersCh <- br - startOffset := m.minChunkSize() + startOffset := m.chunkSize() chunkSize := remainingBytes / int64(numChunks) if chunkSize < 0 { @@ -146,7 +143,7 @@ func (m *BufferMode) Fetch(ctx context.Context, url string) (io.Reader, int64, e end = fileSize - 1 } - br := newBufferedReader(end - start + 1) + br := newBufferedReader(m.chunkSize()) readersCh <- br m.sem.Go(func() error { diff --git a/pkg/download/buffer_unit_test.go b/pkg/download/buffer_unit_test.go index e9b7d00..2c94ecc 100644 --- a/pkg/download/buffer_unit_test.go +++ b/pkg/download/buffer_unit_test.go @@ -52,56 +52,56 @@ func TestFileToBufferChunkCountExceedsMaxChunks(t *testing.T) { opts := Options{ Client: client.Options{}, } - // Ensure that the math generally works out as such for this test case where minChunkSize is < 0.5* contentSize - // (contentSize - minChunkSize) / minChunkSize < maxChunks + // Ensure that the math generally works out as such for this test case where chunkSize is < 0.5* contentSize + // (contentSize - chunkSize) / chunkSize < maxChunks // This ensures that we're always testing the case where the number of chunks exceeds the maxChunks - // Additional cases added to validate various cases where the final chunk is less than minChunkSize + // Additional cases added to validate various cases where the final chunk is less than chunkSize tc := []struct { name string maxConcurrency int - minChunkSize int64 + chunkSize int64 }{ - // In these first cases we will never have more than 2 chunks as the minChunkSize is greater than 0.5*contentSize + // In these first cases we will never have more than 2 chunks as the chunkSize is greater than 0.5*contentSize { - name: "minChunkSize greater than contentSize", - minChunkSize: contentSize + 1, + name: "chunkSize greater than contentSize", + chunkSize: contentSize + 1, maxConcurrency: 1, }, { - name: "minChunkSize equal to contentSize", - minChunkSize: contentSize, + name: "chunkSize equal to contentSize", + chunkSize: contentSize, maxConcurrency: 1, }, { - name: "minChunkSize less than contentSize", - minChunkSize: contentSize - 1, + name: "chunkSize less than contentSize", + chunkSize: contentSize - 1, maxConcurrency: 2, }, { - name: "minChunkSize is 3/4 contentSize", - minChunkSize: int64(float64(contentSize) * 0.75), + name: "chunkSize is 3/4 contentSize", + chunkSize: int64(float64(contentSize) * 0.75), maxConcurrency: 2, }, { // This is an exceptional case where we only need a single additional chunk beyond the default "get content size" // request. - name: "minChunkSize is 1/2 contentSize", - minChunkSize: int64(float64(contentSize) * 0.5), + name: "chunkSize is 1/2 contentSize", + chunkSize: int64(float64(contentSize) * 0.5), maxConcurrency: 2, }, // These test cases cover a few scenarios of downloading where the maxChunks will force a re-calculation of // the chunkSize to ensure that we don't exceed the maxChunks. { // remainder will result in 3 chunks, max-chunks is 2 - name: "minChunkSize is 1/4 contentSize", - minChunkSize: int64(float64(contentSize) * 0.25), + name: "chunkSize is 1/4 contentSize", + chunkSize: int64(float64(contentSize) * 0.25), maxConcurrency: 2, }, { // humanize.KByte = 1024, remainder will result in 1024/10 = 102 chunks, max-chunks is set to 25 // resulting in a chunkSize of 41 name: "many chunks, low maxChunks", - minChunkSize: 10, + chunkSize: 10, maxConcurrency: 25, }, } @@ -109,7 +109,7 @@ func TestFileToBufferChunkCountExceedsMaxChunks(t *testing.T) { for _, tc := range tc { t.Run(tc.name, func(t *testing.T) { opts.MaxConcurrency = tc.maxConcurrency - opts.MinChunkSize = tc.minChunkSize + opts.ChunkSize = tc.chunkSize bufferMode := GetBufferMode(opts) path, _ := url.JoinPath(server.URL, testFilePath) download, size, err := bufferMode.Fetch(context.Background(), path) diff --git a/pkg/download/common.go b/pkg/download/common.go index b66c257..0814df6 100644 --- a/pkg/download/common.go +++ b/pkg/download/common.go @@ -1,11 +1,11 @@ package download import ( - "github.com/dustin/go-humanize" "regexp" + + "github.com/dustin/go-humanize" ) -const defaultMinChunkSize = 16 * humanize.MiByte const defaultChunkSize = 125 * humanize.MiByte var contentRangeRegexp = regexp.MustCompile(`^bytes .*/([0-9]+)$`) diff --git a/pkg/download/consistent_hashing_test.go b/pkg/download/consistent_hashing_test.go index c71d8cb..2751971 100644 --- a/pkg/download/consistent_hashing_test.go +++ b/pkg/download/consistent_hashing_test.go @@ -303,7 +303,7 @@ func TestConsistentHashingPathBased(t *testing.T) { opts := download.Options{ Client: client.Options{}, MaxConcurrency: tc.concurrency, - MinChunkSize: tc.chunkSize, + ChunkSize: tc.chunkSize, CacheHosts: hostnames[0:tc.numCacheHosts], CacheableURIPrefixes: makeCacheableURIPrefixes(fmt.Sprintf("http://%s", hostname)), CacheUsePathProxy: true, @@ -336,7 +336,7 @@ func TestConsistentHashRetries(t *testing.T) { opts := download.Options{ Client: client.Options{Transport: mockTransport}, MaxConcurrency: 8, - MinChunkSize: 1, + ChunkSize: 1, CacheHosts: hostnames, CacheableURIPrefixes: makeCacheableURIPrefixes("http://fake.replicate.delivery"), SliceSize: 1, @@ -371,7 +371,7 @@ func TestConsistentHashRetriesMissingHostname(t *testing.T) { Transport: mockTransport, }, MaxConcurrency: 8, - MinChunkSize: 1, + ChunkSize: 1, CacheHosts: hostnames, CacheableURIPrefixes: makeCacheableURIPrefixes("http://fake.replicate.delivery"), SliceSize: 1, @@ -405,7 +405,7 @@ func TestConsistentHashRetriesTwoHosts(t *testing.T) { opts := download.Options{ Client: client.Options{Transport: mockTransport}, MaxConcurrency: 8, - MinChunkSize: 1, + ChunkSize: 1, CacheHosts: hostnames, CacheableURIPrefixes: makeCacheableURIPrefixes("http://testing.replicate.delivery"), SliceSize: 1, @@ -432,7 +432,7 @@ func TestConsistentHashingHasFallback(t *testing.T) { opts := download.Options{ Client: client.Options{Transport: mockTransport}, MaxConcurrency: 8, - MinChunkSize: 2, + ChunkSize: 2, CacheHosts: []string{""}, // simulate a single unavailable cache host CacheableURIPrefixes: makeCacheableURIPrefixes("http://fake.replicate.delivery"), SliceSize: 3, @@ -525,7 +525,7 @@ func TestConsistentHashingFileFallback(t *testing.T) { opts := download.Options{ Client: client.Options{}, MaxConcurrency: 8, - MinChunkSize: 2, + ChunkSize: 2, CacheHosts: []string{url.Host}, CacheableURIPrefixes: makeCacheableURIPrefixes("http://fake.replicate.delivery"), SliceSize: 3, @@ -587,7 +587,7 @@ func TestConsistentHashingChunkFallback(t *testing.T) { opts := download.Options{ Client: client.Options{}, MaxConcurrency: 8, - MinChunkSize: 3, + ChunkSize: 3, CacheHosts: []string{url.Host}, CacheableURIPrefixes: makeCacheableURIPrefixes("http://fake.replicate.delivery"), SliceSize: 3, diff --git a/pkg/download/options.go b/pkg/download/options.go index f9b5ff1..ed994d5 100644 --- a/pkg/download/options.go +++ b/pkg/download/options.go @@ -16,12 +16,6 @@ type Options struct { // See https://nginx.org/en/docs/http/ngx_http_slice_module.html SliceSize int64 - // Minimum number of bytes per chunk. If set to zero, 16 MiB will be - // used. - // - // Deprecated: use ChunkSize instead - MinChunkSize int64 - // Number of bytes per chunk. If set to zero, 125 MiB will be used. ChunkSize int64