diff --git a/Dockerfile b/Dockerfile index 4975676..8feff3e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,5 @@ -FROM docker.io/golang:1-alpine AS builder +# do not use alpine here, as it is "[...] highly experimental, and not officially supported by the Go project (see golang/go#19938 for details)." +FROM docker.io/golang:latest AS builder WORKDIR /app @@ -6,9 +7,9 @@ COPY go.mod go.sum ./ RUN go mod download COPY . ./ -RUN go build +RUN go build ./cmd/readnetfs -FROM docker.io/alpine:3.18 +FROM docker.io/alpine:latest RUN apk add --no-cache fuse diff --git a/cache/filecache.go b/cache/filecache.go deleted file mode 100644 index df7b803..0000000 --- a/cache/filecache.go +++ /dev/null @@ -1,102 +0,0 @@ -package cache - -import ( - "errors" - "github.com/hashicorp/golang-lru/v2" - "github.com/rs/zerolog/log" - "sync" - "time" -) - -const MEM_PER_FILE_CACHE_B = 1024 * 1024 * 100 // 100MB -const MEM_TOTAL_CACHE_B = 1024 * 1024 * 1024 * 1 //1GB -const BLOCKSIZE = 1024 * 1024 * 1 //1MB and a few - -type CacheBlock struct { - data []byte - lock sync.Mutex -} - -// CachedFile supports contiguous reads via cache -type CachedFile struct { - lru *lru.Cache[int64, *CacheBlock] - dataRequestCallback func(offset, length int64) ([]byte, error) - fileSize int64 - mu sync.Mutex -} - -func NewCachedFile(fSize int64, dataRequestCallback func(offset int64, length int64) ([]byte, error)) *CachedFile { - blockLru, _ := lru.New[int64, *CacheBlock](MEM_PER_FILE_CACHE_B / BLOCKSIZE) - cf := &CachedFile{ - dataRequestCallback: dataRequestCallback, - fileSize: fSize, - lru: blockLru, - } - return cf -} - -func (cf *CachedFile) fillLruBlock(blockNumber int64, block *CacheBlock) error { - for i := 0; i < 5; i++ { - buf, err := cf.dataRequestCallback(blockNumber*BLOCKSIZE, BLOCKSIZE) - if err != nil { - log.Debug().Err(err).Msg("Failed to acquire new data for the cache") - continue - } - block.data = buf - return nil - } - log.Warn().Msg("Killing Block") - cf.lru.Remove(blockNumber) - return errors.New("Failed to fill block") -} - -func (cf *CachedFile) Read(offset, length int64) ([]byte, error) { - if offset > cf.fileSize { - return []byte{}, nil - } - lruBlock := offset / BLOCKSIZE - blockOffset := offset % BLOCKSIZE - cf.mu.Lock() - blck, ok := cf.lru.Get(lruBlock) - if !ok { - newBlock := CacheBlock{data: []byte{}} - newBlock.lock.Lock() - cf.lru.Add(lruBlock, &newBlock) - cf.mu.Unlock() - err := cf.fillLruBlock(lruBlock, &newBlock) - newBlock.lock.Unlock() - if err != nil { - return nil, err - } - blck = &newBlock - } else { - cf.mu.Unlock() - } - blck.lock.Lock() - defer blck.lock.Unlock() - for i := int64(0); i < 3; i++ { - go cf.ReadNewData(lruBlock + i) - time.Sleep(10 * time.Nanosecond) - } - if int64(len(blck.data)) < blockOffset { - return []byte{}, nil - } - if int64(len(blck.data)) < blockOffset+length { - length = int64(len(blck.data)) - blockOffset - } - return blck.data[blockOffset : blockOffset+length], nil -} - -func (cf *CachedFile) ReadNewData(lrublock int64) { - if cf.lru.Contains(lrublock) { - return - } - newBlock := CacheBlock{data: []byte{}} - newBlock.lock.Lock() - cf.lru.Add(lrublock, &newBlock) - err := cf.fillLruBlock(lrublock, &newBlock) - newBlock.lock.Unlock() - if err != nil { - return - } -} diff --git a/readnetfs.go b/cmd/readnetfs/readnetfs.go similarity index 51% rename from readnetfs.go rename to cmd/readnetfs/readnetfs.go index 5639b12..4516a92 100644 --- a/readnetfs.go +++ b/cmd/readnetfs/readnetfs.go @@ -7,10 +7,15 @@ import ( "github.com/hanwen/go-fuse/v2/fuse" "github.com/rs/zerolog" "github.com/rs/zerolog/log" + "github.com/rs/zerolog/pkgerrors" "golang.org/x/sync/semaphore" "os" - "readnetfs/cache" - "readnetfs/fileretriever" + "readnetfs/internal/pkg/cacheclient" + "readnetfs/internal/pkg/failcache" + "readnetfs/internal/pkg/fileserver" + "readnetfs/internal/pkg/fsclient" + "readnetfs/internal/pkg/localclient" + "readnetfs/internal/pkg/netclient" "strings" "syscall" ) @@ -19,95 +24,68 @@ var MAX_CONCURRENCY int64 = 10 type VirtNode struct { fusefs.Inode - path fileretriever.RemotePath + path fsclient.RemotePath sem *semaphore.Weighted - fc *fileretriever.FileClient + fc *fsclient.FileClient } func (n *VirtNode) Open(ctx context.Context, openFlags uint32) (fh fusefs.FileHandle, fuseFlags uint32, errno syscall.Errno) { - n.sem.Acquire(ctx, 1) + _ = n.sem.Acquire(ctx, 1) defer n.sem.Release(1) return nil, 0, 0 } func (n *VirtNode) Read(ctx context.Context, fh fusefs.FileHandle, dest []byte, off int64) (fuse.ReadResult, syscall.Errno) { - n.sem.Acquire(ctx, 1) + _ = n.sem.Acquire(ctx, 1) defer n.sem.Release(1) log.Trace().Msgf("Reading at %d from %s", off, n.path) - cacheEntry := n.fc.GetCachedFile(n.path) - if cacheEntry != nil { - buf, err := cacheEntry.Read(off, int64(len(dest))) - if err != nil { - log.Warn().Err(err).Msgf("Failed to read %s", n.path) - return nil, syscall.EIO - } - if len(buf) < len(dest) && len(buf) > 0 { - nb, err := cacheEntry.Read((off)+int64(len(buf)), int64(len(dest)-len(buf))) - if err != nil { - log.Warn().Err(err).Msgf("Failed to read %s", n.path) - return fuse.ReadResultData(buf), 0 - } - buf = append(buf, nb...) - } - return fuse.ReadResultData(buf), 0 - } - fInfo, err := n.fc.FileInfo(n.path) - if err != nil { - log.Debug().Err(err).Msgf("Failed to read file info for %s", n.path) + buf, err := n.fc.Read(n.path, off, dest) + if err != nil || buf == nil { + log.Debug().Err(err).Msgf("Failed to read %s", n.path) return nil, syscall.EIO } - cf := cache.NewCachedFile(int64(fInfo.Size), func(offset, length int64) ([]byte, error) { - return n.fc.Read(n.path, offset, length) - }) - cf = n.fc.PutOrGet(n.path, cf) - buf, err := cf.Read(int64(off), fuse.MAX_KERNEL_WRITE) - if err != nil { - log.Warn().Err(err).Msgf("Failed to read %s", n.path) - return nil, syscall.EIO - } - return fuse.ReadResultData(buf), 0 } func (n *VirtNode) Write(ctx context.Context, fh fusefs.FileHandle, buf []byte, off int64) (uint32, syscall.Errno) { - n.sem.Acquire(ctx, 1) + _ = n.sem.Acquire(ctx, 1) defer n.sem.Release(1) return 0, 0 } func (n *VirtNode) Getattr(ctx context.Context, fh fusefs.FileHandle, out *fuse.AttrOut) syscall.Errno { - n.sem.Acquire(ctx, 1) + _ = n.sem.Acquire(ctx, 1) defer n.sem.Release(1) - fInfo, err := n.fc.FileInfo(n.path) + info, err := n.fc.FileInfo(n.path) if err != nil { return syscall.EIO } - out.Size = uint64(fInfo.Size) - out.Mtime = uint64(fInfo.ModTime) + out.Size = uint64(info.Size()) + out.Mtime = uint64(info.ModTime().Unix()) return 0 } func (n *VirtNode) Lookup(ctx context.Context, name string, out *fuse.EntryOut) (*fusefs.Inode, syscall.Errno) { - n.sem.Acquire(ctx, 1) + _ = n.sem.Acquire(ctx, 1) defer n.sem.Release(1) log.Debug().Msgf("Looking up %s in %s", name, n.path) - childpath := n.path.Append(name) - fInfo, err := n.fc.FileInfo(childpath) + childPath := n.path.Append(name) + fInfo, err := n.fc.FileInfo(childPath) if err != nil { - log.Debug().Err(err).Msgf("Failed to read file info for %s", childpath) + log.Debug().Err(err).Msgf("Failed to read file info for %s", childPath) return nil, syscall.EIO } stable := fusefs.StableAttr{ - Ino: n.fc.ThisFsToInode(childpath), + Ino: n.fc.PathToInode(childPath), } - if fInfo.IsDir { + if fInfo.IsDir() { stable.Mode = uint32(fuse.S_IFDIR) } else { stable.Mode = uint32(fuse.S_IFREG) } cNode := &VirtNode{ sem: semaphore.NewWeighted(MAX_CONCURRENCY), - path: childpath, + path: childPath, fc: n.fc, } child := n.NewInode(ctx, cNode, stable) @@ -115,14 +93,30 @@ func (n *VirtNode) Lookup(ctx context.Context, name string, out *fuse.EntryOut) } func (n *VirtNode) Readdir(ctx context.Context) (fusefs.DirStream, syscall.Errno) { - n.sem.Acquire(ctx, 1) + _ = n.sem.Acquire(ctx, 1) defer n.sem.Release(1) log.Trace().Msgf("Reading dir %s", n.path) - entries, err := n.fc.ReadDir(n.path) + infos, err := n.fc.ReadDir(n.path) if err != nil { log.Debug().Err(err).Msgf("Failed to read dir %s", n.path) return nil, syscall.EIO } + entries := make([]fuse.DirEntry, 0) + for _, info := range infos { + stable := fusefs.StableAttr{ + Ino: n.fc.PathToInode(n.path.Append(info.Name())), + } + if info.IsDir() { + stable.Mode = uint32(fuse.S_IFDIR) + } else { + stable.Mode = uint32(fuse.S_IFREG) + } + entries = append(entries, fuse.DirEntry{ + Mode: stable.Mode, + Name: info.Name(), + Ino: stable.Ino, + }) + } return fusefs.NewListDirStream(entries), 0 } @@ -140,7 +134,9 @@ func (i *PeerAddress) Set(value string) error { } func main() { - zerolog.SetGlobalLevel(zerolog.TraceLevel) + zerolog.SetGlobalLevel(zerolog.InfoLevel) + zerolog.ErrorStackMarshaler = pkgerrors.MarshalStack + log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr}) log.Debug().Msg(strings.Join(os.Args, ",")) bindAddrPort := flag.String("bind", "", "Bind address and port in x.x.x.x:port format") mntDir := flag.String("mnt", "", "Directory to mount the net filesystem on") @@ -148,45 +144,52 @@ func main() { flag.Var(&PeerNodes, "peer", "Peer addresses and ports in x.x.x.x:port format, has to specified for each peer like so: -peer x.x.x.x:port -peer x.x.x.x:port ...") send := flag.Bool("send", false, "Serve files from the src directory") receive := flag.Bool("receive", false, "Receive files and mount the net filesystem on the mnt directory") - rateLimit := flag.Int("rate", 1000, "rate limit in Mbit/s") - statsdAddrPort := flag.String("statsd", "", "Statsd server address and port in x.x.x.x:port format") - + rateLimit := flag.Int("rate", 1000, "Rate limit in Mbit/s") + allowOther := flag.Bool("allow-other", true, "Allow other users to access the mount") + statsdAddrPort := flag.String("statsd", "", "Statsd fileserver address and port in x.x.x.x:port format") flag.Parse() - log.Debug().Msg("peers: " + strings.Join(PeerNodes, ", ")) - log.Debug().Msg("bind: " + *bindAddrPort) - fclient := fileretriever.NewFileClient(*srcDir, PeerNodes, *statsdAddrPort) + log.Info().Msg("peers: " + strings.Join(PeerNodes, ", ")) + log.Info().Msg("bind: " + *bindAddrPort) if !*send && !*receive { log.Fatal().Msg("Must specify either send or receive or both") } - + localClient := failcache.NewFailCache(localclient.NewLocalclient(*srcDir)) + netClient := cacheclient.NewCacheClient(netclient.NewNetClient(*statsdAddrPort, PeerNodes)) + client := fsclient.NewFileClient(localClient, netClient) if *send { - fserver := fileretriever.NewFileServer(*srcDir, *bindAddrPort, fclient, *rateLimit) + fserver := fileserver.NewFileServer(*srcDir, *bindAddrPort, localClient, *rateLimit, *statsdAddrPort) go fserver.Serve() } - if *receive { - os.Mkdir(*mntDir, 0755) - root := &VirtNode{ - Inode: fusefs.Inode{}, - sem: semaphore.NewWeighted(MAX_CONCURRENCY), - path: "", - fc: fclient, - } - server, err := fusefs.Mount(*mntDir, root, &fusefs.Options{ - MountOptions: fuse.MountOptions{ - Debug: false, - AllowOther: true, - FsName: "chrislfs", - }, - }) - if err != nil { - log.Debug().Err(err).Msg("Failed to mount") - } - log.Printf("Mounted on %s", *mntDir) - log.Printf("Unmount by calling 'fusermount -u %s'", *mntDir) - // Wait until unmount before exiting - server.Wait() + go func() { + err := os.Mkdir(*mntDir, 0755) + if err != nil { + log.Warn().Err(err).Msg("Failed to create mount directory") + } + root := &VirtNode{ + Inode: fusefs.Inode{}, + sem: semaphore.NewWeighted(MAX_CONCURRENCY), + path: "", + fc: client, + } + server, err := fusefs.Mount(*mntDir, root, &fusefs.Options{ + MountOptions: fuse.MountOptions{ + Debug: false, + AllowOther: *allowOther, + FsName: "chrislfs", + }, + }) + if err != nil { + log.Debug().Err(err).Msg("Failed to mount") + } + log.Printf("Mounted on %s", *mntDir) + log.Printf("Unmount by calling 'fusermount -u %s'", *mntDir) + // Wait until unmount before exiting + server.Wait() + }() } + //block forever + select {} } diff --git a/fileretriever/fileclient.go b/fileretriever/fileclient.go deleted file mode 100644 index 4e09f83..0000000 --- a/fileretriever/fileclient.go +++ /dev/null @@ -1,615 +0,0 @@ -package fileretriever - -import ( - "context" - "errors" - "fmt" - "github.com/hanwen/go-fuse/v2/fuse" - "github.com/hashicorp/golang-lru/v2" - "github.com/hashicorp/golang-lru/v2/expirable" - "github.com/lunixbochs/struc" - "github.com/rs/zerolog/log" - "golang.org/x/sync/semaphore" - "math" - "net" - "os" - "readnetfs/cache" - "readnetfs/common" - "strings" - "sync" - "time" -) - -var PATH_TTL = 15 * time.Minute -var DEADLINE = 10 * time.Second -var MAX_CONCURRENT_REQUESTS = 2 -var PATH_CACHE_SIZE = 5000 - -type FInfo struct { - NameLength int64 `struc:"int16,sizeof=Name"` - Name string - Size int64 - IsDir bool - ModTime int64 -} - -type DirFInfo struct { - FInfoLength int64 `struc:"int16,sizeof=FInfos"` - FInfos []FInfo -} - -type LocalPath string - -func (l LocalPath) Append(name string) LocalPath { - return LocalPath(string(l) + "/" + name) -} - -func (l LocalPath) String() string { - return string(l) -} - -type RemotePath string - -func (r RemotePath) Append(name string) RemotePath { - return RemotePath(string(r) + "/" + name) -} - -type PeerInfo struct { - Load int64 - Rate int64 - CurrentRequests *semaphore.Weighted -} - -type FileClient struct { - srcDir string - peerNodes map[string]*PeerInfo - plock sync.Mutex - iCounter uint64 - iMap map[RemotePath]uint64 - iLock sync.Mutex - fcache *lru.Cache[RemotePath, *cache.CachedFile] - fclock sync.Mutex - fPathRemoteCache *expirable.LRU[RemotePath, []string] - fplock sync.Mutex - fInfoCache *expirable.LRU[RemotePath, *FInfo] - fDirCache *expirable.LRU[RemotePath, *[]fuse.DirEntry] - statsdSocket net.Conn -} - -func NewFileClient(srcDir string, peerNodes []string, statsdAddrPort string) *FileClient { - fcache, _ := lru.New[RemotePath, *cache.CachedFile](cache.MEM_TOTAL_CACHE_B / cache.MEM_PER_FILE_CACHE_B) - fPathRemoteCache := expirable.NewLRU[RemotePath, []string](cache.MEM_TOTAL_CACHE_B/cache.MEM_PER_FILE_CACHE_B, - func(key RemotePath, value []string) {}, PATH_TTL) - fInfoCache := expirable.NewLRU[RemotePath, *FInfo](PATH_CACHE_SIZE, func(key RemotePath, value *FInfo) {}, PATH_TTL) - fDirCache := expirable.NewLRU[RemotePath, *[]fuse.DirEntry](PATH_CACHE_SIZE, func(key RemotePath, value *[]fuse.DirEntry) {}, PATH_TTL) - pMap := make(map[string]*PeerInfo) - for _, peer := range peerNodes { - pMap[peer] = &PeerInfo{CurrentRequests: semaphore.NewWeighted(int64(MAX_CONCURRENT_REQUESTS))} - } - - statsdSocket := func() net.Conn { - if statsdAddrPort != "" { - socket, err := net.Dial("udp", statsdAddrPort) - if err != nil { - log.Warn().Err(err).Msg("Failed to establish statsd connection") - return common.DummyConn{} - } - return socket - } else { - return common.DummyConn{} - } - }() - - return &FileClient{srcDir: srcDir, peerNodes: pMap, iMap: make(map[RemotePath]uint64), fcache: fcache, - fPathRemoteCache: fPathRemoteCache, fInfoCache: fInfoCache, fDirCache: fDirCache, statsdSocket: statsdSocket} -} - -func (f *FileClient) GetCachedFile(path RemotePath) *cache.CachedFile { - f.fclock.Lock() - defer f.fclock.Unlock() - cf, ok := f.fcache.Get(path) - if !ok { - return nil - } - return cf -} - -// PutOrGet tries to put a CachedFile, returns existing if already exists -func (f *FileClient) PutOrGet(rpath RemotePath, cf *cache.CachedFile) *cache.CachedFile { - f.fclock.Lock() - defer f.fclock.Unlock() - if existing, ok := f.fcache.Get(rpath); ok { - return existing - } - f.fcache.Add(rpath, cf) - return cf -} - -func (f *FileClient) Re2Lo(remote RemotePath) LocalPath { - return LocalPath(f.srcDir + "/" + string(remote)) -} - -func (f *FileClient) getPeer(path RemotePath) (string, error) { - candidates, ok := f.fPathRemoteCache.Get(path) - if !ok { - candidates = make([]string, 0) - var thisFInfo *FInfo - peers := f.peers() - for _, peer := range peers { - fInfo, err := f.netFileInfo(path, peer) - if err == nil { - if thisFInfo == nil { - thisFInfo = fInfo - } else { - if fInfo.Size != thisFInfo.Size { - return "", errors.New("file has different sizes on different peers" + string(path)) - } - } - candidates = append(candidates, peer) - } - f.fPathRemoteCache.Add(path, candidates) - } - } - if len(candidates) == 0 { - return "", errors.New("no peer candidates for file" + string(path)) - } - //find candidate with lowest load - lowest := int64(math.MaxInt64) - lowestPeer := "" - for _, peer := range candidates { - f.plock.Lock() - if f.peerNodes[peer].Load < lowest { - lowest = f.peerNodes[peer].Load - lowestPeer = peer - } - f.plock.Unlock() - } - if lowest > 3000 { - time.Sleep(3 * time.Second) - } - conn, err := net.Dial("tcp", lowestPeer) - if err != nil { - log.Warn().Err(err).Msg("Failed to get peer conn") - return "", err - } - err = conn.SetDeadline(time.Now().Add(DEADLINE)) - if err != nil { - log.Warn().Msg("Failed to set deadline") - return "", err - } - return lowestPeer, nil -} - -func (f *FileClient) peers() []string { - f.plock.Lock() - peers := make([]string, 0) - for peer, _ := range f.peerNodes { - peers = append(peers, peer) - } - defer f.plock.Unlock() - return peers -} - -func (f *FileClient) FileInfo(path RemotePath) (*FInfo, error) { - if fInfo, ok := f.fInfoCache.Get(path); ok && fInfo != nil { - return fInfo, nil - } - fInfo, err := f.fileInfo(path) - if err != nil && fInfo != nil { - return nil, err - } - f.fInfoCache.Add(path, fInfo) - return fInfo, err -} - -func (f *FileClient) netFileInfoDir(path RemotePath, peer string) (*DirFInfo, error) { - conn, err := net.Dial("tcp", peer) - if err != nil { - log.Warn().Err(err).Msg("Failed to get peer conn") - return nil, err - } - conn = common.WrapStatsdConn(conn, f.statsdSocket) - err = conn.SetDeadline(time.Now().Add(DEADLINE)) - if err != nil { - log.Warn().Err(err).Msg("Failed to set deadline") - return nil, err - } - defer conn.Close() - write, err := conn.Write([]byte{READ_DIR_FINFO}) - if err != nil || write != 1 { - log.Debug().Err(err).Msg("Failed to write message type") - return nil, err - } - request := &FileRequest{ - Offset: 0, - Length: 0, - Path: string(path), - } - _, _ = fmt.Fprintf(f.statsdSocket, "requests.outgoing.read_dir_finfo:1|c\n") - err = struc.Pack(conn, request) - if err != nil { - log.Debug().Err(err).Msg("Failed to pack request") - return nil, err - } - nFinfo := make([]byte, 1) - read, err := conn.Read(nFinfo) - if err != nil || read != 1 { - log.Debug().Err(err).Msgf("Failed to read num of file infos for dir %s", request.Path) - } - var dirFInfo DirFInfo - dirFInfo.FInfos = make([]FInfo, 0) - for i := 0; i < int(nFinfo[0]); i++ { - fInfo := new(FInfo) - err = struc.Unpack(conn, fInfo) - if err != nil { - log.Debug().Err(err).Msgf("Failed to write file info for dir %s", request.Path) - continue - } - dirFInfo.FInfos = append(dirFInfo.FInfos, *fInfo) - } - return &dirFInfo, nil -} - -func (f *FileClient) fileInfo(path RemotePath) (*FInfo, error) { - fInfo, err := f.localFileInfo(path) - if err == nil { - return fInfo, nil - } - for _, peer := range f.peers() { - fInfo, err := f.netFileInfo(path, peer) - if err == nil { - return fInfo, nil - } - } - return nil, errors.New("Failed to find finfo" + string(path) + "on any peer") -} - -func (f *FileClient) netFileInfo(path RemotePath, peer string) (*FInfo, error) { - conn, err := net.Dial("tcp", peer) - if err != nil { - log.Warn().Err(err).Msg("Failed to get peer conn") - return nil, err - } - conn = common.WrapStatsdConn(conn, f.statsdSocket) - err = conn.SetDeadline(time.Now().Add(DEADLINE)) - if err != nil { - log.Warn().Err(err).Msg("Failed to set deadline") - return nil, err - } - defer conn.Close() - write, err := conn.Write([]byte{FILE_INFO}) - if err != nil || write != 1 { - log.Debug().Err(err).Msg("Failed to write message type") - return nil, err - } - request := &FileRequest{ - Offset: 0, - Length: 0, - Path: string(path), - } - _, _ = fmt.Fprintf(f.statsdSocket, "requests.outgoing.file_info:1|c\n") - err = struc.Pack(conn, request) - if err != nil { - log.Debug().Err(err).Msg("Failed to pack request") - return nil, err - } - var fInfo FInfo - err = struc.Unpack(conn, &fInfo) - if err != nil { - return nil, err - } - return &fInfo, nil -} - -func (f *FileClient) localFileInfo(path RemotePath) (*FInfo, error) { - file, err := os.Open(f.Re2Lo(path).String()) - if err != nil { - return nil, err - } - defer file.Close() - fInfo, err := file.Stat() - if err != nil { - return nil, err - } - return &FInfo{ - Name: fInfo.Name(), - Size: fInfo.Size(), - IsDir: fInfo.IsDir(), - ModTime: fInfo.ModTime().Unix(), - }, nil -} - -func (f *FileClient) netRead(path RemotePath, offset int64, length int64) ([]byte, error) { - nextLoad := new(int64) - *nextLoad = 3000 - log.Trace().Msgf("doing net read at %d for len %d", offset, length) - peer, err := f.getPeer(path) - if err != nil { - log.Debug().Err(err).Msg("Failed to get peer") - return nil, err - } - f.plock.Lock() - peerInfo := f.peerNodes[peer] - f.plock.Unlock() - start := time.Now() - err = peerInfo.CurrentRequests.Acquire(context.Background(), 1) - defer peerInfo.CurrentRequests.Release(1) - stop := time.Now() - log.Debug().Msgf("Waited %d millis to dial conn", stop.Sub(start).Milliseconds()) - conn, err := net.Dial("tcp", peer) - if err != nil { - log.Debug().Err(err).Msg("Failed to get peer conn") - return nil, err - } - conn = common.WrapStatsdConn(conn, f.statsdSocket) - if err != nil { - log.Warn().Err(err).Msg("Failed to acquire semaphore") - return nil, err - } - defer func() { - f.plock.Lock() - info, ok := f.peerNodes[peer] - f.plock.Unlock() - if !ok { - log.Debug().Msgf("Peer %s not found in peerNodes", peer) - } else { - info.Load = (*nextLoad + info.Load*5) / 6 - } - log.Trace().Msgf("Peer %s load is now %d", peer, info.Load) - }() - if err != nil { - return nil, err - } - defer conn.Close() - conn.Write([]byte{READ_CONTENT}) - if err != nil { - log.Debug().Err(err).Msg("Failed to write message type") - return nil, err - } - request := &FileRequest{ - Offset: offset, - Length: length, - Path: string(path), - } - _, _ = fmt.Fprintf(f.statsdSocket, "requests.outgoing.read_content:1|c\n") - err = struc.Pack(conn, request) - if err != nil { - log.Debug().Err(err).Msg("Failed to pack request") - return nil, err - } - var response FileResponse - //time read - start = time.Now() - err = struc.Unpack(conn, &response) - if err != nil { - return nil, err - } - elapsed := time.Since(start) - log.Debug().Msgf("Read %d bytes from %s in %s", len(response.Content), peer, elapsed) - *nextLoad = elapsed.Milliseconds() - return response.Content, nil -} - -func (f *FileClient) localRead(remotePath RemotePath, off, length int64) ([]byte, error) { - localPath := f.Re2Lo(remotePath) - file, err := os.Open(localPath.String()) - if err != nil { - log.Debug().Err(err).Msgf("Failed to open file %s", localPath) - return nil, err - } - finfo, err := file.Stat() - if err != nil { - log.Warn().Err(err).Msgf("Failed to stat file %s", localPath) - return nil, err - } - if off >= int64(finfo.Size()) { - return []byte{}, nil - } - seek, err := file.Seek(int64(off), 0) - if err != nil || seek != int64(off) { - log.Warn().Err(err).Msgf("Failed to seek to %d in file %s", off, localPath) - return nil, err - } - buf := make([]byte, length) - read, err := file.Read(buf) - if err != nil { - return nil, err - } - return buf[:read], nil -} - -func (f *FileClient) Read(path RemotePath, off, length int64) ([]byte, error) { - log.Trace().Msgf("doing read at %d for len %d", off, length) - buf, err := f.localRead(path, off, length) - if err != nil { - log.Debug().Msgf("Reading from net %s", path) - for { - buf, err = f.netRead(path, off, length) - if err != nil { - log.Debug().Err(err).Msg("Failed to read from net") - continue - } - break - } - return buf, nil - } - return buf, nil -} - -func (f *FileClient) ReadDir(path RemotePath) ([]fuse.DirEntry, error) { - if fInfo, ok := f.fDirCache.Get(path); ok { - return *fInfo, nil - } - fDirs, err := f.readDir(path) - if err != nil { - return nil, err - } - f.fDirCache.Add(path, &fDirs) - return fDirs, err -} - -func (f *FileClient) readDir(path RemotePath) ([]fuse.DirEntry, error) { - localEntries, err := f.localReadDir(path) - if err != nil { - log.Debug().Err(err).Msgf("Failed to read local dir %s", path) - } - netEntries, err := f.netReadDirAllPeers(path) - if err != nil { - log.Debug().Err(err).Msgf("Failed to read remote dir %s", path) - } - //get vals from netEntries - netList := make([][]fuse.DirEntry, 0) - for _, v := range netEntries { - netList = append(netList, v) - } - entries := deduplicate(append(netList, localEntries)) - return entries, nil -} - -func deduplicate(ls [][]fuse.DirEntry) []fuse.DirEntry { - m := make(map[string]fuse.DirEntry) - for _, l := range ls { - for _, e := range l { - m[e.Name] = e - } - } - r := make([]fuse.DirEntry, 0) - for _, v := range m { - r = append(r, v) - } - return r -} - -func (f *FileClient) netReadDirAllPeers(path RemotePath) (map[string][]fuse.DirEntry, error) { - netEntries := make(map[string][]fuse.DirEntry) - peers := f.peers() - for _, peer := range peers { - netEntryList, err := f.netReadDir(path, peer) - if err != nil { - log.Debug().Err(err).Msgf("Failed to read remote dir %s from %s", path, peer) - continue - } - netEntries[peer] = netEntryList - //try to cache finfo - dirFinfo, err := f.netFileInfoDir(path, peer) - if err != nil { - log.Debug().Err(err).Msgf("Failed to acquire associated file infos of path %s from %s", path, peer) - continue - } - for _, fInfo := range dirFinfo.FInfos { - fc := new(FInfo) - *fc = fInfo - f.fInfoCache.Add(path.Append(fInfo.Name), fc) - log.Trace().Msg(fInfo.Name + " added to file cache") - } - } - return netEntries, nil -} - -func (f *FileClient) localReadDir(path RemotePath) ([]fuse.DirEntry, error) { - localPath := f.Re2Lo(path) - log.Trace().Msgf("doing read dir at %s", path) - dir, err := os.ReadDir(localPath.String()) - if err != nil { - log.Debug().Err(err).Msgf("Failed to read dir %s", path) - return nil, err - } - r := make([]fuse.DirEntry, len(dir)) - for i, file := range dir { - r[i] = f.thisFsFuseDirEntry(path, file.Name()) - } - return r, nil -} - -func (f *FileClient) localToMode(localpath string) uint32 { - file, err := os.Open(localpath) - if err != nil { - panic(err) - } - defer file.Close() - fInfo, _ := file.Stat() - if fInfo.IsDir() { - return fuse.S_IFDIR - } - return fuse.S_IFREG -} - -func (f *FileClient) thisFsFuseDirEntry(path RemotePath, name string) fuse.DirEntry { - return fuse.DirEntry{ - Mode: f.localToMode(f.Re2Lo(path).Append(name).String()), - Ino: f.ThisFsToInode(path.Append(name)), - Name: name, - } -} - -func (f *FileClient) ThisFsToInode(path RemotePath) uint64 { - f.iLock.Lock() - defer f.iLock.Unlock() - if val, ok := f.iMap[path]; ok { - return val - } - f.iCounter++ - f.iMap[path] = f.iCounter - return f.iCounter -} - -func (f *FileClient) netReadDir(path RemotePath, peer string) ([]fuse.DirEntry, error) { - conn, err := net.Dial("tcp", peer) - if err != nil { - log.Warn().Err(err).Msg("Failed to get peer conn") - return nil, err - } - conn = common.WrapStatsdConn(conn, f.statsdSocket) - defer conn.Close() - err = conn.SetDeadline(time.Now().Add(DEADLINE)) - if err != nil { - log.Warn().Err(err).Msg("Failed to set deadline") - return nil, err - } - write, err := conn.Write([]byte{READDIR_CONTENT}) - if err != nil || write != 1 { - log.Warn().Err(err).Msg("Failed to write message type") - return nil, err - } - request := &FileRequest{ - Offset: 0, - Length: 0, - Path: string(path), - } - _, _ = fmt.Fprintf(f.statsdSocket, "requests.outgoing.readdir_content:1|c\n") - err = struc.Pack(conn, request) - if err != nil { - log.Warn().Err(err).Msg("Failed to write request") - return nil, err - } - var dirResponse DirResponse - err = struc.Unpack(conn, &dirResponse) - if err != nil { - log.Warn().Err(err).Msg("Failed to unpack response") - return nil, err - } - dirs := strings.Split(string(dirResponse.Dirs), "\x00") - if len(dirs) > 0 && dirs[0] == "" { - dirs = []string{} - } - files := strings.Split(string(dirResponse.Files), "\x00") - if len(files) > 0 && files[0] == "" { - files = []string{} - } - r := make([]fuse.DirEntry, len(dirs)+len(files)) - for i, file := range files { - r[i] = fuse.DirEntry{ - Mode: fuse.S_IFREG, - Ino: f.ThisFsToInode(path.Append(file)), - Name: file, - } - } - for i, dir := range dirs { - r[i+len(files)] = fuse.DirEntry{ - Mode: fuse.S_IFDIR, - Ino: f.ThisFsToInode(path.Append(dir)), - Name: dir, - } - } - return r, nil -} diff --git a/fileretriever/fileserver.go b/fileretriever/fileserver.go deleted file mode 100644 index f57fd0e..0000000 --- a/fileretriever/fileserver.go +++ /dev/null @@ -1,226 +0,0 @@ -package fileretriever - -import ( - "context" - "fmt" - "github.com/lunixbochs/struc" - "github.com/rs/zerolog/log" - "golang.org/x/time/rate" - "io/fs" - "math" - "net" - "os" - "readnetfs/cache" - "readnetfs/common" - "strings" - "time" -) - -const ( - FILE_INFO byte = iota - READ_CONTENT - READDIR_CONTENT - READ_DIR_FINFO -) - -// TODO use remote path type and custom packer -type FileRequest struct { - Offset int64 - Length int64 - PathLength int64 `struc:"int16,sizeof=Path"` - Path string -} - -type FileResponse struct { - Length int64 `struc:"int64,sizeof=Content"` - FileSize int64 - Content []byte -} - -type DirResponse struct { - DirLength int64 `struc:"int32,sizeof=Dirs"` - Dirs []byte - FileLength int64 `struc:"int32,sizeof=Files"` - Files []byte -} - -type FileServer struct { - srcDir string - bind string - limiter *rate.Limiter - fclient *FileClient -} - -func NewFileServer(srcDir string, bind string, fclient *FileClient, rateLimit int) *FileServer { - maxPacketsPerSecond := (float64(rateLimit) * math.Pow(float64(10), float64(6))) / float64(cache.BLOCKSIZE*8) - log.Trace().Msgf("Setting rate limit to %d data packets per second", maxPacketsPerSecond) - return &FileServer{srcDir: srcDir, bind: bind, fclient: fclient, limiter: rate.NewLimiter(rate.Limit(maxPacketsPerSecond), 2)} -} - -func (f *FileServer) handleDirRequest(conn net.Conn, request *FileRequest) { - _, _ = fmt.Fprintf(f.fclient.statsdSocket, "requests.incoming.readdir_content:1|c\n") - path := f.srcDir + "/" + request.Path - root := os.DirFS(path) - entries, err := fs.ReadDir(root, ".") - if err != nil { - return - } - files := make([]string, 0) - dirs := make([]string, 0) - for _, e := range entries { - if e.IsDir() { - dirs = append(dirs, e.Name()) - } else { - files = append(files, e.Name()) - } - } - dirResponse := DirResponse{ - Dirs: []byte(strings.Join(dirs, "\x00")), - Files: []byte(strings.Join(files, "\x00")), - } - err = struc.Pack(conn, &dirResponse) - if err != nil { - log.Warn().Err(err).Msg("Failed to write response") - return - } -} - -func (f *FileServer) handleFileRequest(conn net.Conn, request *FileRequest) { - _, _ = fmt.Fprintf(f.fclient.statsdSocket, "requests.incoming.read_content:1|c\n") - log.Printf("Trying to read %d bytes at %d from file %s", request.Length, request.Offset, request.Path) - start := time.Now() - err := f.limiter.Wait(context.Background()) - stop := time.Now() - log.Trace().Msgf("Waited %d millis for rate limiter", stop.Sub(start).Milliseconds()) - if err != nil { - return - } - buf, err := f.fclient.localRead(RemotePath(request.Path), request.Offset, request.Length) - if err != nil { - return - } - fileResponse := &FileResponse{ - Content: buf, - } - err = struc.Pack(conn, fileResponse) - if err != nil { - log.Warn().Err(err).Msg("Failed to write response") - return - } -} - -func (f *FileServer) handleGetFileInfo(conn net.Conn, request *FileRequest) { - _, _ = fmt.Fprintf(f.fclient.statsdSocket, "requests.incoming.file_info:1|c\n") - fInfo, err := f.fclient.localFileInfo(RemotePath(request.Path)) - if err != nil { - log.Debug().Err(err).Msgf("Failed to read local file info for %s", request.Path) - return - } - err = struc.Pack(conn, fInfo) - if err != nil { - log.Debug().Err(err).Msgf("Failed to write file info for %s", request.Path) - return - } -} - -func (f *FileServer) handleDirFInfo(conn net.Conn, request *FileRequest) { - _, _ = fmt.Fprintf(f.fclient.statsdSocket, "requests.incoming.read_dir_finfo:1|c\n") - path := f.fclient.Re2Lo(RemotePath(request.Path)) - root := os.DirFS(path.String()) - entries, err := fs.ReadDir(root, ".") - if err != nil { - log.Debug().Err(err).Msgf("Failed to read dir for %s", request.Path) - return - } - fInfos := DirFInfo{FInfos: make([]FInfo, 0)} - for _, e := range entries { - fInfo, err := e.Info() - if err != nil { - log.Debug().Err(err).Msgf("Failed to read file info for %s", e.Name()) - continue - } - fInfos.FInfos = append(fInfos.FInfos, FInfo{ - Name: fInfo.Name(), - Size: fInfo.Size(), - IsDir: fInfo.IsDir(), - ModTime: fInfo.ModTime().Unix(), - }) - if err != nil { - log.Debug().Err(err).Msgf("Failed to write file info for %s", e.Name()) - return - } - } - //TODO use custom packer - write, err := conn.Write([]byte{byte(len(fInfos.FInfos))}) - if err != nil || write != 1 { - log.Debug().Err(err).Msgf("Failed to write num of file infos for dir %s", request.Path) - } - for _, fInfo := range fInfos.FInfos { - err = struc.Pack(conn, &fInfo) - if err != nil { - log.Debug().Err(err).Msgf("Failed to write file info for dir %s", request.Path) - } - } -} - -func (f *FileServer) handleConn(conn net.Conn) { - conn = common.WrapStatsdConn(conn, f.fclient.statsdSocket) - defer conn.Close() - err := conn.SetDeadline(time.Now().Add(10 * time.Second)) - if err != nil { - log.Warn().Msg("Failed to set deadline") - return - } - request := &FileRequest{} - messageType := make([]byte, 1) - n, err := conn.Read(messageType) - if err != nil || n != 1 { - log.Warn().Err(err).Msg("Failed to read message type") - return - } - log.Debug().Msgf("Got message type %d", messageType[0]) - switch messageType[0] { - case FILE_INFO: - err = struc.Unpack(conn, request) - if err != nil { - log.Warn().Err(err).Msg("Failed to unpack request") - return - } - f.handleGetFileInfo(conn, request) - case READ_CONTENT: - err = struc.Unpack(conn, request) - if err != nil { - log.Warn().Err(err).Msg("Failed to unpack request") - return - } - f.handleFileRequest(conn, request) - case READDIR_CONTENT: - err = struc.Unpack(conn, request) - if err != nil { - log.Warn().Err(err).Msg("Failed to unpack request") - return - } - f.handleDirRequest(conn, request) - case READ_DIR_FINFO: - err = struc.Unpack(conn, request) - if err != nil { - log.Warn().Err(err).Msg("Failed to unpack request") - } - f.handleDirFInfo(conn, request) - } -} - -func (f *FileServer) Serve() { - ln, err := net.Listen("tcp", f.bind) - if err != nil { - // handle error - } - for { - conn, err := ln.Accept() - if err != nil { - log.Info().Err(err).Msg("Failed to accept") - continue - } - go f.handleConn(conn) - } -} diff --git a/go.mod b/go.mod index b9b7d64..6165d91 100644 --- a/go.mod +++ b/go.mod @@ -14,5 +14,6 @@ require ( require ( github.com/mattn/go-colorable v0.1.12 // indirect github.com/mattn/go-isatty v0.0.14 // indirect + github.com/pkg/errors v0.9.1 // indirect golang.org/x/sys v0.4.0 // indirect ) diff --git a/go.sum b/go.sum index 38ab827..18400e9 100644 --- a/go.sum +++ b/go.sum @@ -14,6 +14,7 @@ github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9 github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= github.com/moby/sys/mountinfo v0.6.2 h1:BzJjoreD5BMFNmD9Rus6gdd1pLuecOFPt8wC+Vygl78= github.com/moby/sys/mountinfo v0.6.2/go.mod h1:IJb6JQeOklcdMU9F5xQ8ZALD+CUr5VlGpwtX+VE0rpI= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.30.0 h1:SymVODrcRsaRaSInD9yQtKbtWqwsfoPcRff/oRXLj4c= diff --git a/internal/pkg/cacheclient/cacheclient.go b/internal/pkg/cacheclient/cacheclient.go new file mode 100644 index 0000000..9e9cba8 --- /dev/null +++ b/internal/pkg/cacheclient/cacheclient.go @@ -0,0 +1,119 @@ +package cacheclient + +import ( + lru "github.com/hashicorp/golang-lru/v2" + "github.com/hashicorp/golang-lru/v2/expirable" + "github.com/rs/zerolog/log" + "io/fs" + "readnetfs/internal/pkg/fsclient" + "sync" + "syscall" + "time" +) + +var MAX_CONCURRENT_REQUESTS = 2 +var PATH_CACHE_SIZE = 5000 +var PATH_TTL = 15 * time.Minute + +// CacheClient use mutexes to make sure only one request is sent at a time +type CacheClient struct { + infos *expirable.LRU[fsclient.RemotePath, fs.FileInfo] + infoLock sync.Mutex + dirContent *expirable.LRU[fsclient.RemotePath, []string] + dirContentLock sync.Mutex + fCache *lru.Cache[fsclient.RemotePath, *CachedFile] + fCacheLock sync.Mutex + client fsclient.Client +} + +func NewCacheClient(client fsclient.Client) *CacheClient { + dirContent := expirable.NewLRU[fsclient.RemotePath, []string](PATH_CACHE_SIZE, + func(key fsclient.RemotePath, value []string) {}, PATH_TTL) + infos := expirable.NewLRU[fsclient.RemotePath, fs.FileInfo](PATH_CACHE_SIZE, + func(key fsclient.RemotePath, info fs.FileInfo) {}, PATH_TTL) + fCache, _ := lru.New[fsclient.RemotePath, *CachedFile](MEM_TOTAL_CACHE_B / MEM_PER_FILE_CACHE_B) + return &CacheClient{client: client, dirContent: dirContent, infos: infos, fCache: fCache} +} + +func (c *CacheClient) Purge() { + c.dirContent.Purge() + c.infos.Purge() + c.fCache.Purge() + c.client.Purge() +} + +func (c *CacheClient) PutOrGet(rpath fsclient.RemotePath, cf *CachedFile) *CachedFile { + c.fCacheLock.Lock() + defer c.fCacheLock.Unlock() + if existing, ok := c.fCache.Get(rpath); ok { + return existing + } + c.fCache.Add(rpath, cf) + return cf +} + +func (c *CacheClient) Read(path fsclient.RemotePath, off int64, dest []byte) ([]byte, error) { + cacheEntry, ok := c.fCache.Get(path) + if ok { + dest, err := cacheEntry.Read(off, dest) + if err != nil { + log.Warn().Err(err).Msgf("Failed to read %s", path) + return nil, syscall.EIO + } + return dest, nil + } + cf, err := NewCachedFile(path, c.client) + if err != nil { + return nil, err + } + cf = c.PutOrGet(path, cf) + buf, err := cf.Read(off, dest) + if err != nil { + log.Warn().Err(err).Msgf("Failed to read %s", path) + return nil, syscall.EIO + } + return buf, nil +} + +func (c *CacheClient) ReadDir(path fsclient.RemotePath) ([]fs.FileInfo, error) { + if files, ok := c.dirContent.Get(path); ok { + infos := make([]fs.FileInfo, len(files)) + for i, file := range files { + info, ok := c.infos.Get(path.Append(file)) + if !ok { + break + } + infos[i] = info + } + if len(infos) == len(files) { + return infos, nil + } + } + c.dirContentLock.Lock() + defer c.dirContentLock.Unlock() + infos, err := c.client.ReadDir(path) + if err != nil { + return nil, err + } + files := make([]string, len(infos)) + for i, info := range infos { + files[i] = info.Name() + c.infos.Add(path.Append(info.Name()), info) + } + c.dirContent.Add(path, files) + return infos, nil +} + +func (c *CacheClient) FileInfo(path fsclient.RemotePath) (fs.FileInfo, error) { + if info, ok := c.infos.Get(path); ok { + return info, nil + } + c.infoLock.Lock() + defer c.infoLock.Unlock() + info, err := c.client.FileInfo(path) + if err != nil { + return nil, err + } + c.infos.Add(path, info) + return info, nil +} diff --git a/internal/pkg/cacheclient/filecache.go b/internal/pkg/cacheclient/filecache.go new file mode 100644 index 0000000..b2b3e62 --- /dev/null +++ b/internal/pkg/cacheclient/filecache.go @@ -0,0 +1,115 @@ +package cacheclient + +import ( + "errors" + "github.com/hashicorp/golang-lru/v2" + "github.com/rs/zerolog/log" + "readnetfs/internal/pkg/fsclient" + "sync" +) + +const MEM_PER_FILE_CACHE_B = 1024 * 1024 * 100 // 300MB +const MEM_TOTAL_CACHE_B = 1024 * 1024 * 1024 * 3 //3GB +const BLOCKSIZE = 1024 * 1024 * 1 //1MB and a few + +type cacheBlock struct { + data []byte + lock sync.Mutex +} + +// CachedFile is optimal for contiguous reads +type CachedFile struct { + lru *lru.Cache[int64, *cacheBlock] + fileSize int64 + mu sync.Mutex + client fsclient.Client + path fsclient.RemotePath +} + +func NewCachedFile(path fsclient.RemotePath, client fsclient.Client) (*CachedFile, error) { + blockLru, _ := lru.New[int64, *cacheBlock](MEM_PER_FILE_CACHE_B / BLOCKSIZE) + info, err := client.FileInfo(path) + if err != nil { + log.Debug().Err(err).Msgf("Failed to read file info for %s", path) + return nil, err + } + cf := &CachedFile{ + client: client, + path: path, + fileSize: info.Size(), + lru: blockLru, + } + return cf, nil +} + +func (cf *CachedFile) Flush() { + cf.lru.Purge() + cf.client.Purge() +} + +func (cf *CachedFile) fillLruBlock(blockNumber int64, block *cacheBlock) error { + buf, err := cf.client.Read(cf.path, blockNumber*BLOCKSIZE, make([]byte, BLOCKSIZE)) + if err != nil { + log.Warn().Msgf("killing block %d for file %s", blockNumber, cf.path) + cf.lru.Remove(blockNumber) + return errors.New("failed to fill block") + } + block.data = buf + return nil + +} + +func (cf *CachedFile) Read(offset int64, dest []byte) ([]byte, error) { + if offset > cf.fileSize { + return dest[:0], nil + } + lruBlock := offset / BLOCKSIZE + blockOffset := offset % BLOCKSIZE + cf.mu.Lock() + blck, ok := cf.lru.Get(lruBlock) + if !ok { + newBlock := cacheBlock{data: []byte{}} + newBlock.lock.Lock() + cf.lru.Add(lruBlock, &newBlock) + cf.mu.Unlock() + err := cf.fillLruBlock(lruBlock, &newBlock) + newBlock.lock.Unlock() + if err != nil { + return dest[:0], err + } + blck = &newBlock + } else { + cf.mu.Unlock() + } + blck.lock.Lock() + defer blck.lock.Unlock() + for i := int64(0); i < 3; i++ { + go cf.readNewData(lruBlock + i) + } + end := blockOffset + int64(len(dest)) + if end > int64(len(blck.data)) { + end = int64(len(blck.data)) + } + ret := blck.data[blockOffset:end] + if len(ret) < len(dest) && offset+int64(len(ret)) < cf.fileSize { + nb, err := cf.Read((lruBlock+1)*(BLOCKSIZE), dest[len(ret):]) + if err == nil { + ret = append(ret, nb...) + } + } + return ret, nil +} + +func (cf *CachedFile) readNewData(lrublock int64) { + if cf.lru.Contains(lrublock) { + return + } + newBlock := cacheBlock{data: []byte{}} + newBlock.lock.Lock() + cf.lru.Add(lrublock, &newBlock) + err := cf.fillLruBlock(lrublock, &newBlock) + newBlock.lock.Unlock() + if err != nil { + return + } +} diff --git a/common/dummycon.go b/internal/pkg/common/dummycon.go similarity index 100% rename from common/dummycon.go rename to internal/pkg/common/dummycon.go diff --git a/common/statsd.go b/internal/pkg/common/statsd.go similarity index 80% rename from common/statsd.go rename to internal/pkg/common/statsd.go index 288016b..0fc67db 100644 --- a/common/statsd.go +++ b/internal/pkg/common/statsd.go @@ -2,6 +2,7 @@ package common import ( "fmt" + "github.com/rs/zerolog/log" "net" "strings" "time" @@ -12,6 +13,22 @@ type StatsdConn struct { statsdSocket net.Conn } +func NewStatsdConn(statsdAddrPort string) net.Conn { + statsdSocket := func() net.Conn { + if statsdAddrPort != "" { + socket, err := net.Dial("udp", statsdAddrPort) + if err != nil { + log.Warn().Err(err).Msg("Failed to establish statsd connection") + return DummyConn{} + } + return socket + } else { + return DummyConn{} + } + }() + return statsdSocket +} + func WrapStatsdConn(conn net.Conn, statsdSocket net.Conn) *StatsdConn { return &StatsdConn{Conn: conn, statsdSocket: statsdSocket} } diff --git a/internal/pkg/common/types.go b/internal/pkg/common/types.go new file mode 100644 index 0000000..2f83ca0 --- /dev/null +++ b/internal/pkg/common/types.go @@ -0,0 +1,127 @@ +package common + +import ( + "encoding/binary" + "github.com/lunixbochs/struc" + "io" + "io/fs" + "time" +) + +type MessageType byte + +const ( + FILE_INFO MessageType = iota + READ_CONTENT + READDIR_INFO +) + +type NetInfo struct { + NameLength int64 `struc:"int16,sizeof=NName"` + NName string + NSize int64 + NIsDir bool + NModTime int64 +} + +func (n NetInfo) Name() string { + return n.NName +} + +func (n NetInfo) Size() int64 { + return n.NSize +} + +func (n NetInfo) Mode() fs.FileMode { + return 0 +} + +func (n NetInfo) ModTime() time.Time { + return time.Unix(n.NModTime, 0) +} + +func (n NetInfo) IsDir() bool { + return n.NIsDir +} + +func (n NetInfo) Sys() any { + return nil +} + +func NewNetInfo(info fs.FileInfo) *NetInfo { + return &NetInfo{ + NName: info.Name(), + NSize: info.Size(), + NIsDir: info.IsDir(), + NModTime: info.ModTime().Unix(), + } +} + +type FsRequest struct { + Type byte + Offset int64 + Length int64 + PathLength int64 `struc:"int16,sizeof=Path"` + Path string +} + +type FileResponse struct { + Length int64 `struc:"int64,sizeof=Content"` + FileSize int64 + Content []byte +} + +func NewDirInfo(infos []fs.FileInfo) *DirInfo { + resp := &DirInfo{Infos: make([]fs.FileInfo, len(infos))} + for i, info := range infos { + resp.Infos[i] = NewNetInfo(info) + } + return resp +} + +type DirInfo struct { + Infos []fs.FileInfo +} + +func (d *DirInfo) Marshal(writer io.Writer) error { + err := binary.Write(writer, binary.LittleEndian, int32(len(d.Infos))) + if err != nil { + return err + } + for _, info := range d.Infos { + netInfo := NewNetInfo(info) + err := struc.Pack(writer, &netInfo) + if err != nil { + return err + } + } + return nil +} + +func (d *DirInfo) Unmarshal(reader io.Reader) error { + var infoLen int32 + err := binary.Read(reader, binary.LittleEndian, &infoLen) + if err != nil { + return err + } + d.Infos = make([]fs.FileInfo, infoLen) + for i := int32(0); i < infoLen; i++ { + var info NetInfo + err := struc.Unpack(reader, &info) + if err != nil { + return err + } + d.Infos[i] = info + } + return nil +} + +type LocalPath string + +func (l LocalPath) Append(name string) LocalPath { + return LocalPath(string(l) + "/" + name) +} + +func (l LocalPath) String() string { + return string(l) +} diff --git a/internal/pkg/failcache/failcache.go b/internal/pkg/failcache/failcache.go new file mode 100644 index 0000000..ffebf7b --- /dev/null +++ b/internal/pkg/failcache/failcache.go @@ -0,0 +1,61 @@ +package failcache + +//TODO base this and other path caches on inotify and not on polling + +import ( + "github.com/hashicorp/golang-lru/v2/expirable" + "io/fs" + "readnetfs/internal/pkg/fsclient" + "time" +) + +var FAILED_TTL = 30 * time.Minute +var FAILED_NUM = 100 + +type FailCache struct { + failedPaths *expirable.LRU[fsclient.RemotePath, interface{}] + client fsclient.Client +} + +func NewFailCache(client fsclient.Client) *FailCache { + fPaths := expirable.NewLRU[fsclient.RemotePath, interface{}](FAILED_NUM, func(key fsclient.RemotePath, empty interface{}) {}, FAILED_TTL) + return &FailCache{failedPaths: fPaths, client: client} +} + +func (f *FailCache) Purge() { + f.failedPaths.Purge() + f.client.Purge() +} + +func (f *FailCache) Read(path fsclient.RemotePath, offset int64, dest []byte) ([]byte, error) { + if _, ok := f.failedPaths.Get(path); ok { + return nil, fs.ErrNotExist + } + buf, err := f.client.Read(path, offset, dest) + if err != nil { + f.failedPaths.Add(path, nil) + } + return buf, err +} + +func (f *FailCache) ReadDir(path fsclient.RemotePath) ([]fs.FileInfo, error) { + if _, ok := f.failedPaths.Get(path); ok { + return nil, fs.ErrNotExist + } + finfos, err := f.client.ReadDir(path) + if err != nil { + f.failedPaths.Add(path, nil) + } + return finfos, err +} + +func (f FailCache) FileInfo(path fsclient.RemotePath) (fs.FileInfo, error) { + if _, ok := f.failedPaths.Get(path); ok { + return nil, fs.ErrNotExist + } + finfo, err := f.client.FileInfo(path) + if err != nil { + f.failedPaths.Add(path, nil) + } + return finfo, err +} diff --git a/internal/pkg/fileserver/server.go b/internal/pkg/fileserver/server.go new file mode 100644 index 0000000..b5a70b6 --- /dev/null +++ b/internal/pkg/fileserver/server.go @@ -0,0 +1,128 @@ +package fileserver + +import ( + "context" + "fmt" + "github.com/lunixbochs/struc" + "github.com/rs/zerolog/log" + "golang.org/x/time/rate" + "math" + "net" + "readnetfs/internal/pkg/cacheclient" + "readnetfs/internal/pkg/common" + "readnetfs/internal/pkg/fsclient" + "readnetfs/internal/pkg/netclient" + "time" +) + +type Server struct { + srcDir string + bind string + limiter *rate.Limiter + client fsclient.Client + statsdSocket net.Conn +} + +func NewFileServer(srcDir string, bind string, client fsclient.Client, rateLimit int, statsdAddrPort string) *Server { + maxPacketsPerSecond := (float64(rateLimit) * math.Pow(float64(10), float64(6))) / float64(cacheclient.BLOCKSIZE*8) + log.Trace().Msgf("setting rate limit to %d data packets per second", maxPacketsPerSecond) + statsdSocket := common.NewStatsdConn(statsdAddrPort) + return &Server{srcDir: srcDir, bind: bind, client: client, limiter: rate.NewLimiter(rate.Limit(maxPacketsPerSecond), 2), statsdSocket: statsdSocket} +} + +func (f *Server) handleDir(conn net.Conn, request *common.FsRequest) { + _, _ = fmt.Fprintf(f.statsdSocket, "requests.incoming.readdir_content:1|c\n") + infos, err := f.client.ReadDir(fsclient.RemotePath(request.Path)) + if err != nil { + return + } + dirResp := common.NewDirInfo(infos) + err = dirResp.Marshal(conn) + if err != nil { + return + } + if err != nil { + log.Warn().Err(err).Msg("failed to write response") + return + } +} + +func (f *Server) handleRead(conn net.Conn, request *common.FsRequest) { + _, _ = fmt.Fprintf(f.statsdSocket, "requests.incoming.read_content:1|c\n") + log.Printf("trying to read %d bytes at %d from file %s", request.Length, request.Offset, request.Path) + start := time.Now() + err := f.limiter.Wait(context.Background()) + stop := time.Now() + log.Trace().Msgf("Waited %d millis for rate limiter", stop.Sub(start).Milliseconds()) + if err != nil { + return + } + buf, err := f.client.Read(fsclient.RemotePath(request.Path), request.Offset, make([]byte, request.Length)) + if err != nil { + return + } + fileResponse := common.FileResponse{ + Content: buf, + } + log.Debug().Msgf("read %d bytes from file %s", len(buf), request.Path) + err = struc.Pack(conn, &fileResponse) + if err != nil { + log.Warn().Err(err).Msg("failed to write response") + return + } +} + +func (f *Server) handleInfo(conn net.Conn, request *common.FsRequest) { + _, _ = fmt.Fprintf(f.statsdSocket, "requests.incoming.file_info:1|c\n") + info, err := f.client.FileInfo(fsclient.RemotePath(request.Path)) + if err != nil { + log.Debug().Err(err).Msgf("failed to Read local file info for %s", request.Path) + return + } + err = struc.Pack(conn, common.NewNetInfo(info)) + if err != nil { + log.Debug().Err(err).Msgf("failed to write file info for %s", request.Path) + return + } +} + +func (f *Server) handleConn(conn net.Conn) { + conn = common.WrapStatsdConn(conn, f.statsdSocket) + defer func(conn net.Conn) { + err := conn.Close() + if err != nil { + log.Warn().Err(err).Msgf("Failed to close statsd conn") + } + }(conn) + err := conn.SetDeadline(time.Now().Add(netclient.DEADLINE)) + if err != nil { + log.Warn().Msg("failed to set deadline") + return + } + request := &common.FsRequest{} + err = struc.Unpack(conn, request) + log.Debug().Msgf("got message type %d", request.Type) + switch common.MessageType(request.Type) { + case common.FILE_INFO: + f.handleInfo(conn, request) + case common.READ_CONTENT: + f.handleRead(conn, request) + case common.READDIR_INFO: + f.handleDir(conn, request) + } +} + +func (f *Server) Serve() { + ln, err := net.Listen("tcp", f.bind) + if err != nil { + // handle error + } + for { + conn, err := ln.Accept() + if err != nil { + log.Info().Err(err).Msg("failed to accept") + continue + } + go f.handleConn(conn) + } +} diff --git a/internal/pkg/fsclient/client.go b/internal/pkg/fsclient/client.go new file mode 100644 index 0000000..f4c3349 --- /dev/null +++ b/internal/pkg/fsclient/client.go @@ -0,0 +1,114 @@ +package fsclient + +import ( + "errors" + "fmt" + "github.com/rs/zerolog/log" + "io/fs" + "sync" +) + +var MAX_RETRIES = 3 + +type Client interface { + Read(path RemotePath, offset int64, dest []byte) ([]byte, error) + ReadDir(path RemotePath) ([]fs.FileInfo, error) + FileInfo(path RemotePath) (fs.FileInfo, error) + Purge() +} + +type FileClient struct { + clients []Client + iCounter uint64 + iMap map[RemotePath]uint64 + iLock sync.Mutex +} + +// NewFileClient with argument clients: order of clients is priority +func NewFileClient(clients ...Client) *FileClient { + return &FileClient{clients: clients, iMap: make(map[RemotePath]uint64)} +} +func (f *FileClient) Purge() { + for _, client := range f.clients { + client.Purge() + } +} + +func (f *FileClient) FileInfo(path RemotePath) (fs.FileInfo, error) { + for i := 0; i < MAX_RETRIES; i++ { + for _, client := range f.clients { + log.Trace().Msgf("reading file info %s from %T", path, client) + info, err := client.FileInfo(path) + if err != nil || info == nil { + log.Debug().Err(err).Msgf("failed to get fInfo from %s", path) + continue + } + if info == nil { + return nil, errors.New(fmt.Sprintf("nil fileinfo returned from client %T", client)) + } + return info, nil + } + } + return nil, errors.New("failed to get fInfo from any client") +} + +func (f *FileClient) Read(path RemotePath, off int64, dest []byte) ([]byte, error) { + for i := 0; i < MAX_RETRIES; i++ { + for _, client := range f.clients { + log.Trace().Msgf("reading %s from %T", path, client) + buf, err := client.Read(path, off, dest) + if err != nil { + log.Debug().Err(err).Msgf("failed to read from %s", path) + continue + } + return buf, nil + } + } + return nil, errors.New("failed to read from any client") +} + +func (f *FileClient) ReadDir(path RemotePath) ([]fs.FileInfo, error) { + entries := make([]fs.FileInfo, 0) + for _, client := range f.clients { + var newEntries []fs.FileInfo + var err error + for i := 0; i < MAX_RETRIES; i++ { + log.Trace().Msgf("reading dir %s from %T", path, client) + newEntries, err = client.ReadDir(path) + if err != nil || newEntries == nil { + log.Debug().Err(err).Msg("failed to read dir") + continue + } + break + } + entries = append(entries, newEntries...) + } + notNil := make([]fs.FileInfo, 0) + for _, entry := range entries { + if entry != nil { + notNil = append(notNil, entry) + } + } + return notNil, nil +} + +func (f *FileClient) PathToInode(path RemotePath) uint64 { + f.iLock.Lock() + defer f.iLock.Unlock() + if inode, ok := f.iMap[path]; ok { + return inode + } + f.iCounter++ + f.iMap[path] = f.iCounter + return f.iCounter +} + +type RemotePath string + +func (r RemotePath) Append(name string) RemotePath { + return RemotePath(string(r) + "/" + name) +} + +func (r RemotePath) String() string { + return string(r) +} diff --git a/internal/pkg/localclient/localclient.go b/internal/pkg/localclient/localclient.go new file mode 100644 index 0000000..d02f296 --- /dev/null +++ b/internal/pkg/localclient/localclient.go @@ -0,0 +1,102 @@ +package localclient + +import ( + "errors" + "fmt" + "github.com/rs/zerolog/log" + "io" + "io/fs" + "os" + "readnetfs/internal/pkg/common" + "readnetfs/internal/pkg/fsclient" +) + +type LocalClient struct { + srcDir string +} + +func (l *LocalClient) Purge() {} + +func NewLocalclient(srcDir string) *LocalClient { + return &LocalClient{srcDir: srcDir} +} + +func (l *LocalClient) re2lo(remote fsclient.RemotePath) common.LocalPath { + if len(remote) == 0 { + return common.LocalPath(l.srcDir) + } + if remote[0] == '/' { + remote = remote[1:] + } + return common.LocalPath(fmt.Sprintf("%s/%s", l.srcDir, string(remote))) +} + +func (l *LocalClient) Read(remotePath fsclient.RemotePath, off int64, dest []byte) ([]byte, error) { + localPath := l.re2lo(remotePath) + file, err := os.Open(localPath.String()) + if err != nil { + log.Debug().Err(err).Msgf("failed to open file %s", localPath) + return nil, err + } + info, err := file.Stat() + if err != nil { + log.Warn().Err(err).Msgf("failed to stat file %s", localPath) + return nil, err + } + if off >= info.Size() { + return []byte{}, nil + } + if off+int64(len(dest)) > info.Size() { + dest = dest[:info.Size()-off] + } + seek, err := file.Seek(off, io.SeekStart) + if err != nil || seek != off { + return nil, errors.Join(err, errors.New("failed to seek to correct offset")) + } + read, err := io.ReadAtLeast(file, dest, len(dest)) + if err != nil { + return nil, err + } + if read != len(dest) { + log.Debug().Err(err).Msgf("failed to read enough bytes from %s", localPath) + } + return dest, nil +} + +func (l *LocalClient) ReadDir(path fsclient.RemotePath) ([]os.FileInfo, error) { + localPath := l.re2lo(path) + log.Trace().Msgf("read dir at %s", path) + dir, err := os.ReadDir(localPath.String()) + if err != nil { + log.Debug().Err(err).Msgf("failed to Read dir %s", path) + return nil, err + } + r := make([]os.FileInfo, len(dir)) + for i, file := range dir { + info, err := file.Info() + if err != nil { + log.Debug().Err(err).Msgf("failed to acquire file info for %s", path.Append(file.Name())) + continue + } + r[i] = info + } + return r, nil +} + +func (l *LocalClient) FileInfo(path fsclient.RemotePath) (fs.FileInfo, error) { + file, err := os.Open(l.re2lo(path).String()) + if err != nil { + return nil, err + } + defer func(file *os.File) { + err := file.Close() + if err != nil { + log.Warn().Err(err).Msgf("failed to close file %s", path) + } + }(file) + info, err := file.Stat() + if err != nil { + return nil, err + } + return info, nil +} diff --git a/internal/pkg/netclient/netclient.go b/internal/pkg/netclient/netclient.go new file mode 100644 index 0000000..fc73d30 --- /dev/null +++ b/internal/pkg/netclient/netclient.go @@ -0,0 +1,295 @@ +package netclient + +import ( + "context" + "errors" + "fmt" + "github.com/hashicorp/golang-lru/v2/expirable" + "github.com/lunixbochs/struc" + "github.com/rs/zerolog/log" + "golang.org/x/sync/semaphore" + "io/fs" + "math" + "net" + "readnetfs/internal/pkg/cacheclient" + "readnetfs/internal/pkg/common" + "readnetfs/internal/pkg/fsclient" + "sync" + "time" +) + +var DEADLINE = 10 * time.Second + +type PeerInfo struct { + Load int64 + Rate int64 + CurrentRequests *semaphore.Weighted +} + +type NetClient struct { + statsdSocket net.Conn + peerNodes map[string]*PeerInfo + plock sync.Mutex + fPathRemoteCache *expirable.LRU[fsclient.RemotePath, []string] +} + +func (f *NetClient) Purge() { + f.fPathRemoteCache.Purge() + pMap := make(map[string]*PeerInfo) + for peer, _ := range f.peerNodes { + pMap[peer] = &PeerInfo{CurrentRequests: semaphore.NewWeighted(int64(cacheclient.MAX_CONCURRENT_REQUESTS))} + } + f.peerNodes = pMap +} + +func NewNetClient(statsdAddrPort string, peerNodes []string) *NetClient { + fPathRemoteCache := expirable.NewLRU[fsclient.RemotePath, []string](cacheclient.MEM_TOTAL_CACHE_B/cacheclient.MEM_PER_FILE_CACHE_B, + func(key fsclient.RemotePath, value []string) {}, cacheclient.PATH_TTL) + pMap := make(map[string]*PeerInfo) + for _, peer := range peerNodes { + pMap[peer] = &PeerInfo{CurrentRequests: semaphore.NewWeighted(int64(cacheclient.MAX_CONCURRENT_REQUESTS))} + } + statsdSocket := common.NewStatsdConn(statsdAddrPort) + return &NetClient{statsdSocket: statsdSocket, peerNodes: pMap, fPathRemoteCache: fPathRemoteCache} +} + +type netReply interface { + common.FileResponse | common.DirInfo | common.NetInfo +} + +func (f *NetClient) getPeer(path fsclient.RemotePath) (string, error) { + candidates, ok := f.fPathRemoteCache.Get(path) + if !ok { + candidates = make([]string, 0) + var thisInfo fs.FileInfo + peers := f.peers() + for _, peer := range peers { + info, err := f.fileInfo(path, peer) + if err == nil { + if thisInfo == nil { + thisInfo = info + } else { + if info.Size() != thisInfo.Size() { + return "", errors.New("file has different sizes on different peers" + string(path)) + } + } + candidates = append(candidates, peer) + } + } + if len(candidates) > 0 { + f.fPathRemoteCache.Add(path, candidates) + } + } + if len(candidates) == 0 { + return "", errors.New("no peer candidates for file" + string(path)) + } + //find candidate with the lowest load + lowest := int64(math.MaxInt64) + lowestPeer := "" + for _, peer := range candidates { + f.plock.Lock() + if f.peerNodes[peer].Load < lowest { + lowest = f.peerNodes[peer].Load + lowestPeer = peer + } + f.plock.Unlock() + } + if lowest > 3000 { + time.Sleep(3 * time.Second) + } + return lowestPeer, nil +} + +func (f *NetClient) peers() []string { + f.plock.Lock() + peers := make([]string, 0) + for peer := range f.peerNodes { + peers = append(peers, peer) + } + defer f.plock.Unlock() + return peers +} + +func (f *NetClient) openConn(peer string) (net.Conn, error) { + conn, err := net.Dial("tcp", peer) + if err != nil { + log.Warn().Err(err).Msgf("Failed to connect to %s", peer) + return nil, err + } + conn = common.WrapStatsdConn(conn, f.statsdSocket) + return conn, nil +} + +func getReply[T netReply](f *NetClient, req *common.FsRequest, peer string) (*T, error) { + nextLoad := new(int64) + *nextLoad = 3000 + defer f.calcDelay(*nextLoad, peer) + f.plock.Lock() + peerInfo := f.peerNodes[peer] + f.plock.Unlock() + start := time.Now() + err := peerInfo.CurrentRequests.Acquire(context.Background(), 1) + defer peerInfo.CurrentRequests.Release(1) + stop := time.Now() + log.Debug().Msgf("Waited %d millis to dial conn", stop.Sub(start).Milliseconds()) + conn, err := net.Dial("tcp", peer) + if err != nil { + log.Warn().Err(err).Msg("Failed to get peer conn") + return nil, err + } + conn = common.WrapStatsdConn(conn, f.statsdSocket) + defer func(conn net.Conn) { + err := conn.Close() + if err != nil { + log.Warn().Err(err).Msg("Failed to close conn") + } + }(conn) + err = conn.SetDeadline(time.Now().Add(DEADLINE)) + if err != nil { + log.Warn().Err(err).Msg("Failed to set deadline") + return nil, err + } + err = struc.Pack(conn, req) + if err != nil { + log.Debug().Err(err).Msg("Failed to pack request") + return nil, err + } + var reply T + switch any(reply).(type) { + case common.DirInfo: + err = any(&reply).(*common.DirInfo).Unmarshal(conn) + default: + err = struc.Unpack(conn, &reply) + } + if err != nil { + return nil, err + } + *nextLoad = time.Since(start).Milliseconds() + return &reply, nil +} + +func (f *NetClient) FileInfo(path fsclient.RemotePath) (fs.FileInfo, error) { + for _, peer := range f.peers() { + info, err := f.fileInfo(path, peer) + if err == nil { + return info, nil + } + } + _, _ = fmt.Fprintf(f.statsdSocket, "requests.outgoing.file_info:1|c\n") + return nil, errors.New("no peer has file" + string(path)) +} + +func (f *NetClient) fileInfo(path fsclient.RemotePath, peer string) (fs.FileInfo, error) { + var info *common.NetInfo + info, err := getReply[common.NetInfo](f, &common.FsRequest{ + Type: byte(common.FILE_INFO), + Offset: 0, + Length: 0, + PathLength: 0, + Path: string(path), + }, peer) + if err != nil { + return nil, err + } + return info, nil +} + +func (f *NetClient) calcDelay(nextLoad int64, peer string) { + f.plock.Lock() + info, ok := f.peerNodes[peer] + f.plock.Unlock() + if !ok { + log.Debug().Msgf("Peer %s not found in peerNodes", peer) + } else { + info.Load = (nextLoad + info.Load*5) / 6 + } + log.Trace().Msgf("Peer %s load is now %d", peer, info.Load) +} + +func (f *NetClient) Read(path fsclient.RemotePath, offset int64, dest []byte) ([]byte, error) { + var err error + var buf []byte + for i := 0; i < 3; i++ { + buf, err = f.read(path, offset, dest) + if err == nil { + return buf, nil + } else { + log.Debug().Err(err).Msgf("failed to netread %s", path) + } + } + return nil, err +} + +func (f *NetClient) read(path fsclient.RemotePath, offset int64, dest []byte) ([]byte, error) { + peer, err := f.getPeer(path) + if err != nil { + return nil, err + } + log.Trace().Msgf("doing netread at %d for len %d", offset, len(dest)) + if err != nil { + log.Debug().Err(err).Msg("failed to get peer") + return nil, err + } + response, err := getReply[common.FileResponse](f, &common.FsRequest{ + Type: byte(common.READ_CONTENT), + Offset: offset, + Length: int64(len(dest)), + Path: string(path), + }, peer) + if err != nil { + return nil, err + } + _, _ = fmt.Fprintf(f.statsdSocket, "requests.outgoing.read_content:1|c\n") + log.Debug().Msgf("read %d bytes", len(response.Content)) + return response.Content, nil +} + +func deduplicate(ls map[string][]fs.FileInfo) []fs.FileInfo { + m := make(map[string]fs.FileInfo) + for _, vs := range ls { + for _, e := range vs { + m[e.Name()] = e + } + } + r := make([]fs.FileInfo, 0) + for _, v := range m { + r = append(r, v) + } + return r +} + +func (f *NetClient) ReadDir(path fsclient.RemotePath) ([]fs.FileInfo, error) { + netEntries := make(map[string][]fs.FileInfo) + peers := f.peers() + for _, peer := range peers { + netEntryList, err := f.readDir1(path, peer) + if err != nil { + log.Debug().Err(err).Msgf("failed to Read remote dir %s from %s", path, peer) + continue + } + netEntries[peer] = netEntryList + } + for k, v := range netEntries { + for _, e := range v { + f.fPathRemoteCache.Add(path.Append(e.Name()), []string{k}) + } + } + return deduplicate(netEntries), nil +} + +func (f *NetClient) readDir1(path fsclient.RemotePath, peer string) ([]fs.FileInfo, error) { + response, err := getReply[common.DirInfo](f, &common.FsRequest{ + Type: byte(common.READDIR_INFO), + Path: string(path), + }, peer) + if err != nil { + log.Warn().Err(err).Msg("failed to get peer conn") + return nil, err + } + _, _ = fmt.Fprintf(f.statsdSocket, "requests.outgoing.readdir_content:1|c\n") + infos := make([]fs.FileInfo, len(response.Infos)) + for i, info := range response.Infos { + infos[i] = info + } + return infos, nil +}