From 3349ee6511a411c9423a883ef589aef933e28776 Mon Sep 17 00:00:00 2001 From: langj Date: Mon, 28 Aug 2023 23:17:57 +0200 Subject: [PATCH] rate limits, flags for receive/send --- cache/filecache.go | 16 +++++++---- fileretriever/fileclient.go | 21 ++++---------- fileretriever/fileserver.go | 17 +++++++++-- go.mod | 1 + go.sum | 2 ++ readnetfs.go | 57 ++++++++++++++++++++++--------------- 6 files changed, 67 insertions(+), 47 deletions(-) diff --git a/cache/filecache.go b/cache/filecache.go index 69ce0b3..3c68931 100644 --- a/cache/filecache.go +++ b/cache/filecache.go @@ -4,11 +4,12 @@ import ( "github.com/hashicorp/golang-lru/v2" "github.com/rs/zerolog/log" "sync" + "time" ) const MEM_PER_FILE_CACHE_B = 1024 * 1024 * 100 // 100MB const MEM_TOTAL_CACHE_B = 1024 * 1024 * 1024 * 1 //1GB -const BLOCKSIZE = 1024 * 1024 * 1 //10MB +const BLOCKSIZE = 1024 * 1024 * 1 //1MB type CacheBlock struct { data []byte @@ -38,11 +39,16 @@ func (CF *CachedFile) Kill() { } func (CF *CachedFile) fillLruBlock(blockNumber int, block *CacheBlock) { - buf, err := CF.dataRequestCallback(blockNumber*BLOCKSIZE, BLOCKSIZE) - if err != nil { - log.Debug().Err(err).Msg("Failed to acquire new data for the cache") + for i := 0; i < 5; i++ { + buf, err := CF.dataRequestCallback(blockNumber*BLOCKSIZE, BLOCKSIZE) + if err != nil { + log.Debug().Err(err).Msg("Failed to acquire new data for the cache") + time.Sleep(100 * time.Millisecond) + continue + } + block.data = buf + return } - block.data = buf } func (CF *CachedFile) Read(offset, length int) ([]byte, error) { diff --git a/fileretriever/fileclient.go b/fileretriever/fileclient.go index 9e827ca..2c63c2e 100644 --- a/fileretriever/fileclient.go +++ b/fileretriever/fileclient.go @@ -18,6 +18,7 @@ import ( ) var PATH_TTL = 60 * time.Second +var DEADLINE = 1 * time.Second type LocalPath string @@ -85,22 +86,10 @@ func (f *FileClient) PutOrGet(rpath RemotePath, cf *cache.CachedFile) *cache.Cac return cf } -func (*FileClient) Lo2Re(local LocalPath) RemotePath { - remote := local - if remote != "" && remote[0] == '/' { - remote = remote[1:] - } - return RemotePath(remote) -} - func (f *FileClient) Re2Lo(remote RemotePath) LocalPath { return LocalPath(f.srcDir + "/" + string(remote)) } -func (f *FileClient) SrcDir() string { - return f.srcDir -} - func (f *FileClient) getPeerConn(path RemotePath) (net.Conn, string, error) { candidates, ok := f.fPathRemoteCache.Get(path) if !ok { @@ -141,7 +130,7 @@ func (f *FileClient) getPeerConn(path RemotePath) (net.Conn, string, error) { log.Warn().Err(err).Msg("Failed to get peer conn") return nil, "", err } - err = conn.SetDeadline(time.Now().Add(1 * time.Second)) + err = conn.SetDeadline(time.Now().Add(DEADLINE)) if err != nil { log.Warn().Msg("Failed to set deadline") return nil, "", err @@ -169,7 +158,7 @@ func (f *FileClient) netFileInfo(path RemotePath, peer string) (*common.Finfo, e log.Warn().Err(err).Msg("Failed to get peer conn") return nil, err } - err = conn.SetDeadline(time.Now().Add(1 * time.Second)) + err = conn.SetDeadline(time.Now().Add(DEADLINE)) if err != nil { log.Warn().Err(err).Msg("Failed to set deadline") return nil, err @@ -403,12 +392,12 @@ func (f *FileClient) ThisFsToInode(path RemotePath) uint64 { func (f *FileClient) netReadDir(path RemotePath, peer string) ([]fuse.DirEntry, error) { conn, err := net.Dial("tcp", peer) - defer conn.Close() if err != nil { log.Warn().Err(err).Msg("Failed to get peer conn") return nil, err } - err = conn.SetDeadline(time.Now().Add(1 * time.Second)) + defer conn.Close() + err = conn.SetDeadline(time.Now().Add(DEADLINE * time.Second)) if err != nil { log.Warn().Err(err).Msg("Failed to set deadline") return nil, err diff --git a/fileretriever/fileserver.go b/fileretriever/fileserver.go index 6721319..c21b4a9 100644 --- a/fileretriever/fileserver.go +++ b/fileretriever/fileserver.go @@ -1,11 +1,15 @@ package fileretriever import ( + "context" "github.com/lunixbochs/struc" "github.com/rs/zerolog/log" + "golang.org/x/time/rate" "io/fs" + "math" "net" "os" + "readnetfs/cache" "strings" "time" ) @@ -40,11 +44,14 @@ type DirResponse struct { type FileServer struct { srcDir string bind string + limiter *rate.Limiter fclient *FileClient } -func NewFileServer(srcDir string, bind string, fclient *FileClient) *FileServer { - return &FileServer{srcDir: srcDir, bind: bind, fclient: fclient} +func NewFileServer(srcDir string, bind string, fclient *FileClient, rateLimit int) *FileServer { + maxPacketsPerSecond := (float64(rateLimit) * math.Pow(float64(10), float64(8))) / float64(cache.BLOCKSIZE*8) + log.Trace().Msgf("Setting rate limit to %d data packets per second", maxPacketsPerSecond) + return &FileServer{srcDir: srcDir, bind: bind, fclient: fclient, limiter: rate.NewLimiter(rate.Limit(maxPacketsPerSecond), 2)} } func (f *FileServer) handleDirRequest(conn net.Conn, request *FileRequest) { @@ -76,6 +83,10 @@ func (f *FileServer) handleDirRequest(conn net.Conn, request *FileRequest) { func (f *FileServer) handleFileRequest(conn net.Conn, request *FileRequest) { log.Printf("Trying to read %d bytes at %d from file %s", request.Length, request.Offset, request.Path) + err := f.limiter.Wait(context.TODO()) + if err != nil { + return + } buf, err := f.fclient.localRead(RemotePath(request.Path), request.Offset, request.Length) if err != nil { return @@ -143,7 +154,7 @@ func (f *FileServer) Serve() { conn, err := ln.Accept() if err != nil { log.Info().Err(err).Msg("Failed to accept") - conn.Close() + continue } go f.handleConn(conn) } diff --git a/go.mod b/go.mod index 254f83a..1b30000 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/hashicorp/golang-lru/v2 v2.0.6 github.com/lunixbochs/struc v0.0.0-20200707160740-784aaebc1d40 github.com/rs/zerolog v1.30.0 + golang.org/x/time v0.3.0 ) require ( diff --git a/go.sum b/go.sum index 04c0beb..828feee 100644 --- a/go.sum +++ b/go.sum @@ -24,3 +24,5 @@ golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18= golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= +golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/readnetfs.go b/readnetfs.go index d8c152b..175cd84 100644 --- a/readnetfs.go +++ b/readnetfs.go @@ -63,7 +63,7 @@ func (n *VirtNode) Write(ctx context.Context, fh fusefs.FileHandle, buf []byte, func (n *VirtNode) Getattr(ctx context.Context, fh fusefs.FileHandle, out *fuse.AttrOut) syscall.Errno { fInfo, err := n.fc.FileInfo(n.path) if err != nil { - return 0 + return syscall.EIO } out.Size = uint64(fInfo.Size) out.Mtime = uint64(fInfo.ModTime) @@ -93,6 +93,7 @@ func (n *VirtNode) Lookup(ctx context.Context, name string, out *fuse.EntryOut) child := n.NewInode(ctx, cNode, stable) return child, 0 } + func (n *VirtNode) Readdir(ctx context.Context) (fusefs.DirStream, syscall.Errno) { log.Trace().Msgf("Reading dir %s", n.path) entries, err := n.fc.ReadDir(n.path) @@ -123,35 +124,45 @@ func main() { mntDir := flag.String("mnt", "", "mnt") srcDir := flag.String("src", "", "src") flag.Var(&PeerNodes, "peer", "peer address and port") + send := flag.Bool("send", false, "send") + receive := flag.Bool("receive", false, "receive") + rateLimit := flag.Int("rate", 1000, "rate limit in Mbit/s") + flag.Parse() log.Debug().Msg("peers: " + strings.Join(PeerNodes, ", ")) log.Debug().Msg("bind: " + *bindAddrPort) fclient := fileretriever.NewFileClient(*srcDir, PeerNodes) - fserver := fileretriever.NewFileServer(*srcDir, *bindAddrPort, fclient) - go fserver.Serve() - - os.Mkdir(*mntDir, 0755) - root := &VirtNode{ - Inode: fusefs.Inode{}, - path: "", - fc: fclient, - } - server, err := fusefs.Mount(*mntDir, root, &fusefs.Options{ - MountOptions: fuse.MountOptions{ - Debug: false, - AllowOther: true, - FsName: "chrislfs", - }, - }) - if err != nil { - log.Debug().Err(err).Msg("Failed to mount") + if !*send && !*receive { + log.Fatal().Msg("Must specify either send or receive or both") } - log.Printf("Mounted on %s", *mntDir) - log.Printf("Unmount by calling 'fusermount -u %s'", *mntDir) + if *send { + fserver := fileretriever.NewFileServer(*srcDir, *bindAddrPort, fclient, *rateLimit) + go fserver.Serve() + } - // Wait until unmount before exiting - server.Wait() + if *receive { + os.Mkdir(*mntDir, 0755) + root := &VirtNode{ + Inode: fusefs.Inode{}, + path: "", + fc: fclient, + } + server, err := fusefs.Mount(*mntDir, root, &fusefs.Options{ + MountOptions: fuse.MountOptions{ + Debug: false, + AllowOther: true, + FsName: "chrislfs", + }, + }) + if err != nil { + log.Debug().Err(err).Msg("Failed to mount") + } + log.Printf("Mounted on %s", *mntDir) + log.Printf("Unmount by calling 'fusermount -u %s'", *mntDir) + // Wait until unmount before exiting + server.Wait() + } }