diff --git a/.github/codecov.yml b/.github/codecov.yml index f0cb241de5..e6ae8ff00b 100644 --- a/.github/codecov.yml +++ b/.github/codecov.yml @@ -3,7 +3,7 @@ coverage: project: default: enabled: yes - target: auto # auto compares coverage to the previous base commit + target: auto # auto compares coverage to the previous base commit # adjust accordingly based on how flaky your tests are # this allows a 0.3% drop from the previous base commit coverage threshold: 0.3% @@ -11,13 +11,12 @@ coverage: comment: layout: "reach, diff, flags, files" behavior: default - require_changes: false # if true: only post the comment if coverage changes + require_changes: true # if true: only post the comment if coverage changes codecov: require_ci_to_pass: false notify: - wait_for_ci: false - + wait_for_ci: true # When modifying this file, please validate using -# curl -X POST --data-binary @codecov.yml https://codecov.io/validate \ No newline at end of file +# curl -X POST --data-binary @codecov.yml https://codecov.io/validate diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 89d3560dbd..7031d4a23f 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -19,8 +19,37 @@ env: REGISTRY: ghcr.io jobs: - run-e2e: - runs-on: ubuntu-latest + run-e2e-for-cgroups-v1: + runs-on: ubuntu-20.04 + steps: + - name: Set up Go + uses: actions/setup-go@v3 + with: + go-version: "1.19.6" + - name: Checkout repository + uses: actions/checkout@v3 + - name: cache go mod + uses: actions/cache@v3 + with: + path: ~/go/pkg/mod + key: ${{ runner.os }}-go-${{ hashFiles('go.sum') }} + restore-keys: | + ${{ runner.os }}-go + - name: Log in to container registry + uses: docker/login-action@v2 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + - name: Run e2e test + run: | + TAG=$GITHUB_REF_NAME + [ "$TAG" == "main" ] && TAG="latest" + [ "$GITHUB_EVENT_NAME" == "pull_request" ] && TAG="local" + make integration + + run-e2e-for-cgroups-v2: + runs-on: ubuntu-22.04 steps: - name: Set up Go uses: actions/setup-go@v3 diff --git a/.github/workflows/optimizer.yml b/.github/workflows/optimizer.yml index 84a39c361a..d8d3cf266a 100644 --- a/.github/workflows/optimizer.yml +++ b/.github/workflows/optimizer.yml @@ -74,18 +74,27 @@ jobs: sudo make install-optimizer sudo install -D -m 755 misc/example/optimizer-nri-plugin.conf /etc/nri/conf.d/02-optimizer-nri-plugin.conf sudo systemctl restart containerd + systemctl status containerd --no-pager -l - name: Generate accessed files list run: | - sudo crictl run misc/optimizer/ubuntu.yaml misc/optimizer/sandbox.yaml + sed -i "s|host_path: script|host_path: $(pwd)/misc/optimizer/script|g" misc/optimizer/nginx.yaml + sudo crictl run misc/optimizer/nginx.yaml misc/optimizer/sandbox.yaml sleep 20 sudo crictl rmp -f --all tree /opt/nri/optimizer/results/ - cat /opt/nri/optimizer/results/library/ubuntu:22.04 - count=$(cat /opt/nri/optimizer/results/library/ubuntu:22.04 | wc -l) - echo $count - if [ "$count" != 4 ]; then - echo "failed to generate accessed files list for ubuntu:22.04" + count=$(cat /opt/nri/optimizer/results/library/nginx:1.23.3 | wc -l) + expected=$(cat misc/optimizer/script/file_list.txt | wc -l) + echo "count: $count expected minimum value: $expected" + if [ $count -lt $expected ]; then + echo "failed to generate accessed files list for nginx:1.23.3" + cat misc/optimizer/script/file_list.txt exit 1 fi - cat /opt/nri/optimizer/results/library/ubuntu:22.04.csv - \ No newline at end of file + cat /opt/nri/optimizer/results/library/nginx:1.23.3.csv + - name: Dump logs + if: failure() + continue-on-error: true + run: | + systemctl status containerd --no-pager -l + journalctl -xeu containerd --no-pager + \ No newline at end of file diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 675804bac7..74ee92dda6 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -55,18 +55,12 @@ jobs: needs: [build] steps: - uses: actions/checkout@v3 - - name: install hub - run: | - HUB_VER=$(curl -s "https://api.github.com/repos/github/hub/releases/latest" | jq -r .tag_name | sed 's/^v//') - wget -q -O- https://github.com/github/hub/releases/download/v$HUB_VER/hub-linux-amd64-$HUB_VER.tgz | \ - tar xz --strip-components=2 --wildcards '*/bin/hub' - sudo mv hub /usr/local/bin/hub - - name: download artifacts + - name: Download Artifacts uses: actions/download-artifact@v3 with: name: nydus-snapshotter_artifacts path: nydus-snapshotter - - name: upload artifacts + - name: Upload Artifacts run: | tag=$(echo $GITHUB_REF | cut -d/ -f3-) tarball="nydus-snapshotter-$tag-x86_64.tgz" diff --git a/MAINTAINERS b/MAINTAINERS index 147f388d4c..eba02b426e 100644 --- a/MAINTAINERS +++ b/MAINTAINERS @@ -12,3 +12,5 @@ imeoer, Yan Song, yansong.ys@antgroup.com # # REVIEWERS # GitHub ID, Name, Email address +sctb512, Bin Tang, tangbin.bin@bytedance.com + diff --git a/Makefile b/Makefile index a2c5b8952b..32313a4355 100644 --- a/Makefile +++ b/Makefile @@ -130,7 +130,7 @@ smoke: integration: CGO_ENABLED=1 ${PROXY} GOOS=${GOOS} GOARCH=${GOARCH} go build -ldflags '-X "${PKG}/version.Version=${VERSION}" -extldflags "-static"' -race -v -o bin/containerd-nydus-grpc ./cmd/containerd-nydus-grpc $(SUDO) DOCKER_BUILDKIT=1 docker build ${BUILD_ARG_E2E_DOWNLOADS_MIRROR} -t nydus-snapshotter-e2e:0.1 -f integration/Dockerfile . - $(SUDO) docker run --name nydus-snapshotter_e2e --rm --privileged -v /root/.docker:/root/.docker -v `go env GOMODCACHE`:/go/pkg/mod \ + $(SUDO) docker run --cap-add SYS_ADMIN --security-opt seccomp=unconfined --cgroup-parent=system.slice --cgroupns private --name nydus-snapshotter_e2e --rm --privileged -v /root/.docker:/root/.docker -v `go env GOMODCACHE`:/go/pkg/mod \ -v `go env GOCACHE`:/root/.cache/go-build -v `pwd`:/nydus-snapshotter \ -v /usr/src/linux-headers-${KERNEL_VER}:/usr/src/linux-headers-${KERNEL_VER} \ ${ENV_TARGET_IMAGES_FILE} \ diff --git a/config/config.go b/config/config.go index 83c2266416..521497cf52 100644 --- a/config/config.go +++ b/config/config.go @@ -16,8 +16,11 @@ import ( "github.com/containerd/nydus-snapshotter/internal/constant" "github.com/containerd/nydus-snapshotter/internal/flags" + "github.com/containerd/nydus-snapshotter/pkg/cgroup" "github.com/containerd/nydus-snapshotter/pkg/errdefs" "github.com/containerd/nydus-snapshotter/pkg/utils/file" + "github.com/containerd/nydus-snapshotter/pkg/utils/parser" + "github.com/containerd/nydus-snapshotter/pkg/utils/sysinfo" ) func init() { @@ -97,8 +100,10 @@ func ParseRecoverPolicy(p string) (DaemonRecoverPolicy, error) { } const ( - FsDriverFusedev string = constant.FsDriverFusedev - FsDriverFscache string = constant.FsDriverFscache + FsDriverBlockdev string = constant.FsDriverBlockdev + FsDriverFusedev string = constant.FsDriverFusedev + FsDriverFscache string = constant.FsDriverFscache + FsDriverNodev string = constant.FsDriverNodev ) type Experimental struct { @@ -106,6 +111,11 @@ type Experimental struct { EnableReferrerDetect bool `toml:"enable_referrer_detect"` } +type CgroupConfig struct { + Enable bool `toml:"enable"` + MemoryLimit string `toml:"memory_limit"` +} + // Configure how to start and recover nydusd daemons type DaemonConfig struct { NydusdPath string `toml:"nydusd_path"` @@ -114,6 +124,7 @@ type DaemonConfig struct { RecoverPolicy string `toml:"recover_policy"` FsDriver string `toml:"fs_driver"` ThreadsNumber int `toml:"threads_number"` + LogRotationSize int `toml:"log_rotation_size"` } type LoggingConfig struct { @@ -203,6 +214,7 @@ type SnapshotterConfig struct { ImageConfig ImageConfig `toml:"image"` CacheManagerConfig CacheManagerConfig `toml:"cache_manager"` LoggingConfig LoggingConfig `toml:"log"` + CgroupConfig CgroupConfig `toml:"cgroup"` Experimental Experimental `toml:"experimental"` } @@ -216,6 +228,7 @@ func LoadSnapshotterConfig(path string) (*SnapshotterConfig, error) { if err != nil { return nil, errors.Wrapf(err, "load toml configuration from file %q", path) } + if err = tree.Unmarshal(&config); err != nil { return nil, errors.Wrap(err, "unmarshal snapshotter configuration") } @@ -336,3 +349,19 @@ func ParseParameters(args *flags.Args, cfg *SnapshotterConfig) error { return nil } + +func ParseCgroupConfig(config CgroupConfig) (cgroup.Config, error) { + totalMemory, err := sysinfo.GetTotalMemoryBytes() + if err != nil { + return cgroup.Config{}, errors.Wrap(err, "Failed to get total memory bytes") + } + + memoryLimitInBytes, err := parser.MemoryConfigToBytes(config.MemoryLimit, totalMemory) + if err != nil { + return cgroup.Config{}, err + } + + return cgroup.Config{ + MemoryLimitInBytes: memoryLimitInBytes, + }, nil +} diff --git a/config/config_test.go b/config/config_test.go index 2cb13b00b4..cd0c232387 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -47,6 +47,7 @@ func TestLoadSnapshotterTOMLConfig(t *testing.T) { RecoverPolicy: "restart", NydusdConfigPath: "/etc/nydus/nydusd-config.fusedev.json", ThreadsNumber: 4, + LogRotationSize: 100, }, SnapshotsConfig: SnapshotConfig{ EnableNydusOverlayFS: false, @@ -77,12 +78,16 @@ func TestLoadSnapshotterTOMLConfig(t *testing.T) { RotateLogLocalTime: true, RotateLogMaxAge: 7, RotateLogMaxBackups: 5, - RotateLogMaxSize: 1, + RotateLogMaxSize: 100, LogToStdout: false, }, MetricsConfig: MetricsConfig{ Address: ":9110", }, + CgroupConfig: CgroupConfig{ + Enable: true, + MemoryLimit: "", + }, } A.EqualValues(cfg, &exampleConfig) diff --git a/config/default.go b/config/default.go index 9df5df5e44..5b184c19c8 100644 --- a/config/default.go +++ b/config/default.go @@ -43,6 +43,7 @@ func (c *SnapshotterConfig) FillUpWithDefaults() error { } daemonConfig.RecoverPolicy = RecoverPolicyRestart.String() daemonConfig.FsDriver = constant.DefaultFsDriver + daemonConfig.LogRotationSize = constant.DefaultDaemonRotateLogMaxSize // cache configuration cacheConfig := &c.CacheManagerConfig @@ -54,11 +55,6 @@ func (c *SnapshotterConfig) FillUpWithDefaults() error { } func (c *SnapshotterConfig) SetupNydusBinaryPaths() error { - // when using DaemonMode = none, nydusd and nydus-image binaries are not required - if c.DaemonMode == string(DaemonModeNone) { - return nil - } - // resolve nydusd path if path, err := exec.LookPath(constant.NydusdBinaryName); err == nil { c.DaemonConfig.NydusdPath = path diff --git a/config/global.go b/config/global.go index d2a2cc99ee..5272a20bdc 100644 --- a/config/global.go +++ b/config/global.go @@ -40,6 +40,10 @@ type GlobalConfig struct { MirrorsConfig MirrorsConfig } +func IsFusedevSharedModeEnabled() bool { + return globalConfig.DaemonMode == DaemonModeShared +} + func GetDaemonMode() DaemonMode { return globalConfig.DaemonMode } @@ -80,6 +84,10 @@ func GetLogLevel() string { return globalConfig.origin.LoggingConfig.LogLevel } +func GetDaemonLogRotationSize() int { + return globalConfig.origin.DaemonConfig.LogRotationSize +} + func GetDaemonThreadsNumber() int { return globalConfig.origin.DaemonConfig.ThreadsNumber } diff --git a/docs/configure_nydus.md b/docs/configure_nydus.md index c745863c8c..da8d79409a 100644 --- a/docs/configure_nydus.md +++ b/docs/configure_nydus.md @@ -128,7 +128,10 @@ The Nydus snapshotter will get the new secret and parse the authorization. If yo ## Metrics -Nydusd records metrics in its own format. The metrics are exported via a HTTP server on top of unix domain socket. Nydus-snapshotter fetches the metrics and convert them in to Prometheus format which is exported via a network address. Nydus-snapshotter by default does not fetch metrics from nydusd. You can enable the nydusd metrics download by assigning an network address to `metrics.address` in nydus-snapshotter's toml [configuration file](../misc/snapshotter/config.toml). +Nydusd records metrics in its own format. The metrics are exported via a HTTP server on top of unix domain socket. Nydus-snapshotter fetches the metrics and convert them in to Prometheus format which is exported via a network address. Nydus-snapshotter by default does not fetch metrics from nydusd. You can enable the nydusd metrics download by assigning a network address to `metrics.address` in nydus-snapshotter's toml [configuration file](../misc/snapshotter/config.toml). + +Once this entry is enabled, not only nydusd metrics, but also some information about the nydus-snapshotter +runtime and snapshot related events are exported in Prometheus format as well. ## Diagnose diff --git a/go.mod b/go.mod index 2d31d400b3..0ef412965b 100644 --- a/go.mod +++ b/go.mod @@ -92,6 +92,13 @@ require ( gopkg.in/ini.v1 v1.67.0 // indirect ) +require ( + github.com/cilium/ebpf v0.9.1 // indirect + github.com/coreos/go-systemd/v22 v22.5.0 // indirect + github.com/docker/go-units v0.5.0 // indirect + github.com/godbus/dbus/v5 v5.1.0 // indirect +) + require ( github.com/AdamKorcz/go-118-fuzz-build v0.0.0-20221215162035-5330a85ea652 // indirect github.com/Microsoft/go-winio v0.6.0 // indirect diff --git a/go.sum b/go.sum index e0f534e621..bec76f7371 100644 --- a/go.sum +++ b/go.sum @@ -605,6 +605,7 @@ go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95a go.uber.org/zap v1.21.0 h1:WefMeulhovoZ2sYXz7st6K0sLj7bBhpiFaud4r4zST8= go.uber.org/zap v1.21.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -634,6 +635,8 @@ golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 h1:k/i9J1pBpvlfR+9QsetwPyERs golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= +golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 h1:k/i9J1pBpvlfR+9QsetwPyERsqu1GIbi967PQMq3Ivc= +golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= diff --git a/integration/Dockerfile b/integration/Dockerfile index 821b9da8b5..fb40de95ce 100644 --- a/integration/Dockerfile +++ b/integration/Dockerfile @@ -23,8 +23,6 @@ ARG NERDCTL_VER # deb https://mirrors.tuna.tsinghua.edu.cn/debian/ bullseye-backports main contrib non-free\n\ # deb https://mirrors.tuna.tsinghua.edu.cn/debian-security bullseye-security main contrib non-free\n' > /etc/apt/sources.list -ENV GOPROXY=https://goproxy.cn - RUN apt-get update -y && apt-get install -y libbtrfs-dev libseccomp-dev sudo psmisc jq lsof net-tools RUN go install github.com/go-delve/delve/cmd/dlv@latest diff --git a/integration/entrypoint.sh b/integration/entrypoint.sh index 94e4bf180e..5e49adc468 100755 --- a/integration/entrypoint.sh +++ b/integration/entrypoint.sh @@ -563,6 +563,18 @@ function kill_multiple_nydusd_recover_failover { detect_go_race } +# Refer to https://github.com/moby/moby/blob/088afc99e4bf8adb78e29733396182417d67ada2/hack/dind#L28-L38 +function enable_nesting_for_cgroup_v2() { + if [ -f /sys/fs/cgroup/cgroup.controllers ]; then + mkdir -p /sys/fs/cgroup/init + xargs -rn1 < /sys/fs/cgroup/cgroup.procs > /sys/fs/cgroup/init/cgroup.procs || : + sed -e 's/ / +/g' -e 's/^/-/' < /sys/fs/cgroup/cgroup.controllers \ + > /sys/fs/cgroup/cgroup.subtree_control + fi +} + +enable_nesting_for_cgroup_v2 + reboot_containerd multiple start_single_container_multiple_daemons diff --git a/internal/constant/values.go b/internal/constant/values.go index 8edc7141aa..e35306258a 100644 --- a/internal/constant/values.go +++ b/internal/constant/values.go @@ -17,8 +17,14 @@ const ( ) const ( + // Mount RAFS filesystem by using EROFS over block devices. + FsDriverBlockdev string = "blockdev" + // Mount RAFS filesystem by using FUSE subsystem FsDriverFusedev string = "fusedev" + // Mount RAFS filesystem by using fscache/EROFS. FsDriverFscache string = "fscache" + // Only prepare/supply meta/data blobs, do not mount RAFS filesystem. + FsDriverNodev string = "nodev" ) const ( @@ -38,9 +44,10 @@ const ( DefaultSystemControllerAddress = "/run/containerd-nydus/system.sock" // Log rotation - DefaultRotateLogMaxSize = 200 // 200 megabytes - DefaultRotateLogMaxBackups = 10 - DefaultRotateLogMaxAge = 0 // days - DefaultRotateLogLocalTime = true - DefaultRotateLogCompress = true + DefaultDaemonRotateLogMaxSize = 100 // 100 megabytes + DefaultRotateLogMaxSize = 200 // 200 megabytes + DefaultRotateLogMaxBackups = 5 + DefaultRotateLogMaxAge = 0 // days + DefaultRotateLogLocalTime = true + DefaultRotateLogCompress = true ) diff --git a/misc/optimizer/nginx.yaml b/misc/optimizer/nginx.yaml new file mode 100644 index 0000000000..ee0bf5c995 --- /dev/null +++ b/misc/optimizer/nginx.yaml @@ -0,0 +1,18 @@ +metadata: + name: nginx + +image: + image: nginx:1.23.3 + +mounts: + - host_path: script + container_path: /script + +command: + - /script/entrypoint.sh + +args: + - /script/file_list.txt + +log_path: nginx.0.log +linux: {} \ No newline at end of file diff --git a/misc/optimizer/sandbox.yaml b/misc/optimizer/sandbox.yaml index 101ed4a5a8..4f8c41faec 100644 --- a/misc/optimizer/sandbox.yaml +++ b/misc/optimizer/sandbox.yaml @@ -1,5 +1,5 @@ metadata: - name: ubuntu-sandbox + name: nginx-sandbox namespace: default attempt: 1 uid: hdishd83djaidwnduwk28bcsb diff --git a/misc/optimizer/script/entrypoint.sh b/misc/optimizer/script/entrypoint.sh new file mode 100755 index 0000000000..10a609ad68 --- /dev/null +++ b/misc/optimizer/script/entrypoint.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash + +path=$1 + +default_path=file_list_path.txt +if [[ $# -eq 0 ]]; then + path=${default_path} +fi + +files=($(cat ${path} | tr "\n" " ")) +files_number=${#files[@]} +echo "file number: $files_number" + +for file in "${files[@]}"; do + file_size=$(stat -c%s "${file}") + echo "file: ${file} size: ${file_size}" + cat ${file} >/dev/null +done + +echo "Read file list done." diff --git a/misc/optimizer/script/file_list.txt b/misc/optimizer/script/file_list.txt new file mode 100644 index 0000000000..f75bc7a6a1 --- /dev/null +++ b/misc/optimizer/script/file_list.txt @@ -0,0 +1,12 @@ +/lib/x86_64-linux-gnu/ld-2.31.so +/lib/x86_64-linux-gnu/libc-2.31.so +/lib/x86_64-linux-gnu/libtinfo.so.6.2 +/lib/x86_64-linux-gnu/libdl-2.31.so +/lib/x86_64-linux-gnu/libnss_files-2.31.so +/lib/x86_64-linux-gnu/libselinux.so.1 +/usr/lib/x86_64-linux-gnu/libpcre2-8.so.0.10.1 +/lib/x86_64-linux-gnu/libpthread-2.31.so +/docker-entrypoint.sh +/docker-entrypoint.d/10-listen-on-ipv6-by-default.sh +/docker-entrypoint.d/20-envsubst-on-templates.sh +/docker-entrypoint.d/30-tune-worker-processes.sh diff --git a/misc/optimizer/ubuntu.yaml b/misc/optimizer/ubuntu.yaml deleted file mode 100644 index b70a977beb..0000000000 --- a/misc/optimizer/ubuntu.yaml +++ /dev/null @@ -1,13 +0,0 @@ -metadata: - name: ubuntu - -image: - image: ubuntu:22.04 - -command: - - /bin/sh - - -c - - echo hello - -log_path: ubuntu.0.log -linux: {} diff --git a/misc/snapshotter/config.toml b/misc/snapshotter/config.toml index ecc8ea6b42..dbc9728830 100644 --- a/misc/snapshotter/config.toml +++ b/misc/snapshotter/config.toml @@ -32,6 +32,17 @@ recover_policy = "restart" # Nydusd worker thread number to handle FUSE or fscache requests, [0-1024]. # Setting to 0 will use the default configuration of nydusd. threads_number = 4 +# Log rotation size for nydusd, in unit MB(megabytes) +log_rotation_size = 100 + + +[cgroup] +# Whether to use separate cgroup for nydusd. +enable = true +# The memory limit for nydusd cgroup, which contains all nydusd processes. +# Percentage is supported as well, please ensure it is end with "%". +# The default unit is bytes. Acceptable values include "209715200", "200MiB", "200Mi" and "10%". +memory_limit = "" [log] # Print logs to stdout rather than logging files @@ -44,7 +55,7 @@ log_rotation_local_time = true log_rotation_max_age = 7 log_rotation_max_backups = 5 # In unit MB(megabytes) -log_rotation_max_size = 1 +log_rotation_max_size = 100 [metrics] # Enable by assigning an address, empty indicates metrics server is disabled diff --git a/pkg/cgroup/cgroup.go b/pkg/cgroup/cgroup.go new file mode 100644 index 0000000000..c74b572b97 --- /dev/null +++ b/pkg/cgroup/cgroup.go @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2023. Nydus Developers. All rights reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package cgroup + +import ( + "errors" + + "github.com/containerd/cgroups" + v1 "github.com/containerd/nydus-snapshotter/pkg/cgroup/v1" + v2 "github.com/containerd/nydus-snapshotter/pkg/cgroup/v2" +) + +const ( + defaultSlice = "system.slice" +) + +var ( + ErrCgroupNotSupported = errors.New("cgroups: cgroup not supported") +) + +type Config struct { + MemoryLimitInBytes int64 +} + +type DaemonCgroup interface { + // Delete the current cgroup. + Delete() error + // Add a process to current cgroup. + AddProc(pid int) error +} + +func createCgroup(name string, config Config) (DaemonCgroup, error) { + if cgroups.Mode() == cgroups.Unified { + return v2.NewCgroup(defaultSlice, name, config.MemoryLimitInBytes) + } + + return v1.NewCgroup(defaultSlice, name, config.MemoryLimitInBytes) +} + +func supported() bool { + return cgroups.Mode() != cgroups.Unavailable +} + +func displayMode() string { + switch cgroups.Mode() { + case cgroups.Legacy: + return "legacy" + case cgroups.Hybrid: + return "hybrid" + case cgroups.Unified: + return "unified" + case cgroups.Unavailable: + return "unavailable" + default: + return "unknown" + } +} diff --git a/pkg/cgroup/manager.go b/pkg/cgroup/manager.go new file mode 100644 index 0000000000..0ca9dd0b63 --- /dev/null +++ b/pkg/cgroup/manager.go @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2023. Nydus Developers. All rights reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package cgroup + +import ( + "github.com/containerd/containerd/log" +) + +type Manager struct { + name string + config Config + cgroup DaemonCgroup +} + +type Opt struct { + Name string + Config Config +} + +func NewManager(opt Opt) (*Manager, error) { + if !supported() { + log.L.Warn("cgroup is not supported") + return nil, ErrCgroupNotSupported + } + + log.L.Infof("cgroup mode: %s", displayMode()) + cg, err := createCgroup(opt.Name, opt.Config) + if err != nil { + return nil, err + } + + return &Manager{ + name: opt.Name, + config: opt.Config, + cgroup: cg, + }, nil +} + +// Please make sure the *Manager is not null. +func (m *Manager) AddProc(pid int) error { + return m.cgroup.AddProc(pid) +} + +// Please make sure the *Manager is not null. +func (m *Manager) Delete() error { + return m.cgroup.Delete() +} diff --git a/pkg/cgroup/v1/v1.go b/pkg/cgroup/v1/v1.go new file mode 100644 index 0000000000..4dab8498b5 --- /dev/null +++ b/pkg/cgroup/v1/v1.go @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2023. Nydus Developers. All rights reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package v1 + +import ( + "github.com/containerd/cgroups/v3/cgroup1" + "github.com/containerd/containerd/log" + "github.com/opencontainers/runtime-spec/specs-go" + "github.com/pkg/errors" +) + +type Cgroup struct { + controller cgroup1.Cgroup +} + +func generateHierarchy() cgroup1.Hierarchy { + return cgroup1.SingleSubsystem(cgroup1.Default, cgroup1.Memory) +} + +func NewCgroup(slice, name string, memoryLimitInBytes int64) (Cgroup, error) { + hierarchy := generateHierarchy() + specResources := &specs.LinuxResources{ + Memory: &specs.LinuxMemory{ + Limit: &memoryLimitInBytes, + }, + } + + controller, err := cgroup1.Load(cgroup1.Slice(slice, name), cgroup1.WithHiearchy(hierarchy)) + if err != nil && err != cgroup1.ErrCgroupDeleted { + return Cgroup{}, err + } + + if controller != nil { + processes, err := controller.Processes(cgroup1.Memory, true) + if err != nil { + return Cgroup{}, err + } + if len(processes) > 0 { + log.L.Infof("target cgroup is existed with processes %v", processes) + if err := controller.Update(specResources); err != nil { + return Cgroup{}, err + } + return Cgroup{ + controller: controller, + }, nil + } + if err := controller.Delete(); err != nil { + return Cgroup{}, err + } + } + + controller, err = cgroup1.New(cgroup1.Slice(slice, name), specResources, cgroup1.WithHiearchy(hierarchy)) + if err != nil { + return Cgroup{}, errors.Wrapf(err, "create cgroup") + } + log.L.Infof("create cgroup (v1) successful, state: %v", controller.State()) + + return Cgroup{ + controller: controller, + }, nil +} + +func (cg Cgroup) Delete() error { + processes, err := cg.controller.Processes(cgroup1.Memory, true) + if err != nil { + return err + } + if len(processes) > 0 { + log.L.Infof("skip destroy cgroup because of running daemon %v", processes) + return nil + } + return cg.controller.Delete() +} +func (cg Cgroup) AddProc(pid int) error { + err := cg.controller.AddProc(uint64(pid), cgroup1.Memory) + if err != nil { + return err + } + + log.L.Infof("add process %d to daemon cgroup successful", pid) + return nil +} diff --git a/pkg/cgroup/v2/v2.go b/pkg/cgroup/v2/v2.go new file mode 100644 index 0000000000..719295cc90 --- /dev/null +++ b/pkg/cgroup/v2/v2.go @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2023. Nydus Developers. All rights reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package v2 + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "strings" + + "github.com/containerd/cgroups/v3/cgroup2" + "github.com/containerd/containerd/log" + "golang.org/x/exp/slices" +) + +const ( + defaultRoot = "/sys/fs/cgroup" +) + +var ( + ErrRootMemorySubtreeControllerDisabled = errors.New("cgroups v2: root subtree controller for memory is disabled") +) + +type Cgroup struct { + manager *cgroup2.Manager +} + +func readSubtreeControllers(dir string) ([]string, error) { + b, err := os.ReadFile(filepath.Join(dir, "cgroup.subtree_control")) + if err != nil { + return nil, err + } + return strings.Fields(string(b)), nil +} + +func NewCgroup(slice, name string, memoryLimitInBytes int64) (Cgroup, error) { + resources := &cgroup2.Resources{ + Memory: &cgroup2.Memory{}, + } + if memoryLimitInBytes > -1 { + resources = &cgroup2.Resources{ + Memory: &cgroup2.Memory{ + Max: &memoryLimitInBytes, + }, + } + } + + rootSubtreeControllers, err := readSubtreeControllers(defaultRoot) + if err != nil { + return Cgroup{}, err + } + log.L.Infof("root subtree controllers: %s", rootSubtreeControllers) + + if !slices.Contains(rootSubtreeControllers, "memory") { + return Cgroup{}, ErrRootMemorySubtreeControllerDisabled + } + + m, err := cgroup2.NewManager(defaultRoot, fmt.Sprintf("/%s/%s", slice, name), resources) + if err != nil { + return Cgroup{}, err + } + + controllers, err := m.Controllers() + if err != nil { + return Cgroup{}, err + } + log.L.Infof("create cgroup (v2) successful, controllers: %v", controllers) + + return Cgroup{ + manager: m, + }, nil +} + +func (cg Cgroup) Delete() error { + if cg.manager != nil { + return cg.manager.Delete() + } + return nil +} +func (cg Cgroup) AddProc(pid int) error { + if cg.manager != nil { + err := cg.manager.AddProc(uint64(pid)) + if err != nil { + return err + } + log.L.Infof("add process %d to daemon cgroup successful", pid) + } + return nil +} diff --git a/pkg/converter/constant.go b/pkg/converter/constant.go index d590b48b6e..e23f3122df 100644 --- a/pkg/converter/constant.go +++ b/pkg/converter/constant.go @@ -19,6 +19,7 @@ const ( LayerAnnotationNydusBlobSize = "containerd.io/snapshot/nydus-blob-size" LayerAnnotationNydusBootstrap = "containerd.io/snapshot/nydus-bootstrap" LayerAnnotationNydusSourceChainID = "containerd.io/snapshot/nydus-source-chainid" + LayerAnnotationNydusEncryptedBlob = "containerd.io/snapshot/nydus-encrypted-blob" LayerAnnotationNydusReferenceBlobIDs = "containerd.io/snapshot/nydus-reference-blob-ids" diff --git a/pkg/converter/convert_unix.go b/pkg/converter/convert_unix.go index e0bbd05561..3b26b7131d 100644 --- a/pkg/converter/convert_unix.go +++ b/pkg/converter/convert_unix.go @@ -326,6 +326,9 @@ func Pack(ctx context.Context, dest io.Writer, opt PackOption) (io.WriteCloser, if opt.BatchSize != "" && opt.BatchSize != "0" { requiredFeatures.Add(tool.FeatureBatchSize) } + if opt.Encrypt { + requiredFeatures.Add(tool.FeatureEncrypt) + } detectedFeatures, err := tool.DetectFeatures(builderPath, requiredFeatures, tool.GetHelp) if err != nil { @@ -407,6 +410,7 @@ func packFromDirectory(ctx context.Context, dest io.Writer, opt PackOption, buil ChunkSize: opt.ChunkSize, Compressor: opt.Compressor, Timeout: opt.Timeout, + Encrypt: opt.Encrypt, Features: opt.features, }) @@ -510,6 +514,7 @@ func packFromTar(ctx context.Context, dest io.Writer, opt PackOption) (io.WriteC BatchSize: opt.BatchSize, Compressor: opt.Compressor, Timeout: opt.Timeout, + Encrypt: opt.Encrypt, Features: opt.features, }) @@ -852,6 +857,10 @@ func LayerConvertFunc(opt PackOption) converter.ConvertFunc { newDesc.Annotations[label.NydusRefLayer] = desc.Digest.String() } + if opt.Encrypt { + newDesc.Annotations[LayerAnnotationNydusEncryptedBlob] = "true" + } + if opt.Backend != nil { if err := opt.Backend.Push(ctx, cs, newDesc); err != nil { return nil, errors.Wrap(err, "push to storage backend") @@ -1009,10 +1018,12 @@ func convertManifest(ctx context.Context, cs content.Store, oldDesc ocispec.Desc // Update the config gc label manifestLabels[configGCLabelKey] = newConfigDesc.Digest.String() - // Associate a reference to the original OCI manifest. - // See the `subject` field description in - // https://github.com/opencontainers/image-spec/blob/main/manifest.md#image-manifest-property-descriptions - manifest.Subject = &oldDesc + if opt.WithReferrer { + // Associate a reference to the original OCI manifest. + // See the `subject` field description in + // https://github.com/opencontainers/image-spec/blob/main/manifest.md#image-manifest-property-descriptions + manifest.Subject = &oldDesc + } // Update image manifest in content store. newManifestDesc, err := writeJSON(ctx, cs, manifest, manifestDesc, manifestLabels) @@ -1134,6 +1145,11 @@ func MergeLayers(ctx context.Context, cs content.Store, descs []ocispec.Descript if opt.OCIRef { blobDesc.Annotations[label.NydusRefLayer] = layers[idx].OriginalDigest.String() } + + if len(opt.EncryptRecipients) != 0 { + blobDesc.Annotations[LayerAnnotationNydusEncryptedBlob] = "true" + } + blobDescs = append(blobDescs, blobDesc) } diff --git a/pkg/converter/tool/builder.go b/pkg/converter/tool/builder.go index ce808951a7..78f860029a 100644 --- a/pkg/converter/tool/builder.go +++ b/pkg/converter/tool/builder.go @@ -40,6 +40,7 @@ type PackOption struct { AlignedChunk bool ChunkSize string BatchSize string + Encrypt bool Timeout *time.Duration Features Features @@ -136,6 +137,9 @@ func buildPackArgs(option PackOption) []string { if option.Features.Contains(FeatureBatchSize) { args = append(args, "--batch-size", option.BatchSize) } + if option.Encrypt { + args = append(args, "--encrypt") + } args = append(args, option.SourcePath) return args diff --git a/pkg/converter/tool/feature.go b/pkg/converter/tool/feature.go index 79f5003a0b..346d3cc076 100644 --- a/pkg/converter/tool/feature.go +++ b/pkg/converter/tool/feature.go @@ -32,6 +32,9 @@ const ( // into a big batch chunk, which can reduce the the size of the image // and accelerate the runtime file loading. FeatureBatchSize Feature = "--batch-size" + // The option `--encrypt` enables converting directories, tar files + // or OCI images into encrypted nydus blob. + FeatureEncrypt Feature = "--encrypt" ) var requiredFeatures Features diff --git a/pkg/converter/tool/feature_test.go b/pkg/converter/tool/feature_test.go index 70990fa280..2a2aff3eb1 100644 --- a/pkg/converter/tool/feature_test.go +++ b/pkg/converter/tool/feature_test.go @@ -375,6 +375,71 @@ func TestDetectFeature(t *testing.T) { Print help information `), }, + { + name: "'--encrypt' is supported in v2.2.0-261-g22ad0e2c", + feature: FeatureEncrypt, + expect: true, + helpMsg: []byte(` + Create RAFS filesystems from directories, tar files or OCI images + + Usage: nydus-image create [OPTIONS] + + Arguments: + source from which to build the RAFS filesystem + + Options: + -L, --log-file + Log file path + -t, --type + Conversion type: [default: dir-rafs] [possible values: directory, dir-rafs, estargz-rafs, estargz-ref, estargztoc-ref, tar-rafs, tar-tarfs, targz-rafs, targz-ref, stargz_index] + -B, --bootstrap + File path to save the generated RAFS metadata blob + -l, --log-level + Log level: [default: info] [possible values: trace, debug, info, warn, error] + -D, --blob-dir + Directory path to save generated RAFS metadata and data blobs + -b, --blob + File path to save the generated RAFS data blob + --blob-inline-meta + Inline RAFS metadata and blob metadata into the data blob + --blob-id + OSS object id for the generated RAFS data blob + --blob-data-size + Set data blob size for 'estargztoc-ref' conversion + --chunk-size + Set the size of data chunks, must be power of two and between 0x1000-0x1000000: + --batch-size + Set the batch size to merge small chunks, must be power of two, between 0x1000-0x1000000 or be zero: [default: 0] + --compressor + Algorithm to compress data chunks: [default: zstd] [possible values: none, lz4_block, zstd] + --digester + Algorithm to digest data chunks: [default: blake3] [possible values: blake3, sha256] + -C, --config + Configuration file for storage backend, cache and RAFS FUSE filesystem. + -v, --fs-version + Set RAFS format version number: [default: 6] [possible values: 5, 6] + --features + Enable/disable features [possible values: blob-toc] + --chunk-dict + File path of chunk dictionary for data deduplication + --parent-bootstrap + File path of the parent/referenced RAFS metadata blob (optional) + --aligned-chunk + Align uncompressed data chunks to 4K, only for RAFS V5 + --repeatable + Generate reproducible RAFS metadata + --whiteout-spec + Set the type of whiteout specification: [default: oci] [possible values: oci, overlayfs, none] + --prefetch-policy + Set data prefetch policy [default: none] [possible values: fs, blob, none] + -J, --output-json + File path to save operation result in JSON format + -E, --encrypt + Encrypt the generated RAFS metadata and data blobs + -h, --help + Print help information + `), + }, { name: "'--type tar-rafs' is not supported in v2.1.4", @@ -552,6 +617,69 @@ func TestDetectFeature(t *testing.T) { source path `), }, + { + name: "'--encrypt' is not supported in v2.2.0", + feature: FeatureEncrypt, + expect: false, + helpMsg: []byte(` + Create RAFS filesystems from directories, tar files or OCI images + + Usage: nydus-image create [OPTIONS] + + Arguments: + source from which to build the RAFS filesystem + + Options: + -L, --log-file + Log file path + -t, --type + Conversion type: [default: dir-rafs] [possible values: directory, dir-rafs, estargz-rafs, estargz-ref, estargztoc-ref, tar-rafs, tar-tarfs, targz-rafs, targz-ref, stargz_index] + -B, --bootstrap + File path to save the generated RAFS metadata blob + -l, --log-level + Log level: [default: info] [possible values: trace, debug, info, warn, error] + -D, --blob-dir + Directory path to save generated RAFS metadata and data blobs + -b, --blob + File path to save the generated RAFS data blob + --blob-inline-meta + Inline RAFS metadata and blob metadata into the data blob + --blob-id + OSS object id for the generated RAFS data blob + --blob-data-size + Set data blob size for 'estargztoc-ref' conversion + --chunk-size + Set the size of data chunks, must be power of two and between 0x1000-0x1000000: + --batch-size + Set the batch size to merge small chunks, must be power of two, between 0x1000-0x1000000 or be zero: [default: 0] + --compressor + Algorithm to compress data chunks: [default: zstd] [possible values: none, lz4_block, zstd] + --digester + Algorithm to digest data chunks: [default: blake3] [possible values: blake3, sha256] + -C, --config + Configuration file for storage backend, cache and RAFS FUSE filesystem. + -v, --fs-version + Set RAFS format version number: [default: 6] [possible values: 5, 6] + --features + Enable/disable features [possible values: blob-toc] + --chunk-dict + File path of chunk dictionary for data deduplication + --parent-bootstrap + File path of the parent/referenced RAFS metadata blob (optional) + --aligned-chunk + Align uncompressed data chunks to 4K, only for RAFS V5 + --repeatable + Generate reproducible RAFS metadata + --whiteout-spec + Set the type of whiteout specification: [default: oci] [possible values: oci, overlayfs, none] + --prefetch-policy + Set data prefetch policy [default: none] [possible values: fs, blob, none] + -J, --output-json + File path to save operation result in JSON format + -h, --help + Print help information + `), + }, { name: "detectFeature should support empty input", feature: "", @@ -597,7 +725,7 @@ func TestDetectFeatures(t *testing.T) { expectErr: false, }, { - name: "should not support '--batch-size' and '--type tar-rafs' in v2.1.4", + name: "should not support '--encrypt', '--batch-size' or '--type tar-rafs' in v2.1.4", resetGlobal: true, disableTar2Rafs: true, helpText: []byte(` @@ -662,7 +790,7 @@ func TestDetectFeatures(t *testing.T) { ARGS: ... source path to build the nydus image from `), - required: Features{FeatureTar2Rafs: {}, FeatureBatchSize: {}}, + required: Features{FeatureTar2Rafs: {}, FeatureBatchSize: {}, FeatureEncrypt: {}}, detected: Features{}, expectErr: false, }, diff --git a/pkg/converter/types.go b/pkg/converter/types.go index c9795d946c..5bb734f844 100644 --- a/pkg/converter/types.go +++ b/pkg/converter/types.go @@ -79,6 +79,8 @@ type PackOption struct { Backend Backend // Timeout cancels execution once exceed the specified time. Timeout *time.Duration + // Whether the generated Nydus blobs should be encrypted. + Encrypt bool // Features keeps a feature list supported by newer version of builder, // It is detected automatically, so don't export it. @@ -105,6 +107,16 @@ type MergeOption struct { OCI bool // OCIRef enables converting OCI tar(.gz) blob to nydus referenced blob. OCIRef bool + // WithReferrer associates a reference to the original OCI manifest. + // See the `subject` field description in + // https://github.com/opencontainers/image-spec/blob/main/manifest.md#image-manifest-property-descriptions + // + // With this association, we can track all nydus images associated with + // an OCI image. For example, in Harbor we can cascade to show nydus + // images linked to an OCI image, deleting the OCI image can also delete + // the corresponding nydus images. At runtime, nydus snapshotter can also + // automatically upgrade an OCI image run to nydus image. + WithReferrer bool // Backend uploads blobs generated by nydus-image builder to a backend storage. Backend Backend // Timeout cancels execution once exceed the specified time. diff --git a/pkg/daemon/command/command.go b/pkg/daemon/command/command.go index fdbcf894d6..968a2ee501 100644 --- a/pkg/daemon/command/command.go +++ b/pkg/daemon/command/command.go @@ -64,7 +64,7 @@ func BuildCommand(opts []Opt) ([]string, error) { value := v.Field(i).Interface() - pair := []string{fmt.Sprintf("--%s", tag.Get("name")), fmt.Sprintf("%s", value)} + pair := []string{fmt.Sprintf("--%s", tag.Get("name")), fmt.Sprintf("%v", value)} args = append(args, pair...) case "subcommand": // Zero value will be skipped appending to command line @@ -166,6 +166,12 @@ func WithLogLevel(l string) Opt { } } +func WithLogRotationSize(l int) Opt { + return func(cmd *DaemonCommand) { + cmd.LogRotationSize = l + } +} + func WithSupervisor(s string) Opt { return func(cmd *DaemonCommand) { cmd.Supervisor = s diff --git a/pkg/daemon/config.go b/pkg/daemon/config.go index bd2252383a..240c32f79e 100644 --- a/pkg/daemon/config.go +++ b/pkg/daemon/config.go @@ -40,6 +40,9 @@ func WithRef(ref int32) NewDaemonOpt { func WithLogDir(dir string) NewDaemonOpt { return func(d *Daemon) error { + if err := os.MkdirAll(dir, 0755); err != nil { + return errors.Wrapf(err, "create logging dir %s", dir) + } d.States.LogDir = filepath.Join(dir, d.ID()) return nil } @@ -63,6 +66,13 @@ func WithLogLevel(logLevel string) NewDaemonOpt { } } +func WithLogRotationSize(logRotationSize int) NewDaemonOpt { + return func(d *Daemon) error { + d.States.LogRotationSize = logRotationSize + return nil + } +} + func WithConfigDir(dir string) NewDaemonOpt { return func(d *Daemon) error { s := filepath.Join(dir, d.ID()) diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index cadd030688..12acf0f1dc 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -39,14 +39,15 @@ type NewDaemonOpt func(d *Daemon) error type States struct { // A unique ID generated by daemon manager to identify the nydusd instance. - ID string - ProcessID int - APISocket string - LogDir string - LogLevel string - LogToStdout bool - DaemonMode config.DaemonMode - FsDriver string + ID string + ProcessID int + APISocket string + LogDir string + LogLevel string + LogRotationSize int + LogToStdout bool + DaemonMode config.DaemonMode + FsDriver string // Fusedev mountpoint on host kernel, the fscache fs driver doesn't need a host kernel mountpoint. Mountpoint string ThreadNum int diff --git a/pkg/daemon/rafs.go b/pkg/daemon/rafs.go index ca19b1c352..5531ae4fec 100644 --- a/pkg/daemon/rafs.go +++ b/pkg/daemon/rafs.go @@ -104,6 +104,7 @@ type Rafs struct { // Usually is the image reference ImageID string DaemonID string + FsDriver string // Given by containerd SnapshotID string SnapshotDir string @@ -113,9 +114,10 @@ type Rafs struct { Annotations map[string]string } -func NewRafs(snapshotID, imageID string) (*Rafs, error) { +func NewRafs(snapshotID, imageID, fsDriver string) (*Rafs, error) { snapshotDir := path.Join(config.GetSnapshotsRootDir(), snapshotID) rafs := &Rafs{ + FsDriver: fsDriver, ImageID: imageID, SnapshotID: snapshotID, SnapshotDir: snapshotDir, @@ -139,6 +141,14 @@ func (r *Rafs) GetSnapshotDir() string { return r.SnapshotDir } +func (r *Rafs) GetFsDriver() string { + if r.FsDriver != "" { + return r.FsDriver + } + + return config.GetFsDriver() +} + // Blob caches' chunk bitmap and meta headers are stored here. func (r *Rafs) FscacheWorkDir() string { return filepath.Join(r.SnapshotDir, "fs") diff --git a/pkg/filesystem/config.go b/pkg/filesystem/config.go index 10184991a8..773af0bac6 100644 --- a/pkg/filesystem/config.go +++ b/pkg/filesystem/config.go @@ -8,6 +8,7 @@ package filesystem import ( + "github.com/containerd/nydus-snapshotter/config" "github.com/containerd/nydus-snapshotter/pkg/cache" "github.com/containerd/nydus-snapshotter/pkg/manager" "github.com/containerd/nydus-snapshotter/pkg/referrer" @@ -31,7 +32,14 @@ func WithManager(pm *manager.Manager) NewFSOpt { return errors.New("process manager cannot be nil") } - fs.Manager = pm + if pm.FsDriver == config.FsDriverFusedev { + fs.fusedevManager = pm + } else if pm.FsDriver == config.FsDriverFscache { + fs.fscacheManager = pm + } + + fs.enabledManagers = append(fs.enabledManagers, pm) + return nil } } diff --git a/pkg/filesystem/fs.go b/pkg/filesystem/fs.go index fcc8f347f3..6d71416d9b 100644 --- a/pkg/filesystem/fs.go +++ b/pkg/filesystem/fs.go @@ -35,36 +35,23 @@ import ( "github.com/containerd/nydus-snapshotter/pkg/stargz" ) -// RafsV6 layout: 1k + SuperBlock(128) + SuperBlockExtended(256) -// RafsV5 layout: 8K superblock -// So we only need to read the MaxSuperBlockSize size to include both v5 and v6 superblocks -const MaxSuperBlockSize = 8 * 1024 -const ( - BootstrapFile string = "image/image.boot" - LegacyBootstrapFile string = "image.boot" - DummyMountpoint string = "/dummy" -) - +// TODO: refact `enabledManagers` and `xxxManager` into `ManagerCoordinator` type Filesystem struct { - // Managing all daemons serving filesystem. - Manager *manager.Manager + fusedevSharedDaemon *daemon.Daemon + fscacheSharedDaemon *daemon.Daemon + blockdevManager *manager.Manager + fusedevManager *manager.Manager + fscacheManager *manager.Manager + nodevManager *manager.Manager + enabledManagers []*manager.Manager cacheMgr *cache.Manager referrerMgr *referrer.Manager - sharedDaemon *daemon.Daemon stargzResolver *stargz.Resolver verifier *signature.Verifier nydusImageBinaryPath string rootMountpoint string } -func (fs *Filesystem) TryRetainSharedDaemon(d *daemon.Daemon) { - // FsDriver can be changed between two startups. - if d.HostMountpoint() == fs.rootMountpoint || config.GetFsDriver() == config.FsDriverFscache { - fs.sharedDaemon = d - d.IncRef() - } -} - // NewFileSystem initialize Filesystem instance // It does mount image layers by starting nydusd doing FUSE mount or not. func NewFileSystem(ctx context.Context, opt ...NewFSOpt) (*Filesystem, error) { @@ -76,31 +63,70 @@ func NewFileSystem(ctx context.Context, opt ...NewFSOpt) (*Filesystem, error) { } } - // Try to reconnect to running daemons - recoveringDaemons, liveDaemons, err := fs.Manager.Recover(ctx) - if err != nil { - return nil, errors.Wrap(err, "failed to reconnect daemons") + recoveringDaemons := make(map[string]*daemon.Daemon, 0) + liveDaemons := make(map[string]*daemon.Daemon, 0) + for _, fsManager := range fs.enabledManagers { + err := fsManager.Recover(ctx, &recoveringDaemons, &liveDaemons) + if err != nil { + return nil, errors.Wrap(err, "reconnect daemons and recover filesystem instance") + } + } + + var hasFscacheSharedDaemon = false + var hasFusedevSharedDaemon = false + for _, daemon := range liveDaemons { + if daemon.States.FsDriver == config.FsDriverFscache { + hasFscacheSharedDaemon = true + } else if daemon.States.FsDriver == config.FsDriverFusedev && daemon.IsSharedDaemon() { + hasFusedevSharedDaemon = true + } + } + for _, daemon := range recoveringDaemons { + if daemon.States.FsDriver == config.FsDriverFscache { + hasFscacheSharedDaemon = true + } else if daemon.States.FsDriver == config.FsDriverFusedev && daemon.IsSharedDaemon() { + hasFusedevSharedDaemon = true + } } // Try to bring up the shared daemon early. // With found recovering daemons, it must be the case that snapshotter is being restarted. - if config.GetDaemonMode() == config.DaemonModeShared && len(liveDaemons) == 0 && len(recoveringDaemons) == 0 { - // Situations that shared daemon is not found: - // 1. The first time this nydus-snapshotter runs - // 2. Daemon record is wrongly deleted from DB. Above reconnecting already gathers - // all daemons but still not found shared daemon. The best workaround is to start - // a new nydusd for it. - // TODO: We still need to consider shared daemon the time sequence of initializing daemon, - // start daemon commit its state to DB and retrieving its state. - log.L.Infof("initializing the shared nydus daemon") - if err := fs.initSharedDaemon(); err != nil { - return nil, errors.Wrap(err, "start shared nydusd daemon") + // Situations that shared daemon is not found: + // 1. The first time this nydus-snapshotter runs + // 2. Daemon record is wrongly deleted from DB. Above reconnecting already gathers + // all daemons but still not found shared daemon. The best workaround is to start + // a new nydusd for it. + // TODO: We still need to consider shared daemon the time sequence of initializing daemon, + // start daemon commit its state to DB and retrieving its state. + if fs.fscacheManager == nil { + if hasFscacheSharedDaemon { + return nil, errors.Errorf("shared fscache daemon is present, but manager is missing") + } + } else if !hasFscacheSharedDaemon && fs.fscacheSharedDaemon == nil { + log.L.Infof("initializing shared nydus daemon for fscache") + if err := fs.initSharedDaemon(fs.fscacheManager); err != nil { + return nil, errors.Wrap(err, "start shared nydusd daemon for fscache") + } + } + if fs.fusedevManager == nil { + if hasFusedevSharedDaemon { + return nil, errors.Errorf("shared fusedev daemon is present, but manager is missing") + } + } else if config.IsFusedevSharedModeEnabled() && !hasFusedevSharedDaemon && fs.fusedevSharedDaemon == nil { + log.L.Infof("initializing shared nydus daemon for fusedev") + if err := fs.initSharedDaemon(fs.fusedevManager); err != nil { + return nil, errors.Wrap(err, "start shared nydusd daemon for fusedev") } } // Try to bring all persisted and stopped nydusd up and remount Rafs for _, d := range recoveringDaemons { - if err := fs.Manager.StartDaemon(d); err != nil { + d.ClearVestige() + fsManager, err := fs.getManager(d.States.FsDriver) + if err != nil { + return nil, errors.Wrapf(err, "get filesystem manager for daemon %s", d.States.ID) + } + if err := fsManager.StartDaemon(d); err != nil { return nil, errors.Wrapf(err, "start daemon %s", d.ID()) } if err := d.WaitUntilState(types.DaemonStateRunning); err != nil { @@ -109,44 +135,47 @@ func NewFileSystem(ctx context.Context, opt ...NewFSOpt) (*Filesystem, error) { if err := d.RecoveredMountInstances(); err != nil { return nil, errors.Wrapf(err, "recover mounts for daemon %s", d.ID()) } - - // Found shared daemon - // Fscache userspace daemon has no host mountpoint. fs.TryRetainSharedDaemon(d) - } for _, d := range liveDaemons { - // Found shared daemon fs.TryRetainSharedDaemon(d) } return &fs, nil } -// The globally shared daemon must be running before using it -// So we don't check if it is none here -// NIL shared damon means no shared daemon is ever needed and required. -func (fs *Filesystem) getSharedDaemon() *daemon.Daemon { - return fs.sharedDaemon +func (fs *Filesystem) TryRetainSharedDaemon(d *daemon.Daemon) { + if d.States.FsDriver == config.FsDriverFscache { + if fs.fscacheSharedDaemon == nil { + log.L.Debug("retain fscache shared daemon") + fs.fscacheSharedDaemon = d + d.IncRef() + } + } else if d.States.FsDriver == config.FsDriverFusedev { + if fs.fusedevSharedDaemon == nil && d.HostMountpoint() == fs.rootMountpoint { + log.L.Debug("retain fusedev shared daemon") + fs.fusedevSharedDaemon = d + d.IncRef() + } + } } -func (fs *Filesystem) decideDaemonMountpoint(rafs *daemon.Rafs) (string, error) { - var m string - if config.GetDaemonMode() == config.DaemonModeShared { - if config.GetFsDriver() == config.FsDriverFscache { - return "", nil +func (fs *Filesystem) TryStopSharedDaemon() { + if fs.fusedevSharedDaemon != nil { + if fs.fusedevSharedDaemon.GetRef() == 1 { + if err := fs.fusedevManager.DestroyDaemon(fs.fusedevSharedDaemon); err != nil { + log.L.WithError(err).Errorf("Terminate shared daemon %s failed", fs.fusedevSharedDaemon.ID()) + } } - m = fs.rootMountpoint - } else { - m = path.Join(rafs.GetSnapshotDir(), "mnt") } - - if err := os.MkdirAll(m, 0755); err != nil { - return "", errors.Wrapf(err, "create directory %s", m) + if fs.fscacheSharedDaemon != nil { + if fs.fscacheSharedDaemon.GetRef() == 1 { + if err := fs.fscacheManager.DestroyDaemon(fs.fscacheSharedDaemon); err != nil { + log.L.WithError(err).Errorf("Terminate shared daemon %s failed", fs.fscacheSharedDaemon.ID()) + } + } } - - return m, nil } // WaitUntilReady wait until daemon ready by snapshotID, it will wait until nydus domain socket established @@ -162,16 +191,18 @@ func (fs *Filesystem) WaitUntilReady(snapshotID string) error { return errors.Wrapf(errdefs.ErrNotFound, "no instance %s", snapshotID) } - d := fs.Manager.GetByDaemonID(instance.DaemonID) - if d == nil { - return errors.Wrapf(errdefs.ErrNotFound, "snapshot id %s daemon id %s", snapshotID, instance.DaemonID) - } + if instance.GetFsDriver() == config.FsDriverFscache || instance.GetFsDriver() == config.FsDriverFusedev { + d, err := fs.getDaemonByRafs(instance) + if err != nil { + return errors.Wrapf(err, "snapshot id %s daemon id %s", snapshotID, instance.DaemonID) + } - if err := d.WaitUntilState(types.DaemonStateRunning); err != nil { - return err - } + if err := d.WaitUntilState(types.DaemonStateRunning); err != nil { + return err + } - log.L.Infof("Nydus remote snapshot %s is ready", snapshotID) + log.L.Debugf("Nydus remote snapshot %s is ready", snapshotID) + } return nil } @@ -180,8 +211,19 @@ func (fs *Filesystem) WaitUntilReady(snapshotID string) error { // this method will fork nydus daemon and manage it in the internal store, and indexed by snapshotID // It must set up all necessary resources during Mount procedure and revoke any step if necessary. func (fs *Filesystem) Mount(snapshotID string, labels map[string]string) (err error) { - // If NoneDaemon mode, we don't mount nydus on host - if !fs.DaemonBacked() { + // TODO: support tarfs + isTarfsMode := false + fsDriver := config.GetFsDriver() + if isTarfsMode { + fsDriver = config.FsDriverBlockdev + } else if !fs.DaemonBacked() { + fsDriver = config.FsDriverNodev + } + isSharedFusedev := fsDriver == config.FsDriverFusedev && config.GetDaemonMode() == config.DaemonModeShared + useSharedDaemon := fsDriver == config.FsDriverFscache || isSharedFusedev + + // Do not create RAFS instance in case of nodev. + if fsDriver == config.FsDriverNodev { return nil } @@ -200,11 +242,11 @@ func (fs *Filesystem) Mount(snapshotID string, labels map[string]string) (err er r := daemon.RafsSet.Get(snapshotID) if r != nil { - // Containerd can handle this error? + // Instance already exists, how could this happen? Can containerd handle this case? return nil } - rafs, err := daemon.NewRafs(snapshotID, imageID) + rafs, err := daemon.NewRafs(snapshotID, imageID, fsDriver) if err != nil { return errors.Wrapf(err, "create rafs instance %s", snapshotID) } @@ -215,68 +257,75 @@ func (fs *Filesystem) Mount(snapshotID string, labels map[string]string) (err er } }() - var d *daemon.Daemon - if fs.getSharedDaemon() != nil { - d = fs.getSharedDaemon() - d.AddInstance(rafs) - } else { - mp, err := fs.decideDaemonMountpoint(rafs) - if err != nil { - return err - } - d, err = fs.createDaemon(mp, 0) - // if daemon already exists for snapshotID, just return - if err != nil { - if errdefs.IsAlreadyExists(err) { - return nil - } - return err - } - d.AddInstance(rafs) + fsManager, err := fs.getManager(fsDriver) + if err != nil { + return errors.Wrapf(err, "get filesystem manager for snapshot %s", snapshotID) } - bootstrap, err := rafs.BootstrapFile() if err != nil { - return errors.Wrapf(err, "find bootstrap file of daemon %s snapshot %s", d.ID(), snapshotID) + return errors.Wrapf(err, "find bootstrap file snapshot %s", snapshotID) } - workDir := rafs.FscacheWorkDir() - // Nydusd uses cache manager's directory to store blob caches. So cache - // manager knows where to find those blobs. - cacheDir := fs.cacheMgr.CacheDir() + var d *daemon.Daemon + if fsDriver == config.FsDriverFscache || fsDriver == config.FsDriverFusedev { + if useSharedDaemon { + d, err = fs.getSharedDaemon(fsDriver) + if err != nil { + return err + } + } else { + mp, err := fs.decideDaemonMountpoint(fsDriver, false, rafs) + if err != nil { + return err + } + d, err = fs.createDaemon(fsManager, config.DaemonModeDedicated, mp, 0) + // if daemon already exists for snapshotID, just return + if err != nil && !errdefs.IsAlreadyExists(err) { + return err + } - params := map[string]string{ - daemonconfig.Bootstrap: bootstrap, - // Fscache driver stores blob cache bitmap and blob header files here - daemonconfig.WorkDir: workDir, - daemonconfig.CacheDir: cacheDir} + // TODO: reclaim resources on error + } - cfg := deepcopy.Copy(fs.Manager.DaemonConfig).(daemonconfig.DaemonConfig) - err = daemonconfig.SupplementDaemonConfig(cfg, imageID, snapshotID, false, labels, params) - if err != nil { - return errors.Wrap(err, "supplement configuration") - } - - // TODO: How to manage rafs configurations on-disk? separated json config file or DB record? - // In order to recover erofs mount, the configuration file has to be persisted. - var configSubDir string - if fs.getSharedDaemon() == nil { - // Associate daemon config object when creating a new daemon object to avoid - // reading disk file again and again. - // For shared daemon, each rafs instance has its own configuration, so we don't - // attach a config interface to daemon in this case. - d.Config = cfg - } else { - configSubDir = snapshotID - } + // Nydusd uses cache manager's directory to store blob caches. So cache + // manager knows where to find those blobs. + cacheDir := fs.cacheMgr.CacheDir() + // Fscache driver stores blob cache bitmap and blob header files here + workDir := rafs.FscacheWorkDir() + params := map[string]string{ + daemonconfig.Bootstrap: bootstrap, + daemonconfig.WorkDir: workDir, + daemonconfig.CacheDir: cacheDir, + } + cfg := deepcopy.Copy(fsManager.DaemonConfig).(daemonconfig.DaemonConfig) + err = daemonconfig.SupplementDaemonConfig(cfg, imageID, snapshotID, false, labels, params) + if err != nil { + return errors.Wrap(err, "supplement configuration") + } - err = cfg.DumpFile(d.ConfigFile(configSubDir)) - if err != nil { - if errors.Is(err, errdefs.ErrAlreadyExists) { - log.L.Debugf("Configuration file %s already exits", d.ConfigFile(configSubDir)) + // TODO: How to manage rafs configurations on-disk? separated json config file or DB record? + // In order to recover erofs mount, the configuration file has to be persisted. + var configSubDir string + if useSharedDaemon { + configSubDir = snapshotID } else { - return errors.Wrap(err, "dump daemon configuration file") + // Associate daemon config object when creating a new daemon object to avoid + // reading disk file again and again. + // For shared daemon, each rafs instance has its own configuration, so we don't + // attach a config interface to daemon in this case. + d.Config = cfg + } + + err = cfg.DumpFile(d.ConfigFile(configSubDir)) + if err != nil { + if errors.Is(err, errdefs.ErrAlreadyExists) { + log.L.Debugf("Configuration file %s already exits", d.ConfigFile(configSubDir)) + } else { + return errors.Wrap(err, "dump daemon configuration file") + } } + + d.AddInstance(rafs) } // if publicKey is not empty we should verify bootstrap file of image @@ -285,13 +334,23 @@ func (fs *Filesystem) Mount(snapshotID string, labels map[string]string) (err er return errors.Wrapf(err, "verify signature of daemon %s", d.ID()) } - err = fs.mount(d, rafs) - if err != nil { - return errors.Wrapf(err, "mount file system by daemon %s, snapshot %s", d.ID(), snapshotID) + switch fsDriver { + case config.FsDriverFscache: + err = fs.mountRemote(fsManager, useSharedDaemon, d, rafs) + if err != nil { + return errors.Wrapf(err, "mount file system by daemon %s, snapshot %s", d.ID(), snapshotID) + } + case config.FsDriverFusedev: + err = fs.mountRemote(fsManager, useSharedDaemon, d, rafs) + if err != nil { + return errors.Wrapf(err, "mount file system by daemon %s, snapshot %s", d.ID(), snapshotID) + } + // case config.FsDriverBlockdev: + // TODO: support tarfs } // Persist it after associate instance after all the states are calculated. - if err := fs.Manager.NewInstance(rafs); err != nil { + if err := fsManager.NewInstance(rafs); err != nil { return errors.Wrapf(err, "create instance %s", snapshotID) } @@ -301,32 +360,38 @@ func (fs *Filesystem) Mount(snapshotID string, labels map[string]string) (err er func (fs *Filesystem) Umount(ctx context.Context, snapshotID string) error { instance := daemon.RafsSet.Get(snapshotID) if instance == nil { - log.L.Debugf("Not a rafs instance. ID %s", snapshotID) - return nil - } - - daemon := fs.Manager.GetByDaemonID(instance.DaemonID) - if daemon == nil { - log.L.Infof("snapshot %s does not correspond to a nydusd", snapshotID) + log.L.Debugf("no RAFS filesystem instance associated with snapshot %s", snapshotID) return nil } - log.L.Infof("umount snapshot %s, daemon ID %s", snapshotID, daemon.ID()) - - daemon.RemoveInstance(snapshotID) - if err := daemon.UmountInstance(instance); err != nil { - return errors.Wrapf(err, "umount instance %s", snapshotID) + fsDriver := instance.GetFsDriver() + fsManager, err := fs.getManager(fsDriver) + if err != nil { + return errors.Wrapf(err, "get manager for filesystem instance %s", instance.DaemonID) } - if err := fs.Manager.RemoveInstance(snapshotID); err != nil { - return errors.Wrapf(err, "remove instance %s", snapshotID) - } + if fsDriver == config.FsDriverFscache || fsDriver == config.FsDriverFusedev { + daemon, err := fs.getDaemonByRafs(instance) + if err != nil { + log.L.Debugf("snapshot %s has no associated nydusd", snapshotID) + return errors.Wrapf(err, "get daemon with ID %s for snapshot %s", instance.DaemonID, snapshotID) + } - // Once daemon's reference reaches 0, destroy the whole daemon - if daemon.GetRef() == 0 { - if err := fs.Manager.DestroyDaemon(daemon); err != nil { - return errors.Wrapf(err, "destroy daemon %s", daemon.ID()) + daemon.RemoveInstance(snapshotID) + if err := fsManager.RemoveInstance(snapshotID); err != nil { + return errors.Wrapf(err, "remove snapshot %s", snapshotID) } + if err := daemon.UmountInstance(instance); err != nil { + return errors.Wrapf(err, "umount instance %s", snapshotID) + } + // Once daemon's reference reaches 0, destroy the whole daemon + if daemon.GetRef() == 0 { + if err := fsManager.DestroyDaemon(daemon); err != nil { + return errors.Wrapf(err, "destroy daemon %s", daemon.ID()) + } + } + // } else if fsDriver == config.FsDriverBlockdev { + // TODO: support tarfs } return nil @@ -335,6 +400,7 @@ func (fs *Filesystem) Umount(ctx context.Context, snapshotID string) error { // How much space the layer/blob cache filesystem is occupying // The blob digest mush have `sha256:` prefixed, otherwise, throw errors. func (fs *Filesystem) CacheUsage(ctx context.Context, blobDigest string) (snapshots.Usage, error) { + log.L.Infof("cache usage %s", blobDigest) digest := digest.Digest(blobDigest) if err := digest.Validate(); err != nil { return snapshots.Usage{}, errors.Wrapf(err, "invalid blob digest from label %q, digest=%s", @@ -345,35 +411,46 @@ func (fs *Filesystem) CacheUsage(ctx context.Context, blobDigest string) (snapsh } func (fs *Filesystem) RemoveCache(blobDigest string) error { + log.L.Infof("remove cache %s", blobDigest) digest := digest.Digest(blobDigest) if err := digest.Validate(); err != nil { return errors.Wrapf(err, "invalid blob digest from label %q, digest=%s", snpkg.TargetLayerDigestLabel, blobDigest) } blobID := digest.Hex() - if config.GetFsDriver() == config.FsDriverFscache { - c, err := fs.getSharedDaemon().GetClient() + + if fs.fscacheManager != nil { + c, err := fs.fscacheSharedDaemon.GetClient() if err != nil { return err } // delete fscache blob cache file + // TODO: skip error for blob not existing if err := c.UnbindBlob("", blobID); err != nil { return err } return nil + } + return fs.cacheMgr.RemoveBlobCache(blobID) } // Try to stop all the running daemons if they are not referenced by any snapshots // Clean up resources along with the daemons. func (fs *Filesystem) Teardown(ctx context.Context) error { - for _, d := range fs.Manager.ListDaemons() { - for _, instance := range d.Instances.List() { - err := fs.Umount(ctx, instance.SnapshotID) - if err != nil { - log.L.Errorf("Failed to umount snapshot %s, %s", instance.SnapshotID, err) + for _, fsManager := range fs.enabledManagers { + if fsManager.FsDriver == config.FsDriverFscache || fsManager.FsDriver == config.FsDriverFusedev { + for _, d := range fsManager.ListDaemons() { + for _, instance := range d.Instances.List() { + err := fs.Umount(ctx, instance.SnapshotID) + if err != nil { + log.L.Errorf("Failed to umount snapshot %s, %s", instance.SnapshotID, err) + } + } } + // } else if fsManager.FsDriver == config.FsDriverBlockdev { + // TODO: support tarfs } } @@ -386,11 +463,10 @@ func (fs *Filesystem) MountPoint(snapshotID string) (string, error) { // existed on host. NoneDaemon mode does not start nydusd, so NO fuse mount is // ever performed. Only mount option carries meaningful info to containerd and // finally passes to shim. - return DummyMountpoint, nil + return fs.rootMountpoint, nil } rafs := daemon.RafsSet.Get(snapshotID) - if rafs != nil { return rafs.GetMountpoint(), nil } @@ -405,39 +481,66 @@ func (fs *Filesystem) BootstrapFile(id string) (string, error) { // daemon mountpoint to rafs mountpoint // calculate rafs mountpoint for snapshots mount slice. -func (fs *Filesystem) mount(d *daemon.Daemon, r *daemon.Rafs) error { - if config.GetDaemonMode() == config.DaemonModeShared { - if config.GetFsDriver() == config.FsDriverFusedev { +func (fs *Filesystem) mountRemote(fsManager *manager.Manager, useSharedDaemon bool, + d *daemon.Daemon, r *daemon.Rafs) error { + + if useSharedDaemon { + if fsManager.FsDriver == config.FsDriverFusedev { r.SetMountpoint(path.Join(d.HostMountpoint(), r.SnapshotID)) } else { r.SetMountpoint(path.Join(r.GetSnapshotDir(), "mnt")) } - if err := d.SharedMount(r); err != nil { return errors.Wrapf(err, "failed to mount") } } else { - err := fs.Manager.StartDaemon(d) + r.SetMountpoint(path.Join(d.HostMountpoint())) + err := fsManager.StartDaemon(d) if err != nil { - return err + return errors.Wrapf(err, "start daemon") } - r.SetMountpoint(path.Join(d.HostMountpoint())) - return errors.Wrapf(err, "start daemon") } return nil } +func (fs *Filesystem) decideDaemonMountpoint(fsDriver string, isSharedDaemonMode bool, rafs *daemon.Rafs) (string, error) { + m := "" + + if fsDriver == config.FsDriverFscache || fsDriver == config.FsDriverFusedev { + if isSharedDaemonMode { + m = fs.rootMountpoint + } else { + m = path.Join(rafs.GetSnapshotDir(), "mnt") + } + if err := os.MkdirAll(m, 0755); err != nil { + return "", errors.Wrapf(err, "create directory %s", m) + } + } + + return m, nil +} + // 1. Create a daemon instance // 2. Build command line // 3. Start daemon -func (fs *Filesystem) initSharedDaemon() (err error) { - mp, err := fs.decideDaemonMountpoint(nil) +func (fs *Filesystem) initSharedDaemon(fsManager *manager.Manager) (err error) { + var daemonMode config.DaemonMode + switch fsManager.FsDriver { + case config.FsDriverFscache: + daemonMode = config.DaemonModeShared + case config.FsDriverFusedev: + daemonMode = config.DaemonModeShared + default: + return errors.Errorf("unsupported filesystem driver %s", fsManager.FsDriver) + } + + mp, err := fs.decideDaemonMountpoint(fsManager.FsDriver, true, nil) if err != nil { return err } - d, err := fs.createDaemon(mp, 1) + d, err := fs.createDaemon(fsManager, daemonMode, mp, 0) if err != nil { return errors.Wrap(err, "initialize shared daemon") } @@ -445,7 +548,7 @@ func (fs *Filesystem) initSharedDaemon() (err error) { // FIXME: Daemon record should not be removed after starting daemon failure. defer func() { if err != nil { - if err := fs.Manager.DeleteDaemon(d); err != nil { + if err := fsManager.DeleteDaemon(d); err != nil { log.L.Errorf("Start nydusd daemon error %v", err) } } @@ -454,45 +557,36 @@ func (fs *Filesystem) initSharedDaemon() (err error) { // Shared nydusd daemon does not need configuration to start process but // it is loaded when requesting mount api // Dump the configuration file since it is reloaded when recovering the nydusd - d.Config = fs.Manager.DaemonConfig + d.Config = fsManager.DaemonConfig err = d.Config.DumpFile(d.ConfigFile("")) if err != nil && !errors.Is(err, errdefs.ErrAlreadyExists) { return errors.Wrapf(err, "dump configuration file %s", d.ConfigFile("")) } - if err := fs.Manager.StartDaemon(d); err != nil { + if err := fsManager.StartDaemon(d); err != nil { return errors.Wrap(err, "start shared daemon") } - fs.sharedDaemon = d + fs.TryRetainSharedDaemon(d) return } -func (fs *Filesystem) TryStopSharedDaemon() { - sharedDaemon := fs.getSharedDaemon() - if sharedDaemon != nil { - if sharedDaemon.GetRef() == 1 { - if err := fs.Manager.DestroyDaemon(sharedDaemon); err != nil { - log.L.WithError(err).Errorf("Terminate shared daemon %s failed", sharedDaemon.ID()) - } - } - } -} - // createDaemon create new nydus daemon by snapshotID and imageID // For fscache driver, no need to provide mountpoint to nydusd daemon. -func (fs *Filesystem) createDaemon(mountpoint string, ref int32) (d *daemon.Daemon, err error) { +func (fs *Filesystem) createDaemon(fsManager *manager.Manager, daemonMode config.DaemonMode, + mountpoint string, ref int32) (d *daemon.Daemon, err error) { opts := []daemon.NewDaemonOpt{ daemon.WithRef(ref), daemon.WithSocketDir(config.GetSocketRoot()), daemon.WithConfigDir(config.GetConfigRoot()), daemon.WithLogDir(config.GetLogDir()), daemon.WithLogLevel(config.GetLogLevel()), + daemon.WithLogRotationSize(config.GetDaemonLogRotationSize()), daemon.WithLogToStdout(config.GetLogToStdout()), daemon.WithNydusdThreadNum(config.GetDaemonThreadsNumber()), - daemon.WithFsDriver(config.GetFsDriver()), - daemon.WithDaemonMode(config.GetDaemonMode()), + daemon.WithFsDriver(fsManager.FsDriver), + daemon.WithDaemonMode(daemonMode), } if mountpoint != "" { @@ -508,13 +602,13 @@ func (fs *Filesystem) createDaemon(mountpoint string, ref int32) (d *daemon.Daem return nil, errors.Wrapf(err, "new daemon") } - if err = fs.Manager.NewDaemon(d); err != nil { + if err = fsManager.NewDaemon(d); err != nil { return nil, err } - if fs.Manager.SupervisorSet != nil { + if fsManager.SupervisorSet != nil { // Supervisor is strongly associated with real running nydusd daemon. - su := fs.Manager.SupervisorSet.NewSupervisor(d.ID()) + su := fsManager.SupervisorSet.NewSupervisor(d.ID()) if su == nil { return nil, errors.Errorf("create supervisor for daemon %s", d.ID()) @@ -528,3 +622,90 @@ func (fs *Filesystem) createDaemon(mountpoint string, ref int32) (d *daemon.Daem func (fs *Filesystem) DaemonBacked() bool { return config.GetDaemonMode() != config.DaemonModeNone } + +func (fs *Filesystem) getManager(fsDriver string) (*manager.Manager, error) { + switch fsDriver { + case config.FsDriverBlockdev: + if fs.blockdevManager != nil { + return fs.blockdevManager, nil + } + case config.FsDriverFscache: + if fs.fscacheManager != nil { + return fs.fscacheManager, nil + } + case config.FsDriverFusedev: + if fs.fusedevManager != nil { + return fs.fusedevManager, nil + } + case config.FsDriverNodev: + if fs.nodevManager != nil { + return fs.nodevManager, nil + } + } + + return nil, errors.Errorf("no manager for filesystem driver %s", fsDriver) +} + +func (fs *Filesystem) getSharedDaemon(fsDriver string) (*daemon.Daemon, error) { + switch fsDriver { + case config.FsDriverFscache: + if fs.fscacheSharedDaemon != nil { + return fs.fscacheSharedDaemon, nil + } + case config.FsDriverFusedev: + if fs.fusedevSharedDaemon != nil { + return fs.fusedevSharedDaemon, nil + } + } + + return nil, errors.Errorf("no shared daemon for filesystem driver %s", fsDriver) +} + +func (fs *Filesystem) getDaemonByRafs(rafs *daemon.Rafs) (*daemon.Daemon, error) { + switch rafs.GetFsDriver() { + case config.FsDriverBlockdev: + if fs.blockdevManager != nil { + if d := fs.blockdevManager.GetByDaemonID(rafs.DaemonID); d != nil { + return d, nil + } + } + case config.FsDriverFscache: + if fs.fscacheManager != nil { + if d := fs.fscacheManager.GetByDaemonID(rafs.DaemonID); d != nil { + return d, nil + } + } + case config.FsDriverFusedev: + if fs.fusedevManager != nil { + if d := fs.fusedevManager.GetByDaemonID(rafs.DaemonID); d != nil { + return d, nil + } + } + } + + return nil, errdefs.ErrNotFound +} + +func (fs *Filesystem) GetDaemonByID(id string) (*daemon.Daemon, error) { + if fs.blockdevManager != nil { + if d := fs.blockdevManager.GetByDaemonID(id); d != nil { + return d, nil + } + } + if fs.fscacheManager != nil { + if d := fs.fscacheManager.GetByDaemonID(id); d != nil { + return d, nil + } + } + if fs.fusedevManager != nil { + if d := fs.fusedevManager.GetByDaemonID(id); d != nil { + return d, nil + } + } + if fs.nodevManager != nil { + if d := fs.nodevManager.GetByDaemonID(id); d != nil { + return d, nil + } + } + return nil, errdefs.ErrNotFound +} diff --git a/pkg/manager/daemon_adaptor.go b/pkg/manager/daemon_adaptor.go index 0fcf2fe2e6..1aa9765475 100644 --- a/pkg/manager/daemon_adaptor.go +++ b/pkg/manager/daemon_adaptor.go @@ -94,6 +94,13 @@ func (m *Manager) StartDaemon(d *daemon.Daemon) error { collector.NewDaemonEventCollector(types.DaemonStateRunning).Collect() + if m.CgroupMgr != nil { + if err := m.CgroupMgr.AddProc(d.States.ProcessID); err != nil { + log.L.WithError(err).Errorf("add daemon %s to cgroup failed", d.ID()) + return + } + } + d.Lock() collector.NewDaemonInfoCollector(&d.Version, 1).Collect() d.Unlock() @@ -160,6 +167,10 @@ func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool) command.WithLogLevel(d.States.LogLevel), command.WithAPISock(d.GetAPISock())) + if d.States.LogRotationSize > 0 { + cmdOpts = append(cmdOpts, command.WithLogRotationSize(d.States.LogRotationSize)) + } + if upgrade { cmdOpts = append(cmdOpts, command.WithUpgrade()) } diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 246aedcb55..2ace2fb296 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -20,6 +20,7 @@ import ( "github.com/containerd/nydus-snapshotter/config" "github.com/containerd/nydus-snapshotter/config/daemonconfig" + "github.com/containerd/nydus-snapshotter/pkg/cgroup" "github.com/containerd/nydus-snapshotter/pkg/daemon" "github.com/containerd/nydus-snapshotter/pkg/daemon/types" "github.com/containerd/nydus-snapshotter/pkg/errdefs" @@ -51,7 +52,7 @@ func (s *DaemonStates) Add(daemon *daemon.Daemon) *daemon.Daemon { return old } -func (s *DaemonStates) removeUnlocked(d *daemon.Daemon) *daemon.Daemon { +func (s *DaemonStates) removeLocked(d *daemon.Daemon) *daemon.Daemon { old := s.idxByDaemonID[d.ID()] delete(s.idxByDaemonID, d.ID()) return old @@ -59,14 +60,14 @@ func (s *DaemonStates) removeUnlocked(d *daemon.Daemon) *daemon.Daemon { func (s *DaemonStates) Remove(d *daemon.Daemon) *daemon.Daemon { s.mu.Lock() - old := s.removeUnlocked(d) + old := s.removeLocked(d) s.mu.Unlock() return old } func (s *DaemonStates) RemoveByDaemonID(id string) *daemon.Daemon { - return s.GetByDaemonID(id, func(d *daemon.Daemon) { s.removeUnlocked(d) }) + return s.GetByDaemonID(id, func(d *daemon.Daemon) { s.removeLocked(d) }) } // Also recover daemon runtime state here @@ -135,6 +136,9 @@ type Manager struct { // A basic configuration template loaded from the file DaemonConfig daemonconfig.DaemonConfig + // Cgroup manager for nydusd + CgroupMgr *cgroup.Manager + // In order to validate daemon fs driver is consistent with the latest snapshotter boot FsDriver string @@ -150,6 +154,7 @@ type Opt struct { // Nydus-snapshotter work directory RootDir string DaemonConfig daemonconfig.DaemonConfig + CgroupMgr *cgroup.Manager // In order to validate daemon fs driver is consistent with the latest snapshotter boot FsDriver string } @@ -278,6 +283,7 @@ func NewManager(opt Opt) (*Manager, error) { RecoverPolicy: opt.RecoverPolicy, SupervisorSet: supervisorSet, DaemonConfig: opt.DaemonConfig, + CgroupMgr: opt.CgroupMgr, FsDriver: opt.FsDriver, } @@ -444,7 +450,6 @@ func (m *Manager) DestroyDaemon(d *daemon.Daemon) error { if err := d.Wait(); err != nil { log.L.Warnf("Failed to wait for daemon, %v", err) } - collector.NewDaemonEventCollector(types.DaemonStateDestroyed).Collect() d.Lock() collector.NewDaemonInfoCollector(&d.Version, -1).Collect() @@ -458,25 +463,18 @@ func (m *Manager) DestroyDaemon(d *daemon.Daemon) error { // 1. Don't erase ever written record // 2. Just recover nydusd daemon states to manager's memory part. // 3. Manager in SharedDaemon mode should starts a nydusd when recovering -func (m *Manager) Recover(ctx context.Context) (map[string]*daemon.Daemon, map[string]*daemon.Daemon, error) { - // Collected deserialized daemons that need to be recovered. - recoveringDaemons := make(map[string]*daemon.Daemon, 0) - liveDaemons := make(map[string]*daemon.Daemon, 0) - +func (m *Manager) Recover(ctx context.Context, + recoveringDaemons *map[string]*daemon.Daemon, liveDaemons *map[string]*daemon.Daemon) error { if err := m.store.WalkDaemons(ctx, func(s *daemon.States) error { - log.L.Debugf("found daemon states %#v", s) + if s.FsDriver != m.FsDriver { + return nil + } + log.L.Debugf("found daemon states %#v", s) opt := make([]daemon.NewDaemonOpt, 0) var d, _ = daemon.NewDaemon(opt...) d.States = *s - // It can't change snapshotter's fs driver to a different one from a daemon that ever created in the past. - if d.States.FsDriver != m.FsDriver { - return errors.Wrapf(errdefs.ErrInvalidArgument, - "can't recover from the last restart, the specified fs-driver=%s mismatches with the last fs-driver=%s", - m.FsDriver, d.States.FsDriver) - } - m.daemonStates.RecoverDaemonState(d) if m.SupervisorSet != nil { @@ -500,7 +498,7 @@ func (m *Manager) Recover(ctx context.Context) (map[string]*daemon.Daemon, map[s state, err := d.GetState() if err != nil { log.L.Warnf("Daemon %s died somehow. Clean up its vestige!, %s", d.ID(), err) - recoveringDaemons[d.ID()] = d + (*recoveringDaemons)[d.ID()] = d //nolint:nilerr return nil } @@ -512,8 +510,13 @@ func (m *Manager) Recover(ctx context.Context) (map[string]*daemon.Daemon, map[s // FIXME: Should put the a daemon back file system shared damon field. log.L.Infof("found RUNNING daemon %s during reconnecting", d.ID()) - liveDaemons[d.ID()] = d + (*liveDaemons)[d.ID()] = d + if m.CgroupMgr != nil { + if err := m.CgroupMgr.AddProc(d.States.ProcessID); err != nil { + return errors.Wrapf(err, "add daemon %s to cgroup failed", d.ID()) + } + } d.Lock() collector.NewDaemonInfoCollector(&d.Version, 1).Collect() d.Unlock() @@ -535,32 +538,31 @@ func (m *Manager) Recover(ctx context.Context) (map[string]*daemon.Daemon, map[s return nil }); err != nil { - return nil, nil, errors.Wrapf(err, "walk daemons to reconnect") + return errors.Wrapf(err, "walk daemons to reconnect") } if err := m.store.WalkInstances(ctx, func(r *daemon.Rafs) error { - log.L.Debugf("found instance %#v", r) - - d := recoveringDaemons[r.DaemonID] - if d != nil { - d.AddInstance(r) - } - - d = liveDaemons[r.DaemonID] - if d != nil { - d.AddInstance(r) + if r.GetFsDriver() == m.FsDriver { + log.L.Debugf("found instance %#v", r) + if r.GetFsDriver() == config.FsDriverFscache || r.GetFsDriver() == config.FsDriverFusedev { + d := (*recoveringDaemons)[r.DaemonID] + if d != nil { + d.AddInstance(r) + } + d = (*liveDaemons)[r.DaemonID] + if d != nil { + d.AddInstance(r) + } + daemon.RafsSet.Add(r) + } else if r.GetFsDriver() == config.FsDriverBlockdev { + daemon.RafsSet.Add(r) + // TODO: check and remount tarfs + } } - - daemon.RafsSet.Add(r) - return nil }); err != nil { - return nil, nil, errors.Wrapf(err, "walk instances to reconnect") + return errors.Wrapf(err, "walk instances to reconnect") } - for _, d := range recoveringDaemons { - d.ClearVestige() - } - - return recoveringDaemons, liveDaemons, nil + return nil } diff --git a/pkg/manager/monitor.go b/pkg/manager/monitor.go index 92d839f97c..1664aeb960 100644 --- a/pkg/manager/monitor.go +++ b/pkg/manager/monitor.go @@ -82,7 +82,7 @@ func (m *livenessMonitor) Subscribe(id string, path string, notifier chan<- deat m.mu.Lock() defer m.mu.Unlock() - if s, ok := m.subscribers[id]; ok && s.id == path { + if s, ok := m.subscribers[id]; ok && s.path == path { log.L.Warnf("Daemon %s is already subscribed!", id) return errdefs.ErrAlreadyExists } diff --git a/pkg/manager/monitor_test.go b/pkg/manager/monitor_test.go index 168c5cf986..aa86d61f6f 100644 --- a/pkg/manager/monitor_test.go +++ b/pkg/manager/monitor_test.go @@ -70,6 +70,8 @@ func TestLivenessMonitor(t *testing.T) { e1 := monitor.Subscribe("daemon_1", s1.Name(), notifier) assert.Nil(t, e1) + e1 = monitor.Subscribe("daemon_1", s1.Name(), notifier) + assert.NotNil(t, e1) e2 := monitor.Subscribe("daemon_2", s2.Name(), notifier) assert.Nil(t, e2) diff --git a/pkg/metrics/serve.go b/pkg/metrics/serve.go index 8bff1e893e..6544270d7b 100644 --- a/pkg/metrics/serve.go +++ b/pkg/metrics/serve.go @@ -28,20 +28,12 @@ const defaultHungIOInterval = 10 * time.Second type ServerOpt func(*Server) error type Server struct { - rootDir string managers []*manager.Manager snCollectors []*collector.SnapshotterMetricsCollector fsCollector *collector.FsMetricsVecCollector inflightCollector *collector.InflightMetricsVecCollector } -func WithRootDir(rootDir string) ServerOpt { - return func(s *Server) error { - s.rootDir = rootDir - return nil - } -} - func WithProcessManager(pm *manager.Manager) ServerOpt { return func(s *Server) error { s.managers = append(s.managers, pm) diff --git a/pkg/remote/remotes/docker/fetcher.go b/pkg/remote/remotes/docker/fetcher.go index 03b015076f..bfe00172ac 100644 --- a/pkg/remote/remotes/docker/fetcher.go +++ b/pkg/remote/remotes/docker/fetcher.go @@ -151,11 +151,16 @@ func (r dockerFetcher) Fetch(ctx context.Context, desc ocispec.Descriptor) (io.R }) } -func (r dockerFetcher) createGetReq(ctx context.Context, host RegistryHost, ps ...string) (*request, int64, error) { +func (r dockerFetcher) createGetReq(ctx context.Context, host RegistryHost, mediatype string, ps ...string) (*request, int64, error) { headReq := r.request(host, http.MethodHead, ps...) if err := headReq.addNamespace(r.refspec.Hostname()); err != nil { return nil, 0, err } + if mediatype == "" { + headReq.header.Set("Accept", "*/*") + } else { + headReq.header.Set("Accept", strings.Join([]string{mediatype, `*/*`}, ", ")) + } headResp, err := headReq.doWithRetries(ctx, nil) if err != nil { @@ -190,13 +195,14 @@ func (r dockerFetcher) FetchByDigest(ctx context.Context, dgst digest.Digest) (i } var ( - getReq *request - sz int64 - firstErr error + getReq *request + sz int64 + firstErr error + mediaType string ) for _, host := range r.hosts { - getReq, sz, err = r.createGetReq(ctx, host, "blobs", dgst.String()) + getReq, sz, err = r.createGetReq(ctx, host, mediaType, "blobs", dgst.String()) if err == nil { break } @@ -208,8 +214,15 @@ func (r dockerFetcher) FetchByDigest(ctx context.Context, dgst digest.Digest) (i if getReq == nil { // Fall back to the "manifests" endpoint + // TODO: this change should be upstreamed to containerd. + mediaType = strings.Join([]string{ + images.MediaTypeDockerSchema2Manifest, + images.MediaTypeDockerSchema2ManifestList, + ocispec.MediaTypeImageManifest, + ocispec.MediaTypeImageIndex, + }, ", ") for _, host := range r.hosts { - getReq, sz, err = r.createGetReq(ctx, host, "manifests", dgst.String()) + getReq, sz, err = r.createGetReq(ctx, host, mediaType, "manifests", dgst.String()) if err == nil { break } @@ -231,7 +244,7 @@ func (r dockerFetcher) FetchByDigest(ctx context.Context, dgst digest.Digest) (i } seeker, err := newHTTPReadSeeker(sz, func(offset int64) (rc io.ReadCloser, err error) { - rc, _, err = r.open(ctx, getReq, "", offset) + rc, _, err = r.open(ctx, getReq, mediaType, offset) return }) if err != nil { diff --git a/pkg/utils/parser/parser.go b/pkg/utils/parser/parser.go new file mode 100644 index 0000000000..14c7b04803 --- /dev/null +++ b/pkg/utils/parser/parser.go @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2023. Nydus Developers. All rights reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package parser + +import ( + "regexp" + "strconv" + "sync" + + "github.com/pkg/errors" +) + +var ( + unitMultipliers map[string]int64 + unitMultipliersOnce sync.Once +) + +func InitUnitMultipliers() { + unitMultipliers = make(map[string]int64, 10) + + unitMultipliers["KiB"] = 1024 + unitMultipliers["MiB"] = unitMultipliers["KiB"] * 1024 + unitMultipliers["GiB"] = unitMultipliers["MiB"] * 1024 + unitMultipliers["TiB"] = unitMultipliers["GiB"] * 1024 + unitMultipliers["PiB"] = unitMultipliers["TiB"] * 1024 + + unitMultipliers["Ki"] = 1024 + unitMultipliers["Mi"] = unitMultipliers["Ki"] * 1024 + unitMultipliers["Gi"] = unitMultipliers["Mi"] * 1024 + unitMultipliers["Ti"] = unitMultipliers["Gi"] * 1024 + unitMultipliers["Pi"] = unitMultipliers["Ti"] * 1024 +} + +func MemoryConfigToBytes(data string, totalMemoryBytes int) (int64, error) { + if data == "" { + return -1, nil + } + + // Memory value without unit. + value, err := strconv.ParseFloat(data, 64) + if err == nil { + return int64(value), nil + } + + re := regexp.MustCompile(`(\d*\.?\d+)([a-zA-Z\%]+)`) + matches := re.FindStringSubmatch(data) + if len(matches) != 3 { + return 0, errors.Errorf("Falied to convert data to bytes: Unknown unit in %s", data) + } + + // Parse memory value and unit. + valueString, unit := matches[1], matches[2] + value, err = strconv.ParseFloat(valueString, 64) + if err != nil { + return 0, errors.Wrap(err, "Failed to parse memory limit") + } + + // Return if the unit is byte. + if unit == "B" { + return int64(value), nil + } + + // Calculate value if the unit is "%". + if unit == "%" { + limitMemory := float64(totalMemoryBytes) * value / 100 + return int64(limitMemory + 0.5), nil + } + + unitMultipliersOnce.Do(InitUnitMultipliers) + + multiplier := unitMultipliers[unit] + return int64(value * float64(multiplier)), nil +} diff --git a/pkg/utils/parser/parser_test.go b/pkg/utils/parser/parser_test.go new file mode 100644 index 0000000000..15846d8783 --- /dev/null +++ b/pkg/utils/parser/parser_test.go @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2023. Nydus Developers. All rights reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package parser + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestMemoryLimitToBytes(t *testing.T) { + totalMemoryBytes := 10000 + + for desc, test := range map[string]struct { + MemoryLimit string + expected int64 + }{ + "memory limit is zero": { + MemoryLimit: "", + expected: -1, + }, + "memory limit is a percentage": { + MemoryLimit: "20%", + expected: 2000, + }, + "memory limit is a float percentage": { + MemoryLimit: "0.2%", + expected: 20, + }, + "memory limit is a value without unit": { + MemoryLimit: "10240", + expected: 10240, + }, + "memory limit is a value with Byte unit": { + MemoryLimit: "10240B", + expected: 10240, + }, + "memory limit is a value with KiB unit": { + MemoryLimit: "30KiB", + expected: 30 * 1024, + }, + "memory limit is a value with MiB unit": { + MemoryLimit: "30MiB", + expected: 30 * 1024 * 1024, + }, + "memory limit is a value with GiB unit": { + MemoryLimit: "30GiB", + expected: 30 * 1024 * 1024 * 1024, + }, + "memory limit is a value with TiB unit": { + MemoryLimit: "30TiB", + expected: 30 * 1024 * 1024 * 1024 * 1024, + }, + "memory limit is a value with PiB unit": { + MemoryLimit: "30PiB", + expected: 30 * 1024 * 1024 * 1024 * 1024 * 1024, + }, + "memory limit is a value with Ki unit": { + MemoryLimit: "30Ki", + expected: 30 * 1024, + }, + "memory limit is a value with Mi unit": { + MemoryLimit: "30Mi", + expected: 30 * 1024 * 1024, + }, + "memory limit is a value with Gi unit": { + MemoryLimit: "30Gi", + expected: 30 * 1024 * 1024 * 1024, + }, + "memory limit is a value with Ti unit": { + MemoryLimit: "30Ti", + expected: 30 * 1024 * 1024 * 1024 * 1024, + }, + "memory limit is a value with Pi unit": { + MemoryLimit: "30Pi", + expected: 30 * 1024 * 1024 * 1024 * 1024 * 1024, + }, + } { + t.Logf("TestCase %q", desc) + + memoryLimitInBytes, err := MemoryConfigToBytes(test.MemoryLimit, totalMemoryBytes) + assert.NoError(t, err) + assert.Equal(t, memoryLimitInBytes, test.expected) + } +} diff --git a/pkg/utils/sysinfo/sysinfo.go b/pkg/utils/sysinfo/sysinfo.go new file mode 100644 index 0000000000..cf8562c1dd --- /dev/null +++ b/pkg/utils/sysinfo/sysinfo.go @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2023. Nydus Developers. All rights reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package sysinfo + +import ( + "sync" + "syscall" +) + +var ( + sysinfo *syscall.Sysinfo_t + sysinfoOnce sync.Once + sysinfoErr error +) + +func GetSysinfo() { + var info syscall.Sysinfo_t + err := syscall.Sysinfo(&info) + if err != nil { + sysinfoErr = err + return + } + sysinfo = &info + sysinfoErr = nil +} + +func GetTotalMemoryBytes() (int, error) { + sysinfoOnce.Do(GetSysinfo) + if sysinfo == nil { + return 0, sysinfoErr + } + + return int(sysinfo.Totalram), nil +} diff --git a/snapshot/snapshot.go b/snapshot/snapshot.go index c5e66635e5..007ec95755 100644 --- a/snapshot/snapshot.go +++ b/snapshot/snapshot.go @@ -29,6 +29,8 @@ import ( "github.com/containerd/nydus-snapshotter/config/daemonconfig" "github.com/containerd/nydus-snapshotter/pkg/cache" + "github.com/containerd/nydus-snapshotter/pkg/cgroup" + v2 "github.com/containerd/nydus-snapshotter/pkg/cgroup/v2" "github.com/containerd/nydus-snapshotter/pkg/daemon" "github.com/containerd/nydus-snapshotter/pkg/errdefs" "github.com/containerd/nydus-snapshotter/pkg/layout" @@ -82,6 +84,23 @@ func NewSnapshotter(ctx context.Context, cfg *config.SnapshotterConfig) (snapsho return nil, errors.Wrap(err, "parse recover policy") } + var cgroupMgr *cgroup.Manager + if cfg.CgroupConfig.Enable { + cgroupConfig, err := config.ParseCgroupConfig(cfg.CgroupConfig) + if err != nil { + return nil, errors.Wrap(err, "parse cgroup configuration") + } + log.L.Infof("parsed cgroup config: %#v", cgroupConfig) + + cgroupMgr, err = cgroup.NewManager(cgroup.Opt{ + Name: "nydusd", + Config: cgroupConfig, + }) + if err != nil && (err != cgroup.ErrCgroupNotSupported || err != v2.ErrRootMemorySubtreeControllerDisabled) { + return nil, errors.Wrap(err, "create cgroup manager") + } + } + manager, err := mgr.NewManager(mgr.Opt{ NydusdBinaryPath: cfg.DaemonConfig.NydusdPath, Database: db, @@ -90,6 +109,7 @@ func NewSnapshotter(ctx context.Context, cfg *config.SnapshotterConfig) (snapsho RecoverPolicy: rp, FsDriver: config.GetFsDriver(), DaemonConfig: daemonConfig, + CgroupMgr: cgroupMgr, }) if err != nil { return nil, errors.Wrap(err, "create daemons manager") @@ -97,7 +117,6 @@ func NewSnapshotter(ctx context.Context, cfg *config.SnapshotterConfig) (snapsho metricServer, err := metrics.NewServer( ctx, - metrics.WithRootDir(cfg.Root), metrics.WithProcessManager(manager), ) if err != nil { @@ -562,6 +581,12 @@ func (o *snapshotter) Close() error { o.fs.TryStopSharedDaemon() + if o.manager.CgroupMgr != nil { + if err := o.manager.CgroupMgr.Delete(); err != nil { + log.L.Errorf("failed to destroy cgroup, err %v", err) + } + } + return o.ms.Close() } @@ -750,7 +775,10 @@ func (o *snapshotter) remoteMountWithExtraOptions(ctx context.Context, s storage } instance := daemon.RafsSet.Get(id) - daemon := o.fs.Manager.GetByDaemonID(instance.DaemonID) + daemon, err := o.fs.GetDaemonByID(instance.DaemonID) + if err != nil { + return nil, errors.Wrapf(err, "get daemon with ID %s", instance.DaemonID) + } var c daemonconfig.DaemonConfig if daemon.IsSharedDaemon() {