Skip to content

Commit

Permalink
NBSNEBIUS-317: use the same sockets dir for pod and node (#1295)
Browse files Browse the repository at this point in the history
  • Loading branch information
sanek325 authored Jun 14, 2024
1 parent 4bc730b commit 1881b1a
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 55 deletions.
11 changes: 1 addition & 10 deletions cloud/blockstore/tools/csi_driver/cmd/nbs-csi-driver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,7 @@ func main() {
flag.StringVar(&cfg.NfsServerSocket, "nfs-server-socket", "", "NFS server unix socket path")
flag.UintVar(&cfg.NfsVhostPort, "nfs-vhost-port", 9022, "NFS vhost port")
flag.StringVar(&cfg.NfsVhostSocket, "nfs-vhost-socket", "", "NFS vhost unix socket path")
flag.StringVar(&cfg.NbsSocketsDir,
"nbs-sockets-dir",
"/run/nbsd/sockets",
"Path to folder with disk sockets on the node",
)
flag.StringVar(&cfg.PodSocketsDir,
"pod-sockets-dir",
"/nbsd-sockets",
"Path to folder with disk sockets on the pod",
)
flag.StringVar(&cfg.SocketsDir, "sockets-dir", "/run/nbsd/sockets", "Path to folder with disk sockets")

flag.Parse()

Expand Down
6 changes: 2 additions & 4 deletions cloud/blockstore/tools/csi_driver/internal/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ type Config struct {
NfsServerSocket string
NfsVhostPort uint
NfsVhostSocket string
NbsSocketsDir string
PodSocketsDir string
SocketsDir string
}

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -147,8 +146,7 @@ func NewDriver(cfg Config) (*Driver, error) {
cfg.NodeID,
nbsClientID,
cfg.VMMode,
cfg.NbsSocketsDir,
cfg.PodSocketsDir,
cfg.SocketsDir,
NodeFsTargetPathPattern,
NodeBlkTargetPathPattern,
nbsClient,
Expand Down
32 changes: 13 additions & 19 deletions cloud/blockstore/tools/csi_driver/internal/driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ type nodeService struct {
nodeID string
clientID string
vmMode bool
nbsSocketsDir string
podSocketsDir string
socketsDir string
targetFsPathRegexp *regexp.Regexp
targetBlkPathRegexp *regexp.Regexp

Expand All @@ -71,8 +70,7 @@ func newNodeService(
nodeID string,
clientID string,
vmMode bool,
nbsSocketsDir string,
podSocketsDir string,
socketsDir string,
targetFsPathPattern string,
targetBlkPathPattern string,
nbsClient nbsclient.ClientIface,
Expand All @@ -83,8 +81,7 @@ func newNodeService(
nodeID: nodeID,
clientID: clientID,
vmMode: vmMode,
nbsSocketsDir: nbsSocketsDir,
podSocketsDir: podSocketsDir,
socketsDir: socketsDir,
nbsClient: nbsClient,
nfsClient: nfsClient,
mounter: mounter,
Expand Down Expand Up @@ -364,15 +361,14 @@ func (s *nodeService) startNbsEndpoint(
volumeId string,
ipcType nbsapi.EClientIpcType) (*nbsapi.TStartEndpointResponse, error) {

endpointDir := filepath.Join(s.podSocketsDir, podId, volumeId)
endpointDir := filepath.Join(s.socketsDir, podId, volumeId)
if err := os.MkdirAll(endpointDir, os.FileMode(0755)); err != nil {
return nil, err
}

socketPath := filepath.Join(s.nbsSocketsDir, podId, volumeId, nbsSocketName)
hostType := nbsapi.EHostType_HOST_TYPE_DEFAULT
return s.nbsClient.StartEndpoint(ctx, &nbsapi.TStartEndpointRequest{
UnixSocketPath: socketPath,
UnixSocketPath: filepath.Join(endpointDir, nbsSocketName),
DiskId: volumeId,
ClientId: s.clientID,
DeviceName: volumeId,
Expand All @@ -394,7 +390,7 @@ func (s *nodeService) nodePublishFileStoreAsVhostSocket(
ctx context.Context,
req *csi.NodePublishVolumeRequest) error {

endpointDir := filepath.Join(s.podSocketsDir, s.getPodId(req), req.VolumeId)
endpointDir := filepath.Join(s.socketsDir, s.getPodId(req), req.VolumeId)
if err := os.MkdirAll(endpointDir, os.FileMode(0755)); err != nil {
return err
}
Expand All @@ -403,10 +399,9 @@ func (s *nodeService) nodePublishFileStoreAsVhostSocket(
return fmt.Errorf("NFS client wasn't created")
}

socketPath := filepath.Join(s.nbsSocketsDir, s.getPodId(req), req.VolumeId, nfsSocketName)
_, err := s.nfsClient.StartEndpoint(ctx, &nfsapi.TStartEndpointRequest{
Endpoint: &nfsapi.TEndpointConfig{
SocketPath: socketPath,
SocketPath: filepath.Join(endpointDir, nfsSocketName),
FileSystemId: req.VolumeId,
ClientId: fmt.Sprintf("%s-%s", s.clientID, s.getPodId(req)),
VhostQueuesCount: 8,
Expand Down Expand Up @@ -437,16 +432,15 @@ func (s *nodeService) nodeUnpublishVolume(
}
}

podSocketDir := filepath.Join(s.podSocketsDir, podId, req.VolumeId)
nodeSocketDir := filepath.Join(s.nbsSocketsDir, podId, req.VolumeId)
socketDir := filepath.Join(s.socketsDir, podId, req.VolumeId)

// Trying to stop both NBS and NFS endpoints,
// because the endpoint's backend service is unknown here.
// When we miss we get S_FALSE/S_ALREADY code (err == nil).

if s.nbsClient != nil {
_, err := s.nbsClient.StopEndpoint(ctx, &nbsapi.TStopEndpointRequest{
UnixSocketPath: filepath.Join(nodeSocketDir, nbsSocketName),
UnixSocketPath: filepath.Join(socketDir, nbsSocketName),
})
if err != nil {
return fmt.Errorf("failed to stop nbs endpoint: %w", err)
Expand All @@ -455,25 +449,25 @@ func (s *nodeService) nodeUnpublishVolume(

if s.nfsClient != nil {
_, err := s.nfsClient.StopEndpoint(ctx, &nfsapi.TStopEndpointRequest{
SocketPath: filepath.Join(nodeSocketDir, nfsSocketName),
SocketPath: filepath.Join(socketDir, nfsSocketName),
})
if err != nil {
return fmt.Errorf("failed to stop nfs endpoint: %w", err)
}
}

if err := os.RemoveAll(podSocketDir); err != nil {
if err := os.RemoveAll(socketDir); err != nil {
return err
}

// remove pod's folder if it's empty
os.Remove(filepath.Join(s.podSocketsDir, podId))
os.Remove(filepath.Join(s.socketsDir, podId))
return nil
}

func (s *nodeService) mountSocketDir(req *csi.NodePublishVolumeRequest) error {

endpointDir := filepath.Join(s.podSocketsDir, s.getPodId(req), req.VolumeId)
endpointDir := filepath.Join(s.socketsDir, s.getPodId(req), req.VolumeId)

// https://kubevirt.io/user-guide/virtual_machines/disks_and_volumes/#persistentvolumeclaim
// "If the disk.img image file has not been created manually before starting a VM
Expand Down
38 changes: 16 additions & 22 deletions cloud/blockstore/tools/csi_driver/internal/driver/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,18 @@ func doTestPublishUnpublishVolumeForKubevirt(t *testing.T, backend string) {
podID := "test-pod-id-13"
nfsClientId := "testClientId-test-pod-id-13"
diskID := "test-disk-id-42"
podSocketsDir := filepath.Join(tempDir, "sockets")
nbsSocketsDir := "/test/sockets/folder"
sourcePath := filepath.Join(podSocketsDir, podID, diskID)
socketsDir := filepath.Join(tempDir, "sockets")
sourcePath := filepath.Join(socketsDir, podID, diskID)
targetPath := filepath.Join(tempDir, "pods", podID, "volumes", diskID, "mount")
targetFsPathPattern := filepath.Join(tempDir, "pods/([a-z0-9-]+)/volumes/([a-z0-9-]+)/mount")
nbsSocketPath := filepath.Join(nbsSocketsDir, podID, diskID, "nbs.sock")
nfsSocketPath := filepath.Join(nbsSocketsDir, podID, diskID, "nfs.sock")
nbsSocketPath := filepath.Join(sourcePath, "nbs.sock")
nfsSocketPath := filepath.Join(sourcePath, "nfs.sock")

nodeService := newNodeService(
"testNodeId",
clientID,
true, // vmMode
nbsSocketsDir,
podSocketsDir,
socketsDir,
targetFsPathPattern,
"", // targetBlkPathPattern
nbsClient,
Expand Down Expand Up @@ -145,7 +143,7 @@ func doTestPublishUnpublishVolumeForKubevirt(t *testing.T, backend string) {
})
require.NoError(t, err)

_, err = os.Stat(filepath.Join(podSocketsDir, podID))
_, err = os.Stat(filepath.Join(socketsDir, podID))
assert.True(t, os.IsNotExist(err))

_, err = nodeService.NodeUnstageVolume(ctx, &csi.NodeUnstageVolumeRequest{
Expand Down Expand Up @@ -192,17 +190,15 @@ func TestPublishUnpublishDiskForInfrakuber(t *testing.T) {
diskID := "test-disk-id-42"
targetPath := filepath.Join(tempDir, "pods", podID, "volumes", diskID, "mount")
targetFsPathPattern := filepath.Join(tempDir, "pods/([a-z0-9-]+)/volumes/([a-z0-9-]+)/mount")
podSocketsDir := filepath.Join(tempDir, "sockets")
nbsSocketsDir := "/test/sockets/folder"
sourcePath := filepath.Join(podSocketsDir, podID, diskID)
socketPath := filepath.Join(nbsSocketsDir, podID, diskID, "nbs.sock")
socketsDir := filepath.Join(tempDir, "sockets")
sourcePath := filepath.Join(socketsDir, podID, diskID)
socketPath := filepath.Join(socketsDir, podID, diskID, "nbs.sock")

nodeService := newNodeService(
"testNodeId",
clientID,
false, // vmMode
nbsSocketsDir,
podSocketsDir,
socketsDir,
targetFsPathPattern,
"", // targetBlkPathPattern
nbsClient,
Expand Down Expand Up @@ -289,7 +285,7 @@ func TestPublishUnpublishDiskForInfrakuber(t *testing.T) {
})
require.NoError(t, err)

_, err = os.Stat(filepath.Join(podSocketsDir, podID))
_, err = os.Stat(filepath.Join(socketsDir, podID))
assert.True(t, os.IsNotExist(err))

_, err = nodeService.NodeUnstageVolume(ctx, &csi.NodeUnstageVolumeRequest{
Expand All @@ -316,17 +312,15 @@ func TestPublishUnpublishDeviceForInfrakuber(t *testing.T) {
diskID := "test-disk-id-42"
targetPath := filepath.Join(tempDir, "volumeDevices", "publish", diskID, podID)
targetBlkPathPattern := filepath.Join(tempDir, "volumeDevices/publish/([a-z0-9-]+)/([a-z0-9-]+)")
podSocketsDir := filepath.Join(tempDir, "sockets")
nbsSocketsDir := "/test/sockets/folder"
sourcePath := filepath.Join(podSocketsDir, podID, diskID)
socketPath := filepath.Join(nbsSocketsDir, podID, diskID, "nbs.sock")
socketsDir := filepath.Join(tempDir, "sockets")
sourcePath := filepath.Join(socketsDir, podID, diskID)
socketPath := filepath.Join(sourcePath, "nbs.sock")

nodeService := newNodeService(
"testNodeId",
clientID,
false, // vmMode
nbsSocketsDir,
podSocketsDir,
socketsDir,
"", // targetFsPathPattern
targetBlkPathPattern,
nbsClient,
Expand Down Expand Up @@ -406,7 +400,7 @@ func TestPublishUnpublishDeviceForInfrakuber(t *testing.T) {
})
require.NoError(t, err)

_, err = os.Stat(filepath.Join(podSocketsDir, podID))
_, err = os.Stat(filepath.Join(socketsDir, podID))
assert.True(t, os.IsNotExist(err))

_, err = nodeService.NodeUnstageVolume(ctx, &csi.NodeUnstageVolumeRequest{
Expand Down

0 comments on commit 1881b1a

Please sign in to comment.