Skip to content

Commit

Permalink
feat: Remove DumpFile operations
Browse files Browse the repository at this point in the history
  • Loading branch information
fappy1234567 committed Oct 11, 2024
1 parent 8fa319b commit 237cc8a
Show file tree
Hide file tree
Showing 12 changed files with 210 additions and 87 deletions.
53 changes: 33 additions & 20 deletions config/daemonconfig/daemonconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ package daemonconfig

import (
"encoding/json"
"os"
//"os"

Check failure on line 12 in config/daemonconfig/daemonconfig.go

View workflow job for this annotation

GitHub Actions / Build and Lint

commentFormatting: put a space between `//` and comment text (gocritic)
"reflect"
"strings"
"sync"

"github.com/pkg/errors"

Expand All @@ -36,7 +37,7 @@ type DaemonConfig interface {
StorageBackend() (StorageBackendType, *BackendConfig)
UpdateMirrors(mirrorsConfigDir, registryHost string) error
DumpString() (string, error)
DumpFile(path string) error
//DumpFile(path string) error

Check failure on line 40 in config/daemonconfig/daemonconfig.go

View workflow job for this annotation

GitHub Actions / Build and Lint

commentFormatting: put a space between `//` and comment text (gocritic)
}

// Daemon configurations factory
Expand Down Expand Up @@ -122,41 +123,53 @@ type DeviceConfig struct {
} `json:"cache"`
}

var configRWMutex sync.RWMutex

type SupplementInfoInterface interface {
GetImageID() string
GetSnapshotID() string
IsVPCRegistry() bool
GetLabels() map[string]string
GetParams() map[string]string
}

// For nydusd as FUSE daemon. Serialize Daemon info and persist to a json file
// We don't have to persist configuration file for fscache since its configuration
// is passed through HTTP API.
func DumpConfigFile(c interface{}, path string) error {
if config.IsBackendSourceEnabled() {
c = serializeWithSecretFilter(c)
}
b, err := json.Marshal(c)
if err != nil {
return errors.Wrapf(err, "marshal config")
}

return os.WriteFile(path, b, 0600)
}
// func DumpConfigFile(c interface{}, path string) error {
// if config.IsBackendSourceEnabled() {
// c = serializeWithSecretFilter(c)
// }
// b, err := json.Marshal(c)
// if err != nil {
// return errors.Wrapf(err, "marshal config")
// }

// return os.WriteFile(path, b, 0600)
// }

func DumpConfigString(c interface{}) (string, error) {
b, err := json.Marshal(c)
return string(b), err
}

// Achieve a daemon configuration from template or snapshotter's configuration
func SupplementDaemonConfig(c DaemonConfig, imageID, snapshotID string,
vpcRegistry bool, labels map[string]string, params map[string]string) error {
func SupplementDaemonConfig(c DaemonConfig, info SupplementInfoInterface) error {

configRWMutex.Lock()
defer configRWMutex.Unlock()

image, err := registry.ParseImage(imageID)
image, err := registry.ParseImage(info.GetImageID())
if err != nil {
return errors.Wrapf(err, "parse image %s", imageID)
return errors.Wrapf(err, "parse image %s", info.GetImageID())
}

backendType, _ := c.StorageBackend()

switch backendType {
case backendTypeRegistry:
registryHost := image.Host
if vpcRegistry {
if info.IsVPCRegistry() {
registryHost = registry.ConvertToVPCHost(registryHost)
} else if registryHost == "docker.io" {
// For docker.io images, we should use index.docker.io
Expand All @@ -170,8 +183,8 @@ func SupplementDaemonConfig(c DaemonConfig, imageID, snapshotID string,
// If no auth is provided, don't touch auth from provided nydusd configuration file.
// We don't validate the original nydusd auth from configuration file since it can be empty
// when repository is public.
keyChain := auth.GetRegistryKeyChain(registryHost, imageID, labels)
c.Supplement(registryHost, image.Repo, snapshotID, params)
keyChain := auth.GetRegistryKeyChain(registryHost, info.GetImageID(), info.GetLabels())
c.Supplement(registryHost, image.Repo, info.GetSnapshotID(), info.GetParams())
c.FillAuth(keyChain)

// Localfs and OSS backends don't need any update,
Expand Down
15 changes: 8 additions & 7 deletions config/daemonconfig/fscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ package daemonconfig
import (
"encoding/json"
"os"
"path"

//"path"

Check failure on line 13 in config/daemonconfig/fscache.go

View workflow job for this annotation

GitHub Actions / Build and Lint

commentFormatting: put a space between `//` and comment text (gocritic)

"github.com/containerd/log"
"github.com/containerd/nydus-snapshotter/pkg/auth"
Expand Down Expand Up @@ -122,10 +123,10 @@ func (c *FscacheDaemonConfig) DumpString() (string, error) {
return DumpConfigString(c)
}

func (c *FscacheDaemonConfig) DumpFile(f string) error {
if err := os.MkdirAll(path.Dir(f), 0755); err != nil {
return err
}
// func (c *FscacheDaemonConfig) DumpFile(f string) error {
// if err := os.MkdirAll(path.Dir(f), 0755); err != nil {
// return err
// }

return DumpConfigFile(c, f)
}
// return DumpConfigFile(c, f)
// }
17 changes: 10 additions & 7 deletions config/daemonconfig/fuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ package daemonconfig
import (
"encoding/json"
"os"
"path"

//"path"

"github.com/pkg/errors"

Expand Down Expand Up @@ -92,12 +93,14 @@ func (c *FuseDaemonConfig) StorageBackend() (string, *BackendConfig) {
}

func (c *FuseDaemonConfig) DumpString() (string, error) {
configRWMutex.Lock()
defer configRWMutex.Unlock()
return DumpConfigString(c)
}

func (c *FuseDaemonConfig) DumpFile(f string) error {
if err := os.MkdirAll(path.Dir(f), 0755); err != nil {
return err
}
return DumpConfigFile(c, f)
}
// func (c *FuseDaemonConfig) DumpFile(f string) error {
// if err := os.MkdirAll(path.Dir(f), 0755); err != nil {
// return err
// }
// return DumpConfigFile(c, f)
// }
2 changes: 1 addition & 1 deletion misc/snapshotter/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ FROM base AS kubectl-sourcer
ARG TARGETARCH

RUN apk add -q --no-cache curl && \
curl -fsSL -o /usr/bin/kubectl https://storage.googleapis.com/kubernetes-release/release/"$(curl -L -s https://dl.k8s.io/release/stable.txt)"/bin/linux/"$TARGETARCH"/kubectl && \
curl -fsSL -o /usr/bin/kubectl https://storage.googleapis.com/kubernetes-release/release/v1.30.0/bin/linux/"$TARGETARCH"/kubectl && \
chmod +x /usr/bin/kubectl

FROM base
Expand Down
55 changes: 43 additions & 12 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,21 @@ type Daemon struct {
state types.DaemonState
}

type NydusdSupplementInfo struct {
DaemonState ConfigState
ImageID string
SnapshotID string
Vpc bool
Labels map[string]string
Params map[string]string
}

func (s *NydusdSupplementInfo) GetImageID() string { return s.ImageID }
func (s *NydusdSupplementInfo) GetSnapshotID() string { return s.SnapshotID }
func (s *NydusdSupplementInfo) IsVPCRegistry() bool { return s.Vpc }
func (s *NydusdSupplementInfo) GetLabels() map[string]string { return s.Labels }
func (s *NydusdSupplementInfo) GetParams() map[string]string { return s.Params }

func (d *Daemon) Lock() {
d.mu.Lock()
}
Expand Down Expand Up @@ -250,12 +265,7 @@ func (d *Daemon) sharedFusedevMount(rafs *rafs.Rafs) error {
return err
}

c, err := daemonconfig.NewDaemonConfig(d.States.FsDriver, d.ConfigFile(rafs.SnapshotID))
if err != nil {
return errors.Wrapf(err, "Failed to reload instance configuration %s",
d.ConfigFile(rafs.SnapshotID))
}

c := d.Config
cfg, err := c.DumpString()
if err != nil {
return errors.Wrap(err, "dump instance configuration")
Expand All @@ -280,12 +290,7 @@ func (d *Daemon) sharedErofsMount(ra *rafs.Rafs) error {
return errors.Wrapf(err, "failed to create fscache work dir %s", ra.FscacheWorkDir())
}

c, err := daemonconfig.NewDaemonConfig(d.States.FsDriver, d.ConfigFile(ra.SnapshotID))
if err != nil {
log.L.Errorf("Failed to reload daemon configuration %s, %s", d.ConfigFile(ra.SnapshotID), err)
return err
}

c := d.Config
cfgStr, err := c.DumpString()
if err != nil {
return err
Expand Down Expand Up @@ -650,3 +655,29 @@ func NewDaemon(opt ...NewDaemonOpt) (*Daemon, error) {

return d, nil
}

func (d *Daemon) MountByAPI() error {
rafs := d.RafsCache.Head()
if rafs == nil {
return errors.Wrapf(errdefs.ErrNotFound, "daemon %s no rafs instance associated", d.ID())
}
client, err := d.GetClient()
if err != nil {
return errors.Wrapf(err, "mount instance %s", rafs.SnapshotID)
}
bootstrap, err := rafs.BootstrapFile()
if err != nil {
return err
}
c := d.Config
cfg, err := c.DumpString()
if err != nil {
return errors.Wrap(err, "dump instance configuration")
}
err = client.Mount("/", bootstrap, cfg)
if err != nil {
return errors.Wrapf(err, "mount rafs instance MountByAPI()")
}
return nil

}
53 changes: 26 additions & 27 deletions pkg/filesystem/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,19 @@ func NewFileSystem(ctx context.Context, opt ...NewFSOpt) (*Filesystem, error) {
if err != nil {
return errors.Wrapf(err, "get filesystem manager for daemon %s", d.States.ID)
}

supplementInfo, err := fsManager.GetInfo(d.ID())
if err != nil {
return errors.Wrap(err, "GetInfo failed")
}

cfg := d.Config
err = daemonconfig.SupplementDaemonConfig(cfg, supplementInfo)
if err != nil {
return errors.Wrap(err, "supplement configuration")
}
d.Config = cfg

if err := fsManager.StartDaemon(d); err != nil {
return errors.Wrapf(err, "start daemon %s", d.ID())
}
Expand Down Expand Up @@ -232,7 +245,6 @@ func (fs *Filesystem) Mount(ctx context.Context, snapshotID string, labels map[s
// Instance already exists, how could this happen? Can containerd handle this case?
return nil
}

fsDriver := config.GetFsDriver()
if label.IsTarfsDataLayer(labels) {
fsDriver = config.FsDriverBlockdev
Expand Down Expand Up @@ -302,34 +314,25 @@ func (fs *Filesystem) Mount(ctx context.Context, snapshotID string, labels map[s
daemonconfig.WorkDir: workDir,
daemonconfig.CacheDir: cacheDir,
}
supplementInfo := &daemon.NydusdSupplementInfo{
DaemonState: d.States,
ImageID: imageID,
SnapshotID: snapshotID,
Vpc: false,
Labels: labels,
Params: params,
}
cfg := deepcopy.Copy(*fsManager.DaemonConfig).(daemonconfig.DaemonConfig)
err = daemonconfig.SupplementDaemonConfig(cfg, imageID, snapshotID, false, labels, params)
err = daemonconfig.SupplementDaemonConfig(cfg, supplementInfo)
if err != nil {
return errors.Wrap(err, "supplement configuration")
}

if errs := fsManager.AddSupplementInfo(supplementInfo); errs != nil {
return errors.Wrapf(err, "AddSupplementInfo failed %s", d.States.ID)
}
// TODO: How to manage rafs configurations on-disk? separated json config file or DB record?
// In order to recover erofs mount, the configuration file has to be persisted.
var configSubDir string
if useSharedDaemon {
configSubDir = snapshotID
} else {
// Associate daemon config object when creating a new daemon object to avoid
// reading disk file again and again.
// For shared daemon, each rafs instance has its own configuration, so we don't
// attach a config interface to daemon in this case.
d.Config = cfg
}

err = cfg.DumpFile(d.ConfigFile(configSubDir))
if err != nil {
if errors.Is(err, errdefs.ErrAlreadyExists) {
log.L.Debugf("Configuration file %s already exits", d.ConfigFile(configSubDir))
} else {
return errors.Wrap(err, "dump daemon configuration file")
}
}

d.Config = cfg
d.AddRafsInstance(rafs)

// if publicKey is not empty we should verify bootstrap file of image
Expand Down Expand Up @@ -596,10 +599,6 @@ func (fs *Filesystem) initSharedDaemon(fsManager *manager.Manager) (err error) {
// it is loaded when requesting mount api
// Dump the configuration file since it is reloaded when recovering the nydusd
d.Config = *fsManager.DaemonConfig
err = d.Config.DumpFile(d.ConfigFile(""))
if err != nil && !errors.Is(err, errdefs.ErrAlreadyExists) {
return errors.Wrapf(err, "dump configuration file %s", d.ConfigFile(""))
}

if err := fsManager.StartDaemon(d); err != nil {
return errors.Wrap(err, "start shared daemon")
Expand Down
14 changes: 10 additions & 4 deletions pkg/manager/daemon_adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ func (m *Manager) StartDaemon(d *daemon.Daemon) error {
if err := cmd.Start(); err != nil {
return err
}
fsDriver := config.GetFsDriver()
isSharedFusedev := fsDriver == config.FsDriverFusedev && config.GetDaemonMode() == config.DaemonModeShared
useSharedDaemon := fsDriver == config.FsDriverFscache || isSharedFusedev

if !useSharedDaemon {
errs := d.MountByAPI()
if errs != nil {
return errors.Wrapf(err, "failed to mount")
}
}

d.Lock()
defer d.Unlock()
Expand Down Expand Up @@ -155,10 +165,6 @@ func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool)
return nil, errors.Wrapf(err, "locate bootstrap %s", bootstrap)
}

cmdOpts = append(cmdOpts,
command.WithConfig(d.ConfigFile("")),
command.WithBootstrap(bootstrap),
)
if config.IsBackendSourceEnabled() {
configAPIPath := fmt.Sprintf(endpointGetBackend, d.States.ID)
cmdOpts = append(cmdOpts,
Expand Down
Loading

0 comments on commit 237cc8a

Please sign in to comment.