Skip to content

Commit

Permalink
rate limits, flags for receive/send
Browse files Browse the repository at this point in the history
  • Loading branch information
likeazir committed Aug 28, 2023
1 parent 0c4ac39 commit 3349ee6
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 47 deletions.
16 changes: 11 additions & 5 deletions cache/filecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"github.com/hashicorp/golang-lru/v2"
"github.com/rs/zerolog/log"
"sync"
"time"
)

const MEM_PER_FILE_CACHE_B = 1024 * 1024 * 100 // 100MB
const MEM_TOTAL_CACHE_B = 1024 * 1024 * 1024 * 1 //1GB
const BLOCKSIZE = 1024 * 1024 * 1 //10MB
const BLOCKSIZE = 1024 * 1024 * 1 //1MB

type CacheBlock struct {
data []byte
Expand Down Expand Up @@ -38,11 +39,16 @@ func (CF *CachedFile) Kill() {
}

func (CF *CachedFile) fillLruBlock(blockNumber int, block *CacheBlock) {
buf, err := CF.dataRequestCallback(blockNumber*BLOCKSIZE, BLOCKSIZE)
if err != nil {
log.Debug().Err(err).Msg("Failed to acquire new data for the cache")
for i := 0; i < 5; i++ {
buf, err := CF.dataRequestCallback(blockNumber*BLOCKSIZE, BLOCKSIZE)
if err != nil {
log.Debug().Err(err).Msg("Failed to acquire new data for the cache")
time.Sleep(100 * time.Millisecond)
continue
}
block.data = buf
return
}
block.data = buf
}

func (CF *CachedFile) Read(offset, length int) ([]byte, error) {
Expand Down
21 changes: 5 additions & 16 deletions fileretriever/fileclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
)

var PATH_TTL = 60 * time.Second
var DEADLINE = 1 * time.Second

type LocalPath string

Expand Down Expand Up @@ -85,22 +86,10 @@ func (f *FileClient) PutOrGet(rpath RemotePath, cf *cache.CachedFile) *cache.Cac
return cf
}

func (*FileClient) Lo2Re(local LocalPath) RemotePath {
remote := local
if remote != "" && remote[0] == '/' {
remote = remote[1:]
}
return RemotePath(remote)
}

func (f *FileClient) Re2Lo(remote RemotePath) LocalPath {
return LocalPath(f.srcDir + "/" + string(remote))
}

func (f *FileClient) SrcDir() string {
return f.srcDir
}

func (f *FileClient) getPeerConn(path RemotePath) (net.Conn, string, error) {
candidates, ok := f.fPathRemoteCache.Get(path)
if !ok {
Expand Down Expand Up @@ -141,7 +130,7 @@ func (f *FileClient) getPeerConn(path RemotePath) (net.Conn, string, error) {
log.Warn().Err(err).Msg("Failed to get peer conn")
return nil, "", err
}
err = conn.SetDeadline(time.Now().Add(1 * time.Second))
err = conn.SetDeadline(time.Now().Add(DEADLINE))
if err != nil {
log.Warn().Msg("Failed to set deadline")
return nil, "", err
Expand Down Expand Up @@ -169,7 +158,7 @@ func (f *FileClient) netFileInfo(path RemotePath, peer string) (*common.Finfo, e
log.Warn().Err(err).Msg("Failed to get peer conn")
return nil, err
}
err = conn.SetDeadline(time.Now().Add(1 * time.Second))
err = conn.SetDeadline(time.Now().Add(DEADLINE))
if err != nil {
log.Warn().Err(err).Msg("Failed to set deadline")
return nil, err
Expand Down Expand Up @@ -403,12 +392,12 @@ func (f *FileClient) ThisFsToInode(path RemotePath) uint64 {

func (f *FileClient) netReadDir(path RemotePath, peer string) ([]fuse.DirEntry, error) {
conn, err := net.Dial("tcp", peer)
defer conn.Close()
if err != nil {
log.Warn().Err(err).Msg("Failed to get peer conn")
return nil, err
}
err = conn.SetDeadline(time.Now().Add(1 * time.Second))
defer conn.Close()
err = conn.SetDeadline(time.Now().Add(DEADLINE * time.Second))
if err != nil {
log.Warn().Err(err).Msg("Failed to set deadline")
return nil, err
Expand Down
17 changes: 14 additions & 3 deletions fileretriever/fileserver.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package fileretriever

import (
"context"
"github.com/lunixbochs/struc"
"github.com/rs/zerolog/log"
"golang.org/x/time/rate"
"io/fs"
"math"
"net"
"os"
"readnetfs/cache"
"strings"
"time"
)
Expand Down Expand Up @@ -40,11 +44,14 @@ type DirResponse struct {
type FileServer struct {
srcDir string
bind string
limiter *rate.Limiter
fclient *FileClient
}

func NewFileServer(srcDir string, bind string, fclient *FileClient) *FileServer {
return &FileServer{srcDir: srcDir, bind: bind, fclient: fclient}
func NewFileServer(srcDir string, bind string, fclient *FileClient, rateLimit int) *FileServer {
maxPacketsPerSecond := (float64(rateLimit) * math.Pow(float64(10), float64(8))) / float64(cache.BLOCKSIZE*8)
log.Trace().Msgf("Setting rate limit to %d data packets per second", maxPacketsPerSecond)
return &FileServer{srcDir: srcDir, bind: bind, fclient: fclient, limiter: rate.NewLimiter(rate.Limit(maxPacketsPerSecond), 2)}
}

func (f *FileServer) handleDirRequest(conn net.Conn, request *FileRequest) {
Expand Down Expand Up @@ -76,6 +83,10 @@ func (f *FileServer) handleDirRequest(conn net.Conn, request *FileRequest) {

func (f *FileServer) handleFileRequest(conn net.Conn, request *FileRequest) {
log.Printf("Trying to read %d bytes at %d from file %s", request.Length, request.Offset, request.Path)
err := f.limiter.Wait(context.TODO())
if err != nil {
return
}
buf, err := f.fclient.localRead(RemotePath(request.Path), request.Offset, request.Length)
if err != nil {
return
Expand Down Expand Up @@ -143,7 +154,7 @@ func (f *FileServer) Serve() {
conn, err := ln.Accept()
if err != nil {
log.Info().Err(err).Msg("Failed to accept")
conn.Close()
continue
}
go f.handleConn(conn)
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/hashicorp/golang-lru/v2 v2.0.6
github.com/lunixbochs/struc v0.0.0-20200707160740-784aaebc1d40
github.com/rs/zerolog v1.30.0
golang.org/x/time v0.3.0
)

require (
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,5 @@ golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18=
golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
57 changes: 34 additions & 23 deletions readnetfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (n *VirtNode) Write(ctx context.Context, fh fusefs.FileHandle, buf []byte,
func (n *VirtNode) Getattr(ctx context.Context, fh fusefs.FileHandle, out *fuse.AttrOut) syscall.Errno {
fInfo, err := n.fc.FileInfo(n.path)
if err != nil {
return 0
return syscall.EIO
}
out.Size = uint64(fInfo.Size)
out.Mtime = uint64(fInfo.ModTime)
Expand Down Expand Up @@ -93,6 +93,7 @@ func (n *VirtNode) Lookup(ctx context.Context, name string, out *fuse.EntryOut)
child := n.NewInode(ctx, cNode, stable)
return child, 0
}

func (n *VirtNode) Readdir(ctx context.Context) (fusefs.DirStream, syscall.Errno) {
log.Trace().Msgf("Reading dir %s", n.path)
entries, err := n.fc.ReadDir(n.path)
Expand Down Expand Up @@ -123,35 +124,45 @@ func main() {
mntDir := flag.String("mnt", "", "mnt")
srcDir := flag.String("src", "", "src")
flag.Var(&PeerNodes, "peer", "peer address and port")
send := flag.Bool("send", false, "send")
receive := flag.Bool("receive", false, "receive")
rateLimit := flag.Int("rate", 1000, "rate limit in Mbit/s")

flag.Parse()
log.Debug().Msg("peers: " + strings.Join(PeerNodes, ", "))
log.Debug().Msg("bind: " + *bindAddrPort)

fclient := fileretriever.NewFileClient(*srcDir, PeerNodes)

fserver := fileretriever.NewFileServer(*srcDir, *bindAddrPort, fclient)
go fserver.Serve()

os.Mkdir(*mntDir, 0755)
root := &VirtNode{
Inode: fusefs.Inode{},
path: "",
fc: fclient,
}
server, err := fusefs.Mount(*mntDir, root, &fusefs.Options{
MountOptions: fuse.MountOptions{
Debug: false,
AllowOther: true,
FsName: "chrislfs",
},
})
if err != nil {
log.Debug().Err(err).Msg("Failed to mount")
if !*send && !*receive {
log.Fatal().Msg("Must specify either send or receive or both")
}

log.Printf("Mounted on %s", *mntDir)
log.Printf("Unmount by calling 'fusermount -u %s'", *mntDir)
if *send {
fserver := fileretriever.NewFileServer(*srcDir, *bindAddrPort, fclient, *rateLimit)
go fserver.Serve()
}

// Wait until unmount before exiting
server.Wait()
if *receive {
os.Mkdir(*mntDir, 0755)
root := &VirtNode{
Inode: fusefs.Inode{},
path: "",
fc: fclient,
}
server, err := fusefs.Mount(*mntDir, root, &fusefs.Options{
MountOptions: fuse.MountOptions{
Debug: false,
AllowOther: true,
FsName: "chrislfs",
},
})
if err != nil {
log.Debug().Err(err).Msg("Failed to mount")
}
log.Printf("Mounted on %s", *mntDir)
log.Printf("Unmount by calling 'fusermount -u %s'", *mntDir)
// Wait until unmount before exiting
server.Wait()
}
}

0 comments on commit 3349ee6

Please sign in to comment.