Skip to content

Commit

Permalink
converter: enhance nydus tar unpack
Browse files Browse the repository at this point in the history
Unpacking file from nydus blob should seek until encountering the head
of blob stream, this patch ensures this to improve unpack compatibility
and refine the test codes.

Signed-off-by: Yan Song <[email protected]>
  • Loading branch information
imeoer committed Nov 29, 2022
1 parent f88a15a commit 15b62b3
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 189 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ cover:

smoke:
$(SUDO) NYDUS_BUILDER=${NYDUS_BUILDER} NYDUS_NYDUSD=${NYDUS_NYDUSD} ${GO_EXECUTABLE_PATH} test -race -v ./tests
$(SUDO) NYDUS_BUILDER=${NYDUS_BUILDER} NYDUS_NYDUSD=${NYDUS_NYDUSD} ${GO_EXECUTABLE_PATH} test -race -v ./tests -args -fs-version=6
$(SUDO) NYDUS_BUILDER=${NYDUS_BUILDER} NYDUS_NYDUSD=${NYDUS_NYDUSD} ${GO_EXECUTABLE_PATH} test -race -v ./tests

.PHONY: integration
integration:
Expand Down
162 changes: 54 additions & 108 deletions pkg/converter/convert_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,14 @@ func unpackOciTar(ctx context.Context, dst string, reader io.Reader) error {
}

// Unpack a Nydus formatted tar stream into a directory.
func unpackNydusTar(bootDst, blobDst string, ra content.ReaderAt) error {
func unpackNydusBlob(bootDst, blobDst string, ra content.ReaderAt) error {
boot, err := os.OpenFile(bootDst, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil {
return errors.Wrapf(err, "write to bootstrap %s", bootDst)
}
defer boot.Close()

if err = unpackBootstrapFromNydusTar(ra, boot); err != nil {
if err = unpackFileFromNydusBlob(ra, bootstrapNameInTar, boot); err != nil {
return errors.Wrap(err, "unpack bootstrap from nydus")
}

Expand All @@ -130,134 +130,76 @@ func unpackNydusTar(bootDst, blobDst string, ra content.ReaderAt) error {
}
defer blob.Close()

if err = unpackBlobFromNydusTar(ra, blob); err != nil {
if err = unpackFileFromNydusBlob(ra, blobNameInTar, blob); err != nil {
return errors.Wrap(err, "unpack blob from nydus")
}

return nil
}

// Unpack the bootstrap from nydus formatted tar stream (blob + bootstrap).
// Unpack the file from nydus formatted tar stream.
// The nydus formatted tar stream is a tar-like structure that arranges the
// data as follows:
//
// `blob_data | blob_tar_header | bootstrap_data | bootstrap_tar_header`
func unpackBootstrapFromNydusTar(ra content.ReaderAt, target io.Writer) error {
cur := ra.Size()
reader := newSeekReader(ra)

// `data | tar_header | data | tar_header`
func unpackFileFromNydusBlob(ra content.ReaderAt, targetName string, target io.Writer) error {
const headerSize = 512

// Seek from tail to head of nydus formatted tar stream to find nydus
// bootstrap data.
for {
if headerSize > cur {
return fmt.Errorf("invalid tar format at pos %d", cur)
}

// Try to seek to the part of tar header.
var err error
cur, err = reader.Seek(cur-headerSize, io.SeekCurrent)
if err != nil {
return errors.Wrapf(err, "seek to %d for tar header", cur-headerSize)
}

tr := tar.NewReader(reader)
// Parse tar header.
hdr, err := tr.Next()
if err != nil {
return errors.Wrap(err, "parse tar header")
}

if hdr.Name == bootstrapNameInTar {
// Try to seek to the part of tar data (bootstrap_data).
if hdr.Size > cur {
return fmt.Errorf("invalid tar format at pos %d", cur)
}
bootstrapOffset := cur - hdr.Size
_, err = reader.Seek(bootstrapOffset, io.SeekStart)
if err != nil {
return errors.Wrap(err, "seek to bootstrap data offset")
}

// Copy tar data (bootstrap_data) to provided target writer.
if _, err := io.CopyN(target, reader, hdr.Size); err != nil {
return errors.Wrap(err, "copy bootstrap data to reader")
}

return nil
}

if cur == hdr.Size {
break
}
if headerSize > ra.Size() {
return fmt.Errorf("invalid nydus tar size %d", ra.Size())
}

return fmt.Errorf("can't find bootstrap in nydus tar")
}

// Unpack the blob from nydus formatted tar stream (blob + bootstrap).
// The nydus formatted tar stream is a tar-like structure that arranges the
// data as follows:
//
// `blob_data | blob_tar_header | bootstrap_data | bootstrap_tar_header`
func unpackBlobFromNydusTar(ra content.ReaderAt, target io.Writer) error {
cur := ra.Size()
cur := ra.Size() - headerSize
reader := newSeekReader(ra)

const headerSize = 512

// Seek from tail to head of nydus formatted tar stream to find nydus
// bootstrap data.
// Seek from tail to head of nydus formatted tar stream to find
// target data.
for {
if headerSize > cur {
break
}

// Try to seek to the part of tar header.
var err error
cur, err = reader.Seek(cur-headerSize, io.SeekStart)
// Try to seek the part of tar header.
_, err := reader.Seek(cur, io.SeekStart)
if err != nil {
return errors.Wrapf(err, "seek to %d for tar header", cur-headerSize)
return errors.Wrapf(err, "seek %d for nydus tar header", cur)
}

tr := tar.NewReader(reader)
// Parse tar header.
tr := tar.NewReader(reader)
hdr, err := tr.Next()
if err != nil {
return errors.Wrap(err, "parse tar header")
return errors.Wrap(err, "parse nydus tar header")
}

if hdr.Name == bootstrapNameInTar {
if hdr.Size > cur {
return fmt.Errorf("invalid tar format at pos %d", cur)
}
cur, err = reader.Seek(cur-hdr.Size, io.SeekStart)
if err != nil {
return errors.Wrap(err, "seek to bootstrap data offset")
}
} else if hdr.Name == blobNameInTar {
if hdr.Size > cur {
return fmt.Errorf("invalid tar format at pos %d", cur)
}
if cur < hdr.Size {
return errors.Wrapf(err, "invalid nydus tar data, name %s, size %d", hdr.Name, hdr.Size)
}

if hdr.Name == targetName {
// Try to seek the part of tar data.
_, err = reader.Seek(cur-hdr.Size, io.SeekStart)
if err != nil {
return errors.Wrap(err, "seek to blob data offset")
return errors.Wrap(err, "seek target data offset")
}

// Copy tar data to provided target writer.
if _, err := io.CopyN(target, reader, hdr.Size); err != nil {
return errors.Wrap(err, "copy blob data to reader")
return errors.Wrap(err, "copy target data to reader")
}

return nil
}

cur = cur - hdr.Size - headerSize
if cur < 0 {
break
}
}

return nil
return fmt.Errorf("can't find target %s in nydus tar", targetName)
}

// Pack converts an OCI tar stream to nydus formatted stream with a tar-like
// structure that arranges the data as follows:
//
// `blob_data | blob_tar_header | bootstrap_data | bootstrap_tar_header`
// `data | tar_header | data | tar_header`
//
// The caller should write OCI tar stream into the returned `io.WriteCloser`,
// then the Pack method will write the nydus formatted stream to `dest`
Expand Down Expand Up @@ -351,22 +293,25 @@ func Merge(ctx context.Context, layers []Layer, dest io.Writer, opt MergeOption)
}
defer os.RemoveAll(workDir)

getBootstrapPath := func(layerIdx int) string {
digestHex := layers[layerIdx].Digest.Hex()
return filepath.Join(workDir, digestHex)
}

eg, _ := errgroup.WithContext(ctx)
sourceBootstrapPaths := []string{}
for idx := range layers {
sourceBootstrapPaths = append(sourceBootstrapPaths, filepath.Join(workDir, layers[idx].Digest.Hex()))
sourceBootstrapPaths = append(sourceBootstrapPaths, getBootstrapPath(idx))
eg.Go(func(idx int) func() error {
return func() error {
layer := layers[idx]

// Use the hex hash string of whole tar blob as the bootstrap name.
bootstrap, err := os.Create(filepath.Join(workDir, layer.Digest.Hex()))
bootstrap, err := os.Create(getBootstrapPath(idx))
if err != nil {
return errors.Wrap(err, "create source bootstrap")
}
defer bootstrap.Close()

if err := unpackBootstrapFromNydusTar(layer.ReaderAt, bootstrap); err != nil {
if err := unpackFileFromNydusBlob(layers[idx].ReaderAt, bootstrapNameInTar, bootstrap); err != nil {
return errors.Wrap(err, "unpack nydus tar")
}

Expand Down Expand Up @@ -428,7 +373,7 @@ func Unpack(ctx context.Context, ra content.ReaderAt, dest io.Writer, opt Unpack
defer os.RemoveAll(workDir)

bootPath, blobPath := filepath.Join(workDir, bootstrapNameInTar), filepath.Join(workDir, blobNameInTar)
if err = unpackNydusTar(bootPath, blobPath, ra); err != nil {
if err = unpackNydusBlob(bootPath, blobPath, ra); err != nil {
return errors.Wrap(err, "unpack nydus tar")
}

Expand Down Expand Up @@ -726,33 +671,33 @@ func MergeLayers(ctx context.Context, cs content.Store, descs []ocispec.Descript
layers := []Layer{}

var chainID digest.Digest
for _, blobDesc := range descs {
ra, err := cs.ReaderAt(ctx, blobDesc)
for _, nydusBlobDesc := range descs {
ra, err := cs.ReaderAt(ctx, nydusBlobDesc)
if err != nil {
return nil, nil, errors.Wrapf(err, "get reader for blob %q", blobDesc.Digest)
return nil, nil, errors.Wrapf(err, "get reader for blob %q", nydusBlobDesc.Digest)
}
defer ra.Close()
layers = append(layers, Layer{
Digest: blobDesc.Digest,
Digest: nydusBlobDesc.Digest,
ReaderAt: ra,
})
if chainID == "" {
chainID = identity.ChainID([]digest.Digest{blobDesc.Digest})
chainID = identity.ChainID([]digest.Digest{nydusBlobDesc.Digest})
} else {
chainID = identity.ChainID([]digest.Digest{chainID, blobDesc.Digest})
chainID = identity.ChainID([]digest.Digest{chainID, nydusBlobDesc.Digest})
}
}

// Merge all nydus bootstraps into a final nydus bootstrap.
pr, pw := io.Pipe()
blobDigestChan := make(chan []digest.Digest, 1)
originalBlobDigestChan := make(chan []digest.Digest, 1)
go func() {
defer pw.Close()
blobDigests, err := Merge(ctx, layers, pw, opt)
originalBlobDigests, err := Merge(ctx, layers, pw, opt)
if err != nil {
pw.CloseWithError(errors.Wrapf(err, "merge nydus bootstrap"))
}
blobDigestChan <- blobDigests
originalBlobDigestChan <- originalBlobDigests
}()

// Compress final nydus bootstrap to tar.gz and write into content store.
Expand Down Expand Up @@ -791,8 +736,9 @@ func MergeLayers(ctx context.Context, cs content.Store, descs []ocispec.Descript
return nil, nil, errors.Wrap(err, "get info from content store")
}

blobDigests := <-blobDigestChan
blobDigests := <-originalBlobDigestChan
blobDescs := []ocispec.Descriptor{}

for _, blobDigest := range blobDigests {
blobInfo, err := cs.Info(ctx, blobDigest)
if err != nil {
Expand Down
Loading

0 comments on commit 15b62b3

Please sign in to comment.