Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(filestore): add mmap reader option #665

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ The following emojis are used to highlight certain changes:
### Added

- `files`, `ipld/unixfs`, `mfs` and `tar` now support optional UnixFS 1.5 mode and modification time metadata
- add `WithMMapReader` option to `FileManager`

### Changed

Expand Down
61 changes: 61 additions & 0 deletions filestore/filereader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package filestore

import (
"io"
"os"

"golang.org/x/exp/mmap"
)

type FileReader interface {
io.ReaderAt
io.Closer
}

var _ FileReader = (*stdReader)(nil)

type stdReader struct {
f *os.File
}

// ReadAt implements the FileReader interface.
func (r *stdReader) ReadAt(p []byte, off int64) (n int, err error) {
return r.f.ReadAt(p, off)
}

// Close implements the FileReader interface.
func (r *stdReader) Close() error {
return r.f.Close()
}

func newStdReader(path string) (FileReader, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
}
return &stdReader{f: f}, nil
}

var _ FileReader = (*mmapReader)(nil)

type mmapReader struct {
m *mmap.ReaderAt
}

// ReadAt implements the FileReader interface.
func (r *mmapReader) ReadAt(p []byte, off int64) (n int, err error) {
return r.m.ReadAt(p, off)
}

// Close implements the FileReader interface.
func (r *mmapReader) Close() error {
return r.m.Close()
}

func newMmapReader(path string) (FileReader, error) {
m, err := mmap.Open(path)
if err != nil {
return nil, err
}
return &mmapReader{m: m}, nil
}
128 changes: 70 additions & 58 deletions filestore/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ import (

var bg = context.Background()

func newTestFilestore(t *testing.T) (string, *Filestore) {
func newTestFilestore(t *testing.T, option ...Option) (string, *Filestore) {
mds := ds.NewMapDatastore()

testdir, err := os.MkdirTemp("", "filestore-test")
if err != nil {
t.Fatal(err)
}
fm := NewFileManager(mds, testdir)
fm := NewFileManager(mds, testdir, option...)
fm.AllowFiles = true

bs := blockstore.NewBlockstore(mds)
Expand All @@ -48,62 +48,74 @@ func makeFile(dir string, data []byte) (string, error) {
}

func TestBasicFilestore(t *testing.T) {
dir, fs := newTestFilestore(t)

buf := make([]byte, 1000)
rand.Read(buf)

fname, err := makeFile(dir, buf)
if err != nil {
t.Fatal(err)
}

var cids []cid.Cid
for i := 0; i < 100; i++ {
n := &posinfo.FilestoreNode{
PosInfo: &posinfo.PosInfo{
FullPath: fname,
Offset: uint64(i * 10),
},
Node: dag.NewRawNode(buf[i*10 : (i+1)*10]),
}

err := fs.Put(bg, n)
if err != nil {
t.Fatal(err)
}
cids = append(cids, n.Node.Cid())
}

for i, c := range cids {
blk, err := fs.Get(bg, c)
if err != nil {
t.Fatal(err)
}

if !bytes.Equal(blk.RawData(), buf[i*10:(i+1)*10]) {
t.Fatal("data didnt match on the way out")
}
}

kch, err := fs.AllKeysChan(context.Background())
if err != nil {
t.Fatal(err)
}

out := make(map[string]struct{})
for c := range kch {
out[c.KeyString()] = struct{}{}
}

if len(out) != len(cids) {
t.Fatal("mismatch in number of entries")
}

for _, c := range cids {
if _, ok := out[c.KeyString()]; !ok {
t.Fatal("missing cid: ", c)
}
cases := []struct {
name string
options []Option
}{
{"default", nil},
{"mmap", []Option{WithMMapReader()}},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
dir, fs := newTestFilestore(t, c.options...)

buf := make([]byte, 1000)
rand.Read(buf)

fname, err := makeFile(dir, buf)
if err != nil {
t.Fatal(err)
}

var cids []cid.Cid
for i := 0; i < 100; i++ {
n := &posinfo.FilestoreNode{
PosInfo: &posinfo.PosInfo{
FullPath: fname,
Offset: uint64(i * 10),
},
Node: dag.NewRawNode(buf[i*10 : (i+1)*10]),
}

err := fs.Put(bg, n)
if err != nil {
t.Fatal(err)
}
cids = append(cids, n.Node.Cid())
}

for i, c := range cids {
blk, err := fs.Get(bg, c)
if err != nil {
t.Fatal(err)
}

if !bytes.Equal(blk.RawData(), buf[i*10:(i+1)*10]) {
t.Fatal("data didnt match on the way out")
}
}

kch, err := fs.AllKeysChan(context.Background())
if err != nil {
t.Fatal(err)
}

out := make(map[string]struct{})
for c := range kch {
out[c.KeyString()] = struct{}{}
}

if len(out) != len(cids) {
t.Fatal("mismatch in number of entries")
}

for _, c := range cids {
if _, ok := out[c.KeyString()]; !ok {
t.Fatal("missing cid: ", c)
}
}
})
}
}

Expand Down
37 changes: 28 additions & 9 deletions filestore/fsrefstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
// FilestorePrefix identifies the key prefix for FileManager blocks.
var FilestorePrefix = ds.NewKey("filestore")

type Option func(*FileManager)

// FileManager is a blockstore implementation which stores special
// blocks FilestoreNode type. These nodes only contain a reference
// to the actual location of the block data in the filesystem
Expand All @@ -34,6 +36,7 @@ type FileManager struct {
AllowUrls bool
ds ds.Batching
root string
makeReader func(path string) (FileReader, error)
}

// CorruptReferenceError implements the error interface.
Expand All @@ -51,11 +54,32 @@ func (c CorruptReferenceError) Error() string {
return c.Err.Error()
}

// WithMMapReader sets the FileManager's reader factory to use memory-mapped file I/O.
// On Windows, when reading and writing to a file simultaneously, the system would consume
// a significant amount of memory due to caching. This memory usage is not reflected in
// the application but in the system. Using memory-mapped files (implemented with
// CreateFileMapping on Windows) avoids this issue.
func WithMMapReader() Option {
return func(f *FileManager) {
f.makeReader = newMmapReader
}
}

// NewFileManager initializes a new file manager with the given
// datastore and root. All FilestoreNodes paths are relative to the
// root path given here, which is prepended for any operations.
func NewFileManager(ds ds.Batching, root string) *FileManager {
return &FileManager{ds: dsns.Wrap(ds, FilestorePrefix), root: root}
func NewFileManager(ds ds.Batching, root string, options ...Option) *FileManager {
f := &FileManager{
ds: dsns.Wrap(ds, FilestorePrefix),
root: root,
makeReader: newStdReader,
}

for _, option := range options {
option(f)
}

return f
}

// AllKeysChan returns a channel from which to read the keys stored in
Expand Down Expand Up @@ -175,21 +199,16 @@ func (f *FileManager) readFileDataObj(m mh.Multihash, d *pb.DataObj) ([]byte, er
p := filepath.FromSlash(d.GetFilePath())
abspath := filepath.Join(f.root, p)

fi, err := os.Open(abspath)
fi, err := f.makeReader(abspath)
if os.IsNotExist(err) {
return nil, &CorruptReferenceError{StatusFileNotFound, err}
} else if err != nil {
return nil, &CorruptReferenceError{StatusFileError, err}
}
defer fi.Close()

_, err = fi.Seek(int64(d.GetOffset()), io.SeekStart)
if err != nil {
return nil, &CorruptReferenceError{StatusFileError, err}
}

outbuf := make([]byte, d.GetSize_())
_, err = io.ReadFull(fi, outbuf)
_, err = fi.ReadAt(outbuf, int64(d.GetOffset()))
if err == io.EOF || err == io.ErrUnexpectedEOF {
return nil, &CorruptReferenceError{StatusFileChanged, err}
} else if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ require (
go.uber.org/atomic v1.11.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56
golang.org/x/oauth2 v0.21.0
golang.org/x/sync v0.7.0
golang.org/x/sys v0.22.0
Expand Down Expand Up @@ -182,7 +183,6 @@ require (
go.uber.org/fx v1.22.1 // indirect
go.uber.org/mock v0.4.0 // indirect
golang.org/x/crypto v0.25.0 // indirect
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
golang.org/x/mod v0.19.0 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/text v0.16.0 // indirect
Expand Down