diff --git a/.github/workflows/e2e.yaml b/.github/workflows/e2e.yaml new file mode 100644 index 0000000..f15a35c --- /dev/null +++ b/.github/workflows/e2e.yaml @@ -0,0 +1,20 @@ +name: "E2E test with kind" + +on: + push: + branches: [main] + pull_request: + branches: [main] + +jobs: + e2e-with-kind: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: docker/setup-buildx-action@v1 + - name: Expose GitHub Runtime + uses: crazy-max/ghaction-github-runtime@v2 + # buildkit requires $ACTIONS_CACHE_URL and $ACTIONS_RUNTIME_TOKEN for GHA build cache + - name: e2e-test + run: | + make test-e2e diff --git a/.github/workflows/publish-image.yaml b/.github/workflows/publish-image.yaml new file mode 100644 index 0000000..006c153 --- /dev/null +++ b/.github/workflows/publish-image.yaml @@ -0,0 +1,29 @@ +name: "Publish Container Image" + +on: + push: + tags: + - 'v*' + +jobs: + publish-image: + runs-on: ubuntu-latest + env: + STAGINGVERSION: ${{ github.ref_name }} + PUBLISH_IMAGE: true + steps: + - uses: actions/checkout@v4 + - uses: docker/setup-buildx-action@v1 + - name: Login to GitHub Container Registry + uses: docker/login-action@v2 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + - name: Expose GitHub Runtime + uses: crazy-max/ghaction-github-runtime@v2 + # buildkit requires $ACTIONS_CACHE_URL and $ACTIONS_RUNTIME_TOKEN for GHA build cache + - name: publish image + run: | + make build-driver + make build-examples diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5e56e04 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/bin diff --git a/LICENSE b/LICENSE new file mode 100755 index 0000000..261eeb9 --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/Makefile b/Makefile new file mode 100755 index 0000000..6881065 --- /dev/null +++ b/Makefile @@ -0,0 +1,139 @@ +# Copyright 2018 The Kubernetes Authors. +# Copyright 2022 Google LLC +# Copyright 2023 Preferred Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +export STAGINGVERSION ?= $(shell git describe --long --tags --match='v*' --dirty 2>/dev/null || git rev-list -n1 HEAD) +export BUILD_DATE ?= $(shell date --iso-8601=minutes) +BINDIR ?= bin +LDFLAGS ?= -s -w -X main.version=${STAGINGVERSION} -X main.builddate=${BUILD_DATE} -extldflags '-static' + +DRIVER_BINARY = meta-fuse-csi-plugin +STARTER_BINARY = fuse-starter +FUSERMOUNT3PROXY_BINARY = fusermount3-proxy + +REGISTRY ?= ghcr.io/pfnet-research/meta-fuse-csi-plugin +DRIVER_IMAGE = ${REGISTRY}/${DRIVER_BINARY} +STARTER_IMAGE = ${REGISTRY}/${STARTER_BINARY} +EXAMPLE_IMAGE = ${REGISTRY}/mfcp-example + +DOCKER_BUILD_ARGS ?= --load --build-arg STAGINGVERSION=${STAGINGVERSION} +ifneq ("$(shell docker buildx build --help | grep 'provenance')", "") +DOCKER_BUILD_ARGS += --provenance=false +endif + +LOAD_TO_KIND ?= false +PUBLISH_IMAGE ?= false + +$(info STAGINGVERSION is ${STAGINGVERSION}) +$(info DRIVER_IMAGE is ${DRIVER_IMAGE}) +$(info STARTER_IMAGE is ${STARTER_IMAGE}) + +.PHONY: all build-image-linux-amd64 + +all: build-driver build-examples + +driver: + mkdir -p ${BINDIR} + CGO_ENABLED=0 GOOS=linux GOARCH=$(shell dpkg --print-architecture) go build -ldflags "${LDFLAGS}" -o ${BINDIR}/${DRIVER_BINARY} cmd/csi_driver/main.go + +fuse-starter: + mkdir -p ${BINDIR} + CGO_ENABLED=0 GOOS=linux GOARCH=$(shell dpkg --print-architecture) go build -ldflags "${LDFLAGS}" -o ${BINDIR}/${STARTER_BINARY} cmd/fuse_starter/main.go + +fusermount3-proxy: + mkdir -p ${BINDIR} + CGO_ENABLED=0 GOOS=linux GOARCH=$(shell dpkg --print-architecture) go build -ldflags "${LDFLAGS}" -o ${BINDIR}/${FUSERMOUNT3PROXY_BINARY} cmd/fusermount3-proxy/main.go + +build-driver: + $(eval IMAGE_NAME := ${DRIVER_IMAGE}:${STAGINGVERSION}) + docker buildx build ${DOCKER_BUILD_ARGS} ${DOCKER_CACHE_ARGS} \ + --file ./cmd/csi_driver/Dockerfile \ + --tag ${IMAGE_NAME} \ + --platform linux/amd64 . + if [ "${PUBLISH_IMAGE}" = "true" ]; then \ + docker push ${IMAGE_NAME}; \ + docker tag ${IMAGE_NAME} ${DRIVER_IMAGE}:latest; \ + docker push ${DRIVER_IMAGE}:latest; \ + fi + if [ "${LOAD_TO_KIND}" = "true" ]; then \ + kind load docker-image ${IMAGE_NAME};\ + fi + +define build-example-template +ifneq ("$(EXAMPLES)", "") +EXAMPLES += build-example-$(1)-$(2) +else +EXAMPLES := build-example-$(1)-$(2) +endif + +.PHONY: build-example-$1-$2 +build-example-$(1)-$(2): + $(eval IMAGE_NAME := ${EXAMPLE_IMAGE}-$1-$2:${STAGINGVERSION}) + docker buildx build ${DOCKER_BUILD_ARGS} ${DOCKER_CACHE_ARGS} \ + --file ./examples/$1/$2/Dockerfile \ + --tag ${IMAGE_NAME} \ + --platform linux/amd64 . + if [ "${PUBLISH_IMAGE}" = "true" ]; then \ + docker push ${IMAGE_NAME}; \ + docker tag ${IMAGE_NAME} ${EXAMPLE_IMAGE}-$1-$2:latest; \ + docker push ${IMAGE_NAME_LATEST}; \ + fi + if [ "${LOAD_TO_KIND}" = "true" ]; then \ + kind load docker-image ${IMAGE_NAME};\ + fi +endef + +$(eval $(call build-example-template,proxy,mountpoint-s3)) +$(eval $(call build-example-template,proxy,goofys)) +$(eval $(call build-example-template,proxy,s3fs)) +$(eval $(call build-example-template,proxy,ros3fs)) +$(eval $(call build-example-template,proxy,gcsfuse)) +$(eval $(call build-example-template,proxy,sshfs)) +$(eval $(call build-example-template,starter,ros3fs)) +$(eval $(call build-example-template,starter,sshfs)) + +$(info $(EXAMPLES)) + +.PHONY: build-examples +build-examples: $(EXAMPLES) + +define test-example-template +ifneq ("$(EXAMPLES)", "") +EXAMPLE_TESTS += test-example-$(1)-$(2) +else +EXAMPLE_TESTS := test-example-$(1)-$(2) +endif + +.PHONY: test-example-$1-$2 +test-example-$(1)-$(2): + ./examples/check.sh ./$1/$2 mfcp-example-$1-$2 $3 $4 $5 $6 +endef + +$(eval $(call test-example-template,proxy,mountpoint-s3,starter,/test.txt,busybox,/data/test.txt)) +$(eval $(call test-example-template,proxy,goofys,starter,/test.txt,busybox,/data/test.txt)) +$(eval $(call test-example-template,proxy,s3fs,starter,/test.txt,busybox,/data/test.txt)) +$(eval $(call test-example-template,proxy,ros3fs,starter,/test.txt,busybox,/data/test.txt)) +$(eval $(call test-example-template,proxy,sshfs,starter,/root/sshfs-example/test.txt,busybox,/data/test.txt)) +$(eval $(call test-example-template,starter,ros3fs,starter,/test.txt,busybox,/data/test.txt)) +$(eval $(call test-example-template,starter,sshfs,starter,/root/sshfs-example/test.txt,busybox,/data/test.txt)) + +.PHONY: test-examples +test-examples: $(EXAMPLE_TESTS) + +.PHONY: test-e2e +test-e2e: + - kind delete cluster + kind create cluster + ./test_e2e.sh diff --git a/README.md b/README.md new file mode 100644 index 0000000..331e837 --- /dev/null +++ b/README.md @@ -0,0 +1,190 @@ +# meta-fuse-csi-plugin: A CSI Driver for All FUSE Implementations +[![E2E test with kind](https://github.com/pfnet-research/meta-fuse-csi-plugin/actions/workflows/e2e.yaml/badge.svg)](https://github.com/pfnet-research/meta-fuse-csi-plugin/actions/workflows/e2e.yaml) + +CSI Plugin to run and mount any FUSE implementations (e.g. [mountpoint-s3](https://github.com/awslabs/mountpoint-s3)) in Kubernetes pods without `privileged:true`. + +Mounting FUSE implementations requires `CAP_SYS_ADMIN`. +However, assigning `CAP_SYS_ADMIN` to normal pods is not recommended in terms of security. +meta-fuse-csi-plugin enables pods to run and mount FUSE implementations without `CAP_SYS_ADMIN`. +This brings us better security and more usability with object storages. + +

+ +

+ +For more details, please refer to our blog (English, Japanese) + +## Current support status +Currently, meta-fuse-csi-plugin supports below FUSE implementations. +`examples` contains examples for [mountpoint-s3](https://github.com/awslabs/mountpoint-s3), [goofys](https://github.com/kahing/goofys),[s3fs](https://github.com/s3fs-fuse/s3fs-fuse), [ros3fs](https://github.com/akawashiro/ros3fs), [gcsfuse](https://github.com/GoogleCloudPlatform/gcsfuse) and [sshfs](https://github.com/libfuse/sshfs). +Excepting gcsfuse, you can run examples in local kind cluster. + +

+ +

+ +## Running an example in local kind cluster +You can try this plugin with local kind cluster + +### Dependencies +- [docker](https://docs.docker.com/engine/install/ubuntu/) +- [kubectl](https://kubernetes.io/docs/tasks/tools/) +- [kind](https://kind.sigs.k8s.io/) + +### Create cluster and build images +`build-for-kind.sh` builds plugin and example images and load them to the kind cluster. +```console +$ kind create cluster +$ ./build-for-kind.sh +``` + +### Deploy plugin +`deploy/csi-driver.yaml` and `deploy/csi-driver-daemonset.yaml` are manifests for plugin. + +```console +$ kubectl apply -f ./deploy/csi-driver.yaml +namespace/mfcp-system created +csidriver.storage.k8s.io/meta-fuse-csi-plugin.csi.storage.pfn.io created +$ kubectl apply -f ./deploy/csi-driver-daemonset.yaml +daemonset.apps/meta-fuse-csi-plugin created +``` + +Please confirm the plugin is successfully deployed. + +```console +$ kubectl get ds -n mfcp-system +NAME DESIRED CURRENT READY UP-TO-DATE AVAILABLE NODE SELECTOR AGE +meta-fuse-csi-plugin 1 1 1 1 1 kubernetes.io/os=linux 28m +``` + +### Deploy mountpoint-s3 example +`examples/proxy/mountpoint-s3/deploy.yaml` provides a pod with mountpoint-s3 and MinIO. +Bucket `test-bucket` is mounted at `/data` in `busybox` container. + +As for other examples, `examples/proxy/goofys/deploy.yaml` (for goofys), `examples/proxy/s3fs/deploy.yaml` (for s3fs) and `examples/proxy/sshfs/deploy.yaml` (for sshfs) exist. + +```console +$ kubectl apply -f ./examples/proxy/mountpoint-s3/deploy.yaml +$ kubectl get pods mfcp-example-proxy-mountpoint-s3 +NAME READY STATUS RESTARTS AGE +mfcp-example-proxy-mountpoint-s3 3/3 Running 0 14s +$ kubectl exec -it mfcp-example-proxy-mountpoint-s3 -c busybox -- /bin/ash +/ # +/ # cd /data +/data # ls -l +total 1 +-rw-r--r-- 1 root root 30 Oct 27 02:45 test.txt +/data # cat test.txt +This is a test file for minio +``` + +`starter` container contains `/usr/bin/mc` to operate MinIO's bucket. You can upload file to the bucket and read it via mountpoint-s3. +```console +$ kubectl exec -it mfcp-example-proxy-mountpoint-s3 -c starter -- /bin/bash +root@mfcp-example-proxy-mountpoint-s3:/# echo "Hello, World!" > hello.txt +root@mfcp-example-proxy-mountpoint-s3:/# mc cp hello.txt k8s-minio-dev/test-bucket +/hello.txt: 14 B / 14 B ━━━━━━━━━━━━━ 1.15 KiB/s 0s +root@mfcp-example-proxy-mountpoint-s3:/# exit +$ kubectl exec -it mfcp-example-proxy-mountpoint-s3 -c busybox -- cat /data/hello.txt +Hello, World! +``` + +After the trial, delete the pod. +```console +$ kubectl delete -f ./examples/proxy/mountpoint-s3/deploy.yaml +pod "mfcp-example-proxy-mountpoint-s3" deleted +``` + +## NOTICE +meta-fuse-csi-plugin mounts FUSE implementations after the container started. +Some applications may read the directory before mount. +To avoid such race condition, please wait for the FUSE impl is mounted. +`examples/proxy/mountpoint-s3/deploy.yaml` and `examples/check.sh` do such delaying. +```yaml + - image: busybox + name: busybox + command: ["/bin/ash"] + args: ["-c", "while [[ ! \"$(/bin/mount | grep fuse)\" ]]; do echo \"waiting for mount\" && sleep 1; done; sleep infinity"] +``` +or +```bash +function wait_for_fuse_mounted() { + while [[ ! $(kubectl exec $1 -c $2 -- /bin/mount | grep fuse) ]]; do echo "waiting for mount" && sleep 1; done +} +``` + +## Running E2E tests +### Tested Environment +- Ubuntu 23.04 (Kernel 6.2.0-35-generic) +- Docker (version 24.0.7) +- kubectl (v1.28.2) +- kind (v0.20.0) +- Kubernetes (v1.27.3, running with kind) + +You can run E2E tests with kind. + +```console +$ make test-e2e +``` + +## How it works? +meta-fuse-csi-plugin has two pods, one is CSI driver Pod with `CAP_SYS_ADMIN` and the other is User Pod. +CSI driver Pods are deployed by cluster operators on each node as DaemonSet. +They process privileged operations (open("/dev/fuse") and mount("fuse", ...)) on behalf of FUSE implementations. +User Pods are deployed by users. +Users can use any FUSE implementations and deploy them without `CAP_SYS_ADMIN` as they like. + +meta-fuse-csi-plugin provides two approaches `fuse-starter` and `fusermount3-proxy` to run and mount FUSE implementations. + +### fuse-starter: Direct fd passing approach +This approach derives from gcs-fuse-csi-driver. +Some FUSE implementations support to receive file descriptor (fd) for "/dev/fuse" as an argument. +They use the received fd to communicate FUSE operations with Linux kernel. +As for [libfuse3](https://github.com/libfuse/libfuse), a FUSE user library, [when "/dev/fd/X" is specified as the mount point, libfuse3 will interpret X as the file descriptor for "/dev/fuse"](https://github.com/libfuse/libfuse/blob/f44214be6a2abb4c98f61790cae565c06bdb431c/lib/fuse_lowlevel.c#L3158), and perform FUSE operations. Similarly, [jacobsa/fuse, a FUSE library used by gcsfuse, provides equivalent functionality](https://github.com/jacobsa/fuse/blob/ab21db1af8364aa6fd51032127f9ff852006afe9/mount_linux.go#L128). + +fuse-starter communicates with CSI driver Pod via Unix Domain Socket (UDS), and CSI driver Pod performs `open("/dev/fuse", ...)` and `mount("fuse")` with acquired fd. +Then, fuse-starter receives the fd from CSI driver Pod and passes the fd to the FUSE implementation when fuse-starter executes it. + +

+ +

+ +### fusermount3-proxy: Modified fusermount3 approach +fusermount3-proxy exploits libfuse3's fusermount3 mount approach. + +fusermount3 is a executable binary with setuid. +It performs privileged operations (open and mount) on behalf of libfuse3. +When libfuse3 tries to mount FUSE implementations but failed mount(2) due to lack of permissions, +libfuse3 executes fusermount3 and fusermount3 `open("/dev/fuse", ...)` and `mount("fuse")`. +Then, fusermount3 passes fd for "/dev/fuse" to libfuse3, and libfuse3 continues to process FUSE operations. + +fusermount3-proxy behaves as fusermount3 and it passthrough mount operations to CSI driver Pod. + +

+ +

+ +For more details, please refer to our blog (English, Japanese) + +## Acknowledgement +The driver implementation is forked from [gcs-fuse-csi-driver](https://github.com/GoogleCloudPlatform/gcs-fuse-csi-driver). +gcs-fuse-csi-driver is licensed under Apache 2.0 as described below. + +## LICENSE +``` +# Copyright 2018 The Kubernetes Authors. +# Copyright 2022 Google LLC +# Copyright 2023 Preferred Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +``` diff --git a/assets/demo.gif b/assets/demo.gif new file mode 100644 index 0000000..c3987af Binary files /dev/null and b/assets/demo.gif differ diff --git a/assets/inside-fuse-starter.png b/assets/inside-fuse-starter.png new file mode 100644 index 0000000..1e2ddf2 Binary files /dev/null and b/assets/inside-fuse-starter.png differ diff --git a/assets/inside-fusermount3-proxy.png b/assets/inside-fusermount3-proxy.png new file mode 100644 index 0000000..c6b6916 Binary files /dev/null and b/assets/inside-fusermount3-proxy.png differ diff --git a/assets/support-status.png b/assets/support-status.png new file mode 100644 index 0000000..7fac4f5 Binary files /dev/null and b/assets/support-status.png differ diff --git a/build-for-kind.sh b/build-for-kind.sh new file mode 100755 index 0000000..f006a74 --- /dev/null +++ b/build-for-kind.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +STAGINGVERSION=latest LOAD_TO_KIND=true make all diff --git a/cmd/csi_driver/Dockerfile b/cmd/csi_driver/Dockerfile new file mode 100755 index 0000000..eb80bd6 --- /dev/null +++ b/cmd/csi_driver/Dockerfile @@ -0,0 +1,68 @@ +# Copyright 2018 The Kubernetes Authors. +# Copyright 2022 Google LLC +# Copyright 2023 Preferred Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Build driver go binary +FROM golang:1.20.7 as driver-builder + +ARG STAGINGVERSION + +WORKDIR /meta-fuse-csi-plugin +ADD . . +RUN make driver BINDIR=/bin + +# Start from Kubernetes Debian base. +FROM gke.gcr.io/debian-base:bullseye-v1.4.3-gke.5 as debian +# Install necessary dependencies +RUN clean-install mount bash + +# go/gke-releasing-policies#base-images +# We use `gcr.io/distroless/base` because it includes glibc. +FROM gcr.io/distroless/base-debian11 as distroless-base + +# The distroless amd64 image has a target triplet of x86_64 +FROM distroless-base AS distroless-amd64 +ENV LIB_DIR_PREFIX x86_64 + +FROM distroless-$TARGETARCH as output-image + +# Copy the mount/umount binaries +COPY --from=debian /bin/mount /bin/mount +COPY --from=debian /bin/umount /bin/umount + +# Copy shared libraries into distroless base. +COPY --from=debian /lib/${LIB_DIR_PREFIX}-linux-gnu/libselinux.so.1 /lib/${LIB_DIR_PREFIX}-linux-gnu/ + +COPY --from=debian /usr/lib/${LIB_DIR_PREFIX}-linux-gnu/libblkid.so.1 \ + /usr/lib/${LIB_DIR_PREFIX}-linux-gnu/libmount.so.1 \ + /usr/lib/${LIB_DIR_PREFIX}-linux-gnu/libpcre2-8.so.0 /usr/lib/${LIB_DIR_PREFIX}-linux-gnu/ + +# Build stage used for validation of the output-image +FROM output-image as validation-image +COPY --from=debian /lib/${LIB_DIR_PREFIX}-linux-gnu/libtinfo.so.6 \ + /lib/${LIB_DIR_PREFIX}-linux-gnu/libpcre.so.3 /lib/${LIB_DIR_PREFIX}-linux-gnu/ +COPY --from=debian /bin/bash /bin/bash +COPY --from=debian /bin/grep /bin/grep +COPY --from=debian /usr/bin/ldd /usr/bin/ldd +SHELL ["/bin/bash", "-c"] +RUN if ldd /bin/mount | grep "not found"; then echo "!!! Missing deps for mount command !!!" && exit 1; fi +RUN if ldd /bin/umount | grep "not found"; then echo "!!! Missing deps for umount command !!!" && exit 1; fi + +# Final build stage, create the real Docker image with ENTRYPOINT +FROM output-image + +COPY --from=driver-builder /bin/meta-fuse-csi-plugin /meta-fuse-csi-plugin + +ENTRYPOINT ["/meta-fuse-csi-plugin"] diff --git a/cmd/csi_driver/main.go b/cmd/csi_driver/main.go new file mode 100644 index 0000000..902df1c --- /dev/null +++ b/cmd/csi_driver/main.go @@ -0,0 +1,71 @@ +/* +Copyright 2018 The Kubernetes Authors. +Copyright 2022 Google LLC +Copyright 2023 Preferred Networks, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "flag" + "os" + + driver "github.com/pfnet-research/meta-fuse-csi-plugin/pkg/csi_driver" + csimounter "github.com/pfnet-research/meta-fuse-csi-plugin/pkg/csi_mounter" + "k8s.io/klog/v2" + "k8s.io/mount-utils" +) + +var ( + endpoint = flag.String("endpoint", "unix:/tmp/csi.sock", "CSI endpoint") + nodeID = flag.String("nodeid", "", "node id") + + // These are set at compile time. + version = "unknown" + builddate = "unknown" +) + +func main() { + klog.InitFlags(nil) + flag.Parse() + + var err error + var mounter mount.Interface + if *nodeID == "" { + klog.Fatalf("NodeID cannot be empty for node service") + } + + mounter, err = csimounter.New("") + if err != nil { + klog.Fatalf("Failed to prepare CSI mounter: %v", err) + } + + config := &driver.DriverConfig{ + Name: driver.DefaultName, + Version: version, + NodeID: *nodeID, + Mounter: mounter, + } + + d, err := driver.NewDriver(config) + if err != nil { + klog.Fatalf("Failed to initialize meta-fuse-csi-plugin: %v", err) + } + + klog.Infof("Running meta-fuse-csi-plugin version %v (BuildDate %v)", version, builddate) + d.Run(*endpoint) + + os.Exit(0) +} diff --git a/cmd/fuse_starter/Dockerfile b/cmd/fuse_starter/Dockerfile new file mode 100755 index 0000000..549253f --- /dev/null +++ b/cmd/fuse_starter/Dockerfile @@ -0,0 +1,33 @@ +# Copyright 2018 The Kubernetes Authors. +# Copyright 2022 Google LLC +# Copyright 2023 Preferred Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Build fuse-starter go binary +FROM golang:1.20.7 as fuse-starter-builder + +ARG STAGINGVERSION + +WORKDIR /meta-fuse-csi-plugin +ADD . . +RUN make fuse-starter BINDIR=/bin + +# go/gke-releasing-policies#base-images +# We use `gcr.io/distroless/base` because it includes glibc. +FROM gcr.io/distroless/base-debian11 + +# Copy the binaries +COPY --from=fuse-starter-builder /bin/fuse-starter /fuse-starter + +ENTRYPOINT ["/meta-fuse-csi-plugin-fuse-starter"] diff --git a/cmd/fuse_starter/main.go b/cmd/fuse_starter/main.go new file mode 100644 index 0000000..e75aa42 --- /dev/null +++ b/cmd/fuse_starter/main.go @@ -0,0 +1,122 @@ +/* +Copyright 2018 The Kubernetes Authors. +Copyright 2022 Google LLC +Copyright 2023 Preferred Networks, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "flag" + "os" + "os/signal" + "sync" + "syscall" + + starter "github.com/pfnet-research/meta-fuse-csi-plugin/pkg/fuse_starter" + "k8s.io/klog/v2" +) + +var ( + fdPassingSocketPath = flag.String("fd-passing-socket-path", "", "unix domain socket path for FUSE fd passing") + // This is set at compile time. + version = "unknown" + builddate = "unknown" +) + +func main() { + klog.InitFlags(nil) + flag.Parse() + + klog.Infof("Running meta-fuse-csi-plugin fuse-starter version %v (BuildDate %v)", version, builddate) + klog.Infof("fd-passing-socket-path: %q", *fdPassingSocketPath) + + // parsing command args after "--" + mounterArgsIdx := 0 + for ; mounterArgsIdx < len(os.Args); mounterArgsIdx += 1 { + if os.Args[mounterArgsIdx] == "--" { + mounterArgsIdx += 1 + break + } + } + + if len(os.Args) == mounterArgsIdx { + klog.Error("mounter does not specified") + return + } + + mounterPath := os.Args[mounterArgsIdx] + mounterArgs := os.Args[mounterArgsIdx+1:] + klog.Infof("mounter(%s) args are %v", mounterPath, mounterArgs) + + if *fdPassingSocketPath == "" { + klog.Error("fd-passing-socket-path does not specified") + return + } + + mounter := starter.New(mounterPath, mounterArgs) + var wg sync.WaitGroup + + mc, err := starter.PrepareMountConfig(*fdPassingSocketPath) + if err != nil { + klog.Errorf("failed prepare mount config: socket path %q: %v\n", *fdPassingSocketPath, err) + return + } + + c := make(chan os.Signal, 1) + + wg.Add(1) + go func(mc *starter.MountConfig) { + defer wg.Done() + cmd, err := mounter.Mount(mc) + if err != nil { + klog.Errorf("failed to mount volume %q: %v\n", mc.VolumeName, err) + return + } + + if err = cmd.Start(); err != nil { + klog.Errorf("failed to start mounter with error: %v\n", err) + return + } + + // Since the mounter has taken over the file descriptor, + // closing the file descriptor to avoid other process forking it. + syscall.Close(mc.FileDescriptor) + if err = cmd.Wait(); err != nil { + klog.Errorf("mounter exited with error: %v\n", err) + } else { + klog.Infof("[%v] mounter exited normally.", mc.VolumeName) + } + + // Process may exit early. + c <- syscall.SIGTERM + }(mc) + + signal.Notify(c, syscall.SIGTERM) + klog.Info("waiting for SIGTERM signal...") + + <-c // blocking the process + + klog.Info("received SIGTERM signal, waiting for all the mounter processes exit...") + + // TODO: send SIGKILL to kill hang mounter process + err = mounter.Cmd.Process.Signal(syscall.SIGTERM) + if err != nil { + klog.Warning("failed to send SIGTERM signal to mounter process") + } + wg.Wait() + + klog.Info("exiting fuse-starter...") +} diff --git a/cmd/fusermount3-proxy/main.go b/cmd/fusermount3-proxy/main.go new file mode 100644 index 0000000..f4149f2 --- /dev/null +++ b/cmd/fusermount3-proxy/main.go @@ -0,0 +1,141 @@ +/* +Copyright 2023 Preferred Networks, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "fmt" + "os" + "strconv" + "syscall" + + starter "github.com/pfnet-research/meta-fuse-csi-plugin/pkg/fuse_starter" + "github.com/pfnet-research/meta-fuse-csi-plugin/pkg/util" + flag "github.com/spf13/pflag" + + "k8s.io/klog/v2" +) + +var ( + optUnmount = flag.BoolP("unmount", "u", false, "unmount (NOT SUPPORTED)") + optAutoUnmount = flag.BoolP("auto-unmount", "U", false, "auto-unmount (NOT SUPPORTED)") + optLazy = flag.BoolP("lazy", "z", false, "lazy umount (NOT SUPPORTED)") + optQuiet = flag.BoolP("quiet", "q", false, "quiet (NOT SUPPORTED)") + optHelp = flag.BoolP("help", "h", false, "print help") + optVersion = flag.BoolP("version", "V", false, "print version") + optOptions = flag.StringP("options", "o", "", "mount options") + // This is set at compile time. + version = "unknown" + builddate = "unknown" +) + +var ignoredOptions = map[string]*bool{ + "unmount": optUnmount, + "auto-unmount": optAutoUnmount, + "lazy": optLazy, + "optQuiet": optQuiet, +} + +const ( + ENV_FUSE_COMMFD = "_FUSE_COMMFD" + ENV_FUSERMOUNT3PROXY_FDPASSING_SOCKPATH = "FUSERMOUNT3PROXY_FDPASSING_SOCKPATH" +) + +func main() { + klog.InitFlags(nil) + flag.Parse() + + if *optHelp { + flag.PrintDefaults() + os.Exit(0) + } + + if *optVersion { + fmt.Printf("fusermount3-dummy version %v (BuildDate %v)\n", version, builddate) + os.Exit(0) + } + + klog.Infof("Running meta-fuse-csi-plugin fusermount3-dummy version %v (BuildDate %v)", version, builddate) + + if *optUnmount { + klog.Warning("'unmount' is not supported.") + os.Exit(0) + } + + if len(flag.Args()) == 0 { + klog.Error("mountpoint is not specified.") + os.Exit(1) + } + + if *optOptions == "" { + klog.Error("options is not specified.") + os.Exit(1) + } + + // fd-passing socket between fusermount3-dummy and csi-driver is passed as env var + fdPassingSocketPath := os.Getenv(ENV_FUSERMOUNT3PROXY_FDPASSING_SOCKPATH) + if fdPassingSocketPath == "" { + klog.Errorf("environment variable %q is not specified.", ENV_FUSERMOUNT3PROXY_FDPASSING_SOCKPATH) + os.Exit(1) + } + klog.Infof("fd-passing socket path is %q", fdPassingSocketPath) + + mntPoint := flag.Args()[0] + klog.Infof("mountpoint is %q, but ignored.", mntPoint) + + for k, v := range ignoredOptions { + if *v { + klog.Warningf("opiton %q is true, but ignored.", k) + } + } + + // TODO: send options to csi-driver and use them? + klog.Infof("options=%q", *optOptions) + + // get unix domain socket from caller + commFdStr := os.Getenv(ENV_FUSE_COMMFD) + commFd, err := strconv.Atoi(commFdStr) + if err != nil { + klog.Errorf("failed to get commFd _FUSE_COMMFD=%q", commFdStr) + os.Exit(1) + } + klog.Infof("commFd from %q is %d", ENV_FUSE_COMMFD, commFd) + + commConn, err := util.GetNetConnFromRawUnixSocketFd(commFd) + if err != nil { + klog.Errorf("failed to convert commFd to net.Conn: %w", err) + os.Exit(1) + } + klog.Infof("net.Conn is acquired from fd %d", commFd) + + // get fd for /dev/fuse from csi-driver + mc, err := starter.PrepareMountConfig(fdPassingSocketPath) + if err != nil { + klog.Errorf("failed to prepare mount config: socket path %q: %w", fdPassingSocketPath, err) + os.Exit(1) + } + defer syscall.Close(mc.FileDescriptor) + klog.Infof("received fd for /dev/fuse from csi-driver via socket %q", fdPassingSocketPath) + + // now already FUSE-fs mounted and fd is ready. + err = util.SendMsg(commConn, mc.FileDescriptor, []byte{0}) + if err != nil { + klog.Errorf("failed to send fd via commFd: %w", err) + os.Exit(1) + } + klog.Infof("sent fd for /dev/fuse via commFd %d", commFd) + klog.Info("exiting fusermount3-dummy...") +} diff --git a/deploy/csi-driver-daemonset.yaml b/deploy/csi-driver-daemonset.yaml new file mode 100644 index 0000000..849dc08 --- /dev/null +++ b/deploy/csi-driver-daemonset.yaml @@ -0,0 +1,97 @@ +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: meta-fuse-csi-plugin + namespace: mfcp-system +spec: + selector: + matchLabels: + k8s-app: meta-fuse-csi-plugin + template: + metadata: + annotations: + seccomp.security.alpha.kubernetes.io/pod: runtime/default + labels: + k8s-app: meta-fuse-csi-plugin + spec: + containers: + - args: + - --v=5 + - --endpoint=unix:/csi/csi.sock + - --nodeid=$(KUBE_NODE_NAME) + env: + - name: KUBE_NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName + image: ghcr.io/pfnet-research/meta-fuse-csi-plugin/meta-fuse-csi-plugin:latest + imagePullPolicy: IfNotPresent + name: meta-fuse-csi-plugin + resources: + limits: + cpu: 200m + memory: 200Mi + requests: + cpu: 5m + memory: 10Mi + securityContext: + privileged: true + readOnlyRootFilesystem: true + volumeMounts: + - mountPath: /var/lib/kubelet/pods + mountPropagation: Bidirectional + name: kubelet-dir + - mountPath: /csi + name: socket-dir + - args: + - --v=5 + - --csi-address=/csi/csi.sock + - --kubelet-registration-path=$(DRIVER_REG_SOCK_PATH) + env: + - name: DRIVER_REG_SOCK_PATH + value: /var/lib/kubelet/plugins/meta-fuse-csi-plugin.csi.storage.pfn.io/csi.sock + image: registry.k8s.io/sig-storage/csi-node-driver-registrar:v2.8.0 + imagePullPolicy: Always + name: csi-driver-registrar + resources: + limits: + cpu: 50m + memory: 100Mi + requests: + cpu: 10m + memory: 10Mi + securityContext: + allowPrivilegeEscalation: false + capabilities: + drop: + - all + readOnlyRootFilesystem: true + volumeMounts: + - mountPath: /csi + name: socket-dir + - mountPath: /registration + name: registration-dir + nodeSelector: + kubernetes.io/os: linux + securityContext: + seccompProfile: + type: RuntimeDefault + tolerations: + - operator: Exists + volumes: + - hostPath: + path: /var/lib/kubelet/plugins_registry/ + type: Directory + name: registration-dir + - hostPath: + path: /var/lib/kubelet/pods/ + type: Directory + name: kubelet-dir + - hostPath: + path: /var/lib/kubelet/plugins/meta-fuse-csi-plugin.csi.storage.pfn.io/ + type: DirectoryOrCreate + name: socket-dir + updateStrategy: + rollingUpdate: + maxUnavailable: 10% + type: RollingUpdate diff --git a/deploy/csi-driver.yaml b/deploy/csi-driver.yaml new file mode 100644 index 0000000..e9d9036 --- /dev/null +++ b/deploy/csi-driver.yaml @@ -0,0 +1,21 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: mfcp-system +--- +apiVersion: storage.k8s.io/v1 +kind: CSIDriver +metadata: + labels: + addonmanager.kubernetes.io/mode: Reconcile + k8s-app: meta-fuse-csi-plugin + name: meta-fuse-csi-plugin.csi.storage.pfn.io +spec: + attachRequired: false + fsGroupPolicy: ReadWriteOnceWithFSType + podInfoOnMount: true + requiresRepublish: true + storageCapacity: false + volumeLifecycleModes: + - Ephemeral + diff --git a/examples/.gitignore b/examples/.gitignore new file mode 100644 index 0000000..dcaadf4 --- /dev/null +++ b/examples/.gitignore @@ -0,0 +1 @@ +aws-secret.yaml \ No newline at end of file diff --git a/examples/check.sh b/examples/check.sh new file mode 100755 index 0000000..4f2c4e5 --- /dev/null +++ b/examples/check.sh @@ -0,0 +1,54 @@ +#!/bin/bash + +set -eu + +cd $(dirname $0) + +function wait_for_pod_becom_ready() { + while [[ $(kubectl get pods $1 -o 'jsonpath={..status.conditions[?(@.type=="Ready")].status}') != "True" ]]; do echo "waiting for pod" && sleep 1; done +} + +function wait_for_fuse_mounted() { + while [[ ! $(kubectl exec $1 -c $2 -- /bin/mount | grep fuse) ]]; do echo "waiting for mount" && sleep 1; done +} + +MANIFEST_DIR=$1 # path to example manifest +POD_NAME=$2 +PROVIDER_CONTAINER=$3 +PROVIDED_FILENAME=$4 +MOUNTED_CONTAINER=$5 +MOUNTED_FILENAME=$6 + +clean_up () { + ARG=$? + kubectl delete -f ./deploy.yaml + exit $ARG +} +trap clean_up EXIT + +cd $MANIFEST_DIR + +# Start to check the pod +echo "Checking Pod \"$POD_NAME\"..." +kubectl apply -f ./deploy.yaml + +# Waiting pod becomes ready +wait_for_pod_becom_ready $POD_NAME +echo "Pod is ready." + +# Waiting FUSE is mounted to the target container +wait_for_fuse_mounted $POD_NAME $MOUNTED_CONTAINER +echo "FUSE is mounted." + +# Validating content. +BASE_CONTENT=$(kubectl exec $POD_NAME -c $PROVIDER_CONTAINER -- cat $PROVIDED_FILENAME) +MOUNTED_CONTENT=$(kubectl exec $POD_NAME -c $MOUNTED_CONTAINER -- cat $MOUNTED_FILENAME) + +if [ "$BASE_CONTENT" != "$MOUNTED_CONTENT" ]; then + echo "Content unmatched!! expected=\"${BASE_CONTENT}\" actual=\"${MOUNTED_CONTENT}\"" + exit 1 +fi + +echo "OK." + +exit 0 diff --git a/examples/proxy/gcsfuse/Dockerfile b/examples/proxy/gcsfuse/Dockerfile new file mode 100644 index 0000000..38276c1 --- /dev/null +++ b/examples/proxy/gcsfuse/Dockerfile @@ -0,0 +1,31 @@ +FROM golang:1.20.7 as fusermount3-proxy-builder + +WORKDIR /meta-fuse-csi-plugin +ADD . . +RUN make fusermount3-proxy BINDIR=/bin + +FROM ubuntu:22.04 + +RUN apt update && apt upgrade -y +RUN apt install -y ca-certificates wget libfuse2 fuse3 + +# prepare for MinIO +RUN wget https://dl.min.io/client/mc/release/linux-amd64/mc -O /usr/bin/mc && chmod +x /usr/bin/mc + +COPY <> /etc/profile +RUN ssh-keygen -f /root/.ssh/example -P "" +RUN cat /root/.ssh/example.pub > /root/.ssh/authorized_keys + +COPY < /root/sshfs-example/test.txt + +COPY <> /etc/profile +RUN ssh-keygen -f /root/.ssh/example -P "" +RUN cat /root/.ssh/example.pub > /root/.ssh/authorized_keys + +COPY < /root/sshfs-example/test.txt + +COPY < 0 { + allMountOptions.Insert(mountOption) + } + } + + for _, mountOption := range systemOptions { + allMountOptions.Insert(mountOption) + } + + return allMountOptions.List() +} + +// removeChilds remove all childs in the directory +func removeChilds(dir string) error { + d, err := os.Open(dir) + if err != nil { + return err + } + defer d.Close() + names, err := d.Readdirnames(-1) + if err != nil { + return err + } + for _, name := range names { + err = os.RemoveAll(filepath.Join(dir, name)) + if err != nil { + return err + } + } + return nil +} diff --git a/pkg/csi_driver/node_unimpl.go b/pkg/csi_driver/node_unimpl.go new file mode 100755 index 0000000..4acc522 --- /dev/null +++ b/pkg/csi_driver/node_unimpl.go @@ -0,0 +1,42 @@ +/* +Copyright 2018 The Kubernetes Authors. +Copyright 2022 Google LLC +Copyright 2023 Preferred Networks, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package driver + +import ( + csi "github.com/container-storage-interface/spec/lib/go/csi" + "golang.org/x/net/context" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func (s *nodeServer) NodeStageVolume(_ context.Context, _ *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { + return nil, status.Error(codes.Unimplemented, "NodeStageVolumeResponse unsupported") +} + +func (s *nodeServer) NodeUnstageVolume(_ context.Context, _ *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) { + return nil, status.Error(codes.Unimplemented, "NodeUnstageVolumeResponse unsupported") +} + +func (s *nodeServer) NodeGetVolumeStats(_ context.Context, _ *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) { + return nil, status.Error(codes.Unimplemented, "NodeGetVolumeStats unsupported") +} + +func (s *nodeServer) NodeExpandVolume(_ context.Context, _ *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) { + return nil, status.Error(codes.Unimplemented, "NodeUnStageVolume unsupported") +} diff --git a/pkg/csi_driver/server.go b/pkg/csi_driver/server.go new file mode 100644 index 0000000..901b9a8 --- /dev/null +++ b/pkg/csi_driver/server.go @@ -0,0 +1,102 @@ +/* +Copyright 2018 The Kubernetes Authors. +Copyright 2022 Google LLC +Copyright 2023 Preferred Networks, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package driver + +import ( + "net" + "sync" + + csi "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/pfnet-research/meta-fuse-csi-plugin/pkg/util" + "google.golang.org/grpc" + "k8s.io/klog/v2" +) + +// Defines Non blocking GRPC server interfaces. +type NonBlockingGRPCServer interface { + // Start services at the endpoint + Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) + // Waits for the service to stop + Wait() + // Stops the service gracefully + Stop() + // Stops the service forcefully + ForceStop() +} + +func NewNonBlockingGRPCServer() NonBlockingGRPCServer { + return &nonBlockingGRPCServer{} +} + +// NonBlocking server. +type nonBlockingGRPCServer struct { + wg sync.WaitGroup + server *grpc.Server +} + +func (s *nonBlockingGRPCServer) Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) { + s.wg.Add(1) + + go s.serve(endpoint, ids, ns) +} + +func (s *nonBlockingGRPCServer) Wait() { + s.wg.Wait() +} + +func (s *nonBlockingGRPCServer) Stop() { + s.server.GracefulStop() +} + +func (s *nonBlockingGRPCServer) ForceStop() { + s.server.Stop() +} + +func (s *nonBlockingGRPCServer) serve(endpoint string, ids csi.IdentityServer, ns csi.NodeServer) { + scheme, addr, err := util.ParseEndpoint(endpoint, true) + if err != nil { + klog.Fatalf("failed to parse endpoint %v", err) + } + + klog.Infof("Start listening with scheme %v, addr %v", scheme, addr) + listener, err := net.Listen(scheme, addr) + if err != nil { + klog.Fatalf("Failed to listen: %v", err) + } + + opts := []grpc.ServerOption{ + grpc.UnaryInterceptor(logGRPC), + } + server := grpc.NewServer(opts...) + s.server = server + + if ids != nil { + csi.RegisterIdentityServer(server, ids) + } + if ns != nil { + csi.RegisterNodeServer(server, ns) + } + + klog.Infof("Listening for connections on address: %#v", listener.Addr()) + + err = server.Serve(listener) + if err != nil { + klog.Fatal(err.Error()) + } +} diff --git a/pkg/csi_driver/utils.go b/pkg/csi_driver/utils.go new file mode 100755 index 0000000..8ef3090 --- /dev/null +++ b/pkg/csi_driver/utils.go @@ -0,0 +1,87 @@ +/* +Copyright 2018 The Kubernetes Authors. +Copyright 2022 Google LLC +Copyright 2023 Preferred Networks, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package driver + +import ( + "fmt" + + csi "github.com/container-storage-interface/spec/lib/go/csi" + pbSanitizer "github.com/kubernetes-csi/csi-lib-utils/protosanitizer" + "golang.org/x/net/context" + "google.golang.org/grpc" + "k8s.io/klog/v2" +) + +const ( + CreateVolumeCSIFullMethod = "/csi.v1.Controller/CreateVolume" + DeleteVolumeCSIFullMethod = "/csi.v1.Controller/DeleteVolume" + NodePublishVolumeCSIFullMethod = "/csi.v1.Node/NodePublishVolume" +) + +func NewVolumeCapabilityAccessMode(mode csi.VolumeCapability_AccessMode_Mode) *csi.VolumeCapability_AccessMode { + return &csi.VolumeCapability_AccessMode{Mode: mode} +} + +func NewNodeServiceCapability(c csi.NodeServiceCapability_RPC_Type) *csi.NodeServiceCapability { + return &csi.NodeServiceCapability{ + Type: &csi.NodeServiceCapability_Rpc{ + Rpc: &csi.NodeServiceCapability_RPC{ + Type: c, + }, + }, + } +} + +func logGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + var strippedReq string + switch info.FullMethod { + case CreateVolumeCSIFullMethod: + strippedReq = pbSanitizer.StripSecrets(req).String() + case DeleteVolumeCSIFullMethod: + strippedReq = pbSanitizer.StripSecrets(req).String() + case NodePublishVolumeCSIFullMethod: + if nodePublishReq, ok := req.(*csi.NodePublishVolumeRequest); ok { + if token, ok := nodePublishReq.VolumeContext[VolumeContextKeyServiceAccountToken]; ok { + nodePublishReq.VolumeContext[VolumeContextKeyServiceAccountToken] = "***stripped***" + strippedReq = fmt.Sprintf("%+v", nodePublishReq) + nodePublishReq.VolumeContext[VolumeContextKeyServiceAccountToken] = token + } else { + strippedReq = fmt.Sprintf("%+v", req) + } + } else { + klog.Errorf("failed to case req to *csi.NodePublishVolumeRequest") + } + default: + strippedReq = fmt.Sprintf("%+v", req) + } + + klog.V(4).Infof("%s called with request: %v", info.FullMethod, strippedReq) + resp, err := handler(ctx, req) + if err != nil { + klog.Errorf("%s failed with error: %v", info.FullMethod, err) + } else { + if fmt.Sprintf("%v", resp) == "" { + klog.V(4).Infof("%s succeeded.", info.FullMethod) + } else { + klog.V(4).Infof("%s succeeded with response: %s", info.FullMethod, resp) + } + } + + return resp, err +} diff --git a/pkg/csi_mounter/csi_mounter.go b/pkg/csi_mounter/csi_mounter.go new file mode 100644 index 0000000..68f7cb9 --- /dev/null +++ b/pkg/csi_mounter/csi_mounter.go @@ -0,0 +1,341 @@ +/* +Copyright 2018 The Kubernetes Authors. +Copyright 2022 Google LLC +Copyright 2023 Preferred Networks, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package csimounter + +import ( + "encoding/json" + "fmt" + "net" + "os" + "path/filepath" + "strings" + "sync" + "syscall" + + starter "github.com/pfnet-research/meta-fuse-csi-plugin/pkg/fuse_starter" + "github.com/pfnet-research/meta-fuse-csi-plugin/pkg/util" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2" + "k8s.io/mount-utils" +) + +const ( + // See the nonroot user discussion: https://github.com/GoogleContainerTools/distroless/issues/443 + NobodyUID = 65534 + NobodyGID = 65534 +) + +// Mounter provides the meta-fuse-csi-plugin implementation of mount.Interface +// for the linux platform. +type Mounter struct { + mount.MounterForceUnmounter + chdirMu sync.Mutex + FdPassingSockets *FdPassingSockets +} + +// New returns a mount.MounterForceUnmounter for the current system. +// It provides options to override the default mounter behavior. +// mounterPath allows using an alternative to `/bin/mount` for mounting. +func New(mounterPath string) (mount.Interface, error) { + m, ok := mount.New(mounterPath).(mount.MounterForceUnmounter) + if !ok { + return nil, fmt.Errorf("failed to cast mounter to MounterForceUnmounter") + } + + return &Mounter{ + m, + sync.Mutex{}, + newFdPassingSockets(), + }, nil +} + +func (m *Mounter) Mount(source string, target string, fstype string, options []string) error { + if len(options) == 1 { + options = append(options, "") + } + + fdPassingSocketPath := options[0] + fdPassingSocketDir, fdPassingSocketName := filepath.Split(fdPassingSocketPath) + klog.V(4).Infof("start to mount (fdPassingSocketDir=%s fdPassingSocketName=%s)", fdPassingSocketDir, fdPassingSocketName) + + options = options[1:] + + csiMountOptions, _ := prepareMountOptions(options[1:]) + + klog.V(4).Info("passing the descriptor") + + err := m.createAndRegisterFdPassingSocket(target, fdPassingSocketDir, fdPassingSocketName) + if err != nil { + return fmt.Errorf("failed to create fd-passing socket: %w", err) + } + + // Prepare sidecar mounter MountConfig + mc := starter.MountConfig{ + VolumeName: source, + } + mcb, err := json.Marshal(mc) + if err != nil { + return fmt.Errorf("failed to marshal sidecar mounter MountConfig %v: %w", mc, err) + } + + // Asynchronously waiting for the sidecar container to connect to the listener + go func(mounter *Mounter, target, fstype string, csiMountOptions []string, msg []byte) { + defer func() { + err = mounter.FdPassingSockets.CloseAndUnregister(target, false) + if err != nil { + klog.Errorf("failed to close and unregister fd-passing socket for %q: %w", target, err) + } + }() + + podID, volumeName, _ := util.ParsePodIDVolumeFromTargetpath(target) + logPrefix := fmt.Sprintf("[Pod %v, VolumeName %v]", podID, volumeName) + + klog.V(4).Infof("%v start to accept connections to the listener.", logPrefix) + a, err := mounter.FdPassingSockets.accept(target) + if err != nil { + klog.Errorf("%v failed to accept connections to the listener: %v", logPrefix, err) + return + } + defer a.Close() + + klog.V(4).Info("opening the device /dev/fuse") + fuseFd, err := syscall.Open("/dev/fuse", syscall.O_RDWR, 0o644) + if err != nil { + klog.Errorf("failed to open the device /dev/fuse: %w", err) + return + } + defer syscall.Close(fuseFd) + csiMountOptions = append(csiMountOptions, fmt.Sprintf("fd=%v", fuseFd)) + + // fuse-impl expects fuse is mounted. + klog.V(4).Info("mounting the fuse filesystem") + err = mounter.MountSensitiveWithoutSystemdWithMountFlags(volumeName, target, fstype, csiMountOptions, nil, []string{"--internal-only"}) + if err != nil { + klog.Errorf("failed to mount the fuse filesystem: %w", err) + return + } + + klog.V(4).Infof("%v start to send file descriptor and mount options", logPrefix) + if err = util.SendMsg(a, fuseFd, msg); err != nil { + klog.Errorf("%v failed to send file descriptor and mount options: %v", logPrefix, err) + return + } + + klog.V(4).Infof("%v exiting the goroutine.", logPrefix) + }(m, target, fstype, csiMountOptions, mcb) + + return nil +} + +func (m *Mounter) createAndRegisterFdPassingSocket(target, sockDir, sockName string) error { + m.chdirMu.Lock() + defer m.chdirMu.Unlock() + + // Need to change the current working directory to the temp volume base path, + // because the socket absolute path is longer than 104 characters, + // which will cause "bind: invalid argument" errors. + exPwd, err := os.Getwd() + if err != nil { + return fmt.Errorf("failed to get the current directory to %w", err) + } + if err = os.Chdir(sockDir); err != nil { + return fmt.Errorf("failed to change directory to %q: %w", sockDir, err) + } + + klog.V(4).Infof("creating a listener for the socket at %q", sockDir) + l, err := net.Listen("unix", sockName) + if err != nil { + return fmt.Errorf("failed to create the listener for the socket: %w", err) + } + + unixListner := l.(*net.UnixListener) + + // Change the socket ownership + err = os.Chown(sockDir, NobodyUID, NobodyGID) + if err != nil { + return fmt.Errorf("failed to change ownership on emptyDirBasePath: %w", err) + } + err = os.Chown(sockName, NobodyUID, NobodyGID) + if err != nil { + return fmt.Errorf("failed to change ownership on socket: %w", err) + } + + if err = os.Chdir(exPwd); err != nil { + return fmt.Errorf("failed to change directory to %q: %w", exPwd, err) + } + + sockPath := filepath.Join(sockDir, sockName) + if err = m.FdPassingSockets.register(target, sockPath, unixListner); err != nil { + return fmt.Errorf("failed to register socket at %q: %w", sockPath, err) + } + + return nil +} + +func prepareMountOptions(options []string) ([]string, []string) { + allowedOptions := map[string]bool{ + "exec": true, + "noexec": true, + "atime": true, + "noatime": true, + "sync": true, + "async": true, + "dirsync": true, + } + + csiMountOptions := []string{ + "nodev", + "nosuid", + "allow_other", + "default_permissions", + "rootmode=40000", + fmt.Sprintf("user_id=%d", os.Getuid()), + fmt.Sprintf("group_id=%d", os.Getgid()), + } + + // users may pass options that should be used by Linux mount(8), + // filter out these options and not pass to the sidecar mounter. + validMountOptions := []string{"rw", "ro"} + optionSet := sets.NewString(options...) + for _, o := range validMountOptions { + if optionSet.Has(o) { + csiMountOptions = append(csiMountOptions, o) + optionSet.Delete(o) + } + } + + for _, o := range optionSet.List() { + if strings.HasPrefix(o, "o=") { + v := o[2:] + if allowedOptions[v] { + csiMountOptions = append(csiMountOptions, v) + } else { + klog.Warningf("got invalid mount option %q. Will discard invalid options and continue to mount.", v) + } + optionSet.Delete(o) + } + } + + return csiMountOptions, optionSet.List() +} + +type FdPassingSockets struct { + // key is target path + sockets map[string]*FdPassingSocket + socketsMutex sync.Mutex +} + +type FdPassingSocket struct { + socketPath string + listener *net.UnixListener + exitChan chan bool + closed bool +} + +func newFdPassingSockets() *FdPassingSockets { + return &FdPassingSockets{ + map[string]*FdPassingSocket{}, + sync.Mutex{}, + } +} + +func (fds *FdPassingSockets) register(targetPath string, sockPath string, listener *net.UnixListener) error { + fds.socketsMutex.Lock() + defer fds.socketsMutex.Unlock() + + // if the socket is already registered, return error + if _, ok := fds.sockets[targetPath]; ok { + return fmt.Errorf("fd-passing socket for %q is already registered", sockPath) + } + + fdSock := &FdPassingSocket{ + socketPath: sockPath, + listener: listener, + exitChan: make(chan bool, 5), + closed: false, + } + + fds.sockets[targetPath] = fdSock + + return nil +} + +func (fds *FdPassingSockets) get(targetPath string) *FdPassingSocket { + fds.socketsMutex.Lock() + defer fds.socketsMutex.Unlock() + + return fds.sockets[targetPath] +} + +func (fds *FdPassingSockets) accept(targetPath string) (net.Conn, error) { + sock := fds.get(targetPath) + if sock == nil { + return nil, fmt.Errorf("") + } + + return sock.listener.Accept() +} + +func (fds *FdPassingSockets) Exist(targetPath string) bool { + sock := fds.get(targetPath) + return sock != nil +} + +func (fds *FdPassingSockets) WaitForExit(targetPath string) { + sock := fds.get(targetPath) + if sock == nil { + return + } + + // wait for exit signal + <-sock.exitChan +} + +func (fds *FdPassingSockets) CloseAndUnregister(targetPath string, onlyClose bool) error { + fds.socketsMutex.Lock() + defer fds.socketsMutex.Unlock() + + sock, ok := fds.sockets[targetPath] + if !ok { + // fd-passing socket is already unregistered + return nil + } + + if !sock.closed { + sock.listener.Close() + sock.closed = true + } + + if onlyClose { + return nil + } + + // if unix domain socket exists, remove it. + if _, err := os.Stat(sock.socketPath); err == nil { + syscall.Unlink(sock.socketPath) + } + + // notify that listener has been closed via channel + sock.exitChan <- true + + // remove registered socket + delete(fds.sockets, targetPath) + + return nil +} diff --git a/pkg/csi_mounter/csi_mounter_test.go b/pkg/csi_mounter/csi_mounter_test.go new file mode 100644 index 0000000..fad475e --- /dev/null +++ b/pkg/csi_mounter/csi_mounter_test.go @@ -0,0 +1,94 @@ +/* +Copyright 2018 The Kubernetes Authors. +Copyright 2022 Google LLC +Copyright 2023 Preferred Networks, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package csimounter + +import ( + "fmt" + "os" + "reflect" + "testing" +) + +var defaultCsiMountOptions = []string{ + "nodev", + "nosuid", + "allow_other", + "default_permissions", + "rootmode=40000", + fmt.Sprintf("user_id=%d", os.Getuid()), + fmt.Sprintf("group_id=%d", os.Getgid()), +} + +func TestPrepareMountArgs(t *testing.T) { + t.Parallel() + + testCases := []struct { + name string + inputMountOptions []string + expecteCsiMountOptions []string + expecteSidecarMountOptions []string + }{ + { + name: "should return valid options correctly with empty input", + inputMountOptions: []string{}, + expecteCsiMountOptions: defaultCsiMountOptions, + expecteSidecarMountOptions: []string{}, + }, + { + name: "should return valid options correctly with CSI mount options only", + inputMountOptions: []string{"ro", "o=noexec", "o=noatime", "o=invalid"}, + expecteCsiMountOptions: append(defaultCsiMountOptions, "ro", "noexec", "noatime"), + expecteSidecarMountOptions: []string{}, + }, + { + name: "should return valid options correctly with sidecar mount options only", + inputMountOptions: []string{"implicit-dirs", "max-conns-per-host=10"}, + expecteCsiMountOptions: defaultCsiMountOptions, + expecteSidecarMountOptions: []string{"implicit-dirs", "max-conns-per-host=10"}, + }, + { + name: "should return valid options correctly with CSI and sidecar mount options", + inputMountOptions: []string{"ro", "implicit-dirs", "max-conns-per-host=10", "o=noexec", "o=noatime", "o=invalid"}, + expecteCsiMountOptions: append(defaultCsiMountOptions, "ro", "noexec", "noatime"), + expecteSidecarMountOptions: []string{"implicit-dirs", "max-conns-per-host=10"}, + }, + } + + for _, tc := range testCases { + t.Logf("test case: %s", tc.name) + + c, s := prepareMountOptions(tc.inputMountOptions) + if !reflect.DeepEqual(countOptionOccurrence(c), countOptionOccurrence(tc.expecteCsiMountOptions)) { + t.Errorf("Got options %v, but expected %v", c, tc.expecteCsiMountOptions) + } + + if !reflect.DeepEqual(countOptionOccurrence(s), countOptionOccurrence(tc.expecteSidecarMountOptions)) { + t.Errorf("Got options %v, but expected %v", s, tc.expecteSidecarMountOptions) + } + } +} + +func countOptionOccurrence(options []string) map[string]int { + dict := make(map[string]int) + for _, o := range options { + dict[o]++ + } + + return dict +} diff --git a/pkg/fuse_starter/fuse_starter.go b/pkg/fuse_starter/fuse_starter.go new file mode 100644 index 0000000..beb0118 --- /dev/null +++ b/pkg/fuse_starter/fuse_starter.go @@ -0,0 +1,105 @@ +/* +Copyright 2018 The Kubernetes Authors. +Copyright 2022 Google LLC +Copyright 2023 Preferred Networks, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fusestarter + +import ( + "encoding/json" + "fmt" + "net" + "os" + "os/exec" + "syscall" + + "github.com/pfnet-research/meta-fuse-csi-plugin/pkg/util" + "k8s.io/klog/v2" +) + +// FuseStarter will be used in the sidecar container to invoke fuse impl. +type FuseStarter struct { + mounterPath string + mounterArgs []string + Cmd *exec.Cmd +} + +// New returns a FuseStarter for the current system. +// It provides an option to specify the path to fuse binary. +func New(mounterPath string, mounterArgs []string) *FuseStarter { + return &FuseStarter{ + mounterPath: mounterPath, + mounterArgs: mounterArgs, + Cmd: nil, + } +} + +type MountConfig struct { + FileDescriptor int `json:"-"` + VolumeName string `json:"volumeName,omitempty"` +} + +func (m *FuseStarter) Mount(mc *MountConfig) (*exec.Cmd, error) { + klog.Infof("start to invoke fuse impl for volume %q", mc.VolumeName) + + klog.Infof("%s mounting with args %v...", m.mounterPath, m.mounterArgs) + cmd := exec.Cmd{ + Path: m.mounterPath, + Args: append([]string{m.mounterPath}, m.mounterArgs...), + ExtraFiles: []*os.File{os.NewFile(uintptr(mc.FileDescriptor), "/dev/fuse")}, + Stdout: os.Stdout, + Stderr: os.Stderr, + } + + m.Cmd = &cmd + + return &cmd, nil +} + +// Fetch the following information from a given socket path: +// 1. Pod volume name +// 2. The file descriptor +// 3. Mount options passing to mounter (passed by the csi mounter). +func PrepareMountConfig(sp string) (*MountConfig, error) { + mc := MountConfig{} + + klog.Infof("connecting to socket %q", sp) + c, err := net.Dial("unix", sp) + if err != nil { + return nil, fmt.Errorf("failed to connect to the socket %q: %w", sp, err) + } + defer func() { + // as we got all the information from the socket, closing the connection and deleting the socket + c.Close() + if err = syscall.Unlink(sp); err != nil { + // csi driver may already removed the socket. + klog.Warningf("failed to close socket %q: %v", sp, err) + } + }() + + fd, msg, err := util.RecvMsg(c) + if err != nil { + return nil, fmt.Errorf("failed to receive mount options from the socket %q: %w", sp, err) + } + + mc.FileDescriptor = fd + + if err := json.Unmarshal(msg, &mc); err != nil { + return nil, fmt.Errorf("failed to unmarshal the mount config: %w", err) + } + + return &mc, nil +} diff --git a/pkg/util/fdchannel.go b/pkg/util/fdchannel.go new file mode 100644 index 0000000..721349a --- /dev/null +++ b/pkg/util/fdchannel.go @@ -0,0 +1,84 @@ +/* +Copyright 2018 The Kubernetes Authors. +Copyright 2022 Google LLC +Copyright 2023 Preferred Networks, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "fmt" + "net" + "syscall" + + "k8s.io/klog/v2" +) + +func SendMsg(via net.Conn, fd int, msg []byte) error { + klog.V(4).Info("get the underlying socket") + conn, ok := via.(*net.UnixConn) + if !ok { + return fmt.Errorf("failed to cast via to *net.UnixConn") + } + connf, err := conn.File() + if err != nil { + return err + } + socket := int(connf.Fd()) + defer connf.Close() + + klog.V(4).Infof("calling sendmsg...") + rights := syscall.UnixRights(fd) + + return syscall.Sendmsg(socket, msg, rights, nil, 0) +} + +func RecvMsg(via net.Conn) (int, []byte, error) { + klog.V(4).Info("get the underlying socket") + conn, ok := via.(*net.UnixConn) + if !ok { + return 0, nil, fmt.Errorf("failed to cast via to *net.UnixConn") + } + connf, err := conn.File() + if err != nil { + return 0, nil, err + } + socket := int(connf.Fd()) + defer connf.Close() + + klog.V(4).Info("calling recvmsg...") + buf := make([]byte, syscall.CmsgSpace(4)) + b := make([]byte, 500) + //nolint:dogsled + n, _, _, _, err := syscall.Recvmsg(socket, b, buf, 0) + if err != nil { + return 0, nil, err + } + + klog.V(4).Info("parsing SCM...") + var msgs []syscall.SocketControlMessage + msgs, err = syscall.ParseSocketControlMessage(buf) + if err != nil { + return 0, nil, err + } + + klog.V(4).Info("parsing SCM_RIGHTS...") + fds, err := syscall.ParseUnixRights(&msgs[0]) + if err != nil { + return 0, nil, err + } + + return fds[0], b[:n], err +} diff --git a/pkg/util/util.go b/pkg/util/util.go new file mode 100644 index 0000000..7c418f7 --- /dev/null +++ b/pkg/util/util.go @@ -0,0 +1,149 @@ +/* +Copyright 2018 The Kubernetes Authors. +Copyright 2022 Google LLC +Copyright 2023 Preferred Networks, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "fmt" + "net" + "net/url" + "os" + "regexp" + "strings" + + "k8s.io/klog/v2" +) + +const ( + Mb = 1024 * 1024 +) + +// ConvertLabelsStringToMap converts the labels from string to map +// example: "key1=value1,key2=value2" gets converted into {"key1": "value1", "key2": "value2"} +func ConvertLabelsStringToMap(labels string) (map[string]string, error) { + const labelsDelimiter = "," + const labelsKeyValueDelimiter = "=" + + labelsMap := make(map[string]string) + if labels == "" { + return labelsMap, nil + } + + // Following rules enforced for label keys + // 1. Keys have a minimum length of 1 character and a maximum length of 63 characters, and cannot be empty. + // 2. Keys and values can contain only lowercase letters, numeric characters, underscores, and dashes. + // 3. Keys must start with a lowercase letter. + regexKey := regexp.MustCompile(`^\p{Ll}[\p{Ll}0-9_-]{0,62}$`) + checkLabelKeyFn := func(key string) error { + if !regexKey.MatchString(key) { + return fmt.Errorf("label value %q is invalid (should start with lowercase letter / lowercase letter, digit, _ and - chars are allowed / 1-63 characters", key) + } + + return nil + } + + // Values can be empty, and have a maximum length of 63 characters. + regexValue := regexp.MustCompile(`^[\p{Ll}0-9_-]{0,63}$`) + checkLabelValueFn := func(value string) error { + if !regexValue.MatchString(value) { + return fmt.Errorf("label value %q is invalid (lowercase letter, digit, _ and - chars are allowed / 0-63 characters", value) + } + + return nil + } + + keyValueStrings := strings.Split(labels, labelsDelimiter) + for _, keyValue := range keyValueStrings { + keyValue := strings.Split(keyValue, labelsKeyValueDelimiter) + + if len(keyValue) != 2 { + return nil, fmt.Errorf("labels %q are invalid, correct format: 'key1=value1,key2=value2'", labels) + } + + key := strings.TrimSpace(keyValue[0]) + if err := checkLabelKeyFn(key); err != nil { + return nil, err + } + + value := strings.TrimSpace(keyValue[1]) + if err := checkLabelValueFn(value); err != nil { + return nil, err + } + + labelsMap[key] = value + } + + const maxNumberOfLabels = 64 + if len(labelsMap) > maxNumberOfLabels { + return nil, fmt.Errorf("more than %d labels is not allowed, given: %d", maxNumberOfLabels, len(labelsMap)) + } + + return labelsMap, nil +} + +func ParseEndpoint(endpoint string, cleanupSocket bool) (string, string, error) { + u, err := url.Parse(endpoint) + if err != nil { + klog.Fatal(err.Error()) + } + + var addr string + switch u.Scheme { + case "unix": + addr = u.Path + if cleanupSocket { + if err := os.Remove(addr); err != nil && !os.IsNotExist(err) { + klog.Fatalf("Failed to remove %s, error: %s", addr, err) + } + } + case "tcp": + addr = u.Host + default: + klog.Fatalf("%v endpoint scheme not supported", u.Scheme) + } + + return u.Scheme, addr, nil +} + +func ParsePodIDVolumeFromTargetpath(targetPath string) (string, string, error) { + r := regexp.MustCompile(`/var/lib/kubelet/pods/(.*)/volumes/kubernetes\.io~csi/(.*)/mount`) + matched := r.FindStringSubmatch(targetPath) + if len(matched) < 3 { + return "", "", fmt.Errorf("targetPath %v does not contain Pod ID or volume information", targetPath) + } + podID := matched[1] + volume := matched[2] + + return podID, volume, nil +} + +func GetEmptyDirPath(podId, emptyDirName string) string { + return fmt.Sprintf("/var/lib/kubelet/pods/%s/volumes/kubernetes.io~empty-dir/%s", podId, emptyDirName) +} + +func GetNetConnFromRawUnixSocketFd(fd int) (net.Conn, error) { + f := os.NewFile(uintptr(fd), "unix_socket") + defer f.Close() + + c, err := net.FileConn(f) + if err != nil { + return nil, err + } + + return c, err +} diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go new file mode 100644 index 0000000..759cdda --- /dev/null +++ b/pkg/util/util_test.go @@ -0,0 +1,306 @@ +/* +Copyright 2018 The Kubernetes Authors. +Copyright 2022 Google LLC +Copyright 2023 Preferred Networks, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "reflect" + "testing" +) + +func TestConvertLabelsStringToMap(t *testing.T) { + t.Parallel() + t.Run("parsing labels string into map", func(t *testing.T) { + t.Parallel() + testCases := []struct { + name string + labels string + expectedOutput map[string]string + expectedError bool + }{ + // Success test cases + { + name: "should return empty map when labels string is empty", + labels: "", + expectedOutput: map[string]string{}, + expectedError: false, + }, + { + name: "single label string", + labels: "key=value", + expectedOutput: map[string]string{ + "key": "value", + }, + expectedError: false, + }, + { + name: "multiple label string", + labels: "key1=value1,key2=value2", + expectedOutput: map[string]string{ + "key1": "value1", + "key2": "value2", + }, + expectedError: false, + }, + { + name: "multiple labels string with whitespaces gets trimmed", + labels: "key1=value1, key2=value2", + expectedOutput: map[string]string{ + "key1": "value1", + "key2": "value2", + }, + expectedError: false, + }, + // Failure test cases + { + name: "malformed labels string (no keys and values)", + labels: ",,", + expectedOutput: nil, + expectedError: true, + }, + { + name: "malformed labels string (incorrect format)", + labels: "foo,bar", + expectedOutput: nil, + expectedError: true, + }, + { + name: "malformed labels string (missing key)", + labels: "key1=value1,=bar", + expectedOutput: nil, + expectedError: true, + }, + { + name: "malformed labels string (missing key and value)", + labels: "key1=value1,=bar,=", + expectedOutput: nil, + expectedError: true, + }, + } + + for _, tc := range testCases { + t.Logf("test case: %s", tc.name) + output, err := ConvertLabelsStringToMap(tc.labels) + if tc.expectedError && err == nil { + t.Errorf("Expected error but got none") + } + if err != nil { + if !tc.expectedError { + t.Errorf("Did not expect error but got: %v", err) + } + + continue + } + + if !reflect.DeepEqual(output, tc.expectedOutput) { + t.Errorf("Got labels %v, but expected %v", output, tc.expectedOutput) + } + } + }) + + t.Run("checking google requirements", func(t *testing.T) { + t.Parallel() + testCases := []struct { + name string + labels string + expectedError bool + }{ + { + name: "64 labels at most", + labels: `k1=v,k2=v,k3=v,k4=v,k5=v,k6=v,k7=v,k8=v,k9=v,k10=v,k11=v,k12=v,k13=v,k14=v,k15=v,k16=v,k17=v,k18=v,k19=v,k20=v, + k21=v,k22=v,k23=v,k24=v,k25=v,k26=v,k27=v,k28=v,k29=v,k30=v,k31=v,k32=v,k33=v,k34=v,k35=v,k36=v,k37=v,k38=v,k39=v,k40=v, + k41=v,k42=v,k43=v,k44=v,k45=v,k46=v,k47=v,k48=v,k49=v,k50=v,k51=v,k52=v,k53=v,k54=v,k55=v,k56=v,k57=v,k58=v,k59=v,k60=v, + k61=v,k62=v,k63=v,k64=v,k65=v`, + expectedError: true, + }, + { + name: "label key must have atleast 1 char", + labels: "=v", + expectedError: true, + }, + { + name: "label key can only contain lowercase chars, digits, _ and -)", + labels: "k*=v", + expectedError: true, + }, + { + name: "label key can only contain lowercase chars)", + labels: "K=v", + expectedError: true, + }, + { + name: "label key may not have over 63 characters", + labels: "abcdefghijabcdefghijabcdefghijabcdefghijabcdefghijabcdefghij1234=v", + expectedError: true, + }, + { + name: "label value can only contain lowercase chars, digits, _ and -)", + labels: "k1=###", + expectedError: true, + }, + { + name: "label value can only contain lowercase chars)", + labels: "k1=V", + expectedError: true, + }, + { + name: "label key cannot contain . and /", + labels: "kubernetes.io/created-for/pvc/namespace=v", + expectedError: true, + }, + { + name: "label value cannot contain . and /", + labels: "kubernetes_io_created-for_pvc_namespace=v./", + expectedError: true, + }, + { + name: "label value may not have over 63 chars", + labels: "v=abcdefghijabcdefghijabcdefghijabcdefghijabcdefghijabcdefghij1234", + expectedError: true, + }, + { + name: "label key can have up to 63 chars", + labels: "abcdefghijabcdefghijabcdefghijabcdefghijabcdefghijabcdefghij123=v", + expectedError: false, + }, + { + name: "label value can have up to 63 chars", + labels: "k=abcdefghijabcdefghijabcdefghijabcdefghijabcdefghijabcdefghij123", + expectedError: false, + }, + { + name: "label key can contain - and _", + labels: "abcdefghijabcdefghijabcdefghijabcdefghijabcdefghijabcdefghij-_=v", + expectedError: false, + }, + { + name: "label value can contain - and _", + labels: "k=abcdefghijabcdefghijabcdefghijabcdefghijabcdefghijabcdefghij-_", + expectedError: false, + }, + { + name: "label value can have 0 chars", + labels: "kubernetes_io_created-for_pvc_namespace=", + expectedError: false, + }, + } + + for _, tc := range testCases { + t.Logf("test case: %s", tc.name) + _, err := ConvertLabelsStringToMap(tc.labels) + + if tc.expectedError && err == nil { + t.Errorf("Expected error but got none") + } + + if !tc.expectedError && err != nil { + t.Errorf("Did not expect error but got: %v", err) + } + } + }) +} + +func TestParseEndpoint(t *testing.T) { + t.Parallel() + testCases := []struct { + name string + endpoint string + expectedScheme string + expectedAddress string + expectedError bool + }{ + { + name: "should parse unix endpoint correctly", + endpoint: "unix:/csi/csi.sock", + expectedScheme: "unix", + expectedAddress: "/csi/csi.sock", + expectedError: false, + }, + } + + for _, tc := range testCases { + t.Logf("test case: %s", tc.name) + scheme, address, err := ParseEndpoint(tc.endpoint, false) + if tc.expectedError && err == nil { + t.Errorf("Expected error but got none") + } + if err != nil { + if !tc.expectedError { + t.Errorf("Did not expect error but got: %v", err) + } + + continue + } + + if !reflect.DeepEqual(scheme, tc.expectedScheme) { + t.Errorf("Got scheme %v, but expected %v", scheme, tc.expectedScheme) + } + + if !reflect.DeepEqual(address, tc.expectedAddress) { + t.Errorf("Got address %v, but expected %v", address, tc.expectedAddress) + } + } +} + +func TestParsePodIDVolumeFromTargetpath(t *testing.T) { + t.Parallel() + testCases := []struct { + name string + targetPath string + expectedPodID string + expectedVolume string + expectedError bool + }{ + { + name: "should parse Pod ID correctly", + targetPath: "/var/lib/kubelet/pods/d2013878-3d56-45f9-89ec-0826612c89b6/volumes/kubernetes.io~csi/test-volume/mount", + expectedPodID: "d2013878-3d56-45f9-89ec-0826612c89b6", + expectedVolume: "test-volume", + expectedError: false, + }, + { + name: "should return error", + targetPath: "/foo/bar/volumes", + expectedPodID: "", + expectedVolume: "", + expectedError: true, + }, + } + + for _, tc := range testCases { + t.Logf("test case: %s", tc.name) + podID, volume, err := ParsePodIDVolumeFromTargetpath(tc.targetPath) + if tc.expectedError && err == nil { + t.Errorf("Expected error but got none") + } + if err != nil { + if !tc.expectedError { + t.Errorf("Did not expect error but got: %v", err) + } + + continue + } + + if !reflect.DeepEqual(podID, tc.expectedPodID) { + t.Errorf("Got pod ID %v, but expected %v", podID, tc.expectedPodID) + } + if !reflect.DeepEqual(volume, tc.expectedVolume) { + t.Errorf("Got volume %v, but expected %v", volume, tc.expectedVolume) + } + } +} diff --git a/pkg/util/volume_lock.go b/pkg/util/volume_lock.go new file mode 100755 index 0000000..c64aa7d --- /dev/null +++ b/pkg/util/volume_lock.go @@ -0,0 +1,61 @@ +/* +Copyright 2018 The Kubernetes Authors. +Copyright 2022 Google LLC +Copyright 2023 Preferred Networks, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "sync" + + "k8s.io/apimachinery/pkg/util/sets" +) + +const ( + VolumeOperationAlreadyExistsFmt = "An operation with the given volume key %s already exists" +) + +// VolumeLocks implements a map with atomic operations. It stores a set of all volume IDs +// with an ongoing operation. +type VolumeLocks struct { + locks sets.Set[string] + mux sync.Mutex +} + +func NewVolumeLocks() *VolumeLocks { + return &VolumeLocks{ + locks: sets.Set[string]{}, + } +} + +// TryAcquire tries to acquire the lock for operating on volumeID and returns true if successful. +// If another operation is already using volumeID, returns false. +func (vl *VolumeLocks) TryAcquire(volumeID string) bool { + vl.mux.Lock() + defer vl.mux.Unlock() + if vl.locks.Has(volumeID) { + return false + } + vl.locks.Insert(volumeID) + + return true +} + +func (vl *VolumeLocks) Release(volumeID string) { + vl.mux.Lock() + defer vl.mux.Unlock() + vl.locks.Delete(volumeID) +} diff --git a/test_e2e.sh b/test_e2e.sh new file mode 100755 index 0000000..323816d --- /dev/null +++ b/test_e2e.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +set -e + +cd $(dirname $0) + +echo $GITHUB_ACTION +if [ -z $GITHUB_ACTION ]; then + STAGINGVERSION=latest LOAD_TO_KIND=true make all +else + STAGINGVERSION=latest LOAD_TO_KIND=true DOCKER_CACHE_ARGS="--cache-from=type=gha --cache-to=type=gha,mode=max" make all +fi +kubectl apply -f deploy/csi-driver.yaml +kubectl apply -f deploy/csi-driver-daemonset.yaml + +while [[ $(kubectl get ds -n mfcp-system meta-fuse-csi-plugin -o 'jsonpath={..status.numberReady}') != "1" ]]; do echo "waiting for csi-plugin" && sleep 1; done + +make test-examples