Skip to content

Commit

Permalink
Merge pull request #265 from imeoer/converter-fix
Browse files Browse the repository at this point in the history
converter: enhance nydus tar unpack
  • Loading branch information
changweige authored Nov 30, 2022
2 parents 1a6f980 + 15b62b3 commit dba3aaf
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 189 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ cover:

smoke:
$(SUDO) NYDUS_BUILDER=${NYDUS_BUILDER} NYDUS_NYDUSD=${NYDUS_NYDUSD} ${GO_EXECUTABLE_PATH} test -race -v ./tests
$(SUDO) NYDUS_BUILDER=${NYDUS_BUILDER} NYDUS_NYDUSD=${NYDUS_NYDUSD} ${GO_EXECUTABLE_PATH} test -race -v ./tests -args -fs-version=6
$(SUDO) NYDUS_BUILDER=${NYDUS_BUILDER} NYDUS_NYDUSD=${NYDUS_NYDUSD} ${GO_EXECUTABLE_PATH} test -race -v ./tests

.PHONY: integration
integration:
Expand Down
162 changes: 54 additions & 108 deletions pkg/converter/convert_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,14 @@ func unpackOciTar(ctx context.Context, dst string, reader io.Reader) error {
}

// Unpack a Nydus formatted tar stream into a directory.
func unpackNydusTar(bootDst, blobDst string, ra content.ReaderAt) error {
func unpackNydusBlob(bootDst, blobDst string, ra content.ReaderAt) error {
boot, err := os.OpenFile(bootDst, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil {
return errors.Wrapf(err, "write to bootstrap %s", bootDst)
}
defer boot.Close()

if err = unpackBootstrapFromNydusTar(ra, boot); err != nil {
if err = unpackFileFromNydusBlob(ra, bootstrapNameInTar, boot); err != nil {
return errors.Wrap(err, "unpack bootstrap from nydus")
}

Expand All @@ -130,134 +130,76 @@ func unpackNydusTar(bootDst, blobDst string, ra content.ReaderAt) error {
}
defer blob.Close()

if err = unpackBlobFromNydusTar(ra, blob); err != nil {
if err = unpackFileFromNydusBlob(ra, blobNameInTar, blob); err != nil {
return errors.Wrap(err, "unpack blob from nydus")
}

return nil
}

// Unpack the bootstrap from nydus formatted tar stream (blob + bootstrap).
// Unpack the file from nydus formatted tar stream.
// The nydus formatted tar stream is a tar-like structure that arranges the
// data as follows:
//
// `blob_data | blob_tar_header | bootstrap_data | bootstrap_tar_header`
func unpackBootstrapFromNydusTar(ra content.ReaderAt, target io.Writer) error {
cur := ra.Size()
reader := newSeekReader(ra)

// `data | tar_header | data | tar_header`
func unpackFileFromNydusBlob(ra content.ReaderAt, targetName string, target io.Writer) error {
const headerSize = 512

// Seek from tail to head of nydus formatted tar stream to find nydus
// bootstrap data.
for {
if headerSize > cur {
return fmt.Errorf("invalid tar format at pos %d", cur)
}

// Try to seek to the part of tar header.
var err error
cur, err = reader.Seek(cur-headerSize, io.SeekCurrent)
if err != nil {
return errors.Wrapf(err, "seek to %d for tar header", cur-headerSize)
}

tr := tar.NewReader(reader)
// Parse tar header.
hdr, err := tr.Next()
if err != nil {
return errors.Wrap(err, "parse tar header")
}

if hdr.Name == bootstrapNameInTar {
// Try to seek to the part of tar data (bootstrap_data).
if hdr.Size > cur {
return fmt.Errorf("invalid tar format at pos %d", cur)
}
bootstrapOffset := cur - hdr.Size
_, err = reader.Seek(bootstrapOffset, io.SeekStart)
if err != nil {
return errors.Wrap(err, "seek to bootstrap data offset")
}

// Copy tar data (bootstrap_data) to provided target writer.
if _, err := io.CopyN(target, reader, hdr.Size); err != nil {
return errors.Wrap(err, "copy bootstrap data to reader")
}

return nil
}

if cur == hdr.Size {
break
}
if headerSize > ra.Size() {
return fmt.Errorf("invalid nydus tar size %d", ra.Size())
}

return fmt.Errorf("can't find bootstrap in nydus tar")
}

// Unpack the blob from nydus formatted tar stream (blob + bootstrap).
// The nydus formatted tar stream is a tar-like structure that arranges the
// data as follows:
//
// `blob_data | blob_tar_header | bootstrap_data | bootstrap_tar_header`
func unpackBlobFromNydusTar(ra content.ReaderAt, target io.Writer) error {
cur := ra.Size()
cur := ra.Size() - headerSize
reader := newSeekReader(ra)

const headerSize = 512

// Seek from tail to head of nydus formatted tar stream to find nydus
// bootstrap data.
// Seek from tail to head of nydus formatted tar stream to find
// target data.
for {
if headerSize > cur {
break
}

// Try to seek to the part of tar header.
var err error
cur, err = reader.Seek(cur-headerSize, io.SeekStart)
// Try to seek the part of tar header.
_, err := reader.Seek(cur, io.SeekStart)
if err != nil {
return errors.Wrapf(err, "seek to %d for tar header", cur-headerSize)
return errors.Wrapf(err, "seek %d for nydus tar header", cur)
}

tr := tar.NewReader(reader)
// Parse tar header.
tr := tar.NewReader(reader)
hdr, err := tr.Next()
if err != nil {
return errors.Wrap(err, "parse tar header")
return errors.Wrap(err, "parse nydus tar header")
}

if hdr.Name == bootstrapNameInTar {
if hdr.Size > cur {
return fmt.Errorf("invalid tar format at pos %d", cur)
}
cur, err = reader.Seek(cur-hdr.Size, io.SeekStart)
if err != nil {
return errors.Wrap(err, "seek to bootstrap data offset")
}
} else if hdr.Name == blobNameInTar {
if hdr.Size > cur {
return fmt.Errorf("invalid tar format at pos %d", cur)
}
if cur < hdr.Size {
return errors.Wrapf(err, "invalid nydus tar data, name %s, size %d", hdr.Name, hdr.Size)
}

if hdr.Name == targetName {
// Try to seek the part of tar data.
_, err = reader.Seek(cur-hdr.Size, io.SeekStart)
if err != nil {
return errors.Wrap(err, "seek to blob data offset")
return errors.Wrap(err, "seek target data offset")
}

// Copy tar data to provided target writer.
if _, err := io.CopyN(target, reader, hdr.Size); err != nil {
return errors.Wrap(err, "copy blob data to reader")
return errors.Wrap(err, "copy target data to reader")
}

return nil
}

cur = cur - hdr.Size - headerSize
if cur < 0 {
break
}
}

return nil
return fmt.Errorf("can't find target %s in nydus tar", targetName)
}

// Pack converts an OCI tar stream to nydus formatted stream with a tar-like
// structure that arranges the data as follows:
//
// `blob_data | blob_tar_header | bootstrap_data | bootstrap_tar_header`
// `data | tar_header | data | tar_header`
//
// The caller should write OCI tar stream into the returned `io.WriteCloser`,
// then the Pack method will write the nydus formatted stream to `dest`
Expand Down Expand Up @@ -351,22 +293,25 @@ func Merge(ctx context.Context, layers []Layer, dest io.Writer, opt MergeOption)
}
defer os.RemoveAll(workDir)

getBootstrapPath := func(layerIdx int) string {
digestHex := layers[layerIdx].Digest.Hex()
return filepath.Join(workDir, digestHex)
}

eg, _ := errgroup.WithContext(ctx)
sourceBootstrapPaths := []string{}
for idx := range layers {
sourceBootstrapPaths = append(sourceBootstrapPaths, filepath.Join(workDir, layers[idx].Digest.Hex()))
sourceBootstrapPaths = append(sourceBootstrapPaths, getBootstrapPath(idx))
eg.Go(func(idx int) func() error {
return func() error {
layer := layers[idx]

// Use the hex hash string of whole tar blob as the bootstrap name.
bootstrap, err := os.Create(filepath.Join(workDir, layer.Digest.Hex()))
bootstrap, err := os.Create(getBootstrapPath(idx))
if err != nil {
return errors.Wrap(err, "create source bootstrap")
}
defer bootstrap.Close()

if err := unpackBootstrapFromNydusTar(layer.ReaderAt, bootstrap); err != nil {
if err := unpackFileFromNydusBlob(layers[idx].ReaderAt, bootstrapNameInTar, bootstrap); err != nil {
return errors.Wrap(err, "unpack nydus tar")
}

Expand Down Expand Up @@ -428,7 +373,7 @@ func Unpack(ctx context.Context, ra content.ReaderAt, dest io.Writer, opt Unpack
defer os.RemoveAll(workDir)

bootPath, blobPath := filepath.Join(workDir, bootstrapNameInTar), filepath.Join(workDir, blobNameInTar)
if err = unpackNydusTar(bootPath, blobPath, ra); err != nil {
if err = unpackNydusBlob(bootPath, blobPath, ra); err != nil {
return errors.Wrap(err, "unpack nydus tar")
}

Expand Down Expand Up @@ -726,33 +671,33 @@ func MergeLayers(ctx context.Context, cs content.Store, descs []ocispec.Descript
layers := []Layer{}

var chainID digest.Digest
for _, blobDesc := range descs {
ra, err := cs.ReaderAt(ctx, blobDesc)
for _, nydusBlobDesc := range descs {
ra, err := cs.ReaderAt(ctx, nydusBlobDesc)
if err != nil {
return nil, nil, errors.Wrapf(err, "get reader for blob %q", blobDesc.Digest)
return nil, nil, errors.Wrapf(err, "get reader for blob %q", nydusBlobDesc.Digest)
}
defer ra.Close()
layers = append(layers, Layer{
Digest: blobDesc.Digest,
Digest: nydusBlobDesc.Digest,
ReaderAt: ra,
})
if chainID == "" {
chainID = identity.ChainID([]digest.Digest{blobDesc.Digest})
chainID = identity.ChainID([]digest.Digest{nydusBlobDesc.Digest})
} else {
chainID = identity.ChainID([]digest.Digest{chainID, blobDesc.Digest})
chainID = identity.ChainID([]digest.Digest{chainID, nydusBlobDesc.Digest})
}
}

// Merge all nydus bootstraps into a final nydus bootstrap.
pr, pw := io.Pipe()
blobDigestChan := make(chan []digest.Digest, 1)
originalBlobDigestChan := make(chan []digest.Digest, 1)
go func() {
defer pw.Close()
blobDigests, err := Merge(ctx, layers, pw, opt)
originalBlobDigests, err := Merge(ctx, layers, pw, opt)
if err != nil {
pw.CloseWithError(errors.Wrapf(err, "merge nydus bootstrap"))
}
blobDigestChan <- blobDigests
originalBlobDigestChan <- originalBlobDigests
}()

// Compress final nydus bootstrap to tar.gz and write into content store.
Expand Down Expand Up @@ -791,8 +736,9 @@ func MergeLayers(ctx context.Context, cs content.Store, descs []ocispec.Descript
return nil, nil, errors.Wrap(err, "get info from content store")
}

blobDigests := <-blobDigestChan
blobDigests := <-originalBlobDigestChan
blobDescs := []ocispec.Descriptor{}

for _, blobDigest := range blobDigests {
blobInfo, err := cs.Info(ctx, blobDigest)
if err != nil {
Expand Down
Loading

0 comments on commit dba3aaf

Please sign in to comment.