Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed chunk size #173

Merged
merged 13 commits into from
Mar 6, 2024
20 changes: 12 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ https://example.com/music.mp3 /local/path/to/music.mp3
- Type `Integer`

### Global Command-Line Options
- `--max-chunks`
- Maximum number of chunks for downloading a given file
- Type: `Integer`
- Default: `4 * runtime.NumCPU()`
- `--concurrency`
- Maximum number of chunks to download in parallel for a given file
- Type: `Integer`
- Default: `4 * runtime.NumCPU()`
- `--connect-timeout`
- Timeout for establishing a connection, format is <number><unit>, e.g. 10s
- Type: `Duration`
Expand All @@ -116,10 +116,10 @@ https://example.com/music.mp3 /local/path/to/music.mp3
- Log level (debug, info, warn, error)
- Type: `string`
- Default: `info`
- `-m`, `--minimum-chunk-size string`
- Minimum chunk size (in bytes) to use when downloading a file (e.g. 10M)
- `-m`, `--chunk-size string`
- Chunk size (in bytes) to use when downloading a file (e.g. 10M)
- Type: `string`
- Default: `16M`
- Default: `125M`
- `--resolve`
- Resolve hostnames to specific IPs, can be specified multiple times, format <hostname>:<port>:<ip> (e.g. example.com:443:127.0.0.1)
- Type: `string
Expand All @@ -133,10 +133,14 @@ https://example.com/music.mp3 /local/path/to/music.mp3
- Default: `false`

#### Deprecated
- `--concurrency` (deprecated, use `--max-chunks` instead)
- `--max-chunks` (deprecated, use `--concurrency` instead)
- Maximum number of chunks for downloading a given file
- Type: `Integer`
- Default: `4 * runtime.NumCPU()`
- `-m`, `--minimum-chunk-size string` (deprecated, use `--chunk-size` instead)
- Minimum chunk size (in bytes) to use when downloading a file (e.g. 10M)
- Type: `string`
- Default: `16M`

## Error Handling

Expand Down
4 changes: 2 additions & 2 deletions cmd/multifile/multifile.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func maxConcurrentFiles() int {
}

func multifileExecute(ctx context.Context, manifest pget.Manifest) error {
minChunkSize, err := humanize.ParseBytes(viper.GetString(config.OptMinimumChunkSize))
chunkSize, err := humanize.ParseBytes(viper.GetString(config.OptChunkSize))
if err != nil {
return err
}
Expand All @@ -119,7 +119,7 @@ func multifileExecute(ctx context.Context, manifest pget.Manifest) error {
}
downloadOpts := download.Options{
MaxConcurrency: viper.GetInt(config.OptConcurrency),
MinChunkSize: int64(minChunkSize),
ChunkSize: int64(chunkSize),
Client: clientOpts,
}
pgetOpts := pget.Options{
Expand Down
63 changes: 46 additions & 17 deletions cmd/root/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/replicate/pget/pkg/client"
"github.com/replicate/pget/pkg/config"
"github.com/replicate/pget/pkg/download"
"github.com/replicate/pget/pkg/logging"
)

const rootLongDesc = `
Expand All @@ -42,6 +43,9 @@ efficient file extractor, providing a streamlined solution for fetching and unpa

var concurrency int
var pidFile *cli.PIDFile
var chunkSize string

const chunkSizeDefault = "125M"

func GetCommand() *cobra.Command {
cmd := &cobra.Command{
Expand All @@ -50,7 +54,6 @@ func GetCommand() *cobra.Command {
Long: rootLongDesc,
PersistentPreRunE: rootPersistentPreRunEFunc,
PersistentPostRunE: rootPersistentPostRunEFunc,
PreRun: rootCmdPreRun,
RunE: runRootCMD,
Args: validateArgs,
Example: ` pget https://example.com/file.tar ./target-dir`,
Expand All @@ -67,6 +70,7 @@ func GetCommand() *cobra.Command {
fmt.Println(err)
os.Exit(1)
}

err = viper.BindPFlags(cmd.Flags())
if err != nil {
fmt.Println(err)
Expand Down Expand Up @@ -101,6 +105,7 @@ func pidFlock(pidFilePath string) error {
}

func rootPersistentPreRunEFunc(cmd *cobra.Command, args []string) error {
logger := logging.GetLogger()
if err := config.PersistentStartupProcessFlags(); err != nil {
return err
}
Expand All @@ -109,6 +114,38 @@ func rootPersistentPreRunEFunc(cmd *cobra.Command, args []string) error {
return err
}
}

// Handle chunk size flags (deprecation and overwriting where needed)
//
// Expected Behavior for chunk size flags:
// * If either cli option is set, use that value
// * If both are set, emit an error
// * If neither are set, use ENV values
// ** If PGET_CHUNK_SIZE is set, use that value
// ** If PGET_CHUNK_SIZE is not set, use PGET_MINIMUM_CHUNK_SIZE if set
// NOTE: PGET_MINIMUM_CHUNK_SIZE value is just set over the key for PGET_CHUNK_SIZE
// Warning message will be emitted
// ** If both PGET_CHUNK_SIZE and PGET_MINIMUM_CHUNK_SIZE are set, use PGET_CHUNK_SIZE
// Warning message will be emitted
// * If neither are set, use the default value

changedMin := cmd.PersistentFlags().Changed(config.OptMinimumChunkSize)
changedChunk := cmd.PersistentFlags().Changed(config.OptChunkSize)
if changedMin && changedChunk {
return fmt.Errorf("--minimum-chunk-size and --chunk-size cannot be used at the same time, use --chunk-size instead")
} else if !(changedMin && changedChunk) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
} else if !(changedMin && changedChunk) {
} else {

in this house, we accept the principal of the excluded middle, so this condition is unnecessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in fact, because the if block returns, we don't need an else at all

minChunkSizeEnv := viper.GetString(config.OptMinimumChunkSize)
chunkSizeEnv := viper.GetString(config.OptChunkSize)
if minChunkSizeEnv != chunkSizeDefault {
if chunkSizeEnv == chunkSizeDefault {
logger.Warn().Msg("Using PGET_MINIMUM_CHUNK_SIZE is deprecated, use PGET_CHUNK_SIZE instead")
viper.Set(config.OptChunkSize, minChunkSizeEnv)
} else {
logger.Warn().Msg("Both PGET_MINIMUM_CHUNK_SIZE and PGET_CHUNK_SIZE are set, using PGET_CHUNK_SIZE")
}
}
}
philandstuff marked this conversation as resolved.
Show resolved Hide resolved

return nil
}

Expand All @@ -124,7 +161,8 @@ func persistentFlags(cmd *cobra.Command) error {
cmd.PersistentFlags().IntVarP(&concurrency, config.OptConcurrency, "c", runtime.GOMAXPROCS(0)*4, "Maximum number of concurrent downloads/maximum number of chunks for a given file")
cmd.PersistentFlags().IntVar(&concurrency, config.OptMaxChunks, runtime.GOMAXPROCS(0)*4, "Maximum number of chunks for a given file")
cmd.PersistentFlags().Duration(config.OptConnTimeout, 5*time.Second, "Timeout for establishing a connection, format is <number><unit>, e.g. 10s")
cmd.PersistentFlags().StringP(config.OptMinimumChunkSize, "m", "16M", "Minimum chunk size (in bytes) to use when downloading a file (e.g. 10M)")
cmd.PersistentFlags().StringVarP(&chunkSize, config.OptChunkSize, "m", chunkSizeDefault, "Chunk size (in bytes) to use when downloading a file (e.g. 10M)")
cmd.PersistentFlags().StringVar(&chunkSize, config.OptMinimumChunkSize, chunkSizeDefault, "Minimum chunk size (in bytes) to use when downloading a file (e.g. 10M)")
cmd.PersistentFlags().BoolP(config.OptForce, "f", false, "OptForce download, overwriting existing file")
cmd.PersistentFlags().StringSlice(config.OptResolve, []string{}, "OptResolve hostnames to specific IPs")
cmd.PersistentFlags().IntP(config.OptRetries, "r", 5, "Number of retries when attempting to retrieve a file")
Expand All @@ -135,10 +173,6 @@ func persistentFlags(cmd *cobra.Command) error {
cmd.PersistentFlags().StringP(config.OptOutputConsumer, "o", "file", "Output Consumer (file, tar, null)")
cmd.PersistentFlags().String(config.OptPIDFile, defaultPidFilePath(), "PID file path")

if err := config.AddFlagAlias(cmd, config.OptConcurrency, config.OptMaxChunks); err != nil {
return err
}

if err := hideAndDeprecateFlags(cmd); err != nil {
return err
}
Expand All @@ -154,7 +188,8 @@ func hideAndDeprecateFlags(cmd *cobra.Command) error {

// DeprecatedFlag flags
err := config.DeprecateFlags(cmd,
config.DeprecatedFlag{Flag: config.OptMaxChunks, Msg: "use --concurrency instead"},
config.DeprecatedFlag{Flag: config.OptMaxChunks, Msg: fmt.Sprintf("use --%s instead", config.OptConcurrency)},
config.DeprecatedFlag{Flag: config.OptMinimumChunkSize, Msg: fmt.Sprintf("use --%s instead", config.OptChunkSize)},
)
if err != nil {
return err
Expand All @@ -163,12 +198,6 @@ func hideAndDeprecateFlags(cmd *cobra.Command) error {

}

func rootCmdPreRun(cmd *cobra.Command, args []string) {
if viper.GetBool(config.OptExtract) {
viper.Set(config.OptOutputConsumer, config.ConsumerTarExtractor)
}
}

func runRootCMD(cmd *cobra.Command, args []string) error {
// After we run through the PreRun functions we want to silence usage from being printed
// on all errors
Expand All @@ -179,7 +208,7 @@ func runRootCMD(cmd *cobra.Command, args []string) error {

log.Info().Str("url", urlString).
Str("dest", dest).
Str("minimum_chunk_size", viper.GetString(config.OptMinimumChunkSize)).
Str("chunk_size", viper.GetString(config.OptChunkSize)).
Msg("Initiating")

// OMG BODGE FIX THIS
Expand All @@ -199,9 +228,9 @@ func runRootCMD(cmd *cobra.Command, args []string) error {
// rootExecute is the main function of the program and encapsulates the general logic
// returns any/all errors to the caller.
func rootExecute(ctx context.Context, urlString, dest string) error {
minChunkSize, err := humanize.ParseBytes(viper.GetString(config.OptMinimumChunkSize))
chunkSize, err := humanize.ParseBytes(viper.GetString(config.OptChunkSize))
if err != nil {
return fmt.Errorf("error parsing minimum chunk size: %w", err)
return fmt.Errorf("error parsing chunk size: %w", err)
}

resolveOverrides, err := config.ResolveOverridesToMap(viper.GetStringSlice(config.OptResolve))
Expand All @@ -220,7 +249,7 @@ func rootExecute(ctx context.Context, urlString, dest string) error {

downloadOpts := download.Options{
MaxConcurrency: viper.GetInt(config.OptConcurrency),
MinChunkSize: int64(minChunkSize),
ChunkSize: int64(chunkSize),
Client: clientOpts,
}

Expand Down
10 changes: 0 additions & 10 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,6 @@ func DeprecateFlags(cmd *cobra.Command, deprecations ...DeprecatedFlag) error {
return nil
}

func AddFlagAlias(cmd *cobra.Command, alias, flag string) error {
f := cmd.Flag(flag)
if f == nil {
return fmt.Errorf("flag %s does not exist", flag)
}

viper.RegisterAlias(alias, flag)
return nil
}

func ViperInit() {
viper.SetEnvPrefix(viperEnvPrefix)
viper.SetEnvKeyReplacer(strings.NewReplacer("-", "_"))
Expand Down
1 change: 1 addition & 0 deletions pkg/config/optnames.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const (
// Normal options with CLI arguments
OptConcurrency = "concurrency"
OptConnTimeout = "connect-timeout"
OptChunkSize = "chunk-size"
OptExtract = "extract"
OptForce = "force"
OptForceHTTP2 = "force-http2"
Expand Down
48 changes: 15 additions & 33 deletions pkg/download/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,14 @@ import (
"fmt"
"io"
"net/http"
"regexp"
"strconv"

"golang.org/x/sync/errgroup"

"github.com/dustin/go-humanize"

"github.com/replicate/pget/pkg/client"
"github.com/replicate/pget/pkg/logging"
)

const defaultMinChunkSize = 16 * humanize.MiByte

var contentRangeRegexp = regexp.MustCompile(`^bytes .*/([0-9]+)$`)

type BufferMode struct {
Client *client.HTTPClient
Options
Expand All @@ -43,10 +36,10 @@ func GetBufferMode(opts Options) *BufferMode {
}
}

func (m *BufferMode) minChunkSize() int64 {
minChunkSize := m.MinChunkSize
func (m *BufferMode) chunkSize() int64 {
minChunkSize := m.ChunkSize
if minChunkSize == 0 {
return defaultMinChunkSize
return defaultChunkSize
}
return minChunkSize
}
Expand All @@ -68,14 +61,14 @@ type firstReqResult struct {
func (m *BufferMode) Fetch(ctx context.Context, url string) (io.Reader, int64, error) {
logger := logging.GetLogger()

br := newBufferedReader(m.minChunkSize())
br := newBufferedReader(m.chunkSize())

firstReqResultCh := make(chan firstReqResult)
m.queue.submit(func() {
m.sem.Go(func() error {
defer close(firstReqResultCh)
defer br.done()
firstChunkResp, err := m.DoRequest(ctx, 0, m.minChunkSize()-1, url)
firstChunkResp, err := m.DoRequest(ctx, 0, m.chunkSize()-1, url)
if err != nil {
br.err = err
firstReqResultCh <- firstReqResult{err: err}
Expand Down Expand Up @@ -112,48 +105,37 @@ func (m *BufferMode) Fetch(ctx context.Context, url string) (io.Reader, int64, e
fileSize := firstReqResult.fileSize
trueURL := firstReqResult.trueURL

if fileSize <= m.minChunkSize() {
if fileSize <= m.chunkSize() {
// we only need a single chunk: just download it and finish
return br, fileSize, nil
}

remainingBytes := fileSize - m.minChunkSize()
numChunks := int(remainingBytes / m.minChunkSize())
// Number of chunks can never be 0
if numChunks <= 0 {
numChunks = 1
}
if numChunks > m.maxConcurrency() {
numChunks = m.maxConcurrency()
}
remainingBytes := fileSize - m.chunkSize()
// integer divide rounding up
numChunks := int((remainingBytes-1)/m.chunkSize() + 1)

readersCh := make(chan io.Reader, m.maxConcurrency()+1)
readersCh := make(chan io.Reader, numChunks+1)
readersCh <- br

startOffset := m.minChunkSize()

chunkSize := remainingBytes / int64(numChunks)
if chunkSize < 0 {
return nil, -1, fmt.Errorf("error: chunksize incorrect - result is negative, %d", chunkSize)
}
startOffset := m.chunkSize()

m.queue.submit(func() {
defer close(readersCh)
logger.Debug().Str("url", url).
Int64("size", fileSize).
Int("connections", numChunks).
Int64("chunkSize", chunkSize).
Int64("chunkSize", m.chunkSize()).
Msg("Downloading")

for i := 0; i < numChunks; i++ {
start := startOffset + chunkSize*int64(i)
end := start + chunkSize - 1
start := startOffset + m.chunkSize()*int64(i)
end := start + m.chunkSize() - 1

if i == numChunks-1 {
end = fileSize - 1
}

br := newBufferedReader(end - start + 1)
br := newBufferedReader(m.chunkSize())
readersCh <- br

m.sem.Go(func() error {
Expand Down
Loading