Skip to content

Commit

Permalink
Merge pull request #2 from owo-uwu-nyaa/wip-refactor-1
Browse files Browse the repository at this point in the history
wip refactor v2
  • Loading branch information
likeazir authored Oct 5, 2023
2 parents 2f7a799 + 9b3966a commit b19e082
Show file tree
Hide file tree
Showing 17 changed files with 1,168 additions and 1,027 deletions.
7 changes: 4 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
FROM docker.io/golang:1-alpine AS builder
# do not use alpine here, as it is "[...] highly experimental, and not officially supported by the Go project (see golang/go#19938 for details)."
FROM docker.io/golang:latest AS builder

WORKDIR /app

COPY go.mod go.sum ./
RUN go mod download

COPY . ./
RUN go build
RUN go build ./cmd/readnetfs

FROM docker.io/alpine:3.18
FROM docker.io/alpine:latest

RUN apk add --no-cache fuse

Expand Down
102 changes: 0 additions & 102 deletions cache/filecache.go

This file was deleted.

165 changes: 84 additions & 81 deletions readnetfs.go → cmd/readnetfs/readnetfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,15 @@ import (
"github.com/hanwen/go-fuse/v2/fuse"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/rs/zerolog/pkgerrors"
"golang.org/x/sync/semaphore"
"os"
"readnetfs/cache"
"readnetfs/fileretriever"
"readnetfs/internal/pkg/cacheclient"
"readnetfs/internal/pkg/failcache"
"readnetfs/internal/pkg/fileserver"
"readnetfs/internal/pkg/fsclient"
"readnetfs/internal/pkg/localclient"
"readnetfs/internal/pkg/netclient"
"strings"
"syscall"
)
Expand All @@ -19,110 +24,99 @@ var MAX_CONCURRENCY int64 = 10

type VirtNode struct {
fusefs.Inode
path fileretriever.RemotePath
path fsclient.RemotePath
sem *semaphore.Weighted
fc *fileretriever.FileClient
fc *fsclient.FileClient
}

func (n *VirtNode) Open(ctx context.Context, openFlags uint32) (fh fusefs.FileHandle, fuseFlags uint32, errno syscall.Errno) {
n.sem.Acquire(ctx, 1)
_ = n.sem.Acquire(ctx, 1)
defer n.sem.Release(1)
return nil, 0, 0
}

func (n *VirtNode) Read(ctx context.Context, fh fusefs.FileHandle, dest []byte, off int64) (fuse.ReadResult, syscall.Errno) {
n.sem.Acquire(ctx, 1)
_ = n.sem.Acquire(ctx, 1)
defer n.sem.Release(1)
log.Trace().Msgf("Reading at %d from %s", off, n.path)
cacheEntry := n.fc.GetCachedFile(n.path)
if cacheEntry != nil {
buf, err := cacheEntry.Read(off, int64(len(dest)))
if err != nil {
log.Warn().Err(err).Msgf("Failed to read %s", n.path)
return nil, syscall.EIO
}
if len(buf) < len(dest) && len(buf) > 0 {
nb, err := cacheEntry.Read((off)+int64(len(buf)), int64(len(dest)-len(buf)))
if err != nil {
log.Warn().Err(err).Msgf("Failed to read %s", n.path)
return fuse.ReadResultData(buf), 0
}
buf = append(buf, nb...)
}
return fuse.ReadResultData(buf), 0
}
fInfo, err := n.fc.FileInfo(n.path)
if err != nil {
log.Debug().Err(err).Msgf("Failed to read file info for %s", n.path)
buf, err := n.fc.Read(n.path, off, dest)
if err != nil || buf == nil {
log.Debug().Err(err).Msgf("Failed to read %s", n.path)
return nil, syscall.EIO
}
cf := cache.NewCachedFile(int64(fInfo.Size), func(offset, length int64) ([]byte, error) {
return n.fc.Read(n.path, offset, length)
})
cf = n.fc.PutOrGet(n.path, cf)
buf, err := cf.Read(int64(off), fuse.MAX_KERNEL_WRITE)
if err != nil {
log.Warn().Err(err).Msgf("Failed to read %s", n.path)
return nil, syscall.EIO
}

return fuse.ReadResultData(buf), 0
}

func (n *VirtNode) Write(ctx context.Context, fh fusefs.FileHandle, buf []byte, off int64) (uint32, syscall.Errno) {
n.sem.Acquire(ctx, 1)
_ = n.sem.Acquire(ctx, 1)
defer n.sem.Release(1)
return 0, 0
}

func (n *VirtNode) Getattr(ctx context.Context, fh fusefs.FileHandle, out *fuse.AttrOut) syscall.Errno {
n.sem.Acquire(ctx, 1)
_ = n.sem.Acquire(ctx, 1)
defer n.sem.Release(1)
fInfo, err := n.fc.FileInfo(n.path)
info, err := n.fc.FileInfo(n.path)
if err != nil {
return syscall.EIO
}
out.Size = uint64(fInfo.Size)
out.Mtime = uint64(fInfo.ModTime)
out.Size = uint64(info.Size())
out.Mtime = uint64(info.ModTime().Unix())
return 0
}

func (n *VirtNode) Lookup(ctx context.Context, name string, out *fuse.EntryOut) (*fusefs.Inode, syscall.Errno) {
n.sem.Acquire(ctx, 1)
_ = n.sem.Acquire(ctx, 1)
defer n.sem.Release(1)
log.Debug().Msgf("Looking up %s in %s", name, n.path)
childpath := n.path.Append(name)
fInfo, err := n.fc.FileInfo(childpath)
childPath := n.path.Append(name)
fInfo, err := n.fc.FileInfo(childPath)
if err != nil {
log.Debug().Err(err).Msgf("Failed to read file info for %s", childpath)
log.Debug().Err(err).Msgf("Failed to read file info for %s", childPath)
return nil, syscall.EIO
}
stable := fusefs.StableAttr{
Ino: n.fc.ThisFsToInode(childpath),
Ino: n.fc.PathToInode(childPath),
}
if fInfo.IsDir {
if fInfo.IsDir() {
stable.Mode = uint32(fuse.S_IFDIR)
} else {
stable.Mode = uint32(fuse.S_IFREG)
}
cNode := &VirtNode{
sem: semaphore.NewWeighted(MAX_CONCURRENCY),
path: childpath,
path: childPath,
fc: n.fc,
}
child := n.NewInode(ctx, cNode, stable)
return child, 0
}

func (n *VirtNode) Readdir(ctx context.Context) (fusefs.DirStream, syscall.Errno) {
n.sem.Acquire(ctx, 1)
_ = n.sem.Acquire(ctx, 1)
defer n.sem.Release(1)
log.Trace().Msgf("Reading dir %s", n.path)
entries, err := n.fc.ReadDir(n.path)
infos, err := n.fc.ReadDir(n.path)
if err != nil {
log.Debug().Err(err).Msgf("Failed to read dir %s", n.path)
return nil, syscall.EIO
}
entries := make([]fuse.DirEntry, 0)
for _, info := range infos {
stable := fusefs.StableAttr{
Ino: n.fc.PathToInode(n.path.Append(info.Name())),
}
if info.IsDir() {
stable.Mode = uint32(fuse.S_IFDIR)
} else {
stable.Mode = uint32(fuse.S_IFREG)
}
entries = append(entries, fuse.DirEntry{
Mode: stable.Mode,
Name: info.Name(),
Ino: stable.Ino,
})
}
return fusefs.NewListDirStream(entries), 0
}

Expand All @@ -140,53 +134,62 @@ func (i *PeerAddress) Set(value string) error {
}

func main() {
zerolog.SetGlobalLevel(zerolog.TraceLevel)
zerolog.SetGlobalLevel(zerolog.InfoLevel)
zerolog.ErrorStackMarshaler = pkgerrors.MarshalStack
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})
log.Debug().Msg(strings.Join(os.Args, ","))
bindAddrPort := flag.String("bind", "", "Bind address and port in x.x.x.x:port format")
mntDir := flag.String("mnt", "", "Directory to mount the net filesystem on")
srcDir := flag.String("src", "", "Directory to serve files from")
flag.Var(&PeerNodes, "peer", "Peer addresses and ports in x.x.x.x:port format, has to specified for each peer like so: -peer x.x.x.x:port -peer x.x.x.x:port ...")
send := flag.Bool("send", false, "Serve files from the src directory")
receive := flag.Bool("receive", false, "Receive files and mount the net filesystem on the mnt directory")
rateLimit := flag.Int("rate", 1000, "rate limit in Mbit/s")
statsdAddrPort := flag.String("statsd", "", "Statsd server address and port in x.x.x.x:port format")

rateLimit := flag.Int("rate", 1000, "Rate limit in Mbit/s")
allowOther := flag.Bool("allow-other", true, "Allow other users to access the mount")
statsdAddrPort := flag.String("statsd", "", "Statsd fileserver address and port in x.x.x.x:port format")
flag.Parse()
log.Debug().Msg("peers: " + strings.Join(PeerNodes, ", "))
log.Debug().Msg("bind: " + *bindAddrPort)

fclient := fileretriever.NewFileClient(*srcDir, PeerNodes, *statsdAddrPort)
log.Info().Msg("peers: " + strings.Join(PeerNodes, ", "))
log.Info().Msg("bind: " + *bindAddrPort)

if !*send && !*receive {
log.Fatal().Msg("Must specify either send or receive or both")
}

localClient := failcache.NewFailCache(localclient.NewLocalclient(*srcDir))
netClient := cacheclient.NewCacheClient(netclient.NewNetClient(*statsdAddrPort, PeerNodes))
client := fsclient.NewFileClient(localClient, netClient)
if *send {
fserver := fileretriever.NewFileServer(*srcDir, *bindAddrPort, fclient, *rateLimit)
fserver := fileserver.NewFileServer(*srcDir, *bindAddrPort, localClient, *rateLimit, *statsdAddrPort)
go fserver.Serve()
}

if *receive {
os.Mkdir(*mntDir, 0755)
root := &VirtNode{
Inode: fusefs.Inode{},
sem: semaphore.NewWeighted(MAX_CONCURRENCY),
path: "",
fc: fclient,
}
server, err := fusefs.Mount(*mntDir, root, &fusefs.Options{
MountOptions: fuse.MountOptions{
Debug: false,
AllowOther: true,
FsName: "chrislfs",
},
})
if err != nil {
log.Debug().Err(err).Msg("Failed to mount")
}
log.Printf("Mounted on %s", *mntDir)
log.Printf("Unmount by calling 'fusermount -u %s'", *mntDir)
// Wait until unmount before exiting
server.Wait()
go func() {
err := os.Mkdir(*mntDir, 0755)
if err != nil {
log.Warn().Err(err).Msg("Failed to create mount directory")
}
root := &VirtNode{
Inode: fusefs.Inode{},
sem: semaphore.NewWeighted(MAX_CONCURRENCY),
path: "",
fc: client,
}
server, err := fusefs.Mount(*mntDir, root, &fusefs.Options{
MountOptions: fuse.MountOptions{
Debug: false,
AllowOther: *allowOther,
FsName: "chrislfs",
},
})
if err != nil {
log.Debug().Err(err).Msg("Failed to mount")
}
log.Printf("Mounted on %s", *mntDir)
log.Printf("Unmount by calling 'fusermount -u %s'", *mntDir)
// Wait until unmount before exiting
server.Wait()
}()
}
//block forever
select {}
}
Loading

0 comments on commit b19e082

Please sign in to comment.