Skip to content

Commit

Permalink
add chunk-dedup flag to support local cas chunk deduplication.
Browse files Browse the repository at this point in the history
Signed-off-by: xwb1136021767 <[email protected]>
  • Loading branch information
xwb1136021767 committed Aug 11, 2023
1 parent a6f4457 commit bee0acf
Show file tree
Hide file tree
Showing 14 changed files with 157 additions and 6 deletions.
5 changes: 5 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ type DaemonConfig struct {
FsDriver string `toml:"fs_driver"`
ThreadsNumber int `toml:"threads_number"`
LogRotationSize int `toml:"log_rotation_size"`
ChunkDedup bool `toml:"chunk_dedup"`
}

type LoggingConfig struct {
Expand Down Expand Up @@ -326,6 +327,10 @@ func ParseParameters(args *flags.Args, cfg *SnapshotterConfig) error {
daemonConfig.FsDriver = args.FsDriver
}

if args.ChunkDedup {
daemonConfig.ChunkDedup = args.ChunkDedup
}

// --- cache manager configuration
// empty

Expand Down
8 changes: 7 additions & 1 deletion config/daemonconfig/daemonconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,21 @@ type DaemonConfig interface {
}

// Daemon configurations factory
func NewDaemonConfig(fsDriver, path string) (DaemonConfig, error) {
func NewDaemonConfig(fsDriver, path string, isDedup bool) (DaemonConfig, error) {
switch fsDriver {
case config.FsDriverFscache:
cfg, err := LoadFscacheConfig(path)
if err != nil {
return nil, err
}
cfg.Config.DedupConfig.Enable = isDedup
return cfg, nil
case config.FsDriverFusedev:
cfg, err := LoadFuseConfig(path)
if err != nil {
return nil, err
}
cfg.Device.Dedup.Enable = isDedup
return cfg, nil
default:
return nil, errors.Errorf("unsupported, fs driver %q", fsDriver)
Expand Down Expand Up @@ -118,6 +120,10 @@ type DeviceConfig struct {
DisableIndexedMap bool `json:"disable_indexed_map"`
} `json:"config"`
} `json:"cache"`
Dedup struct {
Enable bool `json:"enable"`
WorkDir string `json:"work_dir"`
} `json:"dedup"`
}

// For nydusd as FUSE daemon. Serialize Daemon info and persist to a json file
Expand Down
4 changes: 4 additions & 0 deletions config/daemonconfig/fscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ type FscacheDaemonConfig struct {
CacheConfig struct {
WorkDir string `json:"work_dir"`
} `json:"cache_config"`
DedupConfig struct {
Enable bool `json:"enable"`
WorkDir string `json:"work_dir"`
} `json:"dedup_config"`
BlobPrefetchConfig BlobPrefetchConfig `json:"prefetch_config"`
MetadataPath string `json:"metadata_path"`
} `json:"config"`
Expand Down
4 changes: 4 additions & 0 deletions config/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ func GetDaemonProfileCPUDuration() int64 {
return globalConfig.origin.SystemControllerConfig.DebugConfig.ProfileDuration
}

func GetChunkDedup() bool {
return globalConfig.origin.DaemonConfig.ChunkDedup
}

func ProcessConfigurations(c *SnapshotterConfig) error {
if c.LoggingConfig.LogDir == "" {
c.LoggingConfig.LogDir = filepath.Join(c.Root, logging.DefaultLogDirName)
Expand Down
7 changes: 7 additions & 0 deletions internal/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Args struct {
LogToStdout bool
LogToStdoutCount int
PrintVersion bool
ChunkDedup bool
}

type Flags struct {
Expand Down Expand Up @@ -97,6 +98,12 @@ func buildFlags(args *Args) []cli.Flag {
Usage: "print version and build information",
Destination: &args.PrintVersion,
},
&cli.BoolFlag{
Name: "chunk-dedup",
Value: false,
Usage: "(experiment)whether to enable local cas chunk dedup",
Destination: &args.ChunkDedup,
},
}
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/daemon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ func WithLogToStdout(logToStdout bool) NewDaemonOpt {
}
}

func WithChunkDedup(chunkDedup bool) NewDaemonOpt {
return func(d *Daemon) error {
d.States.ChunkDedup = chunkDedup
return nil
}
}

func WithLogLevel(logLevel string) NewDaemonOpt {
return func(d *Daemon) error {
if logLevel == "" {
Expand Down
24 changes: 22 additions & 2 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type States struct {
// Where the configuration file resides, all rafs instances share the same configuration template
ConfigDir string
SupervisorPath string
ChunkDedup bool
}

// TODO: Record queried nydusd state
Expand Down Expand Up @@ -244,12 +245,13 @@ func (d *Daemon) sharedFusedevMount(rafs *Rafs) error {
return errors.Wrapf(err, "mount instance %s", rafs.SnapshotID)
}

configPath := d.ConfigFile(rafs.SnapshotID)
bootstrap, err := rafs.BootstrapFile()
if err != nil {
return err
}

c, err := daemonconfig.NewDaemonConfig(d.States.FsDriver, d.ConfigFile(rafs.SnapshotID))
c, err := daemonconfig.NewDaemonConfig(d.States.FsDriver, d.ConfigFile(rafs.SnapshotID), d.States.ChunkDedup)
if err != nil {
return errors.Wrapf(err, "Failed to reload instance configuration %s",
d.ConfigFile(rafs.SnapshotID))
Expand All @@ -260,6 +262,15 @@ func (d *Daemon) sharedFusedevMount(rafs *Rafs) error {
return errors.Wrap(err, "dump instance configuration")
}

// static dedup bootstrap
if d.States.ChunkDedup {
log.L.Infoln("sharedFusedevMount, cfg = ", cfg)
bootstrap, err = rafs.DedupBootstrap(bootstrap, configPath)
if err != nil {
return errors.Wrapf(err, "failed to dedup rafs bootstrap")
}
}

err = client.Mount(rafs.RelaMountpoint(), bootstrap, cfg)
if err != nil {
return errors.Wrapf(err, "mount rafs instance")
Expand All @@ -279,7 +290,8 @@ func (d *Daemon) sharedErofsMount(rafs *Rafs) error {
return errors.Wrapf(err, "failed to create fscache work dir %s", rafs.FscacheWorkDir())
}

c, err := daemonconfig.NewDaemonConfig(d.States.FsDriver, d.ConfigFile(rafs.SnapshotID))
configPath := d.ConfigFile(rafs.SnapshotID)
c, err := daemonconfig.NewDaemonConfig(d.States.FsDriver, configPath, d.States.ChunkDedup)
if err != nil {
log.L.Errorf("Failed to reload daemon configuration %s, %s", d.ConfigFile(rafs.SnapshotID), err)
return err
Expand Down Expand Up @@ -309,6 +321,14 @@ func (d *Daemon) sharedErofsMount(rafs *Rafs) error {
rafs.AddAnnotation(AnnoFsCacheDomainID, cfg.DomainID)
rafs.AddAnnotation(AnnoFsCacheID, fscacheID)

if d.States.ChunkDedup {
log.L.Infoln("sharedErofsMount, cfg = ", cfg)
bootstrapPath, err = rafs.DedupBootstrap(bootstrapPath, configPath)
if err != nil {
return errors.Wrapf(err, "dedup rafs bootstrap")
}
}

if err := erofs.Mount(bootstrapPath, cfg.DomainID, fscacheID, mountPoint); err != nil {
if !errdefs.IsErofsMounted(err) {
return errors.Wrapf(err, "mount erofs to %s", mountPoint)
Expand Down
36 changes: 36 additions & 0 deletions pkg/daemon/rafs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,19 @@
package daemon

import (
"fmt"
"os"
"os/exec"
"path"
"path/filepath"
"strings"
"sync"

"github.com/mohae/deepcopy"
"github.com/pkg/errors"

"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"

"github.com/containerd/nydus-snapshotter/config"
)
Expand Down Expand Up @@ -195,3 +199,35 @@ func (r *Rafs) BootstrapFile() (string, error) {

return "", errors.Wrapf(errdefs.ErrNotFound, "bootstrap %s", bootstrap)
}

func buildDedupCommand(bootstrapPath, configPath, nydusImagePath string) *exec.Cmd {
args := []string{
"dedup",
"--bootstrap", bootstrapPath,
"--config", configPath,
}

log.L.Infof("start bootstrap dedup: %s %s", nydusImagePath, strings.Join(args, " "))

cmd := exec.Command(nydusImagePath, args...)

return cmd

}

func (r *Rafs) DedupBootstrap(bootstrapPath, configPath string) (string, error) {
nydusImagePath, err := exec.LookPath("nydus-image")
if err != nil {
fmt.Println(err.Error())
return "", err
}

cmd := buildDedupCommand(bootstrapPath, configPath, nydusImagePath)
_, err = cmd.CombinedOutput()
if err != nil {
return "", err
}

bootstrapPath += ".dedup"
return bootstrapPath, err
}
26 changes: 26 additions & 0 deletions pkg/daemon/rafs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package daemon

import (
"fmt"
"strings"
"testing"
)

func TestBuildDedupCommand(t *testing.T) {
bootstrapPath := "/path/to/bootstrap"
configPath := "/path/to/config.json"
nydusImagePath := "/path/to/nydus-image"

cmd := buildDedupCommand(bootstrapPath, configPath, nydusImagePath)

expectedArgs := []string{
"dedup",
"--bootstrap", bootstrapPath,
"--config", configPath,
}
expectedCmd := fmt.Sprintf("%s %s", nydusImagePath, strings.Join(expectedArgs, " "))

if expectedCmd != cmd.String() {
t.Errorf("unexpected command string '%s'", cmd.String())
}
}
7 changes: 7 additions & 0 deletions pkg/filesystem/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,10 @@ func WithEnableStargz(enable bool) NewFSOpt {
return nil
}
}

func WithChunkDedup(chunkDedup bool) NewFSOpt {
return func(fs *Filesystem) error {
fs.chunkDedup = chunkDedup
return nil
}
}
2 changes: 2 additions & 0 deletions pkg/filesystem/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type Filesystem struct {
verifier *signature.Verifier
nydusImageBinaryPath string
rootMountpoint string
chunkDedup bool
}

// NewFileSystem initialize Filesystem instance
Expand Down Expand Up @@ -587,6 +588,7 @@ func (fs *Filesystem) createDaemon(fsManager *manager.Manager, daemonMode config
daemon.WithNydusdThreadNum(config.GetDaemonThreadsNumber()),
daemon.WithFsDriver(fsManager.FsDriver),
daemon.WithDaemonMode(daemonMode),
daemon.WithChunkDedup(config.GetChunkDedup()),
}

if mountpoint != "" {
Expand Down
19 changes: 19 additions & 0 deletions pkg/manager/daemon_adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,21 @@ func (m *Manager) StartDaemon(d *daemon.Daemon) error {
return errors.Wrapf(err, "create command for daemon %s", d.ID())
}

// dedup bootstrap, not dedup bootstrap for shared daemon
log.L.Infoln("StartDaemon dedup = ", d.States.ChunkDedup)
if d.States.ChunkDedup && !d.IsSharedDaemon() {
rafs := d.Instances.Head()
if rafs == nil {
return errors.Wrapf(errdefs.ErrNotFound, "daemon %s no rafs instance associated", d.ID())
}
configPath := d.ConfigFile("")
bootstrap, _ := rafs.BootstrapFile()
_, err = rafs.DedupBootstrap(bootstrap, configPath)
if err != nil {
return errors.Errorf("fail to dedup bootstrap ")
}
}

if err := cmd.Start(); err != nil {
return err
}
Expand Down Expand Up @@ -145,6 +160,10 @@ func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool)
return nil, errors.Wrapf(err, "locate bootstrap %s", bootstrap)
}

if d.States.ChunkDedup {
bootstrap += ".dedup"
}

cmdOpts = append(cmdOpts,
command.WithConfig(d.ConfigFile("")),
command.WithBootstrap(bootstrap),
Expand Down
2 changes: 1 addition & 1 deletion pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ func (m *Manager) Recover(ctx context.Context,
}

if d.States.FsDriver == config.FsDriverFusedev {
cfg, err := daemonconfig.NewDaemonConfig(d.States.FsDriver, d.ConfigFile(""))
cfg, err := daemonconfig.NewDaemonConfig(d.States.FsDriver, d.ConfigFile(""), d.States.ChunkDedup)
if err != nil {
log.L.Errorf("Failed to reload daemon configuration %s, %s", d.ConfigFile(""), err)
return err
Expand Down
12 changes: 10 additions & 2 deletions snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func NewSnapshotter(ctx context.Context, cfg *config.SnapshotterConfig) (snapsho
return nil, errors.Wrap(err, "initialize image verifier")
}

daemonConfig, err := daemonconfig.NewDaemonConfig(config.GetFsDriver(), cfg.DaemonConfig.NydusdConfigPath)
daemonConfig, err := daemonconfig.NewDaemonConfig(config.GetFsDriver(), cfg.DaemonConfig.NydusdConfigPath, cfg.DaemonConfig.ChunkDedup)
if err != nil {
return nil, errors.Wrap(err, "load daemon configuration")
}
Expand Down Expand Up @@ -143,6 +143,7 @@ func NewSnapshotter(ctx context.Context, cfg *config.SnapshotterConfig) (snapsho
filesystem.WithVerifier(verifier),
filesystem.WithRootMountpoint(config.GetRootMountpoint()),
filesystem.WithEnableStargz(cfg.Experimental.EnableStargz),
filesystem.WithChunkDedup(cfg.DaemonConfig.ChunkDedup),
}

cacheConfig := &cfg.CacheManagerConfig
Expand Down Expand Up @@ -782,7 +783,7 @@ func (o *snapshotter) remoteMountWithExtraOptions(ctx context.Context, s storage

var c daemonconfig.DaemonConfig
if daemon.IsSharedDaemon() {
c, err = daemonconfig.NewDaemonConfig(daemon.States.FsDriver, daemon.ConfigFile(instance.SnapshotID))
c, err = daemonconfig.NewDaemonConfig(daemon.States.FsDriver, daemon.ConfigFile(instance.SnapshotID), daemon.States.ChunkDedup)
if err != nil {
return nil, errors.Wrapf(err, "Failed to load instance configuration %s",
daemon.ConfigFile(instance.SnapshotID))
Expand Down Expand Up @@ -811,6 +812,13 @@ func (o *snapshotter) remoteMountWithExtraOptions(ctx context.Context, s storage
return nil, errors.Wrapf(err, "remoteMounts: failed to detect filesystem version")
}

if daemon.States.ChunkDedup {
configPath := daemon.ConfigFile(instance.SnapshotID)
source, err = instance.DedupBootstrap(source, configPath)
if err != nil {
return nil, errors.Wrapf(err, "remoteMounts: failed to dedup rafs bootstrap")
}
}
// when enable nydus-overlayfs, return unified mount slice for runc and kata
extraOption := &ExtraOption{
Source: source,
Expand Down

0 comments on commit bee0acf

Please sign in to comment.