Skip to content
This repository has been archived by the owner on May 15, 2024. It is now read-only.

Commit

Permalink
Optimize retrieval to do minimal fetcher from singularity
Browse files Browse the repository at this point in the history
Read HTTP range headers and do fetch for entire range instead of allowing the io.CopyN, used by http.ServeContent, to do multiple small fetches.

Fixes #143
  • Loading branch information
gammazero committed Oct 30, 2023
1 parent 14b3c4c commit f249e19
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 10 deletions.
57 changes: 57 additions & 0 deletions integration/singularity/range_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package singularity

import (
"errors"
"io"
)

// rangeReader reads data from one individually retrievable file range.
type rangeReader struct {
// offset is the absolute offset within file where the next read will get
// data from.
offset int64
reader io.ReadCloser
remaining int64
}

func (rr *rangeReader) writeToN(w io.Writer, readLen int64) (int64, error) {
var read int64
for readLen > 0 {
if rr.remaining == 0 {
return read, io.EOF
}
var n int64
var err error

if readLen >= rr.remaining {
// Copy all remaining bytes.
n, err = io.Copy(w, rr.reader)
} else {
// Copy requested number of bytes.
n, err = io.CopyN(w, rr.reader, readLen)
}
if err != nil && !errors.Is(err, io.EOF) {
return 0, err
}
if n == 0 {
// Must have been EOF.
rr.remaining = 0
return read, io.EOF
}
rr.offset += n
rr.remaining -= n
readLen -= n
read += n
}
return read, nil
}

func (rr *rangeReader) close() error {
var err error
if rr.reader != nil {
rr.remaining = 0
err = rr.reader.Close()
rr.reader = nil
}
return err
}
112 changes: 102 additions & 10 deletions integration/singularity/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ type SingularityReader struct {
fileID uint64
offset int64
size int64

// Reads remaining data from current range.
rangeReader *rangeReader
}

func (r *SingularityReader) Read(p []byte) (int, error) {
Expand Down Expand Up @@ -50,18 +53,83 @@ func (r *SingularityReader) WriteTo(w io.Writer) (int64, error) {
}

func (r *SingularityReader) writeToN(w io.Writer, readLen int64) (int64, error) {
_, _, err := r.client.File.RetrieveFile(&file.RetrieveFileParams{
Context: context.Background(),
ID: int64(r.fileID),
Range: ptr.String(fmt.Sprintf("bytes=%d-%d", r.offset, r.offset+readLen-1)),
}, w)
if err != nil {
return 0, fmt.Errorf("failed to retrieve file slice: %w", err)
var read int64
// If there is a rangeReader from the previous read that can be used to
// continue reading more data, then use it instead of doing another
// findFileRanges and Retrieve for more reads from this same range.
if r.rangeReader != nil {
// If continuing from the previous read, keep reading from this rangeReader.
if r.offset == r.rangeReader.offset {
// Reading data leftover from previous read into w.
n, err := r.rangeReader.writeToN(w, readLen)
if err != nil && !errors.Is(err, io.EOF) {
return 0, err
}
r.offset += n
readLen -= n
read += n
if r.rangeReader.remaining == 0 {
// No data left in range reader.
r.rangeReader.close()
r.rangeReader = nil
}
if readLen == 0 {
// Read all requested data from leftover in rangeReader.
return read, nil
}
// No more leftover data to read, but readLen additional bytes
// still needed. Will read more data from next range(s).
} else {
// Trying to read from outside of rangeReader's range. Must have
// seeked out of current range. Close rangeReader and read new
// range.
r.rangeReader.close()
r.rangeReader = nil
}
}

rangeReadLen := readLen
offsetInRange := r.offset - r.size
remainingRange := r.size - offsetInRange
if rangeReadLen > remainingRange {
rangeReadLen = remainingRange
}

byteRange := fmt.Sprintf("bytes=%d-%d", r.offset, r.offset+readLen-1)
rr := &rangeReader{
offset: r.offset,
reader: r.retrieveReader(context.Background(), int64(r.fileID), byteRange),
remaining: remainingRange,
}

// Reading readLen of the remaining bytes in this range.
n, err := rr.writeToN(w, readLen)
if err != nil && !errors.Is(err, io.EOF) {
rr.close()
return 0, err
}
r.offset += n
readLen -= n
read += n

// check for missing file ranges at the end
if readLen > 0 {
if rr != nil {

Check failure on line 117 in integration/singularity/reader.go

View workflow job for this annotation

GitHub Actions / go-check / All

this nil check is always true (SA4031)
rr.close()
}
return read, fmt.Errorf("not enough data to serve entire range %s", byteRange)
}

r.offset += readLen
// Some unread data left over in this range. Save it for next read.
if rr.remaining != 0 {
// Saving leftover rangeReader with rr.remaining bytes left.
r.rangeReader = rr
} else {
// Leftover rangeReader has 0 bytes remaining.
rr.close()
}

return readLen, nil
return read, nil
}

func (r *SingularityReader) Seek(offset int64, whence int) (int64, error) {
Expand All @@ -88,5 +156,29 @@ func (r *SingularityReader) Seek(offset int64, whence int) (int64, error) {
}

func (r *SingularityReader) Close() error {
return nil
var err error
if r.rangeReader != nil {
err = r.rangeReader.close()
r.rangeReader = nil
}
return err
}

func (r *SingularityReader) retrieveReader(ctx context.Context, fileID int64, byteRange string) io.ReadCloser {
// Start goroutine to read from singularity into write end of pipe.
reader, writer := io.Pipe()
go func() {
_, _, err := r.client.File.RetrieveFile(&file.RetrieveFileParams{
Context: ctx,
ID: fileID,
Range: ptr.String(byteRange),
}, writer)
if err != nil {
err = fmt.Errorf("failed to retrieve file slice: %w", err)
}
writer.CloseWithError(err)
}()

// Return the read end of pipe.
return reader
}

0 comments on commit f249e19

Please sign in to comment.