Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed chunk size #173

Merged
merged 13 commits into from
Mar 6, 2024
4 changes: 2 additions & 2 deletions cmd/multifile/multifile.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func maxConcurrentFiles() int {
}

func multifileExecute(ctx context.Context, manifest pget.Manifest) error {
minChunkSize, err := humanize.ParseBytes(viper.GetString(config.OptMinimumChunkSize))
chunkSize, err := humanize.ParseBytes(viper.GetString(config.OptMinimumChunkSize))
if err != nil {
return err
}
Expand All @@ -119,7 +119,7 @@ func multifileExecute(ctx context.Context, manifest pget.Manifest) error {
}
downloadOpts := download.Options{
MaxConcurrency: viper.GetInt(config.OptConcurrency),
MinChunkSize: int64(minChunkSize),
ChunkSize: int64(chunkSize),
Client: clientOpts,
}
pgetOpts := pget.Options{
Expand Down
6 changes: 3 additions & 3 deletions cmd/root/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func persistentFlags(cmd *cobra.Command) error {
cmd.PersistentFlags().IntVar(&concurrency, config.OptMaxChunks, runtime.GOMAXPROCS(0)*4, "Maximum number of chunks for a given file")
cmd.PersistentFlags().Duration(config.OptConnTimeout, 5*time.Second, "Timeout for establishing a connection, format is <number><unit>, e.g. 10s")
cmd.PersistentFlags().StringVarP(&chunkSize, config.OptChunkSize, "m", "125M", "Chunk size (in bytes) to use when downloading a file (e.g. 10M)")
cmd.PersistentFlags().StringVar(&chunkSize, config.OptMinimumChunkSize, "16M", "Minimum chunk size (in bytes) to use when downloading a file (e.g. 10M)")
cmd.PersistentFlags().StringVar(&chunkSize, config.OptMinimumChunkSize, "125M", "Minimum chunk size (in bytes) to use when downloading a file (e.g. 10M)")
cmd.PersistentFlags().BoolP(config.OptForce, "f", false, "OptForce download, overwriting existing file")
cmd.PersistentFlags().StringSlice(config.OptResolve, []string{}, "OptResolve hostnames to specific IPs")
cmd.PersistentFlags().IntP(config.OptRetries, "r", 5, "Number of retries when attempting to retrieve a file")
Expand Down Expand Up @@ -202,7 +202,7 @@ func runRootCMD(cmd *cobra.Command, args []string) error {
// rootExecute is the main function of the program and encapsulates the general logic
// returns any/all errors to the caller.
func rootExecute(ctx context.Context, urlString, dest string) error {
minChunkSize, err := humanize.ParseBytes(viper.GetString(config.OptMinimumChunkSize))
chunkSize, err := humanize.ParseBytes(viper.GetString(config.OptMinimumChunkSize))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
chunkSize, err := humanize.ParseBytes(viper.GetString(config.OptMinimumChunkSize))
chunkSize, err := humanize.ParseBytes(viper.GetString(config.OptChunkSize))

We should still check MinimumChunkSize and adjust ChunkSize upwards to satisfy it if it's set.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not clear why we are bothering to keep min chunk size. It seems... odd? Let's just specify the chunk size we want. Having two controls for the same thing is how we got into the mess of too many ways to control concurrency.

If someone wants a chunk size of 1byte... it's silly but it's fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because pget is already deployed to many models which set it

if they set min chunk size to 500MiB we should respect that and not have a 125MiB chunk size.

Copy link
Member

@tempusfrangit tempusfrangit Mar 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had wired up minchunk size and chunk size to the same value behind the scenes so they should be able to be set either flag but with --minchunk size deprecated.

The exception is in prod where our value would continue to override (since we set with the config map).

The only caveat is if you specified both, the last one processed wins.

I see that I may not have been clear about that change. Totally on me and I'll be better call those things out in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, yes I missed that. However:

  • we should not refer to the deprecated name on this line, so my suggestion above still stands
  • we wire both args up to the chunkSize variable but we never read that variable so it's not obvious to me that it matters (instead we fetch with viper). I don't have a good mental model for what happens? I'm not sure why we bind a variable if we're never reading it?
  • pflag has docs on how to alias flag names, maybe we should do that instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The aliasing is a bit weird with viper involved. Generally using an XXXVarP type instead of XXXP in cobra is the easiest way to alias as it backs to a pointer. I'm really ok either way. Your suggestions do absolutely apply in either case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we have viper, again? is it so we can configure via environment variable? it's all a bit magic, I don't understand it, and i don't find the documentation helpful in the detail (like this variable aliasing thing, I don't think the documentation defines the behaviour here).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Viper is for the ENV binding to the flags. It's a lot of magic

Variable aliasing results in the last processed element winning i'll mkae sure the arguments cannot be both used. It should address the concerns. The ENV bits is a bit more work I'll make sure it's also clear (or at least documented) behavior

if err != nil {
return fmt.Errorf("error parsing minimum chunk size: %w", err)
}
Expand All @@ -223,7 +223,7 @@ func rootExecute(ctx context.Context, urlString, dest string) error {

downloadOpts := download.Options{
MaxConcurrency: viper.GetInt(config.OptConcurrency),
MinChunkSize: int64(minChunkSize),
ChunkSize: int64(chunkSize),
Client: clientOpts,
}

Expand Down
25 changes: 11 additions & 14 deletions pkg/download/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ func GetBufferMode(opts Options) *BufferMode {
}
}

func (m *BufferMode) minChunkSize() int64 {
minChunkSize := m.MinChunkSize
func (m *BufferMode) chunkSize() int64 {
minChunkSize := m.ChunkSize
if minChunkSize == 0 {
return defaultMinChunkSize
return defaultChunkSize
}
return minChunkSize
}
Expand All @@ -61,14 +61,14 @@ type firstReqResult struct {
func (m *BufferMode) Fetch(ctx context.Context, url string) (io.Reader, int64, error) {
logger := logging.GetLogger()

br := newBufferedReader(m.minChunkSize())
br := newBufferedReader(m.chunkSize())

firstReqResultCh := make(chan firstReqResult)
m.queue.submit(func() {
m.sem.Go(func() error {
defer close(firstReqResultCh)
defer br.done()
firstChunkResp, err := m.DoRequest(ctx, 0, m.minChunkSize()-1, url)
firstChunkResp, err := m.DoRequest(ctx, 0, m.chunkSize()-1, url)
if err != nil {
br.err = err
firstReqResultCh <- firstReqResult{err: err}
Expand Down Expand Up @@ -105,25 +105,22 @@ func (m *BufferMode) Fetch(ctx context.Context, url string) (io.Reader, int64, e
fileSize := firstReqResult.fileSize
trueURL := firstReqResult.trueURL

if fileSize <= m.minChunkSize() {
if fileSize <= m.chunkSize() {
// we only need a single chunk: just download it and finish
return br, fileSize, nil
}

remainingBytes := fileSize - m.minChunkSize()
numChunks := int(remainingBytes / m.minChunkSize())
remainingBytes := fileSize - m.chunkSize()
numChunks := int(remainingBytes / m.chunkSize())
philandstuff marked this conversation as resolved.
Show resolved Hide resolved
// Number of chunks can never be 0
if numChunks <= 0 {
numChunks = 1
}
if numChunks > m.maxConcurrency() {
numChunks = m.maxConcurrency()
}

readersCh := make(chan io.Reader, m.maxConcurrency()+1)
readersCh := make(chan io.Reader, numChunks+1)
readersCh <- br

startOffset := m.minChunkSize()
startOffset := m.chunkSize()

chunkSize := remainingBytes / int64(numChunks)
if chunkSize < 0 {
Expand All @@ -146,7 +143,7 @@ func (m *BufferMode) Fetch(ctx context.Context, url string) (io.Reader, int64, e
end = fileSize - 1
}

br := newBufferedReader(end - start + 1)
br := newBufferedReader(m.chunkSize())
readersCh <- br

m.sem.Go(func() error {
Expand Down
38 changes: 19 additions & 19 deletions pkg/download/buffer_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,64 +52,64 @@ func TestFileToBufferChunkCountExceedsMaxChunks(t *testing.T) {
opts := Options{
Client: client.Options{},
}
// Ensure that the math generally works out as such for this test case where minChunkSize is < 0.5* contentSize
// (contentSize - minChunkSize) / minChunkSize < maxChunks
// Ensure that the math generally works out as such for this test case where chunkSize is < 0.5* contentSize
// (contentSize - chunkSize) / chunkSize < maxChunks
// This ensures that we're always testing the case where the number of chunks exceeds the maxChunks
// Additional cases added to validate various cases where the final chunk is less than minChunkSize
// Additional cases added to validate various cases where the final chunk is less than chunkSize
tc := []struct {
name string
maxConcurrency int
minChunkSize int64
chunkSize int64
}{
// In these first cases we will never have more than 2 chunks as the minChunkSize is greater than 0.5*contentSize
// In these first cases we will never have more than 2 chunks as the chunkSize is greater than 0.5*contentSize
{
name: "minChunkSize greater than contentSize",
minChunkSize: contentSize + 1,
name: "chunkSize greater than contentSize",
chunkSize: contentSize + 1,
maxConcurrency: 1,
},
{
name: "minChunkSize equal to contentSize",
minChunkSize: contentSize,
name: "chunkSize equal to contentSize",
chunkSize: contentSize,
maxConcurrency: 1,
},
{
name: "minChunkSize less than contentSize",
minChunkSize: contentSize - 1,
name: "chunkSize less than contentSize",
chunkSize: contentSize - 1,
maxConcurrency: 2,
},
{
name: "minChunkSize is 3/4 contentSize",
minChunkSize: int64(float64(contentSize) * 0.75),
name: "chunkSize is 3/4 contentSize",
chunkSize: int64(float64(contentSize) * 0.75),
maxConcurrency: 2,
},
{
// This is an exceptional case where we only need a single additional chunk beyond the default "get content size"
// request.
name: "minChunkSize is 1/2 contentSize",
minChunkSize: int64(float64(contentSize) * 0.5),
name: "chunkSize is 1/2 contentSize",
chunkSize: int64(float64(contentSize) * 0.5),
maxConcurrency: 2,
},
// These test cases cover a few scenarios of downloading where the maxChunks will force a re-calculation of
// the chunkSize to ensure that we don't exceed the maxChunks.
{
// remainder will result in 3 chunks, max-chunks is 2
name: "minChunkSize is 1/4 contentSize",
minChunkSize: int64(float64(contentSize) * 0.25),
name: "chunkSize is 1/4 contentSize",
chunkSize: int64(float64(contentSize) * 0.25),
maxConcurrency: 2,
},
{
// humanize.KByte = 1024, remainder will result in 1024/10 = 102 chunks, max-chunks is set to 25
// resulting in a chunkSize of 41
name: "many chunks, low maxChunks",
minChunkSize: 10,
chunkSize: 10,
maxConcurrency: 25,
},
}

for _, tc := range tc {
t.Run(tc.name, func(t *testing.T) {
opts.MaxConcurrency = tc.maxConcurrency
opts.MinChunkSize = tc.minChunkSize
opts.ChunkSize = tc.chunkSize
bufferMode := GetBufferMode(opts)
path, _ := url.JoinPath(server.URL, testFilePath)
download, size, err := bufferMode.Fetch(context.Background(), path)
Expand Down
4 changes: 2 additions & 2 deletions pkg/download/common.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package download

import (
"github.com/dustin/go-humanize"
"regexp"

"github.com/dustin/go-humanize"
)

const defaultMinChunkSize = 16 * humanize.MiByte
const defaultChunkSize = 125 * humanize.MiByte

var contentRangeRegexp = regexp.MustCompile(`^bytes .*/([0-9]+)$`)
14 changes: 7 additions & 7 deletions pkg/download/consistent_hashing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func TestConsistentHashingPathBased(t *testing.T) {
opts := download.Options{
Client: client.Options{},
MaxConcurrency: tc.concurrency,
MinChunkSize: tc.chunkSize,
ChunkSize: tc.chunkSize,
CacheHosts: hostnames[0:tc.numCacheHosts],
CacheableURIPrefixes: makeCacheableURIPrefixes(fmt.Sprintf("http://%s", hostname)),
CacheUsePathProxy: true,
Expand Down Expand Up @@ -336,7 +336,7 @@ func TestConsistentHashRetries(t *testing.T) {
opts := download.Options{
Client: client.Options{Transport: mockTransport},
MaxConcurrency: 8,
MinChunkSize: 1,
ChunkSize: 1,
CacheHosts: hostnames,
CacheableURIPrefixes: makeCacheableURIPrefixes("http://fake.replicate.delivery"),
SliceSize: 1,
Expand Down Expand Up @@ -371,7 +371,7 @@ func TestConsistentHashRetriesMissingHostname(t *testing.T) {
Transport: mockTransport,
},
MaxConcurrency: 8,
MinChunkSize: 1,
ChunkSize: 1,
CacheHosts: hostnames,
CacheableURIPrefixes: makeCacheableURIPrefixes("http://fake.replicate.delivery"),
SliceSize: 1,
Expand Down Expand Up @@ -405,7 +405,7 @@ func TestConsistentHashRetriesTwoHosts(t *testing.T) {
opts := download.Options{
Client: client.Options{Transport: mockTransport},
MaxConcurrency: 8,
MinChunkSize: 1,
ChunkSize: 1,
CacheHosts: hostnames,
CacheableURIPrefixes: makeCacheableURIPrefixes("http://testing.replicate.delivery"),
SliceSize: 1,
Expand All @@ -432,7 +432,7 @@ func TestConsistentHashingHasFallback(t *testing.T) {
opts := download.Options{
Client: client.Options{Transport: mockTransport},
MaxConcurrency: 8,
MinChunkSize: 2,
ChunkSize: 2,
CacheHosts: []string{""}, // simulate a single unavailable cache host
CacheableURIPrefixes: makeCacheableURIPrefixes("http://fake.replicate.delivery"),
SliceSize: 3,
Expand Down Expand Up @@ -525,7 +525,7 @@ func TestConsistentHashingFileFallback(t *testing.T) {
opts := download.Options{
Client: client.Options{},
MaxConcurrency: 8,
MinChunkSize: 2,
ChunkSize: 2,
CacheHosts: []string{url.Host},
CacheableURIPrefixes: makeCacheableURIPrefixes("http://fake.replicate.delivery"),
SliceSize: 3,
Expand Down Expand Up @@ -587,7 +587,7 @@ func TestConsistentHashingChunkFallback(t *testing.T) {
opts := download.Options{
Client: client.Options{},
MaxConcurrency: 8,
MinChunkSize: 3,
ChunkSize: 3,
CacheHosts: []string{url.Host},
CacheableURIPrefixes: makeCacheableURIPrefixes("http://fake.replicate.delivery"),
SliceSize: 3,
Expand Down
6 changes: 0 additions & 6 deletions pkg/download/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,6 @@ type Options struct {
// See https://nginx.org/en/docs/http/ngx_http_slice_module.html
SliceSize int64

// Minimum number of bytes per chunk. If set to zero, 16 MiB will be
// used.
//
// Deprecated: use ChunkSize instead
MinChunkSize int64

// Number of bytes per chunk. If set to zero, 125 MiB will be used.
ChunkSize int64

Expand Down