diff --git a/cmd/prefetchfiles-nri-plugin/main.go b/cmd/prefetchfiles-nri-plugin/main.go new file mode 100644 index 0000000000..6e638e4117 --- /dev/null +++ b/cmd/prefetchfiles-nri-plugin/main.go @@ -0,0 +1,204 @@ +/* +* Copyright (c) 2023. Nydus Developers. All rights reserved. +* +* SPDX-License-Identifier: Apache-2.0 + */ + +package main + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "log/syslog" + "net" + "net/http" + "os" + + "github.com/containerd/nydus-snapshotter/config" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "github.com/urfave/cli/v2" + + "github.com/containerd/nri/pkg/api" + "github.com/containerd/nri/pkg/stub" + "github.com/containerd/nydus-snapshotter/pkg/errdefs" + "github.com/containerd/nydus-snapshotter/version" +) + +const ( + endpointPrefetch = "/api/v1/prefetch" + defaultEvents = "RunPodSandbox" + defaultSystemControllerAddress = "/run/containerd-nydus/system.sock" + nydusPrefetchAnnotation = "containerd.io/snapshot/nydus-prefetch" + imageAddressAnnotation = "containerd.io/snapshot/nydus-image-address" +) + +type PluginArgs struct { + PluginName string + PluginIdx string + SocketAddress string +} + +type Flags struct { + Args *PluginArgs + Flag []cli.Flag +} + +func buildFlags(args *PluginArgs) []cli.Flag { + return []cli.Flag{ + &cli.StringFlag{ + Name: "name", + Usage: "plugin name to register to NRI", + Destination: &args.PluginName, + }, + &cli.StringFlag{ + Name: "idx", + Usage: "plugin index to register to NRI", + Destination: &args.PluginIdx, + }, + &cli.StringFlag{ + Name: "socket-addr", + Value: defaultSystemControllerAddress, + Usage: "default unix domain socket address", + Destination: &args.SocketAddress, + }, + } +} + +func NewPluginFlags() *Flags { + var args PluginArgs + return &Flags{ + Args: &args, + Flag: buildFlags(&args), + } +} + +type plugin struct { + stub stub.Stub + mask stub.EventMask +} + +var ( + globalSocket string + log *logrus.Logger + logWriter *syslog.Writer +) + +func sendDataOverHTTP(data config.PrefetchMessage, endpoint, sock string) error { + url := fmt.Sprintf("http://unix%s", endpoint) + + body, err := json.Marshal(data) + if err != nil { + return err + } + + req, err := http.NewRequest(http.MethodPut, url, bytes.NewBuffer(body)) + if err != nil { + return err + } + + client := &http.Client{ + Transport: &http.Transport{ + DialContext: func(_ context.Context, _, _ string) (net.Conn, error) { + return net.Dial("unix", sock) + }, + }, + } + resp, err := client.Do(req) + if err != nil { + return err + } + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("failed to send data, status code: %d", resp.StatusCode) + } + defer resp.Body.Close() + + return nil +} + +func (p *plugin) RunPodSandbox(pod *api.PodSandbox) error { + prefetchList, ok := pod.Annotations[nydusPrefetchAnnotation] + if !ok { + return nil + } + imageAddress, ok := pod.Annotations[imageAddressAnnotation] + if !ok { + return nil + } + msg := config.PrefetchMessage{ + ImageAddress: imageAddress, + PrefetchFiles: prefetchList, + } + + err := sendDataOverHTTP(msg, endpointPrefetch, globalSocket) + if err != nil { + log.Errorf("failed to send data: %v\n", err) + return err + } + return nil +} + +func main() { + + flags := NewPluginFlags() + app := &cli.App{ + Name: "prefetch-nri-plugin", + Usage: "NRI plugin for obtaining and transmitting prefetch files path", + Version: version.Version, + Flags: flags.Flag, + HideVersion: true, + Action: func(c *cli.Context) error { + var ( + opts []stub.Option + err error + ) + + globalSocket = flags.Args.SocketAddress + + log = logrus.StandardLogger() + log.SetFormatter(&logrus.TextFormatter{ + PadLevelText: true, + }) + logWriter, err = syslog.New(syslog.LOG_INFO, "prefetch-nri-plugin") + + if err == nil { + log.SetOutput(io.MultiWriter(os.Stdout, logWriter)) + } + + if flags.Args.PluginName != "" { + opts = append(opts, stub.WithPluginName(flags.Args.PluginName)) + } + if flags.Args.PluginIdx != "" { + opts = append(opts, stub.WithPluginIdx(flags.Args.PluginIdx)) + } + + p := &plugin{} + + if p.mask, err = api.ParseEventMask(defaultEvents); err != nil { + log.Fatalf("failed to parse events: %v", err) + } + + if p.stub, err = stub.New(p, opts...); err != nil { + log.Fatalf("failed to create plugin stub: %v", err) + } + + err = p.stub.Run(context.Background()) + if err != nil { + return errors.Wrap(err, "plugin exited") + } + return nil + }, + } + if err := app.Run(os.Args); err != nil { + + if errdefs.IsConnectionClosed(err) { + log.Info("prefetch NRI plugin exited") + } else { + log.WithError(err).Fatal("failed to start prefetch NRI plugin") + } + } +} diff --git a/config/global.go b/config/global.go index 8c0f1a16ac..9359d82a1b 100644 --- a/config/global.go +++ b/config/global.go @@ -10,6 +10,7 @@ package config import ( + "encoding/json" "os" "path/filepath" "time" @@ -37,6 +38,16 @@ type GlobalConfig struct { DaemonThreadsNum int CacheGCPeriod time.Duration MirrorsConfig MirrorsConfig + /// + PrefetchFiles PrefetchMessage +} +type PrefetchMessage struct { + ImageAddress string `json:"image_address"` + PrefetchFiles string `json:"prefetch_files"` +} + +func GetPrefetchFiles() PrefetchMessage { + return globalConfig.PrefetchFiles } func GetDaemonMode() DaemonMode { @@ -103,6 +114,15 @@ func GetDaemonProfileCPUDuration() int64 { return globalConfig.origin.SystemControllerConfig.DebugConfig.ProfileDuration } +func SetPrefetchFiles(prefetchMsg PrefetchMessage, body []byte) error { + if err := json.Unmarshal(body, &prefetchMsg); err != nil { + return err + } + globalConfig.PrefetchFiles = prefetchMsg + log.L.Infof("received prefetch list from nri plugin: %v ", globalConfig.PrefetchFiles) + return nil +} + func ProcessConfigurations(c *SnapshotterConfig) error { if c.LoggingConfig.LogDir == "" { c.LoggingConfig.LogDir = filepath.Join(c.Root, logging.DefaultLogDirName) diff --git a/pkg/daemon/command/command.go b/pkg/daemon/command/command.go index 0c341a2102..cdcd698d28 100644 --- a/pkg/daemon/command/command.go +++ b/pkg/daemon/command/command.go @@ -26,14 +26,15 @@ type DaemonCommand struct { Upgrade bool `type:"flag" name:"upgrade" default:""` ThreadNum string `type:"param" name:"thread-num"` // `--id` is required by `--supervisor` when starting nydusd - ID string `type:"param" name:"id"` - Config string `type:"param" name:"config"` - Bootstrap string `type:"param" name:"bootstrap"` - Mountpoint string `type:"param" name:"mountpoint"` - APISock string `type:"param" name:"apisock"` - LogLevel string `type:"param" name:"log-level"` - Supervisor string `type:"param" name:"supervisor"` - LogFile string `type:"param" name:"log-file"` + ID string `type:"param" name:"id"` + Config string `type:"param" name:"config"` + Bootstrap string `type:"param" name:"bootstrap"` + Mountpoint string `type:"param" name:"mountpoint"` + APISock string `type:"param" name:"apisock"` + LogLevel string `type:"param" name:"log-level"` + Supervisor string `type:"param" name:"supervisor"` + LogFile string `type:"param" name:"log-file"` + PrefetchFiles string `type:"param" name:"prefetch-files"` } // Build exec style command line @@ -104,6 +105,12 @@ func WithMode(m string) Opt { } } +func WithPrefetchFiles(p string) Opt { + return func(cmd *DaemonCommand) { + cmd.PrefetchFiles = p + } +} + func WithFscacheDriver(w string) Opt { return func(cmd *DaemonCommand) { cmd.FscacheDriver = w diff --git a/pkg/daemon/config.go b/pkg/daemon/config.go index ac50b9b28d..20b8969919 100644 --- a/pkg/daemon/config.go +++ b/pkg/daemon/config.go @@ -101,3 +101,10 @@ func WithDaemonMode(daemonMode config.DaemonMode) NewDaemonOpt { return nil } } + +func WithNydusdPrefetchFiles(prefetchList string) NewDaemonOpt { + return func(d *Daemon) error { + d.States.PrefetchFiles = prefetchList + return nil + } +} diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index 43c2094a95..cadd030688 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -53,6 +53,7 @@ type States struct { // Where the configuration file resides, all rafs instances share the same configuration template ConfigDir string SupervisorPath string + PrefetchFiles string } // TODO: Record queried nydusd state diff --git a/pkg/filesystem/fs.go b/pkg/filesystem/fs.go index 9ed3c79dea..2388dbac2c 100644 --- a/pkg/filesystem/fs.go +++ b/pkg/filesystem/fs.go @@ -12,6 +12,7 @@ package filesystem import ( "context" + "os" "path" @@ -499,6 +500,10 @@ func (fs *Filesystem) createDaemon(mountpoint string, ref int32) (d *daemon.Daem opts = append(opts, daemon.WithMountpoint(mountpoint)) } + if config.GetPrefetchFiles().PrefetchFiles != "" { + opts = append(opts, daemon.WithNydusdPrefetchFiles(config.GetPrefetchFiles().PrefetchFiles)) + } + d, err = daemon.NewDaemon(opts...) if err != nil { return nil, errors.Wrapf(err, "new daemon") diff --git a/pkg/manager/daemon_adaptor.go b/pkg/manager/daemon_adaptor.go index d4fd7f1369..0fcf2fe2e6 100644 --- a/pkg/manager/daemon_adaptor.go +++ b/pkg/manager/daemon_adaptor.go @@ -12,9 +12,8 @@ import ( "strings" "time" - "github.com/pkg/errors" - "github.com/containerd/containerd/log" + "github.com/pkg/errors" "github.com/containerd/nydus-snapshotter/config" "github.com/containerd/nydus-snapshotter/pkg/daemon" @@ -153,6 +152,10 @@ func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool) command.WithID(d.ID())) } + if d.States.PrefetchFiles != "" { + cmdOpts = append(cmdOpts, command.WithPrefetchFiles(d.States.PrefetchFiles)) + } + cmdOpts = append(cmdOpts, command.WithLogLevel(d.States.LogLevel), command.WithAPISock(d.GetAPISock())) diff --git a/pkg/system/system.go b/pkg/system/system.go index 849d8dc532..d6f65c9050 100644 --- a/pkg/system/system.go +++ b/pkg/system/system.go @@ -9,6 +9,7 @@ package system import ( "encoding/json" "fmt" + "io" "net" "net/http" "os" @@ -19,6 +20,8 @@ import ( "strings" "time" + "github.com/containerd/nydus-snapshotter/config" + "github.com/gorilla/mux" "github.com/pkg/errors" @@ -39,6 +42,7 @@ const ( // it's very helpful to check daemon's record in database. endpointDaemonRecords string = "/api/v1/daemons/records" endpointDaemonsUpgrade string = "/api/v1/daemons/upgrade" + endpointPrefetch string = "/api/v1/prefetch" ) const defaultErrorCode string = "Unknown" @@ -167,6 +171,22 @@ func (sc *Controller) registerRouter() { sc.router.HandleFunc(endpointDaemons, sc.describeDaemons()).Methods(http.MethodGet) sc.router.HandleFunc(endpointDaemonsUpgrade, sc.upgradeDaemons()).Methods(http.MethodPut) sc.router.HandleFunc(endpointDaemonRecords, sc.getDaemonRecords()).Methods(http.MethodGet) + sc.router.HandleFunc(endpointPrefetch, sc.setPrefetchConfiguration()).Methods(http.MethodPut) +} + +func (sc *Controller) setPrefetchConfiguration() func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + var Msg config.PrefetchMessage + body, err := io.ReadAll(r.Body) + if err != nil { + log.L.Errorf("Failed to read prefetch list: %v", err) + return + } + if err = config.SetPrefetchFiles(Msg, body); err != nil { + log.L.Errorf("Failed to parse request body: %v", err) + return + } + } } func (sc *Controller) describeDaemons() func(w http.ResponseWriter, r *http.Request) {