From 0d93ad02ff03f836a239423d452d30c2cbfdf252 Mon Sep 17 00:00:00 2001 From: Guangyu Xu Date: Fri, 5 Jan 2024 17:47:01 +0000 Subject: [PATCH] unawareness runtime prefetch implementation on optimizer side 1. add prefetchlist store prefetchlist storage service. 2. Modify the optimizer to publish the access file list as a prefetchlist to the storage service when obtaining it. 3. modify http server add lru algo 4. use echo web framework 5. modify based on comments 6. get and post prefetchlist in optimizer 7. separate fanotify server from optimizer --- cmd/optimizer-nri-plugin/main.go | 148 +++++-------- go.mod | 15 +- go.sum | 37 ++-- misc/example/optimizer-nri-plugin.conf | 8 +- pkg/fanotify/fanotify.go | 156 ------------- pkg/{ => optimizer}/fanotify/conn/conn.go | 0 pkg/optimizer/fanotify/fanotify.go | 255 ++++++++++++++++++++++ pkg/optimizer/optimizer.go | 114 ++++++++++ tools/prefetch-distribution/main.go | 151 +++++++++++++ 9 files changed, 612 insertions(+), 272 deletions(-) delete mode 100644 pkg/fanotify/fanotify.go rename pkg/{ => optimizer}/fanotify/conn/conn.go (100%) create mode 100644 pkg/optimizer/fanotify/fanotify.go create mode 100644 pkg/optimizer/optimizer.go create mode 100644 tools/prefetch-distribution/main.go diff --git a/cmd/optimizer-nri-plugin/main.go b/cmd/optimizer-nri-plugin/main.go index b3e6646da9..64c905f49e 100644 --- a/cmd/optimizer-nri-plugin/main.go +++ b/cmd/optimizer-nri-plugin/main.go @@ -8,23 +8,19 @@ package main import ( "context" - "fmt" "io" "log/syslog" "os" - "path/filepath" "strings" - "time" - "github.com/containerd/log" "github.com/pkg/errors" + "github.com/sirupsen/logrus" "github.com/urfave/cli/v2" - "github.com/containerd/containerd/reference/docker" "github.com/containerd/nri/pkg/api" "github.com/containerd/nri/pkg/stub" "github.com/containerd/nydus-snapshotter/pkg/errdefs" - "github.com/containerd/nydus-snapshotter/pkg/fanotify" + "github.com/containerd/nydus-snapshotter/pkg/optimizer" "github.com/containerd/nydus-snapshotter/version" "github.com/pelletier/go-toml" ) @@ -33,23 +29,18 @@ const ( defaultEvents = "StartContainer,StopContainer" defaultServerPath = "/usr/local/bin/optimizer-server" defaultPersistDir = "/opt/nri/optimizer/results" + defaultServerType = optimizer.FANOTIFY ) type PluginConfig struct { - Events []string `toml:"events"` - - ServerPath string `toml:"server_path"` - PersistDir string `toml:"persist_dir"` - Readable bool `toml:"readable"` - Timeout int `toml:"timeout"` - Overwrite bool `toml:"overwrite"` + Events []string + optimizerCfg optimizer.Config } type PluginArgs struct { - PluginName string - PluginIdx string - PluginEvents string - Config PluginConfig + PluginName string + PluginIdx string + Config PluginConfig } type Flags struct { @@ -70,39 +61,44 @@ func buildFlags(args *PluginArgs) []cli.Flag { Destination: &args.PluginIdx, }, &cli.StringFlag{ - Name: "events", - Value: defaultEvents, - Usage: "the events that containerd subscribes to. DO NOT CHANGE THIS.", - Destination: &args.PluginEvents, + Name: "server-type", + Value: defaultServerType, + Usage: "the type of optimizer, available value includes [\"fanotify\"]", + Destination: &args.Config.optimizerCfg.ServerType, }, &cli.StringFlag{ Name: "server-path", Value: defaultServerPath, Usage: "the path of optimizer server binary", - Destination: &args.Config.ServerPath, + Destination: &args.Config.optimizerCfg.ServerPath, }, &cli.StringFlag{ Name: "persist-dir", Value: defaultPersistDir, Usage: "the directory to persist accessed files list for container", - Destination: &args.Config.PersistDir, + Destination: &args.Config.optimizerCfg.PersistDir, }, &cli.BoolFlag{ Name: "readable", Value: false, Usage: "whether to make the csv file human readable", - Destination: &args.Config.Readable, + Destination: &args.Config.optimizerCfg.Readable, }, &cli.IntFlag{ Name: "timeout", Value: 0, Usage: "the timeout to kill optimizer server, 0 to disable it", - Destination: &args.Config.Timeout, + Destination: &args.Config.optimizerCfg.Timeout, }, &cli.BoolFlag{ Name: "overwrite", Usage: "whether to overwrite the existed persistent files", - Destination: &args.Config.Overwrite, + Destination: &args.Config.optimizerCfg.Overwrite, + }, + &cli.StringFlag{ + Name: "prefetch-distribution-url", + Usage: "The service url of prefetch distribution, for example: http://localhost:1323", + Destination: &args.Config.optimizerCfg.PrefetchDistributionURL, }, } } @@ -121,21 +117,15 @@ type plugin struct { } var ( - cfg PluginConfig - logWriter *syslog.Writer - globalFanotifyServer = make(map[string]*fanotify.Server) - - _ = stub.ConfigureInterface(&plugin{}) - _ = stub.StartContainerInterface(&plugin{}) - _ = stub.StopContainerInterface(&plugin{}) -) - -const ( - imageNameLabel = "io.kubernetes.cri.image-name" + cfg PluginConfig + log *logrus.Logger + logWriter *syslog.Writer + _ = stub.ConfigureInterface(&plugin{}) + globalServer = make(map[string]optimizer.Server) ) -func (p *plugin) Configure(ctx context.Context, config, runtime, version string) (stub.EventMask, error) { - log.G(ctx).Infof("got configuration data: %q from runtime %s %s", config, runtime, version) +func (p *plugin) Configure(_ context.Context, config, runtime, version string) (stub.EventMask, error) { + log.Infof("got configuration data: %q from runtime %s %s", config, runtime, version) if config == "" { return p.mask, nil } @@ -144,7 +134,7 @@ func (p *plugin) Configure(ctx context.Context, config, runtime, version string) if err != nil { return 0, errors.Wrap(err, "parse TOML") } - if err := tree.Unmarshal(&cfg); err != nil { + if err := tree.Unmarshal(&cfg.optimizerCfg); err != nil { return 0, err } @@ -153,74 +143,48 @@ func (p *plugin) Configure(ctx context.Context, config, runtime, version string) return 0, errors.Wrap(err, "parse events in configuration") } - log.G(ctx).Infof("configuration: %#v", cfg) + log.Infof("configuration: %#v", cfg) return p.mask, nil } -func (p *plugin) StartContainer(_ context.Context, _ *api.PodSandbox, container *api.Container) error { - dir, imageName, err := GetImageName(container.Annotations) +func (p *plugin) StartContainer(_ *api.PodSandbox, container *api.Container) error { + imageName, server, err := optimizer.NewServer(cfg.optimizerCfg, container, logWriter) if err != nil { return err } - persistDir := filepath.Join(cfg.PersistDir, dir) - if err := os.MkdirAll(persistDir, os.ModePerm); err != nil { - return err - } - - persistFile := filepath.Join(persistDir, imageName) - if cfg.Timeout > 0 { - persistFile = fmt.Sprintf("%s.timeout%ds", persistFile, cfg.Timeout) + if server == nil { + return nil } - fanotifyServer := fanotify.NewServer(cfg.ServerPath, container.Pid, imageName, persistFile, cfg.Readable, cfg.Overwrite, time.Duration(cfg.Timeout)*time.Second, logWriter) - - if err := fanotifyServer.RunServer(); err != nil { + if err := server.Start(); err != nil { return err } - globalFanotifyServer[imageName] = fanotifyServer + globalServer[imageName] = server return nil } -func (p *plugin) StopContainer(_ context.Context, _ *api.PodSandbox, container *api.Container) ([]*api.ContainerUpdate, error) { +func (p *plugin) StopContainer(_ *api.PodSandbox, container *api.Container) ([]*api.ContainerUpdate, error) { var update = []*api.ContainerUpdate{} - _, imageName, err := GetImageName(container.Annotations) + _, imageName, _, err := optimizer.GetImageName(container.Annotations) if err != nil { return update, err } - if fanotifyServer, ok := globalFanotifyServer[imageName]; ok { - fanotifyServer.StopServer() - } else { - return nil, errors.New("can not find fanotify server for container image " + imageName) - } - - return update, nil -} -func GetImageName(annotations map[string]string) (string, string, error) { - named, err := docker.ParseDockerRef(annotations[imageNameLabel]) - if err != nil { - return "", "", err + if server, ok := globalServer[imageName]; ok { + server.Stop() } - nameTagged := named.(docker.NamedTagged) - repo := docker.Path(nameTagged) - - dir := filepath.Dir(repo) - image := filepath.Base(repo) - - imageName := image + ":" + nameTagged.Tag() - return dir, imageName, nil + return update, nil } func (p *plugin) onClose() { - for _, fanotifyServer := range globalFanotifyServer { - fanotifyServer.StopServer() + for _, server := range globalServer { + server.Stop() } - os.Exit(0) } func main() { @@ -239,13 +203,13 @@ func main() { cfg = flags.Args.Config - // FIXME(thaJeztah): ucontainerd's log does not set "PadLevelText: true" - _ = log.SetFormat(log.TextFormat) - ctx := log.WithLogger(context.Background(), log.L) - + log = logrus.StandardLogger() + log.SetFormatter(&logrus.TextFormatter{ + PadLevelText: true, + }) logWriter, err = syslog.New(syslog.LOG_INFO, "optimizer-nri-plugin") if err == nil { - log.G(ctx).Logger.SetOutput(io.MultiWriter(os.Stdout, logWriter)) + log.SetOutput(io.MultiWriter(os.Stdout, logWriter)) } if flags.Args.PluginName != "" { @@ -257,18 +221,18 @@ func main() { p := &plugin{} - if p.mask, err = api.ParseEventMask(flags.Args.PluginEvents); err != nil { - log.G(ctx).Fatalf("failed to parse events: %v", err) + if p.mask, err = api.ParseEventMask(defaultEvents); err != nil { + log.Fatalf("failed to parse events: %v", err) } - cfg.Events = strings.Split(flags.Args.PluginEvents, ",") + cfg.Events = strings.Split(defaultEvents, ",") if p.stub, err = stub.New(p, append(opts, stub.WithOnClose(p.onClose))...); err != nil { - log.G(ctx).Fatalf("failed to create plugin stub: %v", err) + log.Fatalf("failed to create plugin stub: %v", err) } err = p.stub.Run(context.Background()) if err != nil { - log.G(ctx).Errorf("plugin exited with error %v", err) + log.Errorf("plugin exited with error %v", err) os.Exit(1) } @@ -277,9 +241,9 @@ func main() { } if err := app.Run(os.Args); err != nil { if errdefs.IsConnectionClosed(err) { - log.L.Info("optimizer NRI plugin exited") + log.Info("optimizer NRI plugin exited") } else { - log.L.WithError(err).Fatal("failed to start optimizer NRI plugin") + log.WithError(err).Fatal("failed to start optimizer NRI plugin") } } } diff --git a/go.mod b/go.mod index 0f78f78f20..4212fa62f8 100644 --- a/go.mod +++ b/go.mod @@ -29,6 +29,7 @@ require ( github.com/hashicorp/go-retryablehttp v0.7.2 github.com/imdario/mergo v0.3.13 github.com/klauspost/compress v1.16.0 + github.com/labstack/echo/v4 v4.11.4 github.com/moby/locker v1.0.1 github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 github.com/opencontainers/go-digest v1.0.0 @@ -43,9 +44,9 @@ require ( github.com/urfave/cli/v2 v2.25.0 go.etcd.io/bbolt v1.3.7 golang.org/x/exp v0.0.0-20231006140011-7918f672742d - golang.org/x/net v0.17.0 + golang.org/x/net v0.19.0 golang.org/x/sync v0.4.0 - golang.org/x/sys v0.13.0 + golang.org/x/sys v0.15.0 google.golang.org/grpc v1.59.0 gopkg.in/natefinch/lumberjack.v2 v2.2.1 gotest.tools v2.2.0+incompatible @@ -112,7 +113,7 @@ require ( github.com/json-iterator/go v1.1.12 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect - github.com/mattn/go-isatty v0.0.17 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/miekg/pkcs11 v1.1.1 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect @@ -137,12 +138,12 @@ require ( go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/otel v1.14.0 // indirect go.opentelemetry.io/otel/trace v1.14.0 // indirect - golang.org/x/crypto v0.14.0 // indirect + golang.org/x/crypto v0.17.0 // indirect golang.org/x/mod v0.13.0 // indirect golang.org/x/oauth2 v0.11.0 // indirect - golang.org/x/term v0.13.0 // indirect - golang.org/x/text v0.13.0 // indirect - golang.org/x/time v0.3.0 // indirect + golang.org/x/term v0.15.0 // indirect + golang.org/x/text v0.14.0 // indirect + golang.org/x/time v0.5.0 // indirect golang.org/x/tools v0.14.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20231012201019-e917dd12ba7a // indirect diff --git a/go.sum b/go.sum index 86f13d9a07..5367e27596 100644 --- a/go.sum +++ b/go.sum @@ -237,6 +237,10 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/labstack/echo/v4 v4.11.4 h1:vDZmA+qNeh1pd/cCkEicDMrjtrnMGQ1QFI9gWN1zGq8= +github.com/labstack/echo/v4 v4.11.4/go.mod h1:noh7EvLwqDsmh/X/HWKPUl1AjzJrhyptRyEbQJfxen8= +github.com/labstack/gommon v0.4.2 h1:F8qTUNXgG1+6WQmqoUWnz8WiEU60mXVVw0P4ht1WRA0= +github.com/labstack/gommon v0.4.2/go.mod h1:QlUFxVM+SNXhDL/Z7YhocGIBYOiwB0mXm1+1bAPHPyU= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= @@ -245,8 +249,8 @@ github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovk github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcMEpPG5Rm84= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= -github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng= -github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/miekg/pkcs11 v1.1.1 h1:Ugu9pdy6vAYku5DEpVWVFPYnzV+bxB+iRdbuFSu7TvU= @@ -341,6 +345,10 @@ github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtX github.com/urfave/cli v1.22.4/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/urfave/cli/v2 v2.25.0 h1:ykdZKuQey2zq0yin/l7JOm9Mh+pg72ngYMeB0ABn6q8= github.com/urfave/cli/v2 v2.25.0/go.mod h1:GHupkWPMM0M/sj1a2b4wUrWBPzazNrIjouW6fmdJLxc= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQD0Loo= +github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= github.com/vbatts/tar-split v0.11.2 h1:Via6XqJr0hceW4wff3QRzD5gAk/tatMw/4ZA7cTlIME= github.com/vbatts/tar-split v0.11.2/go.mod h1:vV3ZuO2yWSVsz+pfFzDG/upWH1JhjOiEaWq6kXyQ3VI= github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYppBueQtXaqoE= @@ -368,8 +376,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= -golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc= -golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= +golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= +golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI= golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo= @@ -396,8 +404,8 @@ golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= -golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= +golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.11.0 h1:vPL4xzxBM4niKCW6g9whtaWVXTJf1U5e4aZxxFx/gbU= @@ -433,20 +441,21 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= -golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek= -golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= +golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4= +golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= -golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= -golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= -golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= +golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= diff --git a/misc/example/optimizer-nri-plugin.conf b/misc/example/optimizer-nri-plugin.conf index 7b6a561eb1..47530cc6b0 100644 --- a/misc/example/optimizer-nri-plugin.conf +++ b/misc/example/optimizer-nri-plugin.conf @@ -1,13 +1,15 @@ +# The type of optimizer, available value includes ["fanotify"] +server_type = "fanotify" # The directory to persist accessed files list for container. persist_dir = "/opt/nri/optimizer/results" # Whether to make the csv file human readable. readable = false # The path of optimizer server binary. +# Only used for fanotify optimizer. server_path = "/usr/local/bin/optimizer-server" # The timeout to kill optimizer server, 0 to disable it. timeout = 0 # Whether to overwrite the existed persistent files. overwrite = false -# The events that containerd subscribes to. -# Do not change this element. -events = [ "StartContainer", "StopContainer" ] \ No newline at end of file +# The service url of prefetch distribution. +prefetch_distribution_url = "http://localhost:1323" diff --git a/pkg/fanotify/fanotify.go b/pkg/fanotify/fanotify.go deleted file mode 100644 index 7d272cea42..0000000000 --- a/pkg/fanotify/fanotify.go +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Copyright (c) 2023. Nydus Developers. All rights reserved. - * - * SPDX-License-Identifier: Apache-2.0 - */ - -package fanotify - -import ( - "bufio" - "encoding/csv" - "fmt" - "io" - "log/syslog" - "os" - "os/exec" - "syscall" - "time" - - "github.com/containerd/nydus-snapshotter/pkg/fanotify/conn" - "github.com/containerd/nydus-snapshotter/pkg/utils/display" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" -) - -type Server struct { - BinaryPath string - ContainerPid uint32 - ImageName string - PersistFile string - Readable bool - Overwrite bool - Timeout time.Duration - Client *conn.Client - Cmd *exec.Cmd - LogWriter *syslog.Writer -} - -func NewServer(binaryPath string, containerPid uint32, imageName string, persistFile string, readable bool, overwrite bool, timeout time.Duration, logWriter *syslog.Writer) *Server { - return &Server{ - BinaryPath: binaryPath, - ContainerPid: containerPid, - ImageName: imageName, - PersistFile: persistFile, - Readable: readable, - Overwrite: overwrite, - Timeout: timeout, - LogWriter: logWriter, - } -} - -func (fserver *Server) RunServer() error { - if !fserver.Overwrite { - if file, err := os.Stat(fserver.PersistFile); err == nil && !file.IsDir() { - return nil - } - } - - cmd := exec.Command(fserver.BinaryPath) - cmd.SysProcAttr = &syscall.SysProcAttr{ - Cloneflags: syscall.CLONE_NEWNS, - Setpgid: true, - } - cmd.Env = append(cmd.Env, "_MNTNS_PID="+fmt.Sprint(fserver.ContainerPid)) - cmd.Env = append(cmd.Env, "_TARGET=/") - cmd.Stderr = fserver.LogWriter - - notifyR, err := cmd.StdoutPipe() - if err != nil { - return err - } - fserver.Client = &conn.Client{ - Reader: bufio.NewReader(notifyR), - } - - if err := cmd.Start(); err != nil { - return err - } - fserver.Cmd = cmd - - go func() { - if err := fserver.RunReceiver(); err != nil { - logrus.WithError(err).Errorf("Failed to receive event information from server") - } - }() - - if fserver.Timeout > 0 { - go func() { - time.Sleep(fserver.Timeout) - fserver.StopServer() - }() - } - - return nil -} - -func (fserver *Server) RunReceiver() error { - f, err := os.OpenFile(fserver.PersistFile, os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - return errors.Wrapf(err, "failed to open file %q", fserver.PersistFile) - } - defer f.Close() - - persistCsvFile := fmt.Sprintf("%s.csv", fserver.PersistFile) - fCsv, err := os.Create(persistCsvFile) - if err != nil { - return errors.Wrapf(err, "failed to create file %q", persistCsvFile) - } - defer fCsv.Close() - - csvWriter := csv.NewWriter(fCsv) - if err := csvWriter.Write([]string{"path", "size", "elapsed"}); err != nil { - return errors.Wrapf(err, "failed to write csv header") - } - csvWriter.Flush() - - for { - eventInfo, err := fserver.Client.GetEventInfo() - if err != nil { - if err == io.EOF { - logrus.Infoln("Get EOF from fanotify server, break event receiver") - break - } - return fmt.Errorf("failed to get event information: %v", err) - } - - if eventInfo != nil { - fmt.Fprintln(f, eventInfo.Path) - - var line []string - if fserver.Readable { - line = []string{eventInfo.Path, display.ByteToReadableIEC(eventInfo.Size), display.MicroSecondToReadable(eventInfo.Elapsed)} - } else { - line = []string{eventInfo.Path, fmt.Sprint(eventInfo.Size), fmt.Sprint(eventInfo.Elapsed)} - } - if err := csvWriter.Write(line); err != nil { - return errors.Wrapf(err, "failed to write csv") - } - csvWriter.Flush() - } - } - - return nil -} - -func (fserver *Server) StopServer() { - if fserver.Cmd != nil { - logrus.Infof("Send SIGTERM signal to process group %d", fserver.Cmd.Process.Pid) - if err := syscall.Kill(-fserver.Cmd.Process.Pid, syscall.SIGTERM); err != nil { - logrus.WithError(err).Errorf("Stop process group %d failed!", fserver.Cmd.Process.Pid) - } - if _, err := fserver.Cmd.Process.Wait(); err != nil { - logrus.WithError(err).Errorf("Failed to wait for fanotify server") - } - } -} diff --git a/pkg/fanotify/conn/conn.go b/pkg/optimizer/fanotify/conn/conn.go similarity index 100% rename from pkg/fanotify/conn/conn.go rename to pkg/optimizer/fanotify/conn/conn.go diff --git a/pkg/optimizer/fanotify/fanotify.go b/pkg/optimizer/fanotify/fanotify.go new file mode 100644 index 0000000000..6d3fb8863e --- /dev/null +++ b/pkg/optimizer/fanotify/fanotify.go @@ -0,0 +1,255 @@ +/* + * Copyright (c) 2023. Nydus Developers. All rights reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package fanotify + +import ( + "bufio" + "bytes" + "encoding/csv" + "encoding/json" + "fmt" + "io" + "log/syslog" + "net/http" + "os" + "os/exec" + "strings" + "sync" + "syscall" + "time" + + "github.com/containerd/nydus-snapshotter/pkg/optimizer/fanotify/conn" + "github.com/containerd/nydus-snapshotter/pkg/utils/display" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +type Server struct { + BinaryPath string + ContainerPid uint32 + ImageName string + PersistFile *os.File + PersistCSVFile *os.File + Readable bool + Overwrite bool + Timeout time.Duration + Client *conn.Client + Cmd *exec.Cmd + LogWriter *syslog.Writer + ContainerName string + ImageRepo string + IsSent bool + PrefetchPath string + PrefetchDistributionEndpoint string + Mutex sync.Mutex +} + +func NewServer(binaryPath string, containerPid uint32, imageName string, file *os.File, csvFile *os.File, readable bool, overwrite bool, timeout time.Duration, logWriter *syslog.Writer, containerName string, imageRepo string, hasSentPrefetchList bool, prefetchPath string, prefetchDistributionEndpoint string) *Server { + return &Server{ + BinaryPath: binaryPath, + ContainerPid: containerPid, + ImageName: imageName, + PersistFile: file, + PersistCSVFile: csvFile, + Readable: readable, + Overwrite: overwrite, + Timeout: timeout, + LogWriter: logWriter, + ContainerName: containerName, + ImageRepo: imageRepo, + IsSent: hasSentPrefetchList, + PrefetchPath: prefetchPath, + PrefetchDistributionEndpoint: prefetchDistributionEndpoint, + } +} + +func (fserver *Server) Start() error { + cmd := exec.Command(fserver.BinaryPath) + cmd.SysProcAttr = &syscall.SysProcAttr{ + Cloneflags: syscall.CLONE_NEWNS, + Setpgid: true, + } + cmd.Env = append(cmd.Env, "_MNTNS_PID="+fmt.Sprint(fserver.ContainerPid)) + cmd.Env = append(cmd.Env, "_TARGET=/") + cmd.Stderr = fserver.LogWriter + + notifyR, err := cmd.StdoutPipe() + if err != nil { + return err + } + fserver.Client = &conn.Client{ + Reader: bufio.NewReader(notifyR), + } + + if err := cmd.Start(); err != nil { + return err + } + fserver.Cmd = cmd + + go func() { + if err := fserver.Receive(); err != nil { + logrus.WithError(err).Errorf("Failed to receive event information from server") + } + }() + + go func() { + time.Sleep(10 * time.Minute) + fserver.Mutex.Lock() + if !fserver.IsSent { + data, err := getPrefetchListfromLocal(fserver.PrefetchPath) + if err != nil { + logrus.WithError(err).Error("error reading file") + } + if err = sendToServer(fserver.ImageRepo, fserver.ContainerName, fserver.PrefetchDistributionEndpoint, data); err != nil { + logrus.WithError(err).Error("failed to send prefetch to http server") + } + fserver.IsSent = true + } + fserver.Mutex.Unlock() + }() + + if fserver.Timeout > 0 { + go func() { + time.Sleep(fserver.Timeout) + fserver.Stop() + }() + } + + return nil +} + +func (fserver *Server) Receive() error { + defer fserver.PersistFile.Close() + defer fserver.PersistCSVFile.Close() + + csvWriter := csv.NewWriter(fserver.PersistCSVFile) + if err := csvWriter.Write([]string{"path", "size", "elapsed"}); err != nil { + return errors.Wrapf(err, "failed to write csv header") + } + csvWriter.Flush() + + for { + eventInfo, err := fserver.Client.GetEventInfo() + if err != nil { + if err == io.EOF { + logrus.Infoln("Get EOF from fanotify server, break event receiver") + break + } + return fmt.Errorf("failed to get event information: %v", err) + } + + if eventInfo != nil { + fmt.Fprintln(fserver.PersistFile, eventInfo.Path) + + var line []string + if fserver.Readable { + line = []string{eventInfo.Path, display.ByteToReadableIEC(eventInfo.Size), display.MicroSecondToReadable(eventInfo.Elapsed)} + } else { + line = []string{eventInfo.Path, fmt.Sprint(eventInfo.Size), fmt.Sprint(eventInfo.Elapsed)} + } + if err := csvWriter.Write(line); err != nil { + return errors.Wrapf(err, "failed to write csv") + } + csvWriter.Flush() + } + } + + return nil +} + +func (fserver *Server) Stop() { + fserver.Mutex.Lock() + if !fserver.IsSent { + data, err := getPrefetchListfromLocal(fserver.PrefetchPath) + if err != nil { + logrus.WithError(err).Errorf("failed to read prefetch files from local") + } + if err = sendToServer(fserver.ImageRepo, fserver.ContainerName, fserver.PrefetchDistributionEndpoint, data); err != nil { + logrus.WithError(err).Errorf("failed to send prefetch list to http server") + } + fserver.IsSent = true + } + fserver.Mutex.Unlock() + + if fserver.Cmd != nil { + logrus.Infof("Send SIGTERM signal to process group %d", fserver.Cmd.Process.Pid) + if err := syscall.Kill(-fserver.Cmd.Process.Pid, syscall.SIGTERM); err != nil { + logrus.WithError(err).Errorf("Stop process group %d failed!", fserver.Cmd.Process.Pid) + } + if _, err := fserver.Cmd.Process.Wait(); err != nil { + logrus.WithError(err).Errorf("Failed to wait for fanotify server") + } + } +} + +type CacheItem struct { + ImageName string + ContainerName string + PrefetchFiles []string +} + +type Cache struct { + Items map[string]*CacheItem +} + +func getPrefetchListfromLocal(prefetchPath string) ([]byte, error) { + data, err := os.ReadFile(prefetchPath) + if err != nil { + return nil, err + } + return data, nil +} + +func sendToServer(imageName, containerName, serverURL string, data []byte) error { + filePaths := strings.Split(string(data), "\n") + + var prefetchFiles []string + for _, path := range filePaths { + if path != "" { + prefetchFiles = append(prefetchFiles, path) + } + } + + item := CacheItem{ + ImageName: imageName, + ContainerName: containerName, + PrefetchFiles: prefetchFiles, + } + + err := postRequest(item, serverURL) + if err != nil { + return errors.Wrap(err, "error uploading to server") + } + + return nil +} + +func postRequest(item CacheItem, endpoint string) error { + data, err := json.Marshal(item) + if err != nil { + return err + } + + resp, err := http.Post(endpoint, "application/json", bytes.NewBuffer(data)) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return errors.Wrap(fmt.Errorf("post to server returned a non-OK status code: %d", resp.StatusCode), "HTTP Status Error") + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return errors.Wrap(err, "failed to read response body") + } + + logrus.Info("Server Response:", string(body)) + + return nil +} diff --git a/pkg/optimizer/optimizer.go b/pkg/optimizer/optimizer.go new file mode 100644 index 0000000000..884e86e9ba --- /dev/null +++ b/pkg/optimizer/optimizer.go @@ -0,0 +1,114 @@ +package optimizer + +import ( + "fmt" + "log/syslog" + "os" + "path/filepath" + "time" + + "github.com/containerd/containerd/reference/docker" + "github.com/containerd/nri/pkg/api" + "github.com/containerd/nydus-snapshotter/pkg/optimizer/fanotify" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +type Server interface { + Start() error + Stop() +} + +const ( + imageNameLabel = "io.kubernetes.cri.image-name" + defaultPostEndpoint = "/api/v1/prefetch/upload" +) + +const ( + FANOTIFY = "fanotify" +) + +type Config struct { + ServerType string `toml:"server_type"` + ServerPath string `toml:"server_path"` + PersistDir string `toml:"persist_dir"` + Readable bool `toml:"readable"` + Timeout int `toml:"timeout"` + Overwrite bool `toml:"overwrite"` + PrefetchDistributionURL string `toml:"prefetch_distribution_url"` +} + +func GetImageName(annotations map[string]string) (string, string, string, error) { + named, err := docker.ParseDockerRef(annotations[imageNameLabel]) + if err != nil { + return "", "", "", err + } + imageRepo := docker.Named.String(named) + nameTagged := named.(docker.NamedTagged) + repo := docker.Path(nameTagged) + + dir := filepath.Dir(repo) + image := filepath.Base(repo) + + imageName := image + ":" + nameTagged.Tag() + + return dir, imageName, imageRepo, nil +} + +func getPersistPath(cfg Config, dir, imageName string) (string, error) { + persistDir := filepath.Join(cfg.PersistDir, dir) + if err := os.MkdirAll(persistDir, os.ModePerm); err != nil { + return "", err + } + + persistFile := filepath.Join(persistDir, imageName) + if cfg.Timeout > 0 { + persistFile = fmt.Sprintf("%s.timeout%ds", persistFile, cfg.Timeout) + } + + return persistFile, nil +} + +func getPersistFile(persistFile string) (*os.File, *os.File, error) { + f, err := os.OpenFile(persistFile, os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return nil, nil, errors.Wrapf(err, "failed to open file %q", persistFile) + } + + persistCsvFile := fmt.Sprintf("%s.csv", persistFile) + fCsv, err := os.Create(persistCsvFile) + if err != nil { + return nil, nil, errors.Wrapf(err, "failed to create file %q", persistCsvFile) + } + + return f, fCsv, nil +} + +func NewServer(cfg Config, container *api.Container, logWriter *syslog.Writer) (string, Server, error) { + dir, imageName, imageRepo, err := GetImageName(container.Annotations) + if err != nil { + return "", nil, err + } + containerName := container.Name + var hasSentPrefetchList = false + persistPath, err := getPersistPath(cfg, dir, imageName) + if err != nil { + return "", nil, err + } + + prefetchDistributionEndpoint := fmt.Sprintf("%s%s", cfg.PrefetchDistributionURL, defaultPostEndpoint) + + if !cfg.Overwrite { + if file, err := os.Stat(persistPath); err == nil && !file.IsDir() { + return imageName, nil, nil + } + } + + file, csvFile, err := getPersistFile(persistPath) + if err != nil { + return "", nil, err + } + + logrus.Infof("start optimizer server for %s, image: %s, persist file: %s", container.Id, imageName, persistPath) + return imageName, fanotify.NewServer(cfg.ServerPath, container.Pid, imageName, file, csvFile, cfg.Readable, cfg.Overwrite, time.Duration(cfg.Timeout)*time.Second, logWriter, containerName, imageRepo, hasSentPrefetchList, persistPath, prefetchDistributionEndpoint), nil +} diff --git a/tools/prefetch-distribution/main.go b/tools/prefetch-distribution/main.go new file mode 100644 index 0000000000..3a2a3c0e2c --- /dev/null +++ b/tools/prefetch-distribution/main.go @@ -0,0 +1,151 @@ +/* +* Copyright (c) 2023. Nydus Developers. All rights reserved. +* +* SPDX-License-Identifier: Apache-2.0 + */ +package main + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "sync" + + "github.com/labstack/echo/v4" + "github.com/pkg/errors" +) + +type LRUItem struct { + CacheItem *CacheItem + Prev *LRUItem + Next *LRUItem +} + +type CacheItem struct { + ImageName string + ContainerName string + PrefetchFiles []string +} + +type Cache struct { + Items map[string]*LRUItem + Head *LRUItem + Tail *LRUItem + MaxSize int + mutex sync.Mutex +} + +func (cache *Cache) key(imageName, containerName string) string { + return fmt.Sprintf("%s,%s", imageName, containerName) +} + +func (cache *Cache) Get(imageName string) ([]string, error) { + cache.mutex.Lock() + defer cache.mutex.Unlock() + + var latestItem *LRUItem + + for _, lruItem := range cache.Items { + if lruItem.CacheItem.ImageName == imageName { + latestItem = lruItem + break + } + } + + if latestItem == nil { + return nil, errors.New("item not found in cache") + } + + cache.removeNode(latestItem) + cache.addtoHead(cache.Head, latestItem) + + return latestItem.CacheItem.PrefetchFiles, nil +} + +func (cache *Cache) Set(item CacheItem) { + cache.mutex.Lock() + defer cache.mutex.Unlock() + + key := cache.key(item.ImageName, item.ContainerName) + if lruItem, exists := cache.Items[key]; exists { + cache.removeNode(lruItem) + cache.addtoHead(cache.Head, lruItem) + + lruItem.CacheItem = &item + } else { + newLRUItem := &LRUItem{ + CacheItem: &item, + } + + cache.addtoHead(cache.Head, newLRUItem) + + cache.Items[key] = newLRUItem + + if len(cache.Items) > cache.MaxSize { + tail := cache.Tail.Prev + cache.removeNode(tail) + delete(cache.Items, cache.key(tail.CacheItem.ImageName, tail.CacheItem.ContainerName)) + } + } +} + +func (cache *Cache) removeNode(lruItem *LRUItem) { + lruItem.Prev.Next = lruItem.Next + lruItem.Next.Prev = lruItem.Prev +} + +func (cache *Cache) addtoHead(head, lruItem *LRUItem) { + next := head.Next + head.Next = lruItem + lruItem.Prev = head + lruItem.Next = next + next.Prev = lruItem +} + +var serverCache Cache + +func uploadHandler(c echo.Context) error { + var item CacheItem + body, err := io.ReadAll(c.Request().Body) + if err != nil { + return c.String(http.StatusBadRequest, "Failed to read request body") + } + err = json.Unmarshal(body, &item) + if err != nil { + return c.String(http.StatusBadRequest, "Invalid request payload") + } + + serverCache.Set(item) + return c.String(http.StatusOK, fmt.Sprintf("Uploaded CacheItem for %s, %s successfully", item.ImageName, item.ContainerName)) +} + +func downloadHandler(c echo.Context) error { + imageName := c.QueryParam("imageName") + + item, err := serverCache.Get(imageName) + if err != nil { + return c.String(http.StatusNotFound, "CacheItem not found") + } + + return c.JSON(http.StatusOK, item) + +} + +func main() { + head, tail := &LRUItem{}, &LRUItem{} + head.Next = tail + tail.Prev = head + serverCache = Cache{ + Items: make(map[string]*LRUItem), + Head: head, + Tail: tail, + MaxSize: 1000, + } + + e := echo.New() + e.POST("/api/v1/prefetch/upload", uploadHandler) + e.GET("/api/v1/prefetch", downloadHandler) + + e.Logger.Fatal(e.Start(":1323")) +}