Skip to content

Commit

Permalink
daemon: store auth to keyring and send it to nydusd via env
Browse files Browse the repository at this point in the history
Signed-off-by: Bin Tang <[email protected]>
  • Loading branch information
sctb512 committed Aug 1, 2023
1 parent 841ac59 commit bba918f
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 11 deletions.
8 changes: 6 additions & 2 deletions config/daemonconfig/daemonconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func DumpConfigString(c interface{}) (string, error) {

// Achieve a daemon configuration from template or snapshotter's configuration
func SupplementDaemonConfig(c DaemonConfig, imageID, snapshotID string,
vpcRegistry bool, labels map[string]string, params map[string]string) error {
vpcRegistry bool, labels map[string]string, params map[string]string, fn func(string, *auth.PassKeyChain)) error {

image, err := registry.ParseImage(imageID)
if err != nil {
Expand Down Expand Up @@ -169,7 +169,11 @@ func SupplementDaemonConfig(c DaemonConfig, imageID, snapshotID string,
// when repository is public.
keyChain := auth.GetRegistryKeyChain(registryHost, imageID, labels)
c.Supplement(registryHost, image.Repo, snapshotID, params)
c.FillAuth(keyChain)
if config.IsKeyringEnabled() && fn != nil {
fn(registryHost, keyChain)
} else {
c.FillAuth(keyChain)
}

// Localfs and OSS backends don't need any update,
// just use the provided config in template
Expand Down
4 changes: 4 additions & 0 deletions pkg/daemon/command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ func BuildCommand(opts []Opt) ([]string, error) {
return args, nil
}

func (dc *DaemonCommand) GetConfigPath() string {
return dc.Config
}

func WithMode(m string) Opt {
return func(cmd *DaemonCommand) {
cmd.Mode = m
Expand Down
35 changes: 30 additions & 5 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,19 @@ import (
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"

"github.com/containerd/containerd/log"

"github.com/containerd/nydus-snapshotter/config"
"github.com/containerd/nydus-snapshotter/config/daemonconfig"
"github.com/containerd/nydus-snapshotter/pkg/auth"
"github.com/containerd/nydus-snapshotter/pkg/daemon/types"
"github.com/containerd/nydus-snapshotter/pkg/errdefs"
"github.com/containerd/nydus-snapshotter/pkg/supervisor"
"github.com/containerd/nydus-snapshotter/pkg/utils/erofs"
"github.com/containerd/nydus-snapshotter/pkg/utils/mount"
"github.com/containerd/nydus-snapshotter/pkg/utils/registry"
"github.com/containerd/nydus-snapshotter/pkg/utils/retry"
)

Expand Down Expand Up @@ -66,6 +69,7 @@ type Daemon struct {
// fusedev shared mode: zero, one or more RAFS instances
// fscache shared mode: zero, one or more RAFS instances
Instances rafsSet
// authCache *lru.Cache

// Protect nydusd http client
cmu sync.Mutex
Expand All @@ -82,6 +86,7 @@ type Daemon struct {
Version types.BuildTimeInfo

ref int32

// Cache the nydusd daemon state to avoid frequently querying nydusd by API.
state types.DaemonState
}
Expand Down Expand Up @@ -222,7 +227,7 @@ func (d *Daemon) IsSharedDaemon() bool {
return d.HostMountpoint() == config.GetRootMountpoint()
}

func (d *Daemon) SharedMount(rafs *Rafs) error {
func (d *Daemon) SharedMount(rafs *Rafs, authCache *auth.Cache) error {
defer d.SendStates()

switch d.States.FsDriver {
Expand All @@ -232,13 +237,13 @@ func (d *Daemon) SharedMount(rafs *Rafs) error {
}
return nil
case config.FsDriverFusedev:
return d.sharedFusedevMount(rafs)
return d.sharedFusedevMount(rafs, authCache)
default:
return errors.Errorf("unsupported fs driver %s", d.States.FsDriver)
}
}

func (d *Daemon) sharedFusedevMount(rafs *Rafs) error {
func (d *Daemon) sharedFusedevMount(rafs *Rafs, authCache *auth.Cache) error {
client, err := d.GetClient()
if err != nil {
return errors.Wrapf(err, "mount instance %s", rafs.SnapshotID)
Expand All @@ -255,6 +260,25 @@ func (d *Daemon) sharedFusedevMount(rafs *Rafs) error {
d.ConfigFile(rafs.SnapshotID))
}

if config.IsKeyringEnabled() {
image, err := registry.ParseImage(rafs.ImageID)
if err != nil {
return errors.Wrapf(err, "parse image %s", rafs.ImageID)
}

logrus.Debugf("get key for %s", image.Host)
cachedAuth, err := authCache.GetAuth(image.Host)
if err != nil {
return err
}

keyChain, err := auth.FromBase64(cachedAuth)
if err != nil {
return err
}
c.FillAuth(&keyChain)
}

cfg, err := c.DumpString()
if err != nil {
return errors.Wrap(err, "dump instance configuration")
Expand Down Expand Up @@ -624,7 +648,7 @@ func (d *Daemon) CloneInstances(src *Daemon) {
}

// Daemon must be started and reach RUNNING state before call this method
func (d *Daemon) RecoveredMountInstances() error {
func (d *Daemon) RecoveredMountInstances(authCache *auth.Cache) error {
if d.IsSharedDaemon() {
d.Instances.Lock()
defer d.Instances.Unlock()
Expand All @@ -641,7 +665,7 @@ func (d *Daemon) RecoveredMountInstances() error {
for _, i := range instances {
if d.HostMountpoint() != i.GetMountpoint() {
log.L.Infof("Recovered mount instance %s", i.SnapshotID)
if err := d.SharedMount(i); err != nil {
if err := d.SharedMount(i, authCache); err != nil {
return err
}
}
Expand All @@ -657,6 +681,7 @@ func NewDaemon(opt ...NewDaemonOpt) (*Daemon, error) {
d.States.ID = newID()
d.States.DaemonMode = config.DaemonModeDedicated
d.Instances = rafsSet{instances: make(map[string]*Rafs)}
// d.authCache = lru.New(32)

for _, o := range opt {
err := o(d)
Expand Down
15 changes: 12 additions & 3 deletions pkg/filesystem/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ import (
"github.com/mohae/deepcopy"
"github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"

"github.com/containerd/containerd/log"
"github.com/containerd/containerd/snapshots"

snpkg "github.com/containerd/containerd/pkg/snapshotters"
"github.com/containerd/nydus-snapshotter/config"
"github.com/containerd/nydus-snapshotter/config/daemonconfig"
"github.com/containerd/nydus-snapshotter/pkg/auth"
"github.com/containerd/nydus-snapshotter/pkg/cache"
"github.com/containerd/nydus-snapshotter/pkg/daemon"
"github.com/containerd/nydus-snapshotter/pkg/daemon/types"
Expand Down Expand Up @@ -132,7 +134,7 @@ func NewFileSystem(ctx context.Context, opt ...NewFSOpt) (*Filesystem, error) {
if err := d.WaitUntilState(types.DaemonStateRunning); err != nil {
return nil, errors.Wrapf(err, "wait for daemon %s", d.ID())
}
if err := d.RecoveredMountInstances(); err != nil {
if err := d.RecoveredMountInstances(fsManager.AuthCache); err != nil {
return nil, errors.Wrapf(err, "recover mounts for daemon %s", d.ID())
}
fs.TryRetainSharedDaemon(d)
Expand Down Expand Up @@ -298,7 +300,14 @@ func (fs *Filesystem) Mount(snapshotID string, labels map[string]string) (err er
daemonconfig.CacheDir: cacheDir,
}
cfg := deepcopy.Copy(fsManager.DaemonConfig).(daemonconfig.DaemonConfig)
err = daemonconfig.SupplementDaemonConfig(cfg, imageID, snapshotID, false, labels, params)
var updateErr error
err = daemonconfig.SupplementDaemonConfig(cfg, imageID, snapshotID, false, labels, params, func(imageHost string, keyChain *auth.PassKeyChain) {
logrus.Debugf("add key for %s", imageHost)
updateErr = fsManager.AuthCache.UpdateAuth(imageHost, keyChain.ToBase64())
})
if updateErr != nil {
return updateErr
}
if err != nil {
return errors.Wrap(err, "supplement configuration")
}
Expand Down Expand Up @@ -490,7 +499,7 @@ func (fs *Filesystem) mountRemote(fsManager *manager.Manager, useSharedDaemon bo
} else {
r.SetMountpoint(path.Join(r.GetSnapshotDir(), "mnt"))
}
if err := d.SharedMount(r); err != nil {
if err := d.SharedMount(r, fsManager.AuthCache); err != nil {
return errors.Wrapf(err, "failed to mount")
}
} else {
Expand Down
22 changes: 22 additions & 0 deletions pkg/manager/daemon_adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
package manager

import (
"fmt"
"os"
"os/exec"
"strings"
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"

"github.com/containerd/containerd/log"

Expand All @@ -23,6 +25,7 @@ import (
"github.com/containerd/nydus-snapshotter/pkg/errdefs"
"github.com/containerd/nydus-snapshotter/pkg/metrics/collector"
metrics "github.com/containerd/nydus-snapshotter/pkg/metrics/tool"
"github.com/containerd/nydus-snapshotter/pkg/utils/registry"
)

// Fork the nydusd daemon with the process PID decided
Expand Down Expand Up @@ -192,6 +195,25 @@ func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool)

cmd := exec.Command(nydusdPath, args...)

if config.IsKeyringEnabled() && !d.IsSharedDaemon() {
if d.Instances.Len() > 1 {
return nil, errors.New("nydusd is not running in shared mode but the instance length is large than 1")
}

imageID := d.Instances.Head().ImageID
image, err := registry.ParseImage(imageID)
if err != nil {
return nil, errors.Wrapf(err, "parse image %s", imageID)
}

logrus.Debugf("get key for %s", image.Host)
auth, err := m.AuthCache.GetAuth(image.Host)
if err != nil {
return nil, err
}
cmd.Env = append(cmd.Env, fmt.Sprintf("IMAGE_PULL_AUTH=%s", auth))
}

// nydusd standard output and standard error rather than its logs are
// always redirected to snapshotter's respectively
cmd.Stdout = os.Stdout
Expand Down
7 changes: 6 additions & 1 deletion pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/containerd/nydus-snapshotter/config"
"github.com/containerd/nydus-snapshotter/config/daemonconfig"
"github.com/containerd/nydus-snapshotter/pkg/auth"
"github.com/containerd/nydus-snapshotter/pkg/cgroup"
"github.com/containerd/nydus-snapshotter/pkg/daemon"
"github.com/containerd/nydus-snapshotter/pkg/daemon/types"
Expand Down Expand Up @@ -139,6 +140,9 @@ type Manager struct {
// Cgroup manager for nydusd
CgroupMgr *cgroup.Manager

// Cache for registry authorization
AuthCache *auth.Cache

// In order to validate daemon fs driver is consistent with the latest snapshotter boot
FsDriver string

Expand Down Expand Up @@ -222,7 +226,7 @@ func (m *Manager) doDaemonRestart(d *daemon.Daemon) {
break
}

if err := d.SharedMount(r); err != nil {
if err := d.SharedMount(r, m.AuthCache); err != nil {
log.L.Warnf("Failed to mount rafs instance, %v", err)
}
}
Expand Down Expand Up @@ -284,6 +288,7 @@ func NewManager(opt Opt) (*Manager, error) {
SupervisorSet: supervisorSet,
DaemonConfig: opt.DaemonConfig,
CgroupMgr: opt.CgroupMgr,
AuthCache: auth.NewCache(),
FsDriver: opt.FsDriver,
}

Expand Down

0 comments on commit bba918f

Please sign in to comment.