feat: implement NodeStageVolume() and NodePublishVolume()

- Implement `NodeStageVolume()` to handle device attachment to the node
- Move device attachment logic from `NodePublishVolume()` to `NodeStageVolume()`
- Refactor `NodePublishVolume()` to only handle mounting from staging path to target path

Signed-off-by: cheolho.kang <cheolho.kang@samsung.com>
This commit is contained in:
cheolho.kang 2025-03-20 16:50:46 +09:00
parent f3d036bb0f
commit 7f4930d717
4 changed files with 191 additions and 48 deletions

View File

@ -70,6 +70,9 @@ spec:
volumeMounts:
- name: socket-dir
mountPath: /var/lib/kubelet/plugins/csi.nvmf.com
- name: csi-dir
mountPath: /var/lib/kubelet/plugins/kubernetes.io
mountPropagation: "Bidirectional"
- name: pods-mount-dir
mountPath: /var/lib/kubelet/pods
mountPropagation: "Bidirectional"
@ -92,6 +95,10 @@ spec:
hostPath:
path: /var/lib/kubelet/plugins_registry
type: DirectoryOrCreate
- name: csi-dir
hostPath:
path: /var/lib/kubelet/plugins/kubernetes.io
type: DirectoryOrCreate
- name: pods-mount-dir
hostPath:
path: /var/lib/kubelet/pods

View File

@ -6,7 +6,8 @@ spec:
accessModes:
- ReadWriteOnce
storageClassName: csi-nvmf-sc
volumeMode: Block
resources:
requests:
storage: 20Gi

View File

@ -18,6 +18,7 @@ package nvmf
import (
"os"
"sync"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-csi/csi-driver-nvmf/pkg/utils"
@ -29,6 +30,7 @@ import (
type NodeServer struct {
Driver *driver
mtx sync.Mutex // protect volumes map
}
func NewNodeServer(d *driver) *NodeServer {
@ -73,22 +75,31 @@ func (n *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublish
return nil, status.Errorf(codes.InvalidArgument, "NodePublishVolume missing TargetPath in req.")
}
// 2. attachdisk
if len(req.GetStagingTargetPath()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Staging target path missing in request")
}
n.mtx.Lock()
defer n.mtx.Unlock()
klog.V(4).Infof("NodePublishVolume called for volume %s", req.VolumeId)
// 2. mountdisk
// Create mounter for the volume to be published
parameter := req.GetVolumeContext()
volumeID := req.GetVolumeId()
targetPath := req.GetTargetPath()
stagingPath := req.GetStagingTargetPath() + "/" + volumeID
nvmfInfo, err := getNVMfDiskInfo(volumeID, parameter)
if err != nil {
return nil, status.Errorf(codes.Internal, "NodePublishVolume: get NVMf disk info from req err: %v", err)
}
diskMounter := getNVMfDiskMounter(nvmfInfo, targetPath, req.GetVolumeCapability())
// attachDisk realize connect NVMf disk and mount to docker path
_, err = AttachDisk(req, *diskMounter)
// Mount to the docker path from the staging path
err = MountVolume(stagingPath, diskMounter)
if err != nil {
klog.Errorf("NodePublishVolume: Attach volume %s with error: %s", req.VolumeId, err.Error())
return nil, err
klog.Errorf("NodePublishVolume: failed to mount volume %s at %s with error: %s", req.VolumeId, targetPath, err.Error())
return nil, status.Errorf(codes.Unavailable, "NodePublishVolume: failed to mount volume: %v", err)
}
return &csi.NodePublishVolumeResponse{}, nil
@ -131,8 +142,57 @@ func (n *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolu
return nil, status.Error(codes.InvalidArgument, "NodeStageVolume Staging target path is required")
}
n.mtx.Lock()
defer n.mtx.Unlock()
klog.V(4).Infof("NodeStageVolume called for volume %s", volumeID)
deviceName, err := GetDeviceNameByVolumeID(volumeID)
if err == nil && deviceName != "" {
klog.V(4).Infof("NodeStageVolume: Device %s already exists", deviceName)
return &csi.NodeStageVolumeResponse{}, nil
}
// Create Connector and mounter for the volume to be staged
nvmfInfo, err := getNVMfDiskInfo(volumeID, req.GetVolumeContext())
if err != nil {
klog.Errorf("NodeStageVolume: failed to get NVMf disk info: %v", err)
return nil, status.Errorf(codes.Internal, "failed to get NVMf disk info: %v", err)
}
// stagingPath is appended with volumeID to avoid conflicts
// This is necessary to properly handle different volume modes:
// - In filesystem mode: need a dedicated directory for mounting
// - In block mode: need a specific path for the block device file
stagingPath := req.GetStagingTargetPath() + "/" + volumeID
diskMounter := getNVMfDiskMounter(nvmfInfo, stagingPath, req.GetVolumeCapability())
// Attach the NVMe disk
devicePath, err := AttachDisk(volumeID, diskMounter.connector)
if err != nil {
klog.Errorf("NodeStageVolume: failed to attach volume %s: %v", volumeID, err)
return nil, status.Errorf(codes.Unavailable, "failed to attach volume %s: %v", volumeID, err)
}
// Mount the volume
klog.V(4).Infof("NodeStageVolume: mounting device %s at %s", devicePath, stagingPath)
err = MountVolume(devicePath, diskMounter)
if err != nil {
klog.Errorf("NodeStageVolume: failed to mount volume %s: %v", volumeID, err)
diskMounter.connector.Disconnect()
return nil, status.Errorf(codes.Unavailable, "failed to mount volume: %v", err)
}
// Persist connector information for detachment
err = persistConnectorFile(diskMounter.connector, stagingPath+".json")
if err != nil {
klog.Errorf("NodeStageVolume: failed to persist connection info: %v", err)
klog.Errorf("NodeStageVolume: disconnecting volume because persistence file is required for unstage")
UnmountVolume(stagingPath, getNVMfDiskUnMounter())
diskMounter.connector.Disconnect()
return nil, status.Errorf(codes.Unavailable, "failed to persist connection info: %v", err)
}
return &csi.NodeStageVolumeResponse{}, nil
}

View File

@ -19,6 +19,7 @@ package nvmf
import (
"fmt"
"os"
"path/filepath"
"github.com/container-storage-interface/spec/lib/go/csi"
"k8s.io/klog/v2"
@ -106,59 +107,25 @@ func getNVMfDiskUnMounter() *nvmfDiskUnMounter {
}
}
func AttachDisk(req *csi.NodePublishVolumeRequest, nm nvmfDiskMounter) (string, error) {
if nm.connector == nil {
// AttachDisk connects to an NVMe-oF disk and returns the device path
func AttachDisk(volumeID string, connector *Connector) (string, error) {
if connector == nil {
return "", fmt.Errorf("connector is nil")
}
// connect nvmf target disk
devicePath, err := nm.connector.Connect()
devicePath, err := connector.Connect()
if err != nil {
klog.Errorf("AttachDisk: VolumeID %s failed to connect, Error: %v", req.VolumeId, err)
klog.Errorf("AttachDisk: VolumeID %s failed to connect, Error: %v", volumeID, err)
return "", err
}
if devicePath == "" {
klog.Errorf("AttachDisk: VolumeId %s return nil devicePath", req.VolumeId)
return "", fmt.Errorf("VolumeId %s return nil devicePath", req.VolumeId)
}
klog.Infof("AttachDisk: Volume %s successful connected, Device%s", req.VolumeId, devicePath)
mntPath := nm.targetPath
klog.Infof("AttachDisk: MntPath: %s", mntPath)
notMounted, err := nm.mounter.IsLikelyNotMountPoint(mntPath)
if err != nil && !os.IsNotExist(err) {
return "", fmt.Errorf("Heuristic determination of mount point failed:%v", err)
}
if !notMounted {
klog.Infof("AttachDisk: VolumeID: %s, Path: %s is already mounted, device: %s", req.VolumeId, nm.targetPath, nm.DeviceUUID)
return "", nil
klog.Errorf("AttachDisk: VolumeId %s return nil devicePath", volumeID)
return "", fmt.Errorf("VolumeId %s return nil devicePath", volumeID)
}
// pre to mount
if err := os.MkdirAll(mntPath, 0750); err != nil {
klog.Errorf("AttachDisk: failed to mkdir %s, error", mntPath)
return "", err
}
klog.Infof("AttachDisk: Volume %s successfully connected, Device: %s", volumeID, devicePath)
err = persistConnectorFile(nm.connector, mntPath+".json")
if err != nil {
klog.Errorf("AttachDisk: failed to persist connection info: %v", err)
klog.Errorf("AttachDisk: disconnecting volume and failing the publish request because persistence file are required for unpublish volume")
return "", fmt.Errorf("unable to create persistence file for connection")
}
// Tips: use k8s mounter to mount fs and only support "ext4"
var options []string
options = append(options, nm.mountOptions...)
err = nm.mounter.FormatAndMount(devicePath, mntPath, nm.fsType, options)
if err != nil {
klog.Errorf("AttachDisk: failed to mount Device %s to %s with options: %v", devicePath, mntPath, options)
nm.connector.Disconnect()
removeConnectorFile(mntPath)
return "", fmt.Errorf("failed to mount Device %s to %s with options: %v", devicePath, mntPath, options)
}
klog.Infof("AttachDisk: Successfully Mount Device %s to %s with options: %v", devicePath, mntPath, options)
return devicePath, nil
}
@ -187,3 +154,111 @@ func DetachDisk(volumeID string, num *nvmfDiskUnMounter, targetPath string) erro
removeConnectorFile(targetPath)
return nil
}
// mountVolume handles both regular and block device mounts
func MountVolume(sourcePath string, nm *nvmfDiskMounter) error {
klog.Infof("MountVolume: MntPath: %s, SrcPath: ", nm.targetPath, sourcePath)
// Check if already mounted
notMounted, err := nm.mounter.IsLikelyNotMountPoint(nm.targetPath)
if err != nil && !os.IsNotExist(err) {
klog.Errorf("MountVolume: failed to check mount point %s: %v", nm.targetPath, err)
return fmt.Errorf("heuristic determination of mount point failed:%v", err)
}
if !notMounted {
klog.Infof("mountVolume: %s is already mounted", nm.targetPath)
return nil
}
if nm.isBlock {
// Handle block device mount
return mountBlockDevice(sourcePath, nm)
} else {
// Handle regular filesystem mount
return mountFilesystem(sourcePath, nm)
}
}
// UnmountVolume safely unmounts a volume
func UnmountVolume(targetPath string, unmounter *nvmfDiskUnMounter) error {
// Check if already unmounted
notMnt, err := unmounter.mounter.IsLikelyNotMountPoint(targetPath)
if err != nil {
if os.IsNotExist(err) {
klog.Warningf("UnmountVolume: target path %s does not exist, skipping", targetPath)
return nil
}
return fmt.Errorf("failed to check if %s is a mount point: %v", targetPath, err)
}
if notMnt {
klog.V(4).Infof("UnmountVolume: %s is not a mount point, skipping", targetPath)
return nil
}
// Unmount the volume
klog.Infof("UnmountVolume: unmounting %s", targetPath)
if err := unmounter.mounter.Unmount(targetPath); err != nil {
klog.Errorf("UnmountVolume: failed to unmount %s: %v", targetPath, err)
return fmt.Errorf("failed to unmount volume: %v", err)
}
return nil
}
// mountFilesystem handles mounting a formatted filesystem
func mountFilesystem(devicePath string, nm *nvmfDiskMounter) error {
// Create mount point directory
if err := os.MkdirAll(nm.targetPath, 0750); err != nil {
klog.Errorf("mountFilesystem: failed to mkdir %s: %v", nm.targetPath, err)
return err
}
// Mount the filesystem
// Tips: use k8s mounter to mount fs and only support "ext4"
var options []string
options = append(options, nm.mountOptions...)
klog.Infof("mountFilesystem: mounting %s at %s with fstype %s and options: %v", devicePath, nm.targetPath, nm.fsType, options)
if err := nm.mounter.FormatAndMount(devicePath, nm.targetPath, nm.fsType, options); err != nil {
klog.Errorf("mountFilesystem: failed to format and mount %s at %s: %v", devicePath, nm.targetPath, err)
return fmt.Errorf("failed to format and mount device: %v", err)
}
return nil
}
// mountBlockDevice handles mounting a block device directly (without formatting)
func mountBlockDevice(devicePath string, nm *nvmfDiskMounter) error {
// Create parent directory
parentDir := filepath.Dir(nm.targetPath)
if err := os.MkdirAll(parentDir, 0750); err != nil {
klog.Errorf("mountBlockDevice: failed to create parent dir %s: %v", parentDir, err)
return fmt.Errorf("failed to create parent directory: %v", err)
}
// Verify parent directory exists
if exists, _ := mount.PathExists(parentDir); !exists {
return fmt.Errorf("parent directory %s still does not exist after creation", parentDir)
}
// Create block device file
pathFile, err := os.OpenFile(nm.targetPath, os.O_CREATE|os.O_RDWR, 0600) // #nosec G304
if err != nil {
klog.Errorf("mountBlockDevice: failed to create file %s: %v", nm.targetPath, err)
return fmt.Errorf("failed to create block device file: %v", err)
}
if err = pathFile.Close(); err != nil {
klog.Errorf("mountBlockDevice: failed to close file %s: %v", nm.targetPath, err)
return fmt.Errorf("failed to close block device file: %v", err)
}
// Mount the block device with bind option
options := append(nm.mountOptions, "bind")
klog.Infof("mountBlockDevice: mounting %s at %s with options: %v", devicePath, nm.targetPath, options)
if err := nm.mounter.Mount(devicePath, nm.targetPath, nm.fsType, options); err != nil {
klog.Errorf("mountBlockDevice: failed to mount %s at %s: %v", devicePath, nm.targetPath, err)
return fmt.Errorf("failed to mount block device: %v", err)
}
return nil
}