Skip to content

Commit

Permalink
objstorage: add readahead configuration
Browse files Browse the repository at this point in the history
This commit adds a way to configure the read-ahead method, for both
cases of "informed readahead" (currently just compactions, but in the
future possibly backup scans as well) and "speculative readahead".
  • Loading branch information
RaduBerinde committed Jun 11, 2024
1 parent 4a5d7e3 commit 26e7021
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 26 deletions.
48 changes: 48 additions & 0 deletions objstorage/objstorageprovider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,17 @@ type Settings struct {
// out a large chunk of dirty filesystem buffers.
BytesPerSync int

// Local contains fields that are only relevant for files stored on the local
// filesystem.
Local struct {
// TODO(radu): move FSCleaner, NoSyncOnClose, BytesPerSync here.

// ReadaheadConfigFn is a function used to retrieve the current readahead
// mode. This function is run whenever a local object is open for reading.
// If it is nil, DefaultReadaheadConfig is used.
ReadaheadConfigFn func() ReadaheadConfig
}

// Fields here are set only if the provider is to support remote objects
// (experimental).
Remote struct {
Expand Down Expand Up @@ -129,6 +140,43 @@ type Settings struct {
}
}

// ReadaheadConfig controls the use of read-ahead.
type ReadaheadConfig struct {
// Informed is the type of read-ahead for operations that are known to read a
// large consecutive chunk of a file.
Informed ReadaheadMode

// Speculative is the type of read-ahead used automatically, when consecutive
// reads are detected.
Speculative ReadaheadMode
}

// DefaultReadaheadConfig is the readahead config used when ReadaheadConfigFn is
// not specified.
var DefaultReadaheadConfig = ReadaheadConfig{
Informed: FadviseSequential,
Speculative: FadviseSequential,
}

// ReadaheadMode indicates the type of read-ahead to use, either for informed
// read-ahead (e.g. compactions) or speculative read-ahead.
type ReadaheadMode uint8

const (
// NoReadahead disables readahead altogether.
NoReadahead ReadaheadMode = iota

// SysReadahead enables the use of SYS_READAHEAD call to prefetch data.
// The prefetch window grows dynamically as consecutive writes are detected.
SysReadahead

// FadviseSequential enables to use of FADV_SEQUENTIAL. For informed
// read-ahead, FADV_SEQUENTIAL is used from the beginning. For speculative
// read-ahead SYS_READAHEAD is first used until the window reaches the maximum
// size, then we siwtch to FADV_SEQUENTIAL.
FadviseSequential
)

// DefaultSettings initializes default settings (with no remote storage),
// suitable for tests and tools.
func DefaultSettings(fs vfs.FS, dirName string) Settings {
Expand Down
31 changes: 26 additions & 5 deletions objstorage/objstorageprovider/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ func TestProvider(t *testing.T) {
backings := make(map[string]objstorage.RemoteObjectBacking)
backingHandles := make(map[string]objstorage.RemoteObjectBackingHandle)
var curProvider objstorage.Provider
readaheadConfig := DefaultReadaheadConfig
datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string {
readaheadConfig = DefaultReadaheadConfig
scanArgs := func(desc string, args ...interface{}) {
t.Helper()
if len(d.CmdArgs) != len(args) {
Expand Down Expand Up @@ -68,6 +70,9 @@ func TestProvider(t *testing.T) {
st.Remote.CreateOnShared = remote.CreateOnSharedAll
st.Remote.CreateOnSharedLocator = ""
}
st.Local.ReadaheadConfigFn = func() ReadaheadConfig {
return readaheadConfig
}
require.NoError(t, fs.MkdirAll(fsDir, 0755))
p, err := Open(st)
require.NoError(t, err)
Expand Down Expand Up @@ -170,13 +175,29 @@ func TestProvider(t *testing.T) {
return log.String()

case "read":
forCompaction := false
if len(d.CmdArgs) == 2 && d.CmdArgs[1].Key == "for-compaction" {
d.CmdArgs = d.CmdArgs[:1]
forCompaction = true
forCompaction := d.HasArg("for-compaction")
if arg, ok := d.Arg("readahead"); ok {
var mode ReadaheadMode
switch arg.Vals[0] {
case "off":
mode = NoReadahead
case "sys-readahead":
mode = SysReadahead
case "fadvise-sequential":
mode = FadviseSequential
default:
d.Fatalf(t, "unknown readahead mode %s", arg.Vals[0])
}
if forCompaction {
readaheadConfig.Informed = mode
} else {
readaheadConfig.Speculative = mode
}
}

d.CmdArgs = d.CmdArgs[:1]
var fileNum base.FileNum
scanArgs("<file-num> [for-compaction]", &fileNum)
scanArgs("<file-num> [for-compaction] [readahead|speculative-overhead=off|sys-readahead|fadvise-sequential]", &fileNum)
r, err := curProvider.OpenForReading(ctx, base.FileTypeTable, fileNum.DiskFileNum(), objstorage.OpenOptions{})
if err != nil {
return err.Error()
Expand Down
118 changes: 118 additions & 0 deletions objstorage/objstorageprovider/testdata/provider/local_readahead
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,121 @@ size: 2000000
1000 15000: ok (salt 1)
<local fs> close: p1/000001.sst
<local fs> close: p1/000001.sst

# Test non-default readahead modes.

read 1 readahead=off
0 1000
1000 15000
16000 30000
46000 10000
56000 50000
106000 30000
140000 80000
----
<local fs> open: p1/000001.sst (options: *vfs.randomReadsOption)
size: 2000000
<local fs> read-at(0, 1000): p1/000001.sst
0 1000: ok (salt 1)
<local fs> read-at(1000, 15000): p1/000001.sst
1000 15000: ok (salt 1)
<local fs> read-at(16000, 30000): p1/000001.sst
16000 30000: ok (salt 1)
<local fs> read-at(46000, 10000): p1/000001.sst
46000 10000: ok (salt 1)
<local fs> read-at(56000, 50000): p1/000001.sst
56000 50000: ok (salt 1)
<local fs> read-at(106000, 30000): p1/000001.sst
106000 30000: ok (salt 1)
<local fs> read-at(140000, 80000): p1/000001.sst
140000 80000: ok (salt 1)
<local fs> close: p1/000001.sst

read 1 for-compaction readahead=off
0 1000
1000 15000
16000 30000
46000 10000
56000 50000
106000 30000
140000 80000
----
<local fs> open: p1/000001.sst (options: *vfs.randomReadsOption)
size: 2000000
<local fs> read-at(0, 1000): p1/000001.sst
0 1000: ok (salt 1)
<local fs> read-at(1000, 15000): p1/000001.sst
1000 15000: ok (salt 1)
<local fs> read-at(16000, 30000): p1/000001.sst
16000 30000: ok (salt 1)
<local fs> read-at(46000, 10000): p1/000001.sst
46000 10000: ok (salt 1)
<local fs> read-at(56000, 50000): p1/000001.sst
56000 50000: ok (salt 1)
<local fs> read-at(106000, 30000): p1/000001.sst
106000 30000: ok (salt 1)
<local fs> read-at(140000, 80000): p1/000001.sst
140000 80000: ok (salt 1)
<local fs> close: p1/000001.sst

read 1 readahead=sys-readahead
0 1000
1000 15000
16000 30000
46000 10000
56000 50000
106000 30000
140000 80000
----
<local fs> open: p1/000001.sst (options: *vfs.randomReadsOption)
size: 2000000
<local fs> read-at(0, 1000): p1/000001.sst
0 1000: ok (salt 1)
<local fs> read-at(1000, 15000): p1/000001.sst
1000 15000: ok (salt 1)
<local fs> prefetch(16000, 65536): p1/000001.sst
<local fs> read-at(16000, 30000): p1/000001.sst
16000 30000: ok (salt 1)
<local fs> read-at(46000, 10000): p1/000001.sst
46000 10000: ok (salt 1)
<local fs> prefetch(56000, 131072): p1/000001.sst
<local fs> read-at(56000, 50000): p1/000001.sst
56000 50000: ok (salt 1)
<local fs> read-at(106000, 30000): p1/000001.sst
106000 30000: ok (salt 1)
<local fs> prefetch(140000, 262144): p1/000001.sst
<local fs> read-at(140000, 80000): p1/000001.sst
140000 80000: ok (salt 1)
<local fs> close: p1/000001.sst

# TODO(radu): for informed/sys-readahead, we should start with the maximum
# prefetch window.
read 1 for-compaction readahead=sys-readahead
0 1000
1000 15000
16000 30000
46000 10000
56000 50000
106000 30000
140000 80000
----
<local fs> open: p1/000001.sst (options: *vfs.randomReadsOption)
size: 2000000
<local fs> read-at(0, 1000): p1/000001.sst
0 1000: ok (salt 1)
<local fs> read-at(1000, 15000): p1/000001.sst
1000 15000: ok (salt 1)
<local fs> prefetch(16000, 65536): p1/000001.sst
<local fs> read-at(16000, 30000): p1/000001.sst
16000 30000: ok (salt 1)
<local fs> read-at(46000, 10000): p1/000001.sst
46000 10000: ok (salt 1)
<local fs> prefetch(56000, 131072): p1/000001.sst
<local fs> read-at(56000, 50000): p1/000001.sst
56000 50000: ok (salt 1)
<local fs> read-at(106000, 30000): p1/000001.sst
106000 30000: ok (salt 1)
<local fs> prefetch(140000, 262144): p1/000001.sst
<local fs> read-at(140000, 80000): p1/000001.sst
140000 80000: ok (salt 1)
<local fs> close: p1/000001.sst
6 changes: 5 additions & 1 deletion objstorage/objstorageprovider/vfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ func (p *provider) vfsOpenForReading(
}
return nil, err
}
return newFileReadable(file, p.st.FS, filename)
readaheadConfig := DefaultReadaheadConfig
if f := p.st.Local.ReadaheadConfigFn; f != nil {
readaheadConfig = f()
}
return newFileReadable(file, p.st.FS, readaheadConfig, filename)
}

func (p *provider) vfsCreate(
Expand Down
55 changes: 35 additions & 20 deletions objstorage/objstorageprovider/vfs_readable.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,26 @@ type fileReadable struct {

// The following fields are used to possibly open the file again using the
// sequential reads option (see vfsReadHandle).
filename string
fs vfs.FS
filename string
fs vfs.FS
readaheadConfig ReadaheadConfig
}

var _ objstorage.Readable = (*fileReadable)(nil)

func newFileReadable(file vfs.File, fs vfs.FS, filename string) (*fileReadable, error) {
func newFileReadable(
file vfs.File, fs vfs.FS, readaheadConfig ReadaheadConfig, filename string,
) (*fileReadable, error) {
info, err := file.Stat()
if err != nil {
return nil, err
}
r := &fileReadable{
file: file,
size: info.Size(),
filename: filename,
fs: fs,
file: file,
size: info.Size(),
filename: filename,
fs: fs,
readaheadConfig: readaheadConfig,
}
invariants.SetFinalizer(r, func(obj interface{}) {
if obj.(*fileReadable).file != nil {
Expand Down Expand Up @@ -81,8 +85,9 @@ func (r *fileReadable) NewReadHandle(_ context.Context) objstorage.ReadHandle {
}

type vfsReadHandle struct {
r *fileReadable
rs readaheadState
r *fileReadable
rs readaheadState
readaheadMode ReadaheadMode

// sequentialFile holds a file descriptor to the same underlying File,
// except with fadvise(FADV_SEQUENTIAL) called on it to take advantage of
Expand All @@ -109,8 +114,9 @@ var readHandlePool = sync.Pool{

func (rh *vfsReadHandle) init(r *fileReadable) {
*rh = vfsReadHandle{
r: r,
rs: makeReadaheadState(fileMaxReadaheadSize),
r: r,
rs: makeReadaheadState(fileMaxReadaheadSize),
readaheadMode: r.readaheadConfig.Speculative,
}
}

Expand All @@ -127,23 +133,26 @@ func (rh *vfsReadHandle) Close() error {

// ReadAt is part of the objstorage.ReadHandle interface.
func (rh *vfsReadHandle) ReadAt(_ context.Context, p []byte, offset int64) error {
var n int
var err error
if rh.sequentialFile != nil {
// Use OS-level read-ahead.
n, err = rh.sequentialFile.ReadAt(p, offset)
} else {
n, err := rh.sequentialFile.ReadAt(p, offset)
if invariants.Enabled && err == nil && n != len(p) {
panic("short read")
}
return err
}
if rh.readaheadMode != NoReadahead {
if readaheadSize := rh.rs.maybeReadahead(offset, int64(len(p))); readaheadSize > 0 {
if readaheadSize >= fileMaxReadaheadSize {
if rh.readaheadMode == FadviseSequential && readaheadSize >= fileMaxReadaheadSize {
// We've reached the maximum readahead size. Beyond this point, rely on
// OS-level readahead.
rh.switchToOSReadahead()
} else {
_ = rh.r.file.Prefetch(offset, readaheadSize)
}
}
n, err = rh.r.file.ReadAt(p, offset)
}
n, err := rh.r.file.ReadAt(p, offset)
if invariants.Enabled && err == nil && n != len(p) {
panic("short read")
}
Expand All @@ -152,10 +161,16 @@ func (rh *vfsReadHandle) ReadAt(_ context.Context, p []byte, offset int64) error

// SetupForCompaction is part of the objstorage.ReadHandle interface.
func (rh *vfsReadHandle) SetupForCompaction() {
rh.switchToOSReadahead()
rh.readaheadMode = rh.r.readaheadConfig.Informed
if rh.readaheadMode == FadviseSequential {
rh.switchToOSReadahead()
}
}

func (rh *vfsReadHandle) switchToOSReadahead() {
if invariants.Enabled && rh.readaheadMode != FadviseSequential {
panic("readheadMode not respected")
}
if rh.sequentialFile != nil {
return
}
Expand All @@ -170,8 +185,8 @@ func (rh *vfsReadHandle) switchToOSReadahead() {

// RecordCacheHit is part of the objstorage.ReadHandle interface.
func (rh *vfsReadHandle) RecordCacheHit(_ context.Context, offset, size int64) {
if rh.sequentialFile != nil {
// Using OS-level readahead, so do nothing.
if rh.sequentialFile != nil || rh.readaheadMode == NoReadahead {
// Using OS-level or no readahead, so do nothing.
return
}
rh.rs.recordCacheHit(offset, size)
Expand Down
1 change: 1 addition & 0 deletions open.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ func Open(dirname string, opts *Options) (db *DB, _ error) {
NoSyncOnClose: opts.NoSyncOnClose,
BytesPerSync: opts.BytesPerSync,
}
providerSettings.Local.ReadaheadConfigFn = opts.Local.ReadaheadConfigFn
providerSettings.Remote.StorageFactory = opts.Experimental.RemoteStorage
providerSettings.Remote.CreateOnShared = opts.Experimental.CreateOnShared
providerSettings.Remote.CreateOnSharedLocator = opts.Experimental.CreateOnSharedLocator
Expand Down
Loading

0 comments on commit 26e7021

Please sign in to comment.