Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Stream Decompression for tar #183

Merged
merged 3 commits into from
Apr 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ require (
github.com/hashicorp/go-retryablehttp v0.7.5
github.com/jarcoal/httpmock v1.3.1
github.com/mitchellh/hashstructure/v2 v2.0.2
github.com/pierrec/lz4 v2.6.1+incompatible
github.com/rs/zerolog v1.32.0
github.com/spf13/cobra v1.8.0
github.com/spf13/viper v1.18.2
github.com/stretchr/testify v1.9.0
github.com/ulikunitz/xz v0.5.11
golang.org/x/sync v0.7.0
golang.org/x/tools v0.20.0
gotest.tools/gotestsum v1.11.0
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,8 @@ github.com/otiai10/mint v1.3.0/go.mod h1:F5AjcsTsWUqX+Na9fpHb52P8pcRX2CI6A3ctIT9
github.com/otiai10/mint v1.3.1/go.mod h1:/yxELlJQ0ufhjUwhshSj+wFjZ78CnZ48/1wtmBH1OTc=
github.com/pelletier/go-toml/v2 v2.2.0 h1:QLgLl2yMN7N+ruc31VynXs1vhMZa7CeHHejIeBAsoHo=
github.com/pelletier/go-toml/v2 v2.2.0/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM=
github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down Expand Up @@ -578,6 +580,8 @@ github.com/tomarrell/wrapcheck/v2 v2.8.3 h1:5ov+Cbhlgi7s/a42BprYoxsr73CbdMUTzE3b
github.com/tomarrell/wrapcheck/v2 v2.8.3/go.mod h1:g9vNIyhb5/9TQgumxQyOEqDHsmGYcGsVMOx/xGkqdMo=
github.com/tommy-muehle/go-mnd/v2 v2.5.1 h1:NowYhSdyE/1zwK9QCLeRb6USWdoif80Ie+v+yU8u1Zw=
github.com/tommy-muehle/go-mnd/v2 v2.5.1/go.mod h1:WsUAkMJMYww6l/ufffCD3m+P7LEvr8TnZn9lwVDlgzw=
github.com/ulikunitz/xz v0.5.11 h1:kpFauv27b6ynzBNT/Xy+1k+fK4WswhN/6PN5WhFAGw8=
github.com/ulikunitz/xz v0.5.11/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14=
github.com/ultraware/funlen v0.1.0 h1:BuqclbkY6pO+cvxoq7OsktIXZpgBSkYTQtmwhAK81vI=
github.com/ultraware/funlen v0.1.0/go.mod h1:XJqmOQja6DpxarLj6Jj1U7JuoS8PvL4nEqDaQhy22p4=
github.com/ultraware/whitespace v0.1.0 h1:O1HKYoh0kIeqE8sFqZf1o0qbORXUCOQFrlaQyZsczZw=
Expand Down
139 changes: 139 additions & 0 deletions pkg/extract/compression.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package extract

import (
"bytes"
"compress/bzip2"
"compress/gzip"
"compress/lzw"
"io"

"github.com/pierrec/lz4"
"github.com/ulikunitz/xz"

"github.com/replicate/pget/pkg/logging"
)

const (
peekSize = 8
)

var (
gzipMagic = []byte{0x1F, 0x8B}
bzipMagic = []byte{0x42, 0x5A}
xzMagic = []byte{0xFD, 0x37, 0x7A, 0x58, 0x5A, 0x00}
lzwMagic = []byte{0x1F, 0x9D}
lz4Magic = []byte{0x18, 0x4D, 0x22, 0x04}
)

var _ decompressor = gzipDecompressor{}
var _ decompressor = bzip2Decompressor{}
var _ decompressor = xzDecompressor{}
var _ decompressor = lzwDecompressor{}
var _ decompressor = lz4Decompressor{}
var _ decompressor = noOpDecompressor{}

// decompressor represents different compression formats.
type decompressor interface {
decompress(r io.Reader) (io.Reader, error)
}

// detectFormat returns the appropriate extractor according to the magic number.
func detectFormat(input []byte) decompressor {
log := logging.GetLogger()
inputSize := len(input)

if inputSize < 2 {
return noOpDecompressor{}
}
// pad to 8 bytes
if inputSize < 8 {
input = append(input, make([]byte, peekSize-inputSize)...)
}

// magic16 := binary.BigEndian.Uint16(input)
// magic32 := binary.BigEndian.Uint32(input)
// // We need to pre-pend the padding since we're reading into something bigendian and exceeding the
// // 48bits size of the magic number bytes. The 16 and 32 bit magic numbers are complete bytes and
// // therefore do not need any padding.
// magic48 := binary.BigEndian.Uint64(append(make([]byte, 2), input[0:6]...))

switch true {
case bytes.HasPrefix(input, gzipMagic):
log.Debug().
Str("type", "gzip").
Msg("Compression Format")
return gzipDecompressor{}
case bytes.HasPrefix(input, bzipMagic):
log.Debug().
Str("type", "bzip2").
Msg("Compression Format")
return bzip2Decompressor{}
case bytes.HasPrefix(input, lzwMagic):
compressionByte := input[2]
// litWidth is guaranteed to be at least 9 per specification, the high order 3 bits of byte[2] are the litWidth
// the low order 5 bits are only used by non-unix implementations, we are going to ignore them.
litWidth := int(compressionByte>>5) + 9
log.Debug().
Str("type", "lzw").
Int("litWidth", litWidth).
Msg("Compression Format")
return lzwDecompressor{
order: lzw.MSB,
tempusfrangit marked this conversation as resolved.
Show resolved Hide resolved
litWidth: litWidth,
}
case bytes.HasPrefix(input, lz4Magic):
log.Debug().
Str("type", "lz4").
Msg("Compression Format")
return lz4Decompressor{}
case bytes.HasPrefix(input, xzMagic):
log.Debug().
Str("type", "xz").
Msg("Compression Format")
return xzDecompressor{}
default:
log.Debug().
Str("type", "none").
Msg("Compression Format")
return noOpDecompressor{}
}
}

type gzipDecompressor struct{}

func (d gzipDecompressor) decompress(r io.Reader) (io.Reader, error) {
return gzip.NewReader(r)
}

type bzip2Decompressor struct{}

func (d bzip2Decompressor) decompress(r io.Reader) (io.Reader, error) {
return bzip2.NewReader(r), nil
}

type xzDecompressor struct{}

func (d xzDecompressor) decompress(r io.Reader) (io.Reader, error) {
return xz.NewReader(r)
}

type lzwDecompressor struct {
litWidth int
order lzw.Order
}

func (d lzwDecompressor) decompress(r io.Reader) (io.Reader, error) {
return lzw.NewReader(r, d.order, d.litWidth), nil
}

type lz4Decompressor struct{}

func (d lz4Decompressor) decompress(r io.Reader) (io.Reader, error) {
return lz4.NewReader(r), nil
}

type noOpDecompressor struct{}

func (d noOpDecompressor) decompress(r io.Reader) (io.Reader, error) {
return r, nil
}
56 changes: 56 additions & 0 deletions pkg/extract/compression_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package extract

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
)

func TestDetectFormat(t *testing.T) {
tests := []struct {
name string
input []byte
expectType string
}{
{
name: "GZIP",
input: []byte{0x1f, 0x8b},
expectType: "extract.gzipDecompressor",
},
{
name: "BZIP2",
input: []byte{0x42, 0x5a},
expectType: "extract.bzip2Decompressor",
},
{
name: "XZ",
input: []byte{0xfd, 0x37, 0x7a, 0x58, 0x5a, 0x00},
expectType: "extract.xzDecompressor",
},
{
name: "Less than 2 bytes",
input: []byte{0x1f},
expectType: "extract.noOpDecompressor",
},
{
name: "UNKNOWN",
input: []byte{0xde, 0xad},
expectType: "extract.noOpDecompressor",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := detectFormat(tt.input)
assert.Equal(t, tt.expectType, stringFromInterface(result))
})
}
}

func stringFromInterface(i interface{}) string {
if i == nil {
return ""
}
return fmt.Sprintf("%T", i)
}
18 changes: 17 additions & 1 deletion pkg/extract/tar.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package extract

import (
"archive/tar"
"bufio"
"errors"
"fmt"
"io"
Expand All @@ -22,10 +23,20 @@ type link struct {
newName string
}

func TarFile(reader io.Reader, destDir string, overwrite bool) error {
func TarFile(r io.Reader, destDir string, overwrite bool) error {
var links []*link

startTime := time.Now()
peekableReader := bufio.NewReader(r)
peekData, err := peekableReader.Peek(peekSize)
if err != nil {
return fmt.Errorf("error reading peek data: %w", err)
}
decompressor := detectFormat(peekData)
reader, err := decompressor.decompress(peekableReader)
if err != nil {
return fmt.Errorf("error creating decompressed stream: %w", err)
}
tarReader := tar.NewReader(reader)
logger := logging.GetLogger()

Expand Down Expand Up @@ -53,6 +64,11 @@ func TarFile(reader io.Reader, destDir string, overwrite bool) error {
}

switch header.Typeflag {
case tar.TypeXGlobalHeader:
// This is a global pax header, which we can skip as it's mostly handled by the underlying implementation
// NOTE: the global header is not persisted across subsequent calls to Next() and therefore could indicate
// that we are processing a tar file in an unintended manner. This is a limitation of archive/tar.
continue
case tar.TypeDir:
logger.Debug().
Str("target", target).
Expand Down