Skip to content

Commit

Permalink
Add support for zstd layer upload
Browse files Browse the repository at this point in the history
It is now possible to `crane append -f bla.tar.zstd` to upload Zstd layer, both from file and from pipe.

Before this commit, crane would erroneously upload such layer with gzip mime type.

While this PR does not _fully_ solve google#1798, it is very close to that.

Signed-off-by: Marat Radchenko <[email protected]>
  • Loading branch information
slonopotamus committed Oct 28, 2023
1 parent dbcd01c commit 1248c29
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 71 deletions.
6 changes: 5 additions & 1 deletion cmd/crane/cmd/flatten.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,11 @@ func flattenImage(old v1.Image, repo name.Repository, use string, o crane.Option
}

// TODO: Make compression configurable?
layer := stream.NewLayer(mutate.Extract(old), stream.WithCompressionLevel(gzip.BestCompression))
layer, err := stream.NewLayer(mutate.Extract(old), stream.WithCompressionLevel(gzip.BestCompression))
if err != nil {
return nil, fmt.Errorf("new layer: %w", err)
}

if err := remote.WriteLayer(repo, layer, o.Remote...); err != nil {
return nil, fmt.Errorf("uploading layer: %w", err)
}
Expand Down
8 changes: 8 additions & 0 deletions internal/zstd/zstd.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,14 @@ func UnzipReadCloser(r io.ReadCloser) (io.ReadCloser, error) {
}, nil
}

func UnzipReader(r io.Reader) (io.Reader, error) {
return zstd.NewReader(r)
}

func NewWriterLevel(w io.Writer, level int) (*zstd.Encoder, error) {
return zstd.NewWriter(w, zstd.WithEncoderLevel(zstd.EncoderLevelFromZstd(level)))
}

// Is detects whether the input stream is compressed.
func Is(r io.Reader) (bool, error) {
magicHeader := make([]byte, 4)
Expand Down
32 changes: 32 additions & 0 deletions pkg/compression/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
// Package compression abstracts over gzip and zstd.
package compression

import (
"fmt"

"github.com/google/go-containerregistry/pkg/v1/types"
)

// Compression is an enumeration of the supported compression algorithms
type Compression string

Expand All @@ -24,3 +30,29 @@ const (
GZip Compression = "gzip"
ZStd Compression = "zstd"
)

func (compression Compression) ToMediaType(oci bool) (types.MediaType, error) {
if oci {
switch compression {
case ZStd:
return types.OCILayerZStd, nil
case GZip:
return types.OCILayer, nil
case None:
return types.OCIUncompressedLayer, nil
default:
return types.OCILayer, fmt.Errorf("unsupported compression: %s", compression)
}
} else {
switch compression {
case ZStd:
return types.DockerLayerZstd, nil
case GZip:
return types.DockerLayer, nil
case None:
return types.DockerUncompressedLayer, nil
default:
return types.DockerLayer, fmt.Errorf("unsupported compression: %s", compression)
}
}
}
14 changes: 5 additions & 9 deletions pkg/crane/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,11 @@ func Append(base v1.Image, paths ...string) (v1.Image, error) {
return nil, fmt.Errorf("getting base image media type: %w", err)
}

layerType := types.DockerLayer

if baseMediaType == types.OCIManifestSchema1 {
layerType = types.OCILayer
}
oci := baseMediaType == types.OCIManifestSchema1

layers := make([]v1.Layer, 0, len(paths))
for _, path := range paths {
layer, err := getLayer(path, layerType)
layer, err := getLayer(path, oci)
if err != nil {
return nil, fmt.Errorf("reading layer %q: %w", path, err)
}
Expand All @@ -81,16 +77,16 @@ func Append(base v1.Image, paths ...string) (v1.Image, error) {
return mutate.AppendLayers(base, layers...)
}

func getLayer(path string, layerType types.MediaType) (v1.Layer, error) {
func getLayer(path string, oci bool) (v1.Layer, error) {
f, err := streamFile(path)
if err != nil {
return nil, err
}
if f != nil {
return stream.NewLayer(f, stream.WithMediaType(layerType)), nil
return stream.NewLayer(f, stream.WithOCIMediaType(oci))
}

return tarball.LayerFromFile(path, tarball.WithMediaType(layerType))
return tarball.LayerFromFile(path, tarball.WithOCIMediaType(oci))
}

// If we're dealing with a named pipe, trying to open it multiple times will
Expand Down
94 changes: 72 additions & 22 deletions pkg/v1/stream/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import (
"os"
"sync"

internalcomp "github.com/google/go-containerregistry/internal/compression"
"github.com/google/go-containerregistry/internal/zstd"
"github.com/google/go-containerregistry/pkg/compression"
v1 "github.com/google/go-containerregistry/pkg/v1"
"github.com/google/go-containerregistry/pkg/v1/types"
)
Expand All @@ -42,14 +45,19 @@ var (

// Layer is a streaming implementation of v1.Layer.
type Layer struct {
blob io.ReadCloser
consumed bool
compression int
closer io.Closer
uncompressedReader io.Reader

consumed bool

compression compression.Compression
compressionLevel int

mu sync.Mutex
digest, diffID *v1.Hash
size int64
mediaType types.MediaType

oci bool
}

var _ v1.Layer = (*Layer)(nil)
Expand All @@ -60,32 +68,53 @@ type LayerOption func(*Layer)
// WithCompressionLevel sets the gzip compression. See `gzip.NewWriterLevel` for possible values.
func WithCompressionLevel(level int) LayerOption {
return func(l *Layer) {
l.compression = level
l.compressionLevel = level
}
}

// WithMediaType is a functional option for overriding the layer's media type.
func WithMediaType(mt types.MediaType) LayerOption {
// WithOCIMediaType is a functional option for overriding the layer's media type.
func WithOCIMediaType(oci bool) LayerOption {
return func(l *Layer) {
l.mediaType = mt
l.oci = oci
}
}

// NewLayer creates a Layer from an io.ReadCloser.
func NewLayer(rc io.ReadCloser, opts ...LayerOption) *Layer {
func NewLayer(rc io.ReadCloser, opts ...LayerOption) (*Layer, error) {
comp, peekReader, err := internalcomp.PeekCompression(rc)
if err != nil {
return nil, err
}

layer := &Layer{
blob: rc,
compression: gzip.BestSpeed,
// We use DockerLayer for now as uncompressed layers
// are unimplemented
mediaType: types.DockerLayer,
closer: rc,
compression: comp,
}

switch comp {
case compression.ZStd:
layer.compression = comp
layer.uncompressedReader, err = zstd.UnzipReader(peekReader)
if err != nil {
return nil, err
}
case compression.GZip:
layer.compression = comp
layer.uncompressedReader, err = gzip.NewReader(peekReader)
if err != nil {
return nil, err
}
default:
// No support for uncompressed layers for now
layer.compression = compression.GZip
layer.uncompressedReader = peekReader
}

for _, opt := range opts {
opt(layer)
}

return layer
return layer, nil
}

// Digest implements v1.Layer.
Expand Down Expand Up @@ -120,7 +149,7 @@ func (l *Layer) Size() (int64, error) {

// MediaType implements v1.Layer
func (l *Layer) MediaType() (types.MediaType, error) {
return l.mediaType, nil
return l.compression.ToMediaType(l.oci)
}

// Uncompressed implements v1.Layer.
Expand Down Expand Up @@ -183,9 +212,27 @@ func newCompressedReader(l *Layer) (*compressedReader, error) {
// Buffer the output of the gzip writer so we don't have to wait on pr to keep writing.
// 64K ought to be small enough for anybody.
bw := bufio.NewWriterSize(mw, 2<<16)
zw, err := gzip.NewWriterLevel(bw, l.compression)
if err != nil {
return nil, err

var compressedWriter io.Writer
var compressedCloser io.Closer

switch l.compression {
case compression.ZStd:
w, err := zstd.NewWriterLevel(bw, l.compressionLevel)
if err != nil {
return nil, err
}
compressedWriter = w
compressedCloser = w
case compression.GZip:
w, err := gzip.NewWriterLevel(bw, l.compressionLevel)
if err != nil {
return nil, err
}
compressedWriter = w
compressedCloser = w
case compression.None:
compressedWriter = bw
}

doneDigesting := make(chan struct{})
Expand All @@ -211,7 +258,7 @@ func newCompressedReader(l *Layer) (*compressedReader, error) {
//
// NOTE: net/http will call close on success, so if we've already
// closed the inner rc, it's not an error.
if err := l.blob.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
if err := l.closer.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
return err
}

Expand All @@ -223,13 +270,16 @@ func newCompressedReader(l *Layer) (*compressedReader, error) {
go func() {
// Copy blob into the gzip writer, which also hashes and counts the
// size of the compressed output, and hasher of the raw contents.
_, copyErr := io.Copy(io.MultiWriter(h, zw), l.blob)
_, copyErr := io.Copy(io.MultiWriter(h, compressedWriter), l.uncompressedReader)

// Close the gzip writer once copying is done. If this is done in the
// Close method of compressedReader instead, then it can cause a panic
// when the compressedReader is closed before the blob is fully
// consumed and io.Copy in this goroutine is still blocking.
closeErr := zw.Close()
var closeErr error
if compressedCloser != nil {
closeErr = compressedCloser.Close()
}

// Check errors from writing and closing streams.
if copyErr != nil {
Expand Down
Loading

0 comments on commit 1248c29

Please sign in to comment.