diff --git a/cmd/readnetfs/readnetfs.go b/cmd/readnetfs/readnetfs.go index 4516a92..2f8254e 100644 --- a/cmd/readnetfs/readnetfs.go +++ b/cmd/readnetfs/readnetfs.go @@ -16,6 +16,7 @@ import ( "readnetfs/internal/pkg/fsclient" "readnetfs/internal/pkg/localclient" "readnetfs/internal/pkg/netclient" + "readnetfs/internal/pkg/sanitycheck" "strings" "syscall" ) @@ -156,7 +157,7 @@ func main() { log.Fatal().Msg("Must specify either send or receive or both") } localClient := failcache.NewFailCache(localclient.NewLocalclient(*srcDir)) - netClient := cacheclient.NewCacheClient(netclient.NewNetClient(*statsdAddrPort, PeerNodes)) + netClient := sanitycheck.NewSanityCheck(cacheclient.NewCacheClient(netclient.NewNetClient(*statsdAddrPort, PeerNodes))) client := fsclient.NewFileClient(localClient, netClient) if *send { fserver := fileserver.NewFileServer(*srcDir, *bindAddrPort, localClient, *rateLimit, *statsdAddrPort) diff --git a/internal/pkg/cacheclient/cacheclient.go b/internal/pkg/cacheclient/cacheclient.go index c9b643c..754b2c1 100644 --- a/internal/pkg/cacheclient/cacheclient.go +++ b/internal/pkg/cacheclient/cacheclient.go @@ -18,6 +18,7 @@ var PATH_TTL = 15 * time.Minute // CacheClient use mutexes to make sure only one request is sent at a time type CacheClient struct { infos *expirable.LRU[fsclient.RemotePath, fs.FileInfo] + failed *expirable.LRU[fsclient.RemotePath, error] infoLock sync.Mutex dirContent *expirable.LRU[fsclient.RemotePath, []string] dirContentLock sync.Mutex @@ -31,8 +32,9 @@ func NewCacheClient(client fsclient.Client) *CacheClient { func(key fsclient.RemotePath, value []string) {}, PATH_TTL) infos := expirable.NewLRU[fsclient.RemotePath, fs.FileInfo](PATH_CACHE_SIZE, func(key fsclient.RemotePath, info fs.FileInfo) {}, PATH_TTL) + failedPaths := expirable.NewLRU[fsclient.RemotePath, error](PATH_CACHE_SIZE, func(key fsclient.RemotePath, value error) {}, PATH_TTL) fCache, _ := lru.New[fsclient.RemotePath, *CachedFile](MEM_TOTAL_CACHE_B / MEM_PER_FILE_CACHE_B) - return &CacheClient{client: client, dirContent: dirContent, infos: infos, fCache: fCache} + return &CacheClient{client: client, dirContent: dirContent, infos: infos, fCache: fCache, failed: failedPaths} } func (c *CacheClient) Purge() { @@ -53,6 +55,9 @@ func (c *CacheClient) PutOrGet(rpath fsclient.RemotePath, cf *CachedFile) *Cache } func (c *CacheClient) Read(path fsclient.RemotePath, off int64, dest []byte) ([]byte, error) { + if reason, ok := c.failed.Get(path); ok { + return nil, reason + } cacheEntry, ok := c.fCache.Get(path) if ok { dest, err := cacheEntry.Read(off, dest) @@ -76,6 +81,9 @@ func (c *CacheClient) Read(path fsclient.RemotePath, off int64, dest []byte) ([] } func (c *CacheClient) ReadDir(path fsclient.RemotePath) ([]fs.FileInfo, error) { + if reason, ok := c.failed.Get(path); ok { + return nil, reason + } if files, ok := c.dirContent.Get(path); ok { infos := make([]fs.FileInfo, len(files)) for i, file := range files { @@ -93,6 +101,7 @@ func (c *CacheClient) ReadDir(path fsclient.RemotePath) ([]fs.FileInfo, error) { defer c.dirContentLock.Unlock() infos, err := c.client.ReadDir(path) if err != nil { + c.failed.Add(path, err) return nil, err } files := make([]string, len(infos)) @@ -105,6 +114,9 @@ func (c *CacheClient) ReadDir(path fsclient.RemotePath) ([]fs.FileInfo, error) { } func (c *CacheClient) FileInfo(path fsclient.RemotePath) (fs.FileInfo, error) { + if reason, ok := c.failed.Get(path); ok { + return nil, reason + } if info, ok := c.infos.Get(path); ok { return info, nil } @@ -113,6 +125,7 @@ func (c *CacheClient) FileInfo(path fsclient.RemotePath) (fs.FileInfo, error) { defer c.infoLock.Unlock() info, err := c.client.FileInfo(path) if err != nil { + c.failed.Add(path, err) return nil, err } c.infos.Add(path, info) diff --git a/internal/pkg/sanitycheck/sanitycheck.go b/internal/pkg/sanitycheck/sanitycheck.go new file mode 100644 index 0000000..e8c47a7 --- /dev/null +++ b/internal/pkg/sanitycheck/sanitycheck.go @@ -0,0 +1,73 @@ +package sanitycheck + +import ( + "github.com/rs/zerolog/log" + "io/fs" + "readnetfs/internal/pkg/fsclient" +) + +type SanityCheck struct { + client fsclient.Client +} + +func NewSanityCheck(client fsclient.Client) *SanityCheck { + return &SanityCheck{client: client} +} + +func (f *SanityCheck) Purge() { + f.client.Purge() +} + +func (f *SanityCheck) Read(path fsclient.RemotePath, offset int64, dest []byte) (buf []byte, err error) { + defer func() { + if r := recover(); r != nil { + log.Warn().Err(err).Msgf("panic in Read for %s", path) + f.Purge() + buf = nil + err = fs.ErrInvalid + return + } + }() + buf, err = f.client.Read(path, offset, dest) + if err == nil { + info, err := f.client.FileInfo(path) + if err != nil { + return nil, err + } + if len(buf) == 0 && (info.Size() > offset) || info.Size() < offset+int64(len(buf)) { + return nil, fs.ErrInvalid + } + } + if buf == nil { + return nil, fs.ErrNotExist + } + return +} + +func (f *SanityCheck) ReadDir(path fsclient.RemotePath) (infos []fs.FileInfo, err error) { + defer func() { + if r := recover(); r != nil { + log.Warn().Err(err).Msgf("panic in ReadDir for %s", path) + f.Purge() + infos = nil + err = fs.ErrInvalid + return + } + }() + infos, err = f.client.ReadDir(path) + return +} + +func (f SanityCheck) FileInfo(path fsclient.RemotePath) (info fs.FileInfo, err error) { + defer func() { + if r := recover(); r != nil { + log.Warn().Err(err).Msgf("panic in FileInfo for %s", path) + f.Purge() + info = nil + err = fs.ErrInvalid + return + } + }() + info, err = f.client.FileInfo(path) + return +}