Skip to content

Commit

Permalink
unawareness prefetch implementation on snapshotter side
Browse files Browse the repository at this point in the history
1. send post request to http server
2. store prefetchlist
3. add prefetchlist in nydusd
  • Loading branch information
billie60 committed Jul 6, 2024
1 parent 0d93ad0 commit 8779993
Show file tree
Hide file tree
Showing 11 changed files with 148 additions and 63 deletions.
7 changes: 7 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,11 @@ type SystemControllerConfig struct {
DebugConfig DebugConfig `toml:"debug"`
}

type PrefetchControllerConfig struct {
Enable bool `toml:"enable"`
PrefetchConfig string `toml:"distribution_pull_endpoint"`
}

type SnapshotterConfig struct {
// Configuration format version
Version int `toml:"version"`
Expand All @@ -229,6 +234,8 @@ type SnapshotterConfig struct {
LoggingConfig LoggingConfig `toml:"log"`
CgroupConfig CgroupConfig `toml:"cgroup"`
Experimental Experimental `toml:"experimental"`
// Get prefetch list from http server
PrefetchControllerConfig PrefetchControllerConfig `toml:"prefetch"`
}

func LoadSnapshotterConfig(path string) (*SnapshotterConfig, error) {
Expand Down
14 changes: 14 additions & 0 deletions config/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type GlobalConfig struct {
DaemonThreadsNum int
CacheGCPeriod time.Duration
MirrorsConfig MirrorsConfig
PrefetchRoot string
}

func IsFusedevSharedModeEnabled() bool {
Expand All @@ -64,6 +65,18 @@ func GetConfigRoot() string {
return globalConfig.ConfigRoot
}

func GetPrefetchRoot() string {
return globalConfig.PrefetchRoot
}

func GetPrefetchEndpoint() string {
return globalConfig.origin.PrefetchControllerConfig.PrefetchConfig
}

func IsPrefetchEnabled() bool {
return globalConfig.origin.PrefetchControllerConfig.Enable
}

func GetMirrorsConfigDir() string {
return globalConfig.MirrorsConfig.Dir
}
Expand Down Expand Up @@ -181,6 +194,7 @@ func ProcessConfigurations(c *SnapshotterConfig) error {
globalConfig.ConfigRoot = filepath.Join(c.Root, "config")
globalConfig.SocketRoot = filepath.Join(c.Root, "socket")
globalConfig.RootMountpoint = filepath.Join(c.Root, "mnt")
globalConfig.PrefetchRoot = filepath.Join(c.Root, "prefetch")

globalConfig.MirrorsConfig = c.RemoteConfig.MirrorsConfig

Expand Down
9 changes: 9 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,15 @@ require (
)

require (
<<<<<<< HEAD
github.com/AdamKorcz/go-118-fuzz-build v0.0.0-20230306123547-8075edf89bb0 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/Microsoft/hcsshim v0.11.1 // indirect
=======
github.com/AdamKorcz/go-118-fuzz-build v0.0.0-20221215162035-5330a85ea652 // indirect
github.com/Microsoft/go-winio v0.6.0 // indirect
github.com/Microsoft/hcsshim v0.10.0-rc.7 // indirect
>>>>>>> 5977f28 (unawareness prefetch implementation on snapshotter side)
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.24 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.30 // indirect
Expand Down Expand Up @@ -111,6 +117,7 @@ require (
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/labstack/gommon v0.4.2 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
Expand All @@ -132,6 +139,8 @@ require (
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stefanberger/go-pkcs11uri v0.0.0-20201008174630-78d3cae3a980 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasttemplate v1.2.2 // indirect
github.com/vbatts/tar-split v0.11.2 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
go.mozilla.org/pkcs7 v0.0.0-20200128120323-432b2356ecb1 // indirect
Expand Down
4 changes: 4 additions & 0 deletions misc/snapshotter/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ daemon_mode = "dedicated"
# Whether snapshotter should try to clean up resources when it is closed
cleanup_on_close = false

[prefetch]
enable = false
get_prefetch_endpoint = "http://localhost:1323/api/v1/prefetch/download"

[system]
# Snapshotter's debug and trace HTTP server interface
enable = true
Expand Down
15 changes: 15 additions & 0 deletions pkg/daemon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/containerd/nydus-snapshotter/config"
"github.com/containerd/nydus-snapshotter/internal/constant"
"github.com/containerd/nydus-snapshotter/pkg/prefetch"
)

// Build runtime nydusd daemon object, which might be persisted later
Expand All @@ -31,6 +32,20 @@ func WithSocketDir(dir string) NewDaemonOpt {
}
}

func WithPrefetchDir(dir, imageID string) NewDaemonOpt {
return func(d *Daemon) error {
s := filepath.Join(dir, d.ID())
prefetchDir, err := prefetch.GetPrefetchList(s, imageID)
if err != nil {
return errors.Wrapf(err, "failed to get prefetchList for image %s in path %s", imageID, s)
}
if prefetchDir != "" {
d.States.PrefetchDir = prefetchDir
}
return nil
}
}

func WithRef(ref int32) NewDaemonOpt {
return func(d *Daemon) error {
d.ref = ref
Expand Down
3 changes: 2 additions & 1 deletion pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ type ConfigState struct {
SupervisorPath string
ThreadNum int
// Where the configuration file resides, all rafs instances share the same configuration template
ConfigDir string
ConfigDir string
PrefetchDir string
}

// TODO: Record queried nydusd state
Expand Down
10 changes: 7 additions & 3 deletions pkg/filesystem/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func (fs *Filesystem) Mount(ctx context.Context, snapshotID string, labels map[s
if err != nil {
return err
}
d, err = fs.createDaemon(fsManager, config.DaemonModeDedicated, mp, 0)
d, err = fs.createDaemon(fsManager, config.DaemonModeDedicated, mp, 0, imageID)
// if daemon already exists for snapshotID, just return
if err != nil && !errdefs.IsAlreadyExists(err) {
return err
Expand Down Expand Up @@ -578,7 +578,7 @@ func (fs *Filesystem) initSharedDaemon(fsManager *manager.Manager) (err error) {
return errors.Errorf("got null mountpoint for fsDriver %s", fsManager.FsDriver)
}

d, err := fs.createDaemon(fsManager, daemonMode, mp, 0)
d, err := fs.createDaemon(fsManager, daemonMode, mp, 0, "")
if err != nil {
return errors.Wrap(err, "initialize shared daemon")
}
Expand Down Expand Up @@ -612,7 +612,7 @@ func (fs *Filesystem) initSharedDaemon(fsManager *manager.Manager) (err error) {

// createDaemon create new nydus daemon by snapshotID and imageID
func (fs *Filesystem) createDaemon(fsManager *manager.Manager, daemonMode config.DaemonMode,
mountpoint string, ref int32) (d *daemon.Daemon, err error) {
mountpoint string, ref int32, imageID string) (d *daemon.Daemon, err error) {
opts := []daemon.NewDaemonOpt{
daemon.WithRef(ref),
daemon.WithSocketDir(config.GetSocketRoot()),
Expand All @@ -631,6 +631,10 @@ func (fs *Filesystem) createDaemon(fsManager *manager.Manager, daemonMode config
opts = append(opts, daemon.WithMountpoint(mountpoint))
}

if imageID != "" {
opts = append(opts, daemon.WithPrefetchDir(config.GetPrefetchRoot(), imageID))
}

d, err = daemon.NewDaemon(opts...)
if err != nil {
return nil, errors.Wrapf(err, "new daemon")
Expand Down
12 changes: 2 additions & 10 deletions pkg/manager/daemon_adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/containerd/nydus-snapshotter/pkg/errdefs"
"github.com/containerd/nydus-snapshotter/pkg/metrics/collector"
metrics "github.com/containerd/nydus-snapshotter/pkg/metrics/tool"
"github.com/containerd/nydus-snapshotter/pkg/prefetch"
)

const endpointGetBackend string = "/api/v1/daemons/%s/backend"
Expand Down Expand Up @@ -122,7 +121,6 @@ func (m *Manager) StartDaemon(d *daemon.Daemon) error {
// Build commandline according to nydusd daemon configuration.
func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool) (*exec.Cmd, error) {
var cmdOpts []command.Opt
var imageReference string

nydusdThreadNum := d.NydusdThreadNum()

Expand All @@ -148,8 +146,6 @@ func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool)
return nil, errors.Wrapf(errdefs.ErrNotFound, "daemon %s no rafs instance associated", d.ID())
}

imageReference = rafs.ImageID

bootstrap, err := rafs.BootstrapFile()
if err != nil {
return nil, errors.Wrapf(err, "locate bootstrap %s", bootstrap)
Expand All @@ -176,12 +172,8 @@ func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool)
command.WithID(d.ID()))
}

if imageReference != "" {
prefetchfiles := prefetch.Pm.GetPrefetchInfo(imageReference)
if prefetchfiles != "" {
cmdOpts = append(cmdOpts, command.WithPrefetchFiles(prefetchfiles))
prefetch.Pm.DeleteFromPrefetchMap(imageReference)
}
if d.States.PrefetchDir != "" {
cmdOpts = append(cmdOpts, command.WithPrefetchFiles(d.States.PrefetchDir))
}

cmdOpts = append(cmdOpts,
Expand Down
4 changes: 4 additions & 0 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,10 @@ func (m *Manager) cleanUpDaemonResources(d *daemon.Daemon) {
resource := []string{d.States.ConfigDir, d.States.LogDir}
if !d.IsSharedDaemon() {
socketDir := path.Dir(d.GetAPISock())
if d.States.PrefetchDir != "" {
prefetchDir := path.Dir(d.States.PrefetchDir)
resource = append(resource, prefetchDir)
}
resource = append(resource, socketDir)
}

Expand Down
115 changes: 84 additions & 31 deletions pkg/prefetch/prefetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,53 +8,106 @@ package prefetch

import (
"encoding/json"
"sync"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strings"

<<<<<<< HEAD
"github.com/containerd/log"
=======
"github.com/containerd/containerd/log"
"github.com/pkg/errors"

"github.com/containerd/nydus-snapshotter/config"
>>>>>>> 5977f28 (unawareness prefetch implementation on snapshotter side)
)

type prefetchInfo struct {
prefetchMap map[string]string
prefetchMutex sync.Mutex
type prefetchlist struct {
FilePaths []string `json:"files"`
}

var Pm prefetchInfo
func GetPrefetchList(prefetchDir, imageRepo string) (string, error) {
if config.IsPrefetchEnabled() {
url := config.GetPrefetchEndpoint()
getURL := fmt.Sprintf("%s?imageName=%s", url, imageRepo)

resp, err := http.Get(getURL)
if err != nil {
return "", err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNotFound {
return "", fmt.Errorf("get from server returned a non-OK status code: %d, HTTP Status Error", resp.StatusCode)
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}

func (p *prefetchInfo) SetPrefetchFiles(body []byte) error {
p.prefetchMutex.Lock()
defer p.prefetchMutex.Unlock()
if strings.Contains(string(body), "CacheItem not found") {
log.L.Infof("Cache item not found for image: %s\n", imageRepo)
return "", nil
}

prefetchfilePath, err := storePrefetchList(prefetchDir, body)
if err != nil {
return "", err
}
return prefetchfilePath, nil
}
return "", nil
}

var prefetchMsg []map[string]string
if err := json.Unmarshal(body, &prefetchMsg); err != nil {
return err
func storePrefetchList(prefetchDir string, list []byte) (string, error) {
if err := os.MkdirAll(prefetchDir, 0755); err != nil {
return "", errors.Wrapf(err, "create prefetch dir %s", prefetchDir)
}

if p.prefetchMap == nil {
p.prefetchMap = make(map[string]string)
filePath := filepath.Join(prefetchDir, "list")
jsonfilePath := filepath.Join(prefetchDir, "list.json")

file, err := os.OpenFile(filePath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644)
if err != nil {
fmt.Println("Error opening file:", err)
return "", errors.Wrap(err, "error opening prefetch file")
}
for _, item := range prefetchMsg {
image := item["image"]
prefetchfiles := item["prefetch"]
p.prefetchMap[image] = prefetchfiles
defer file.Close()

var prefetchSlice []string
err = json.Unmarshal(list, &prefetchSlice)
if err != nil {
return "", errors.Wrap(err, "failed to parse prefetch list")
}

log.L.Infof("received prefetch list from nri plugin: %v ", p.prefetchMap)
return nil
}
for _, path := range prefetchSlice {
content := path + "\n"
_, err := file.WriteString(content)
if err != nil {
return "", errors.Wrap(err, "error writing to prefetch file")
}
}

func (p *prefetchInfo) GetPrefetchInfo(image string) string {
p.prefetchMutex.Lock()
defer p.prefetchMutex.Unlock()
prefetchStruct := prefetchlist{FilePaths: prefetchSlice}
jsonByte, err := json.Marshal(prefetchStruct)
if err != nil {
return "", errors.Wrap(err, "failed to marshal to JSON")
}

if prefetchfiles, ok := p.prefetchMap[image]; ok {
return prefetchfiles
jsonfile, err := os.Create(jsonfilePath)
if err != nil {
return "", errors.Wrapf(err, "failed to create file %s", jsonfilePath)
}
return ""
}
defer jsonfile.Close()

func (p *prefetchInfo) DeleteFromPrefetchMap(image string) {
p.prefetchMutex.Lock()
defer p.prefetchMutex.Unlock()
_, err = jsonfile.Write(jsonByte)
if err != nil {
return "", errors.Wrap(err, "error writing JSON to file")
}

delete(p.prefetchMap, image)
return filePath, nil
}
Loading

0 comments on commit 8779993

Please sign in to comment.