Skip to content

Commit

Permalink
[WIP]nri-plugin: Modify nri plugin r3 & transmit prefetchfiles to sna…
Browse files Browse the repository at this point in the history
…pshotter
  • Loading branch information
billie60 committed Jul 10, 2023
1 parent 88ccb52 commit e311448
Show file tree
Hide file tree
Showing 8 changed files with 277 additions and 10 deletions.
204 changes: 204 additions & 0 deletions cmd/prefetchfiles-nri-plugin/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* Copyright (c) 2023. Nydus Developers. All rights reserved.
*
* SPDX-License-Identifier: Apache-2.0
*/

package main

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log/syslog"
"net"
"net/http"
"os"

"github.com/containerd/nydus-snapshotter/config"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"

"github.com/containerd/nri/pkg/api"
"github.com/containerd/nri/pkg/stub"
"github.com/containerd/nydus-snapshotter/pkg/errdefs"
"github.com/containerd/nydus-snapshotter/version"
)

const (
endpointPrefetch = "/api/v1/prefetch"
defaultEvents = "RunPodSandbox"
defaultSystemControllerAddress = "/run/containerd-nydus/system.sock"
nydusPrefetchAnnotation = "containerd.io/snapshot/nydus-prefetch"
imageAddressAnnotation = "containerd.io/snapshot/nydus-image-address"
)

type PluginArgs struct {
PluginName string
PluginIdx string
SocketAddress string
}

type Flags struct {
Args *PluginArgs
Flag []cli.Flag
}

func buildFlags(args *PluginArgs) []cli.Flag {
return []cli.Flag{
&cli.StringFlag{
Name: "name",
Usage: "plugin name to register to NRI",
Destination: &args.PluginName,
},
&cli.StringFlag{
Name: "idx",
Usage: "plugin index to register to NRI",
Destination: &args.PluginIdx,
},
&cli.StringFlag{
Name: "socket-addr",
Value: defaultSystemControllerAddress,
Usage: "default unix domain socket address",
Destination: &args.SocketAddress,
},
}
}

func NewPluginFlags() *Flags {
var args PluginArgs
return &Flags{
Args: &args,
Flag: buildFlags(&args),
}
}

type plugin struct {
stub stub.Stub
mask stub.EventMask
}

var (
globalSocket string
log *logrus.Logger
logWriter *syslog.Writer
)

func sendDataOverHTTP(data config.PrefetchMessage, endpoint, sock string) error {
url := fmt.Sprintf("http://unix%s", endpoint)

body, err := json.Marshal(data)
if err != nil {
return err
}

req, err := http.NewRequest(http.MethodPut, url, bytes.NewBuffer(body))
if err != nil {
return err
}

client := &http.Client{
Transport: &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", sock)
},
},
}
resp, err := client.Do(req)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("failed to send data, status code: %d", resp.StatusCode)
}
defer resp.Body.Close()

return nil
}

func (p *plugin) RunPodSandbox(pod *api.PodSandbox) error {
prefetchList, ok := pod.Annotations[nydusPrefetchAnnotation]
if !ok {
return nil
}
imageAddress, ok := pod.Annotations[imageAddressAnnotation]
if !ok {
return nil
}
msg := config.PrefetchMessage{
ImageAddress: imageAddress,
PrefetchFiles: prefetchList,
}

err := sendDataOverHTTP(msg, endpointPrefetch, globalSocket)
if err != nil {
log.Errorf("failed to send data: %v\n", err)
return err
}
return nil
}

func main() {

flags := NewPluginFlags()
app := &cli.App{
Name: "prefetch-nri-plugin",
Usage: "NRI plugin for obtaining and transmitting prefetch files path",
Version: version.Version,
Flags: flags.Flag,
HideVersion: true,
Action: func(c *cli.Context) error {
var (
opts []stub.Option
err error
)

globalSocket = flags.Args.SocketAddress

log = logrus.StandardLogger()
log.SetFormatter(&logrus.TextFormatter{
PadLevelText: true,
})
logWriter, err = syslog.New(syslog.LOG_INFO, "prefetch-nri-plugin")

if err == nil {
log.SetOutput(io.MultiWriter(os.Stdout, logWriter))
}

if flags.Args.PluginName != "" {
opts = append(opts, stub.WithPluginName(flags.Args.PluginName))
}
if flags.Args.PluginIdx != "" {
opts = append(opts, stub.WithPluginIdx(flags.Args.PluginIdx))
}

p := &plugin{}

if p.mask, err = api.ParseEventMask(defaultEvents); err != nil {
log.Fatalf("failed to parse events: %v", err)
}

if p.stub, err = stub.New(p, opts...); err != nil {
log.Fatalf("failed to create plugin stub: %v", err)
}

err = p.stub.Run(context.Background())
if err != nil {
return errors.Wrap(err, "plugin exited")
}
return nil
},
}
if err := app.Run(os.Args); err != nil {

if errdefs.IsConnectionClosed(err) {
log.Info("prefetch NRI plugin exited")
} else {
log.WithError(err).Fatal("failed to start prefetch NRI plugin")
}
}
}
20 changes: 20 additions & 0 deletions config/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package config

import (
"encoding/json"
"os"
"path/filepath"
"time"
Expand Down Expand Up @@ -37,6 +38,16 @@ type GlobalConfig struct {
DaemonThreadsNum int
CacheGCPeriod time.Duration
MirrorsConfig MirrorsConfig
///
PrefetchFiles PrefetchMessage
}
type PrefetchMessage struct {
ImageAddress string `json:"image_address"`
PrefetchFiles string `json:"prefetch_files"`
}

func GetPrefetchFiles() PrefetchMessage {
return globalConfig.PrefetchFiles
}

func GetDaemonMode() DaemonMode {
Expand Down Expand Up @@ -103,6 +114,15 @@ func GetDaemonProfileCPUDuration() int64 {
return globalConfig.origin.SystemControllerConfig.DebugConfig.ProfileDuration
}

func SetPrefetchFiles(prefetchMsg PrefetchMessage, body []byte) error {
if err := json.Unmarshal(body, &prefetchMsg); err != nil {
return err
}
globalConfig.PrefetchFiles = prefetchMsg
log.L.Infof("received prefetch list from nri plugin: %v ", globalConfig.PrefetchFiles)
return nil
}

func ProcessConfigurations(c *SnapshotterConfig) error {
if c.LoggingConfig.LogDir == "" {
c.LoggingConfig.LogDir = filepath.Join(c.Root, logging.DefaultLogDirName)
Expand Down
23 changes: 15 additions & 8 deletions pkg/daemon/command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ type DaemonCommand struct {
Upgrade bool `type:"flag" name:"upgrade" default:""`
ThreadNum string `type:"param" name:"thread-num"`
// `--id` is required by `--supervisor` when starting nydusd
ID string `type:"param" name:"id"`
Config string `type:"param" name:"config"`
Bootstrap string `type:"param" name:"bootstrap"`
Mountpoint string `type:"param" name:"mountpoint"`
APISock string `type:"param" name:"apisock"`
LogLevel string `type:"param" name:"log-level"`
Supervisor string `type:"param" name:"supervisor"`
LogFile string `type:"param" name:"log-file"`
ID string `type:"param" name:"id"`
Config string `type:"param" name:"config"`
Bootstrap string `type:"param" name:"bootstrap"`
Mountpoint string `type:"param" name:"mountpoint"`
APISock string `type:"param" name:"apisock"`
LogLevel string `type:"param" name:"log-level"`
Supervisor string `type:"param" name:"supervisor"`
LogFile string `type:"param" name:"log-file"`
PrefetchFiles string `type:"param" name:"prefetch-files"`
}

// Build exec style command line
Expand Down Expand Up @@ -104,6 +105,12 @@ func WithMode(m string) Opt {
}
}

func WithPrefetchFiles(p string) Opt {
return func(cmd *DaemonCommand) {
cmd.PrefetchFiles = p
}
}

func WithFscacheDriver(w string) Opt {
return func(cmd *DaemonCommand) {
cmd.FscacheDriver = w
Expand Down
7 changes: 7 additions & 0 deletions pkg/daemon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,10 @@ func WithDaemonMode(daemonMode config.DaemonMode) NewDaemonOpt {
return nil
}
}

func WithNydusdPrefetchFiles(prefetchList string) NewDaemonOpt {
return func(d *Daemon) error {
d.States.PrefetchFiles = prefetchList
return nil
}
}
1 change: 1 addition & 0 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type States struct {
// Where the configuration file resides, all rafs instances share the same configuration template
ConfigDir string
SupervisorPath string
PrefetchFiles string
}

// TODO: Record queried nydusd state
Expand Down
5 changes: 5 additions & 0 deletions pkg/filesystem/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package filesystem

import (
"context"

"os"
"path"

Expand Down Expand Up @@ -499,6 +500,10 @@ func (fs *Filesystem) createDaemon(mountpoint string, ref int32) (d *daemon.Daem
opts = append(opts, daemon.WithMountpoint(mountpoint))
}

if config.GetPrefetchFiles().PrefetchFiles != "" {
opts = append(opts, daemon.WithNydusdPrefetchFiles(config.GetPrefetchFiles().PrefetchFiles))
}

d, err = daemon.NewDaemon(opts...)
if err != nil {
return nil, errors.Wrapf(err, "new daemon")
Expand Down
7 changes: 5 additions & 2 deletions pkg/manager/daemon_adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ import (
"strings"
"time"

"github.com/pkg/errors"

"github.com/containerd/containerd/log"
"github.com/pkg/errors"

"github.com/containerd/nydus-snapshotter/config"
"github.com/containerd/nydus-snapshotter/pkg/daemon"
Expand Down Expand Up @@ -153,6 +152,10 @@ func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool)
command.WithID(d.ID()))
}

if d.States.PrefetchFiles != "" {
cmdOpts = append(cmdOpts, command.WithPrefetchFiles(d.States.PrefetchFiles))
}

cmdOpts = append(cmdOpts,
command.WithLogLevel(d.States.LogLevel),
command.WithAPISock(d.GetAPISock()))
Expand Down
20 changes: 20 additions & 0 deletions pkg/system/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package system
import (
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"os"
Expand All @@ -19,6 +20,8 @@ import (
"strings"
"time"

"github.com/containerd/nydus-snapshotter/config"

"github.com/gorilla/mux"
"github.com/pkg/errors"

Expand All @@ -39,6 +42,7 @@ const (
// it's very helpful to check daemon's record in database.
endpointDaemonRecords string = "/api/v1/daemons/records"
endpointDaemonsUpgrade string = "/api/v1/daemons/upgrade"
endpointPrefetch string = "/api/v1/prefetch"
)

const defaultErrorCode string = "Unknown"
Expand Down Expand Up @@ -167,6 +171,22 @@ func (sc *Controller) registerRouter() {
sc.router.HandleFunc(endpointDaemons, sc.describeDaemons()).Methods(http.MethodGet)
sc.router.HandleFunc(endpointDaemonsUpgrade, sc.upgradeDaemons()).Methods(http.MethodPut)
sc.router.HandleFunc(endpointDaemonRecords, sc.getDaemonRecords()).Methods(http.MethodGet)
sc.router.HandleFunc(endpointPrefetch, sc.setPrefetchConfiguration()).Methods(http.MethodPut)
}

func (sc *Controller) setPrefetchConfiguration() func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
var Msg config.PrefetchMessage
body, err := io.ReadAll(r.Body)
if err != nil {
log.L.Errorf("Failed to read prefetch list: %v", err)
return
}
if err = config.SetPrefetchFiles(Msg, body); err != nil {
log.L.Errorf("Failed to parse request body: %v", err)
return
}
}
}

func (sc *Controller) describeDaemons() func(w http.ResponseWriter, r *http.Request) {
Expand Down

0 comments on commit e311448

Please sign in to comment.