From 4ccbd500a6b44d4d95800005cb11314dd5ab5394 Mon Sep 17 00:00:00 2001 From: Marat Radchenko Date: Sat, 28 Oct 2023 19:21:33 +0300 Subject: [PATCH] Add support for zstd layer upload It is now possible to `crane append -f bla.tar.zstd` to upload Zstd layer, both from file and from pipe. Before this commit, crane would erroneously upload such layer with gzip mime type. While this PR does not _fully_ solve #1501, it is very close to that. Signed-off-by: Marat Radchenko --- cmd/crane/cmd/flatten.go | 6 ++- internal/zstd/zstd.go | 8 +++ pkg/compression/compression.go | 32 ++++++++++++ pkg/crane/append.go | 14 ++--- pkg/v1/stream/layer.go | 94 ++++++++++++++++++++++++++-------- pkg/v1/tarball/layer.go | 70 +++++++++++-------------- pkg/v1/types/types.go | 1 + 7 files changed, 154 insertions(+), 71 deletions(-) diff --git a/cmd/crane/cmd/flatten.go b/cmd/crane/cmd/flatten.go index ea642e791..a411a5f6f 100644 --- a/cmd/crane/cmd/flatten.go +++ b/cmd/crane/cmd/flatten.go @@ -228,7 +228,11 @@ func flattenImage(old v1.Image, repo name.Repository, use string, o crane.Option } // TODO: Make compression configurable? - layer := stream.NewLayer(mutate.Extract(old), stream.WithCompressionLevel(gzip.BestCompression)) + layer, err := stream.NewLayer(mutate.Extract(old), stream.WithCompressionLevel(gzip.BestCompression)) + if err != nil { + return nil, fmt.Errorf("new layer: %w", err) + } + if err := remote.WriteLayer(repo, layer, o.Remote...); err != nil { return nil, fmt.Errorf("uploading layer: %w", err) } diff --git a/internal/zstd/zstd.go b/internal/zstd/zstd.go index cccf54a30..fd0dbabe0 100644 --- a/internal/zstd/zstd.go +++ b/internal/zstd/zstd.go @@ -102,6 +102,14 @@ func UnzipReadCloser(r io.ReadCloser) (io.ReadCloser, error) { }, nil } +func UnzipReader(r io.Reader) (io.Reader, error) { + return zstd.NewReader(r) +} + +func NewWriterLevel(w io.Writer, level int) (*zstd.Encoder, error) { + return zstd.NewWriter(w, zstd.WithEncoderLevel(zstd.EncoderLevelFromZstd(level))) +} + // Is detects whether the input stream is compressed. func Is(r io.Reader) (bool, error) { magicHeader := make([]byte, 4) diff --git a/pkg/compression/compression.go b/pkg/compression/compression.go index 6686c2d8d..c89967da0 100644 --- a/pkg/compression/compression.go +++ b/pkg/compression/compression.go @@ -15,6 +15,12 @@ // Package compression abstracts over gzip and zstd. package compression +import ( + "fmt" + + "github.com/google/go-containerregistry/pkg/v1/types" +) + // Compression is an enumeration of the supported compression algorithms type Compression string @@ -24,3 +30,29 @@ const ( GZip Compression = "gzip" ZStd Compression = "zstd" ) + +func (compression Compression) ToMediaType(oci bool) (types.MediaType, error) { + if oci { + switch compression { + case ZStd: + return types.OCILayerZStd, nil + case GZip: + return types.OCILayer, nil + case None: + return types.OCIUncompressedLayer, nil + default: + return types.OCILayer, fmt.Errorf("unsupported compression: %s", compression) + } + } else { + switch compression { + case ZStd: + return types.DockerLayerZstd, nil + case GZip: + return types.DockerLayer, nil + case None: + return types.DockerUncompressedLayer, nil + default: + return types.DockerLayer, fmt.Errorf("unsupported compression: %s", compression) + } + } +} diff --git a/pkg/crane/append.go b/pkg/crane/append.go index f1c2ef69a..6a0a2e9ff 100644 --- a/pkg/crane/append.go +++ b/pkg/crane/append.go @@ -55,15 +55,11 @@ func Append(base v1.Image, paths ...string) (v1.Image, error) { return nil, fmt.Errorf("getting base image media type: %w", err) } - layerType := types.DockerLayer - - if baseMediaType == types.OCIManifestSchema1 { - layerType = types.OCILayer - } + oci := baseMediaType == types.OCIManifestSchema1 layers := make([]v1.Layer, 0, len(paths)) for _, path := range paths { - layer, err := getLayer(path, layerType) + layer, err := getLayer(path, oci) if err != nil { return nil, fmt.Errorf("reading layer %q: %w", path, err) } @@ -81,16 +77,16 @@ func Append(base v1.Image, paths ...string) (v1.Image, error) { return mutate.AppendLayers(base, layers...) } -func getLayer(path string, layerType types.MediaType) (v1.Layer, error) { +func getLayer(path string, oci bool) (v1.Layer, error) { f, err := streamFile(path) if err != nil { return nil, err } if f != nil { - return stream.NewLayer(f, stream.WithMediaType(layerType)), nil + return stream.NewLayer(f, stream.WithOCIMediaType(oci)) } - return tarball.LayerFromFile(path, tarball.WithMediaType(layerType)) + return tarball.LayerFromFile(path, tarball.WithOCIMediaType(oci)) } // If we're dealing with a named pipe, trying to open it multiple times will diff --git a/pkg/v1/stream/layer.go b/pkg/v1/stream/layer.go index 2b0354479..4ba0337a3 100644 --- a/pkg/v1/stream/layer.go +++ b/pkg/v1/stream/layer.go @@ -26,6 +26,9 @@ import ( "os" "sync" + internalcomp "github.com/google/go-containerregistry/internal/compression" + "github.com/google/go-containerregistry/internal/zstd" + "github.com/google/go-containerregistry/pkg/compression" v1 "github.com/google/go-containerregistry/pkg/v1" "github.com/google/go-containerregistry/pkg/v1/types" ) @@ -42,14 +45,19 @@ var ( // Layer is a streaming implementation of v1.Layer. type Layer struct { - blob io.ReadCloser - consumed bool - compression int + closer io.Closer + uncompressedReader io.Reader + + consumed bool + + compression compression.Compression + compressionLevel int mu sync.Mutex digest, diffID *v1.Hash size int64 - mediaType types.MediaType + + oci bool } var _ v1.Layer = (*Layer)(nil) @@ -60,32 +68,53 @@ type LayerOption func(*Layer) // WithCompressionLevel sets the gzip compression. See `gzip.NewWriterLevel` for possible values. func WithCompressionLevel(level int) LayerOption { return func(l *Layer) { - l.compression = level + l.compressionLevel = level } } -// WithMediaType is a functional option for overriding the layer's media type. -func WithMediaType(mt types.MediaType) LayerOption { +// WithOCIMediaType is a functional option for overriding the layer's media type. +func WithOCIMediaType(oci bool) LayerOption { return func(l *Layer) { - l.mediaType = mt + l.oci = oci } } // NewLayer creates a Layer from an io.ReadCloser. -func NewLayer(rc io.ReadCloser, opts ...LayerOption) *Layer { +func NewLayer(rc io.ReadCloser, opts ...LayerOption) (*Layer, error) { + comp, peekReader, err := internalcomp.PeekCompression(rc) + if err != nil { + return nil, err + } + layer := &Layer{ - blob: rc, - compression: gzip.BestSpeed, - // We use DockerLayer for now as uncompressed layers - // are unimplemented - mediaType: types.DockerLayer, + closer: rc, + compression: comp, + } + + switch comp { + case compression.ZStd: + layer.compression = comp + layer.uncompressedReader, err = zstd.UnzipReader(peekReader) + if err != nil { + return nil, err + } + case compression.GZip: + layer.compression = comp + layer.uncompressedReader, err = gzip.NewReader(peekReader) + if err != nil { + return nil, err + } + default: + // No support for uncompressed layers for now + layer.compression = compression.GZip + layer.uncompressedReader = peekReader } for _, opt := range opts { opt(layer) } - return layer + return layer, nil } // Digest implements v1.Layer. @@ -120,7 +149,7 @@ func (l *Layer) Size() (int64, error) { // MediaType implements v1.Layer func (l *Layer) MediaType() (types.MediaType, error) { - return l.mediaType, nil + return l.compression.ToMediaType(l.oci) } // Uncompressed implements v1.Layer. @@ -183,9 +212,27 @@ func newCompressedReader(l *Layer) (*compressedReader, error) { // Buffer the output of the gzip writer so we don't have to wait on pr to keep writing. // 64K ought to be small enough for anybody. bw := bufio.NewWriterSize(mw, 2<<16) - zw, err := gzip.NewWriterLevel(bw, l.compression) - if err != nil { - return nil, err + + var compressedWriter io.Writer + var compressedCloser io.Closer + + switch l.compression { + case compression.ZStd: + w, err := zstd.NewWriterLevel(bw, l.compressionLevel) + if err != nil { + return nil, err + } + compressedWriter = w + compressedCloser = w + case compression.GZip: + w, err := gzip.NewWriterLevel(bw, l.compressionLevel) + if err != nil { + return nil, err + } + compressedWriter = w + compressedCloser = w + case compression.None: + compressedWriter = bw } doneDigesting := make(chan struct{}) @@ -211,7 +258,7 @@ func newCompressedReader(l *Layer) (*compressedReader, error) { // // NOTE: net/http will call close on success, so if we've already // closed the inner rc, it's not an error. - if err := l.blob.Close(); err != nil && !errors.Is(err, os.ErrClosed) { + if err := l.closer.Close(); err != nil && !errors.Is(err, os.ErrClosed) { return err } @@ -223,13 +270,16 @@ func newCompressedReader(l *Layer) (*compressedReader, error) { go func() { // Copy blob into the gzip writer, which also hashes and counts the // size of the compressed output, and hasher of the raw contents. - _, copyErr := io.Copy(io.MultiWriter(h, zw), l.blob) + _, copyErr := io.Copy(io.MultiWriter(h, compressedWriter), l.uncompressedReader) // Close the gzip writer once copying is done. If this is done in the // Close method of compressedReader instead, then it can cause a panic // when the compressedReader is closed before the blob is fully // consumed and io.Copy in this goroutine is still blocking. - closeErr := zw.Close() + var closeErr error + if compressedCloser != nil { + closeErr = compressedCloser.Close() + } // Check errors from writing and closing streams. if copyErr != nil { diff --git a/pkg/v1/tarball/layer.go b/pkg/v1/tarball/layer.go index 8a2630961..9f4146f1a 100644 --- a/pkg/v1/tarball/layer.go +++ b/pkg/v1/tarball/layer.go @@ -16,7 +16,6 @@ package tarball import ( "bytes" - "compress/gzip" "fmt" "io" "os" @@ -44,7 +43,7 @@ type layer struct { compressionLevel int annotations map[string]string estgzopts []estargz.Option - mediaType types.MediaType + oci bool } // Descriptor implements partial.withDescriptor. @@ -53,11 +52,17 @@ func (l *layer) Descriptor() (*v1.Descriptor, error) { if err != nil { return nil, err } + + mediaType, err := l.MediaType() + if err != nil { + return nil, err + } + return &v1.Descriptor{ Size: l.size, Digest: digest, Annotations: l.annotations, - MediaType: l.mediaType, + MediaType: mediaType, }, nil } @@ -88,7 +93,7 @@ func (l *layer) Size() (int64, error) { // MediaType implements v1.Layer func (l *layer) MediaType() (types.MediaType, error) { - return l.mediaType, nil + return l.compression.ToMediaType(l.oci) } // LayerOption applies options to layer @@ -123,10 +128,10 @@ func WithCompressionLevel(level int) LayerOption { } } -// WithMediaType is a functional option for overriding the layer's media type. -func WithMediaType(mt types.MediaType) LayerOption { +// WithOCIMediaType is a functional option for overriding the layer's media type. +func WithOCIMediaType(oci bool) LayerOption { return func(l *layer) { - l.mediaType = mt + l.oci = oci } } @@ -235,10 +240,7 @@ func LayerFromOpener(opener Opener, opts ...LayerOption) (v1.Layer, error) { } layer := &layer{ - compression: compression.GZip, - compressionLevel: gzip.BestSpeed, - annotations: make(map[string]string, 1), - mediaType: types.DockerLayer, + annotations: make(map[string]string, 1), } if estgz := os.Getenv("GGCR_EXPERIMENT_ESTARGZ"); estgz == "1" { @@ -248,7 +250,7 @@ func LayerFromOpener(opener Opener, opts ...LayerOption) (v1.Layer, error) { switch comp { case compression.GZip: - layer.compressedopener = opener + layer.compression = comp layer.uncompressedopener = func() (io.ReadCloser, error) { urc, err := opener() if err != nil { @@ -257,7 +259,7 @@ func LayerFromOpener(opener Opener, opts ...LayerOption) (v1.Layer, error) { return ggzip.UnzipReadCloser(urc) } case compression.ZStd: - layer.compressedopener = opener + layer.compression = comp layer.uncompressedopener = func() (io.ReadCloser, error) { urc, err := opener() if err != nil { @@ -266,40 +268,30 @@ func LayerFromOpener(opener Opener, opts ...LayerOption) (v1.Layer, error) { return zstd.UnzipReadCloser(urc) } default: + layer.compression = compression.GZip layer.uncompressedopener = opener - layer.compressedopener = func() (io.ReadCloser, error) { - crc, err := opener() - if err != nil { - return nil, err - } - - if layer.compression == compression.ZStd { - return zstd.ReadCloserLevel(crc, layer.compressionLevel), nil - } + } - return ggzip.ReadCloserLevel(crc, layer.compressionLevel), nil + layer.compressedopener = func() (io.ReadCloser, error) { + crc, err := opener() + if err != nil { + return nil, err } - } - for _, opt := range opts { - opt(layer) - } + if comp == layer.compression { + // No need to recompress, underlying format already matches + return crc, nil + } - // Warn if media type does not match compression - var mediaTypeMismatch = false - switch layer.compression { - case compression.GZip: - mediaTypeMismatch = - layer.mediaType != types.OCILayer && - layer.mediaType != types.OCIRestrictedLayer && - layer.mediaType != types.DockerLayer + if layer.compression == compression.ZStd { + return zstd.ReadCloserLevel(crc, layer.compressionLevel), nil + } - case compression.ZStd: - mediaTypeMismatch = layer.mediaType != types.OCILayerZStd + return ggzip.ReadCloserLevel(crc, layer.compressionLevel), nil } - if mediaTypeMismatch { - logs.Warn.Printf("Unexpected mediaType (%s) for selected compression in %s in LayerFromOpener().", layer.mediaType, layer.compression) + for _, opt := range opts { + opt(layer) } if layer.digest, layer.size, err = computeDigest(layer.compressedopener); err != nil { diff --git a/pkg/v1/types/types.go b/pkg/v1/types/types.go index c86657d7b..17f215af2 100644 --- a/pkg/v1/types/types.go +++ b/pkg/v1/types/types.go @@ -35,6 +35,7 @@ const ( DockerManifestSchema2 MediaType = "application/vnd.docker.distribution.manifest.v2+json" DockerManifestList MediaType = "application/vnd.docker.distribution.manifest.list.v2+json" DockerLayer MediaType = "application/vnd.docker.image.rootfs.diff.tar.gzip" + DockerLayerZstd MediaType = "application/vnd.docker.image.rootfs.diff.tar.zstd" DockerConfigJSON MediaType = "application/vnd.docker.container.image.v1+json" DockerPluginConfig MediaType = "application/vnd.docker.plugin.v1+json" DockerForeignLayer MediaType = "application/vnd.docker.image.rootfs.foreign.diff.tar.gzip"