Skip to content

Commit

Permalink
feat(filestore): add mmap reader option
Browse files Browse the repository at this point in the history
  • Loading branch information
Dreamacro committed Sep 4, 2024
1 parent 317eb7d commit 6e22b09
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 68 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ The following emojis are used to highlight certain changes:
### Added

- `files`, `ipld/unixfs`, `mfs` and `tar` now support optional UnixFS 1.5 mode and modification time metadata
- add `WithMMapReader` option to `FileManager`

### Changed

Expand Down
61 changes: 61 additions & 0 deletions filestore/filereader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package filestore

import (
"io"
"os"

"golang.org/x/exp/mmap"
)

type FileReader interface {
io.ReaderAt
io.Closer
}

var _ FileReader = (*stdReader)(nil)

type stdReader struct {
f *os.File
}

// ReadAt implements the FileReader interface.
func (r *stdReader) ReadAt(p []byte, off int64) (n int, err error) {
return r.f.ReadAt(p, off)
}

// Close implements the FileReader interface.
func (r *stdReader) Close() error {
return r.f.Close()
}

func newStdReader(path string) (FileReader, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
}

Check warning on line 35 in filestore/filereader.go

View check run for this annotation

Codecov / codecov/patch

filestore/filereader.go#L34-L35

Added lines #L34 - L35 were not covered by tests
return &stdReader{f: f}, nil
}

var _ FileReader = (*mmapReader)(nil)

type mmapReader struct {
m *mmap.ReaderAt
}

// ReadAt implements the FileReader interface.
func (r *mmapReader) ReadAt(p []byte, off int64) (n int, err error) {
return r.m.ReadAt(p, off)
}

// Close implements the FileReader interface.
func (r *mmapReader) Close() error {
return r.m.Close()
}

func newMmapReader(path string) (FileReader, error) {
m, err := mmap.Open(path)
if err != nil {
return nil, err
}

Check warning on line 59 in filestore/filereader.go

View check run for this annotation

Codecov / codecov/patch

filestore/filereader.go#L58-L59

Added lines #L58 - L59 were not covered by tests
return &mmapReader{m: m}, nil
}
128 changes: 70 additions & 58 deletions filestore/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ import (

var bg = context.Background()

func newTestFilestore(t *testing.T) (string, *Filestore) {
func newTestFilestore(t *testing.T, option ...Option) (string, *Filestore) {
mds := ds.NewMapDatastore()

testdir, err := os.MkdirTemp("", "filestore-test")
if err != nil {
t.Fatal(err)
}
fm := NewFileManager(mds, testdir)
fm := NewFileManager(mds, testdir, option...)
fm.AllowFiles = true

bs := blockstore.NewBlockstore(mds)
Expand All @@ -48,62 +48,74 @@ func makeFile(dir string, data []byte) (string, error) {
}

func TestBasicFilestore(t *testing.T) {
dir, fs := newTestFilestore(t)

buf := make([]byte, 1000)
rand.Read(buf)

fname, err := makeFile(dir, buf)
if err != nil {
t.Fatal(err)
}

var cids []cid.Cid
for i := 0; i < 100; i++ {
n := &posinfo.FilestoreNode{
PosInfo: &posinfo.PosInfo{
FullPath: fname,
Offset: uint64(i * 10),
},
Node: dag.NewRawNode(buf[i*10 : (i+1)*10]),
}

err := fs.Put(bg, n)
if err != nil {
t.Fatal(err)
}
cids = append(cids, n.Node.Cid())
}

for i, c := range cids {
blk, err := fs.Get(bg, c)
if err != nil {
t.Fatal(err)
}

if !bytes.Equal(blk.RawData(), buf[i*10:(i+1)*10]) {
t.Fatal("data didnt match on the way out")
}
}

kch, err := fs.AllKeysChan(context.Background())
if err != nil {
t.Fatal(err)
}

out := make(map[string]struct{})
for c := range kch {
out[c.KeyString()] = struct{}{}
}

if len(out) != len(cids) {
t.Fatal("mismatch in number of entries")
}

for _, c := range cids {
if _, ok := out[c.KeyString()]; !ok {
t.Fatal("missing cid: ", c)
}
cases := []struct {
name string
options []Option
}{
{"default", nil},
{"mmap", []Option{WithMMapReader()}},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
dir, fs := newTestFilestore(t, c.options...)

buf := make([]byte, 1000)
rand.Read(buf)

fname, err := makeFile(dir, buf)
if err != nil {
t.Fatal(err)
}

var cids []cid.Cid
for i := 0; i < 100; i++ {
n := &posinfo.FilestoreNode{
PosInfo: &posinfo.PosInfo{
FullPath: fname,
Offset: uint64(i * 10),
},
Node: dag.NewRawNode(buf[i*10 : (i+1)*10]),
}

err := fs.Put(bg, n)
if err != nil {
t.Fatal(err)
}
cids = append(cids, n.Node.Cid())
}

for i, c := range cids {
blk, err := fs.Get(bg, c)
if err != nil {
t.Fatal(err)
}

if !bytes.Equal(blk.RawData(), buf[i*10:(i+1)*10]) {
t.Fatal("data didnt match on the way out")
}
}

kch, err := fs.AllKeysChan(context.Background())
if err != nil {
t.Fatal(err)
}

out := make(map[string]struct{})
for c := range kch {
out[c.KeyString()] = struct{}{}
}

if len(out) != len(cids) {
t.Fatal("mismatch in number of entries")
}

for _, c := range cids {
if _, ok := out[c.KeyString()]; !ok {
t.Fatal("missing cid: ", c)
}
}
})
}
}

Expand Down
37 changes: 28 additions & 9 deletions filestore/fsrefstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
// FilestorePrefix identifies the key prefix for FileManager blocks.
var FilestorePrefix = ds.NewKey("filestore")

type Option func(*FileManager)

// FileManager is a blockstore implementation which stores special
// blocks FilestoreNode type. These nodes only contain a reference
// to the actual location of the block data in the filesystem
Expand All @@ -34,6 +36,7 @@ type FileManager struct {
AllowUrls bool
ds ds.Batching
root string
readerFact func(path string) (FileReader, error)
}

// CorruptReferenceError implements the error interface.
Expand All @@ -51,11 +54,32 @@ func (c CorruptReferenceError) Error() string {
return c.Err.Error()
}

// WithMMapReader sets the FileManager's reader factory to use memory-mapped file I/O.
// On Windows, when reading and writing to a file simultaneously, the system would consume
// a significant amount of memory due to caching. This memory usage is not reflected in
// the application but in the system. Using memory-mapped files (implemented with
// CreateFileMapping on Windows) avoids this issue.
func WithMMapReader() Option {
return func(f *FileManager) {
f.readerFact = newMmapReader
}
}

// NewFileManager initializes a new file manager with the given
// datastore and root. All FilestoreNodes paths are relative to the
// root path given here, which is prepended for any operations.
func NewFileManager(ds ds.Batching, root string) *FileManager {
return &FileManager{ds: dsns.Wrap(ds, FilestorePrefix), root: root}
func NewFileManager(ds ds.Batching, root string, options ...Option) *FileManager {
f := &FileManager{
ds: dsns.Wrap(ds, FilestorePrefix),
root: root,
readerFact: newStdReader,
}

for _, option := range options {
option(f)
}

return f
}

// AllKeysChan returns a channel from which to read the keys stored in
Expand Down Expand Up @@ -175,21 +199,16 @@ func (f *FileManager) readFileDataObj(m mh.Multihash, d *pb.DataObj) ([]byte, er
p := filepath.FromSlash(d.GetFilePath())
abspath := filepath.Join(f.root, p)

fi, err := os.Open(abspath)
fi, err := f.readerFact(abspath)
if os.IsNotExist(err) {
return nil, &CorruptReferenceError{StatusFileNotFound, err}
} else if err != nil {
return nil, &CorruptReferenceError{StatusFileError, err}
}
defer fi.Close()

_, err = fi.Seek(int64(d.GetOffset()), io.SeekStart)
if err != nil {
return nil, &CorruptReferenceError{StatusFileError, err}
}

outbuf := make([]byte, d.GetSize_())
_, err = io.ReadFull(fi, outbuf)
_, err = fi.ReadAt(outbuf, int64(d.GetOffset()))
if err == io.EOF || err == io.ErrUnexpectedEOF {
return nil, &CorruptReferenceError{StatusFileChanged, err}
} else if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ require (
go.uber.org/atomic v1.11.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56
golang.org/x/oauth2 v0.21.0
golang.org/x/sync v0.7.0
golang.org/x/sys v0.22.0
Expand Down Expand Up @@ -182,7 +183,6 @@ require (
go.uber.org/fx v1.22.1 // indirect
go.uber.org/mock v0.4.0 // indirect
golang.org/x/crypto v0.25.0 // indirect
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
golang.org/x/mod v0.19.0 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/text v0.16.0 // indirect
Expand Down

0 comments on commit 6e22b09

Please sign in to comment.