From 791dc5fb4f0807c87f04fe83e4c7305567506e8a Mon Sep 17 00:00:00 2001 From: Dreamacro <8615343+Dreamacro@users.noreply.github.com> Date: Wed, 4 Sep 2024 16:41:40 +0800 Subject: [PATCH] feat(filestore): add mmap reader option --- CHANGELOG.md | 1 + filestore/filereader.go | 61 +++++++++++++++++++++++++++++++++++++++++ filestore/fsrefstore.go | 37 +++++++++++++++++++------ go.mod | 2 +- 4 files changed, 91 insertions(+), 10 deletions(-) create mode 100644 filestore/filereader.go diff --git a/CHANGELOG.md b/CHANGELOG.md index d0c5f4f67..1d0f42c1e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ The following emojis are used to highlight certain changes: ### Added - `files`, `ipld/unixfs`, `mfs` and `tar` now support optional UnixFS 1.5 mode and modification time metadata +- add `WithMMapReader` option to `FileManager` ### Changed diff --git a/filestore/filereader.go b/filestore/filereader.go new file mode 100644 index 000000000..fba3cc942 --- /dev/null +++ b/filestore/filereader.go @@ -0,0 +1,61 @@ +package filestore + +import ( + "io" + "os" + + "golang.org/x/exp/mmap" +) + +type FileReader interface { + io.ReaderAt + io.Closer +} + +var _ FileReader = (*stdReader)(nil) + +type stdReader struct { + f *os.File +} + +// ReadAt implements the FileReader interface. +func (r *stdReader) ReadAt(p []byte, off int64) (n int, err error) { + return r.f.ReadAt(p, off) +} + +// Close implements the FileReader interface. +func (r *stdReader) Close() error { + return r.f.Close() +} + +func newStdReader(path string) (FileReader, error) { + f, err := os.Open(path) + if err != nil { + return nil, err + } + return &stdReader{f: f}, nil +} + +var _ FileReader = (*mmapReader)(nil) + +type mmapReader struct { + m *mmap.ReaderAt +} + +// ReadAt implements the FileReader interface. +func (r *mmapReader) ReadAt(p []byte, off int64) (n int, err error) { + return r.m.ReadAt(p, off) +} + +// Close implements the FileReader interface. +func (r *mmapReader) Close() error { + return r.m.Close() +} + +func newMmapReader(path string) (FileReader, error) { + m, err := mmap.Open(path) + if err != nil { + return nil, err + } + return &mmapReader{m: m}, nil +} diff --git a/filestore/fsrefstore.go b/filestore/fsrefstore.go index 158eadf7a..61908cdf3 100644 --- a/filestore/fsrefstore.go +++ b/filestore/fsrefstore.go @@ -25,6 +25,8 @@ import ( // FilestorePrefix identifies the key prefix for FileManager blocks. var FilestorePrefix = ds.NewKey("filestore") +type Option func(*FileManager) + // FileManager is a blockstore implementation which stores special // blocks FilestoreNode type. These nodes only contain a reference // to the actual location of the block data in the filesystem @@ -34,6 +36,7 @@ type FileManager struct { AllowUrls bool ds ds.Batching root string + readerFact func(path string) (FileReader, error) } // CorruptReferenceError implements the error interface. @@ -51,11 +54,32 @@ func (c CorruptReferenceError) Error() string { return c.Err.Error() } +// WithMMapReader sets the FileManager's reader factory to use memory-mapped file I/O. +// On Windows, when reading and writing to a file simultaneously, the system would consume +// a significant amount of memory due to caching. This memory usage is not reflected in +// the application but in the system. Using memory-mapped files (implemented with +// CreateFileMapping on Windows) avoids this issue. +func WithMMapReader() Option { + return func(f *FileManager) { + f.readerFact = newMmapReader + } +} + // NewFileManager initializes a new file manager with the given // datastore and root. All FilestoreNodes paths are relative to the // root path given here, which is prepended for any operations. -func NewFileManager(ds ds.Batching, root string) *FileManager { - return &FileManager{ds: dsns.Wrap(ds, FilestorePrefix), root: root} +func NewFileManager(ds ds.Batching, root string, options ...Option) *FileManager { + f := &FileManager{ + ds: dsns.Wrap(ds, FilestorePrefix), + root: root, + readerFact: newStdReader, + } + + for _, option := range options { + option(f) + } + + return f } // AllKeysChan returns a channel from which to read the keys stored in @@ -175,7 +199,7 @@ func (f *FileManager) readFileDataObj(m mh.Multihash, d *pb.DataObj) ([]byte, er p := filepath.FromSlash(d.GetFilePath()) abspath := filepath.Join(f.root, p) - fi, err := os.Open(abspath) + fi, err := f.readerFact(abspath) if os.IsNotExist(err) { return nil, &CorruptReferenceError{StatusFileNotFound, err} } else if err != nil { @@ -183,13 +207,8 @@ func (f *FileManager) readFileDataObj(m mh.Multihash, d *pb.DataObj) ([]byte, er } defer fi.Close() - _, err = fi.Seek(int64(d.GetOffset()), io.SeekStart) - if err != nil { - return nil, &CorruptReferenceError{StatusFileError, err} - } - outbuf := make([]byte, d.GetSize_()) - _, err = io.ReadFull(fi, outbuf) + _, err = fi.ReadAt(outbuf, int64(d.GetOffset())) if err == io.EOF || err == io.ErrUnexpectedEOF { return nil, &CorruptReferenceError{StatusFileChanged, err} } else if err != nil { diff --git a/go.mod b/go.mod index a625a8b81..306460068 100644 --- a/go.mod +++ b/go.mod @@ -73,6 +73,7 @@ require ( go.uber.org/atomic v1.11.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 + golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 golang.org/x/oauth2 v0.21.0 golang.org/x/sync v0.7.0 golang.org/x/sys v0.22.0 @@ -182,7 +183,6 @@ require ( go.uber.org/fx v1.22.1 // indirect go.uber.org/mock v0.4.0 // indirect golang.org/x/crypto v0.25.0 // indirect - golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect golang.org/x/mod v0.19.0 // indirect golang.org/x/net v0.27.0 // indirect golang.org/x/text v0.16.0 // indirect