Skip to content

Commit

Permalink
staged csi: use per-instance staging directory
Browse files Browse the repository at this point in the history
We had all staged disk on the node to be in shared staging directory,
but now we have all necessary information in stage record to get
instanceId in NodeUnstage so we can group disks in instance directory.

Old path looked like:
/...NBS.../sockets/${POD_ID}/${VOLUME_ID}/[nbs,nfs].sock

Intermediate path looked like:
/...NBS.../sockets/staging/${VOLUME_ID}/[nbs,nfs].sock

New path will look like:
/...NBS.../sockets/${INSTANCE_ID}/${VOLUME_ID}/[nbs,nfs].sock

Signed-off-by: Anton Kuchin <[email protected]>
  • Loading branch information
aikuchin committed Sep 23, 2024
1 parent f036792 commit 2267fd2
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 11 deletions.
20 changes: 10 additions & 10 deletions cloud/blockstore/tools/csi_driver/internal/driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ const topologyNodeKey = "topology.nbs.csi/node"

const nbsSocketName = "nbs.sock"
const nfsSocketName = "nfs.sock"
const stagingDirName = "staging"

const vhostIpc = nbsapi.EClientIpcType_IPC_VHOST
const nbdIpc = nbsapi.EClientIpcType_IPC_NBD
Expand Down Expand Up @@ -161,17 +160,15 @@ func (s *nodeService) NodeStageVolume(
var err error
if instanceID := req.VolumeContext[instanceIDKey]; instanceID != "" {
stageRecordPath := filepath.Join(req.StagingTargetPath, req.VolumeId+".json")

// Backend can be empty for old disks, in this case we use NBS
backend := "nbs"
if nfsBackend {
backend = "nfs"
}

if err = s.writeStageData(stageRecordPath, &StageData{
Backend: backend,
InstanceId: instanceID,
RealStagePath: s.getEndpointDir(stagingDirName, req.VolumeId),
RealStagePath: s.getEndpointDir(instanceID, req.VolumeId),
}); err != nil {
return nil, s.statusErrorf(codes.Internal,
"Failed to wriete stage record: %v", err)
Expand Down Expand Up @@ -284,7 +281,7 @@ func (s *nodeService) NodePublishVolume(
case *csi.VolumeCapability_Mount:
if s.vmMode {
if instanceID != "" {
err = s.nodePublishStagedVhostSocket(req)
err = s.nodePublishStagedVhostSocket(req, instanceID)
} else {
if nfsBackend {
err = s.nodePublishFileStoreAsVhostSocket(ctx, req)
Expand Down Expand Up @@ -472,7 +469,7 @@ func (s *nodeService) nodeStageDiskAsVhostSocket(

log.Printf("csi.nodeStageDiskAsVhostSocket: %s %s %+v", instanceId, volumeId, volumeContext)

endpointDir := s.getEndpointDir(stagingDirName, volumeId)
endpointDir := s.getEndpointDir(instanceId, volumeId)
if err := os.MkdirAll(endpointDir, os.FileMode(0755)); err != nil {
return err
}
Expand Down Expand Up @@ -671,7 +668,7 @@ func (s *nodeService) nodeStageFileStoreAsVhostSocket(

log.Printf("csi.nodeStageFileStoreAsVhostSocket: %s %s", instanceID, volumeID)

endpointDir := s.getEndpointDir(stagingDirName, volumeID)
endpointDir := s.getEndpointDir(instanceID, volumeID)
if err := os.MkdirAll(endpointDir, os.FileMode(0755)); err != nil {
return err
}
Expand All @@ -693,11 +690,14 @@ func (s *nodeService) nodeStageFileStoreAsVhostSocket(
return fmt.Errorf("failed to start NFS endpoint: %w", err)
}

log.Printf("csi.nodeStageFileStoreAsVhostSocket: started endpoint %q",
filepath.Join(endpointDir, nfsSocketName))

return s.createDummyImgFile(endpointDir)
}

func (s *nodeService) nodePublishStagedVhostSocket(req *csi.NodePublishVolumeRequest) error {
endpointDir := s.getEndpointDir(stagingDirName, req.VolumeId)
func (s *nodeService) nodePublishStagedVhostSocket(req *csi.NodePublishVolumeRequest, instanceId string) error {
endpointDir := s.getEndpointDir(instanceId, req.VolumeId)
return s.mountSocketDir(endpointDir, req)
}

Expand Down Expand Up @@ -812,7 +812,7 @@ func (s *nodeService) nodeUnstageVhostSocket(
}

// remove staging folder if it's empty
ignoreError(os.Remove(s.getEndpointDir(stagingDirName, "")))
ignoreError(os.Remove(s.getEndpointDir(stageData.InstanceId, "")))
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func doTestStagedPublishUnpublishVolumeForKubevirt(t *testing.T, backend string,
}
stagingTargetPath := filepath.Join(tempDir, "testStagingTargetPath")
socketsDir := filepath.Join(tempDir, "sockets")
sourcePath := filepath.Join(socketsDir, stagingDirName, diskID)
sourcePath := filepath.Join(socketsDir, instanceID, diskID)
targetPath := filepath.Join(tempDir, "pods", podID, "volumes", diskID, "mount")
targetFsPathPattern := filepath.Join(tempDir, "pods/([a-z0-9-]+)/volumes/([a-z0-9-]+)/mount")
nbsSocketPath := filepath.Join(sourcePath, "nbs.sock")
Expand Down

0 comments on commit 2267fd2

Please sign in to comment.