Skip to content

Commit

Permalink
feat: consider failed paths in net client
Browse files Browse the repository at this point in the history
feat: sanitycheck.go for translating nonsense return values/panics to errors
  • Loading branch information
likeazir committed Oct 10, 2023
1 parent 942362a commit 978eb6a
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 2 deletions.
3 changes: 2 additions & 1 deletion cmd/readnetfs/readnetfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"readnetfs/internal/pkg/fsclient"
"readnetfs/internal/pkg/localclient"
"readnetfs/internal/pkg/netclient"
"readnetfs/internal/pkg/sanitycheck"
"strings"
"syscall"
)
Expand Down Expand Up @@ -156,7 +157,7 @@ func main() {
log.Fatal().Msg("Must specify either send or receive or both")
}
localClient := failcache.NewFailCache(localclient.NewLocalclient(*srcDir))
netClient := cacheclient.NewCacheClient(netclient.NewNetClient(*statsdAddrPort, PeerNodes))
netClient := sanitycheck.NewSanityCheck(cacheclient.NewCacheClient(netclient.NewNetClient(*statsdAddrPort, PeerNodes)))
client := fsclient.NewFileClient(localClient, netClient)
if *send {
fserver := fileserver.NewFileServer(*srcDir, *bindAddrPort, localClient, *rateLimit, *statsdAddrPort)
Expand Down
15 changes: 14 additions & 1 deletion internal/pkg/cacheclient/cacheclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ var PATH_TTL = 15 * time.Minute
// CacheClient use mutexes to make sure only one request is sent at a time
type CacheClient struct {
infos *expirable.LRU[fsclient.RemotePath, fs.FileInfo]
failed *expirable.LRU[fsclient.RemotePath, error]
infoLock sync.Mutex
dirContent *expirable.LRU[fsclient.RemotePath, []string]
dirContentLock sync.Mutex
Expand All @@ -31,8 +32,9 @@ func NewCacheClient(client fsclient.Client) *CacheClient {
func(key fsclient.RemotePath, value []string) {}, PATH_TTL)
infos := expirable.NewLRU[fsclient.RemotePath, fs.FileInfo](PATH_CACHE_SIZE,
func(key fsclient.RemotePath, info fs.FileInfo) {}, PATH_TTL)
failedPaths := expirable.NewLRU[fsclient.RemotePath, error](PATH_CACHE_SIZE, func(key fsclient.RemotePath, value error) {}, PATH_TTL)
fCache, _ := lru.New[fsclient.RemotePath, *CachedFile](MEM_TOTAL_CACHE_B / MEM_PER_FILE_CACHE_B)
return &CacheClient{client: client, dirContent: dirContent, infos: infos, fCache: fCache}
return &CacheClient{client: client, dirContent: dirContent, infos: infos, fCache: fCache, failed: failedPaths}
}

func (c *CacheClient) Purge() {
Expand All @@ -53,6 +55,9 @@ func (c *CacheClient) PutOrGet(rpath fsclient.RemotePath, cf *CachedFile) *Cache
}

func (c *CacheClient) Read(path fsclient.RemotePath, off int64, dest []byte) ([]byte, error) {
if reason, ok := c.failed.Get(path); ok {
return nil, reason
}
cacheEntry, ok := c.fCache.Get(path)
if ok {
dest, err := cacheEntry.Read(off, dest)
Expand All @@ -76,6 +81,9 @@ func (c *CacheClient) Read(path fsclient.RemotePath, off int64, dest []byte) ([]
}

func (c *CacheClient) ReadDir(path fsclient.RemotePath) ([]fs.FileInfo, error) {
if reason, ok := c.failed.Get(path); ok {
return nil, reason
}
if files, ok := c.dirContent.Get(path); ok {
infos := make([]fs.FileInfo, len(files))
for i, file := range files {
Expand All @@ -93,6 +101,7 @@ func (c *CacheClient) ReadDir(path fsclient.RemotePath) ([]fs.FileInfo, error) {
defer c.dirContentLock.Unlock()
infos, err := c.client.ReadDir(path)
if err != nil {
c.failed.Add(path, err)
return nil, err
}
files := make([]string, len(infos))
Expand All @@ -105,6 +114,9 @@ func (c *CacheClient) ReadDir(path fsclient.RemotePath) ([]fs.FileInfo, error) {
}

func (c *CacheClient) FileInfo(path fsclient.RemotePath) (fs.FileInfo, error) {
if reason, ok := c.failed.Get(path); ok {
return nil, reason
}
if info, ok := c.infos.Get(path); ok {
return info, nil
}
Expand All @@ -113,6 +125,7 @@ func (c *CacheClient) FileInfo(path fsclient.RemotePath) (fs.FileInfo, error) {
defer c.infoLock.Unlock()
info, err := c.client.FileInfo(path)
if err != nil {
c.failed.Add(path, err)
return nil, err
}
c.infos.Add(path, info)
Expand Down
73 changes: 73 additions & 0 deletions internal/pkg/sanitycheck/sanitycheck.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package sanitycheck

import (
"github.com/rs/zerolog/log"
"io/fs"
"readnetfs/internal/pkg/fsclient"
)

type SanityCheck struct {
client fsclient.Client
}

func NewSanityCheck(client fsclient.Client) *SanityCheck {
return &SanityCheck{client: client}
}

func (f *SanityCheck) Purge() {
f.client.Purge()
}

func (f *SanityCheck) Read(path fsclient.RemotePath, offset int64, dest []byte) (buf []byte, err error) {
defer func() {
if r := recover(); r != nil {
log.Warn().Err(err).Msgf("panic in Read for %s", path)
f.Purge()
buf = nil
err = fs.ErrInvalid
return
}
}()
buf, err = f.client.Read(path, offset, dest)
if err == nil {
info, err := f.client.FileInfo(path)
if err != nil {
return nil, err
}
if len(buf) == 0 && (info.Size() > offset) || info.Size() < offset+int64(len(buf)) {
return nil, fs.ErrInvalid
}
}
if buf == nil {
return nil, fs.ErrNotExist
}
return
}

func (f *SanityCheck) ReadDir(path fsclient.RemotePath) (infos []fs.FileInfo, err error) {
defer func() {
if r := recover(); r != nil {
log.Warn().Err(err).Msgf("panic in ReadDir for %s", path)
f.Purge()
infos = nil
err = fs.ErrInvalid
return
}
}()
infos, err = f.client.ReadDir(path)
return
}

func (f SanityCheck) FileInfo(path fsclient.RemotePath) (info fs.FileInfo, err error) {
defer func() {
if r := recover(); r != nil {
log.Warn().Err(err).Msgf("panic in FileInfo for %s", path)
f.Purge()
info = nil
err = fs.ErrInvalid
return
}
}()
info, err = f.client.FileInfo(path)
return
}

0 comments on commit 978eb6a

Please sign in to comment.