Skip to content

Commit

Permalink
feat(filestore): add mmap reader option
Browse files Browse the repository at this point in the history
  • Loading branch information
Dreamacro committed Sep 4, 2024
1 parent 317eb7d commit 791dc5f
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ The following emojis are used to highlight certain changes:
### Added

- `files`, `ipld/unixfs`, `mfs` and `tar` now support optional UnixFS 1.5 mode and modification time metadata
- add `WithMMapReader` option to `FileManager`

### Changed

Expand Down
61 changes: 61 additions & 0 deletions filestore/filereader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package filestore

import (
"io"
"os"

"golang.org/x/exp/mmap"
)

type FileReader interface {
io.ReaderAt
io.Closer
}

var _ FileReader = (*stdReader)(nil)

type stdReader struct {
f *os.File
}

// ReadAt implements the FileReader interface.
func (r *stdReader) ReadAt(p []byte, off int64) (n int, err error) {
return r.f.ReadAt(p, off)
}

// Close implements the FileReader interface.
func (r *stdReader) Close() error {
return r.f.Close()
}

func newStdReader(path string) (FileReader, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
}

Check warning on line 35 in filestore/filereader.go

View check run for this annotation

Codecov / codecov/patch

filestore/filereader.go#L34-L35

Added lines #L34 - L35 were not covered by tests
return &stdReader{f: f}, nil
}

var _ FileReader = (*mmapReader)(nil)

type mmapReader struct {
m *mmap.ReaderAt
}

// ReadAt implements the FileReader interface.
func (r *mmapReader) ReadAt(p []byte, off int64) (n int, err error) {
return r.m.ReadAt(p, off)

Check warning on line 47 in filestore/filereader.go

View check run for this annotation

Codecov / codecov/patch

filestore/filereader.go#L46-L47

Added lines #L46 - L47 were not covered by tests
}

// Close implements the FileReader interface.
func (r *mmapReader) Close() error {
return r.m.Close()

Check warning on line 52 in filestore/filereader.go

View check run for this annotation

Codecov / codecov/patch

filestore/filereader.go#L51-L52

Added lines #L51 - L52 were not covered by tests
}

func newMmapReader(path string) (FileReader, error) {
m, err := mmap.Open(path)
if err != nil {
return nil, err
}
return &mmapReader{m: m}, nil

Check warning on line 60 in filestore/filereader.go

View check run for this annotation

Codecov / codecov/patch

filestore/filereader.go#L55-L60

Added lines #L55 - L60 were not covered by tests
}
37 changes: 28 additions & 9 deletions filestore/fsrefstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
// FilestorePrefix identifies the key prefix for FileManager blocks.
var FilestorePrefix = ds.NewKey("filestore")

type Option func(*FileManager)

// FileManager is a blockstore implementation which stores special
// blocks FilestoreNode type. These nodes only contain a reference
// to the actual location of the block data in the filesystem
Expand All @@ -34,6 +36,7 @@ type FileManager struct {
AllowUrls bool
ds ds.Batching
root string
readerFact func(path string) (FileReader, error)
}

// CorruptReferenceError implements the error interface.
Expand All @@ -51,11 +54,32 @@ func (c CorruptReferenceError) Error() string {
return c.Err.Error()
}

// WithMMapReader sets the FileManager's reader factory to use memory-mapped file I/O.
// On Windows, when reading and writing to a file simultaneously, the system would consume
// a significant amount of memory due to caching. This memory usage is not reflected in
// the application but in the system. Using memory-mapped files (implemented with
// CreateFileMapping on Windows) avoids this issue.
func WithMMapReader() Option {
return func(f *FileManager) {
f.readerFact = newMmapReader
}

Check warning on line 65 in filestore/fsrefstore.go

View check run for this annotation

Codecov / codecov/patch

filestore/fsrefstore.go#L62-L65

Added lines #L62 - L65 were not covered by tests
}

// NewFileManager initializes a new file manager with the given
// datastore and root. All FilestoreNodes paths are relative to the
// root path given here, which is prepended for any operations.
func NewFileManager(ds ds.Batching, root string) *FileManager {
return &FileManager{ds: dsns.Wrap(ds, FilestorePrefix), root: root}
func NewFileManager(ds ds.Batching, root string, options ...Option) *FileManager {
f := &FileManager{
ds: dsns.Wrap(ds, FilestorePrefix),
root: root,
readerFact: newStdReader,
}

for _, option := range options {
option(f)
}

Check warning on line 80 in filestore/fsrefstore.go

View check run for this annotation

Codecov / codecov/patch

filestore/fsrefstore.go#L79-L80

Added lines #L79 - L80 were not covered by tests

return f
}

// AllKeysChan returns a channel from which to read the keys stored in
Expand Down Expand Up @@ -175,21 +199,16 @@ func (f *FileManager) readFileDataObj(m mh.Multihash, d *pb.DataObj) ([]byte, er
p := filepath.FromSlash(d.GetFilePath())
abspath := filepath.Join(f.root, p)

fi, err := os.Open(abspath)
fi, err := f.readerFact(abspath)
if os.IsNotExist(err) {
return nil, &CorruptReferenceError{StatusFileNotFound, err}
} else if err != nil {
return nil, &CorruptReferenceError{StatusFileError, err}
}
defer fi.Close()

_, err = fi.Seek(int64(d.GetOffset()), io.SeekStart)
if err != nil {
return nil, &CorruptReferenceError{StatusFileError, err}
}

outbuf := make([]byte, d.GetSize_())
_, err = io.ReadFull(fi, outbuf)
_, err = fi.ReadAt(outbuf, int64(d.GetOffset()))
if err == io.EOF || err == io.ErrUnexpectedEOF {
return nil, &CorruptReferenceError{StatusFileChanged, err}
} else if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ require (
go.uber.org/atomic v1.11.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56
golang.org/x/oauth2 v0.21.0
golang.org/x/sync v0.7.0
golang.org/x/sys v0.22.0
Expand Down Expand Up @@ -182,7 +183,6 @@ require (
go.uber.org/fx v1.22.1 // indirect
go.uber.org/mock v0.4.0 // indirect
golang.org/x/crypto v0.25.0 // indirect
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
golang.org/x/mod v0.19.0 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/text v0.16.0 // indirect
Expand Down

0 comments on commit 791dc5f

Please sign in to comment.