diff --git a/deploy/kubernetes/csi-nvmf-node.yaml b/deploy/kubernetes/csi-nvmf-node.yaml index 56c8ada..a0841a7 100644 --- a/deploy/kubernetes/csi-nvmf-node.yaml +++ b/deploy/kubernetes/csi-nvmf-node.yaml @@ -70,6 +70,9 @@ spec: volumeMounts: - name: socket-dir mountPath: /var/lib/kubelet/plugins/csi.nvmf.com + - name: csi-dir + mountPath: /var/lib/kubelet/plugins/kubernetes.io + mountPropagation: "Bidirectional" - name: pods-mount-dir mountPath: /var/lib/kubelet/pods mountPropagation: "Bidirectional" @@ -92,6 +95,10 @@ spec: hostPath: path: /var/lib/kubelet/plugins_registry type: DirectoryOrCreate + - name: csi-dir + hostPath: + path: /var/lib/kubelet/plugins/kubernetes.io + type: DirectoryOrCreate - name: pods-mount-dir hostPath: path: /var/lib/kubelet/pods diff --git a/examples/kubernetes/example/pvc.yaml b/examples/kubernetes/example/pvc.yaml index b506531..e8133ad 100644 --- a/examples/kubernetes/example/pvc.yaml +++ b/examples/kubernetes/example/pvc.yaml @@ -6,7 +6,8 @@ spec: accessModes: - ReadWriteOnce storageClassName: csi-nvmf-sc + volumeMode: Block resources: requests: storage: 20Gi - \ No newline at end of file + \ No newline at end of file diff --git a/pkg/nvmf/nodeserver.go b/pkg/nvmf/nodeserver.go index 10c91e2..c95a253 100644 --- a/pkg/nvmf/nodeserver.go +++ b/pkg/nvmf/nodeserver.go @@ -18,6 +18,7 @@ package nvmf import ( "os" + "sync" "github.com/container-storage-interface/spec/lib/go/csi" "github.com/kubernetes-csi/csi-driver-nvmf/pkg/utils" @@ -29,6 +30,7 @@ import ( type NodeServer struct { Driver *driver + mtx sync.Mutex // protect volumes map } func NewNodeServer(d *driver) *NodeServer { @@ -73,22 +75,31 @@ func (n *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublish return nil, status.Errorf(codes.InvalidArgument, "NodePublishVolume missing TargetPath in req.") } - // 2. attachdisk + if len(req.GetStagingTargetPath()) == 0 { + return nil, status.Error(codes.InvalidArgument, "Staging target path missing in request") + } + n.mtx.Lock() + defer n.mtx.Unlock() + + klog.V(4).Infof("NodePublishVolume called for volume %s", req.VolumeId) + + // 2. mountdisk // Create mounter for the volume to be published parameter := req.GetVolumeContext() volumeID := req.GetVolumeId() targetPath := req.GetTargetPath() + stagingPath := req.GetStagingTargetPath() + "/" + volumeID nvmfInfo, err := getNVMfDiskInfo(volumeID, parameter) if err != nil { return nil, status.Errorf(codes.Internal, "NodePublishVolume: get NVMf disk info from req err: %v", err) } diskMounter := getNVMfDiskMounter(nvmfInfo, targetPath, req.GetVolumeCapability()) - // attachDisk realize connect NVMf disk and mount to docker path - _, err = AttachDisk(req, *diskMounter) + // Mount to the docker path from the staging path + err = MountVolume(stagingPath, diskMounter) if err != nil { - klog.Errorf("NodePublishVolume: Attach volume %s with error: %s", req.VolumeId, err.Error()) - return nil, err + klog.Errorf("NodePublishVolume: failed to mount volume %s at %s with error: %s", req.VolumeId, targetPath, err.Error()) + return nil, status.Errorf(codes.Unavailable, "NodePublishVolume: failed to mount volume: %v", err) } return &csi.NodePublishVolumeResponse{}, nil @@ -131,8 +142,57 @@ func (n *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolu return nil, status.Error(codes.InvalidArgument, "NodeStageVolume Staging target path is required") } + n.mtx.Lock() + defer n.mtx.Unlock() + klog.V(4).Infof("NodeStageVolume called for volume %s", volumeID) + deviceName, err := GetDeviceNameByVolumeID(volumeID) + if err == nil && deviceName != "" { + klog.V(4).Infof("NodeStageVolume: Device %s already exists", deviceName) + return &csi.NodeStageVolumeResponse{}, nil + } + + // Create Connector and mounter for the volume to be staged + nvmfInfo, err := getNVMfDiskInfo(volumeID, req.GetVolumeContext()) + if err != nil { + klog.Errorf("NodeStageVolume: failed to get NVMf disk info: %v", err) + return nil, status.Errorf(codes.Internal, "failed to get NVMf disk info: %v", err) + } + + // stagingPath is appended with volumeID to avoid conflicts + // This is necessary to properly handle different volume modes: + // - In filesystem mode: need a dedicated directory for mounting + // - In block mode: need a specific path for the block device file + stagingPath := req.GetStagingTargetPath() + "/" + volumeID + diskMounter := getNVMfDiskMounter(nvmfInfo, stagingPath, req.GetVolumeCapability()) + + // Attach the NVMe disk + devicePath, err := AttachDisk(volumeID, diskMounter.connector) + if err != nil { + klog.Errorf("NodeStageVolume: failed to attach volume %s: %v", volumeID, err) + return nil, status.Errorf(codes.Unavailable, "failed to attach volume %s: %v", volumeID, err) + } + + // Mount the volume + klog.V(4).Infof("NodeStageVolume: mounting device %s at %s", devicePath, stagingPath) + err = MountVolume(devicePath, diskMounter) + if err != nil { + klog.Errorf("NodeStageVolume: failed to mount volume %s: %v", volumeID, err) + diskMounter.connector.Disconnect() + return nil, status.Errorf(codes.Unavailable, "failed to mount volume: %v", err) + } + + // Persist connector information for detachment + err = persistConnectorFile(diskMounter.connector, stagingPath+".json") + if err != nil { + klog.Errorf("NodeStageVolume: failed to persist connection info: %v", err) + klog.Errorf("NodeStageVolume: disconnecting volume because persistence file is required for unstage") + UnmountVolume(stagingPath, getNVMfDiskUnMounter()) + diskMounter.connector.Disconnect() + return nil, status.Errorf(codes.Unavailable, "failed to persist connection info: %v", err) + } + return &csi.NodeStageVolumeResponse{}, nil } diff --git a/pkg/nvmf/nvmf.go b/pkg/nvmf/nvmf.go index 5dd40ab..550d404 100644 --- a/pkg/nvmf/nvmf.go +++ b/pkg/nvmf/nvmf.go @@ -19,6 +19,7 @@ package nvmf import ( "fmt" "os" + "path/filepath" "github.com/container-storage-interface/spec/lib/go/csi" "k8s.io/klog/v2" @@ -106,59 +107,25 @@ func getNVMfDiskUnMounter() *nvmfDiskUnMounter { } } -func AttachDisk(req *csi.NodePublishVolumeRequest, nm nvmfDiskMounter) (string, error) { - if nm.connector == nil { +// AttachDisk connects to an NVMe-oF disk and returns the device path +func AttachDisk(volumeID string, connector *Connector) (string, error) { + if connector == nil { return "", fmt.Errorf("connector is nil") } // connect nvmf target disk - devicePath, err := nm.connector.Connect() + devicePath, err := connector.Connect() if err != nil { - klog.Errorf("AttachDisk: VolumeID %s failed to connect, Error: %v", req.VolumeId, err) + klog.Errorf("AttachDisk: VolumeID %s failed to connect, Error: %v", volumeID, err) return "", err } if devicePath == "" { - klog.Errorf("AttachDisk: VolumeId %s return nil devicePath", req.VolumeId) - return "", fmt.Errorf("VolumeId %s return nil devicePath", req.VolumeId) - } - klog.Infof("AttachDisk: Volume %s successful connected, Device:%s", req.VolumeId, devicePath) - - mntPath := nm.targetPath - klog.Infof("AttachDisk: MntPath: %s", mntPath) - notMounted, err := nm.mounter.IsLikelyNotMountPoint(mntPath) - if err != nil && !os.IsNotExist(err) { - return "", fmt.Errorf("Heuristic determination of mount point failed:%v", err) - } - if !notMounted { - klog.Infof("AttachDisk: VolumeID: %s, Path: %s is already mounted, device: %s", req.VolumeId, nm.targetPath, nm.DeviceUUID) - return "", nil + klog.Errorf("AttachDisk: VolumeId %s return nil devicePath", volumeID) + return "", fmt.Errorf("VolumeId %s return nil devicePath", volumeID) } - // pre to mount - if err := os.MkdirAll(mntPath, 0750); err != nil { - klog.Errorf("AttachDisk: failed to mkdir %s, error", mntPath) - return "", err - } + klog.Infof("AttachDisk: Volume %s successfully connected, Device: %s", volumeID, devicePath) - err = persistConnectorFile(nm.connector, mntPath+".json") - if err != nil { - klog.Errorf("AttachDisk: failed to persist connection info: %v", err) - klog.Errorf("AttachDisk: disconnecting volume and failing the publish request because persistence file are required for unpublish volume") - return "", fmt.Errorf("unable to create persistence file for connection") - } - - // Tips: use k8s mounter to mount fs and only support "ext4" - var options []string - options = append(options, nm.mountOptions...) - err = nm.mounter.FormatAndMount(devicePath, mntPath, nm.fsType, options) - if err != nil { - klog.Errorf("AttachDisk: failed to mount Device %s to %s with options: %v", devicePath, mntPath, options) - nm.connector.Disconnect() - removeConnectorFile(mntPath) - return "", fmt.Errorf("failed to mount Device %s to %s with options: %v", devicePath, mntPath, options) - } - - klog.Infof("AttachDisk: Successfully Mount Device %s to %s with options: %v", devicePath, mntPath, options) return devicePath, nil } @@ -187,3 +154,111 @@ func DetachDisk(volumeID string, num *nvmfDiskUnMounter, targetPath string) erro removeConnectorFile(targetPath) return nil } + +// mountVolume handles both regular and block device mounts +func MountVolume(sourcePath string, nm *nvmfDiskMounter) error { + klog.Infof("MountVolume: MntPath: %s, SrcPath: ", nm.targetPath, sourcePath) + + // Check if already mounted + notMounted, err := nm.mounter.IsLikelyNotMountPoint(nm.targetPath) + if err != nil && !os.IsNotExist(err) { + klog.Errorf("MountVolume: failed to check mount point %s: %v", nm.targetPath, err) + return fmt.Errorf("heuristic determination of mount point failed:%v", err) + } + if !notMounted { + klog.Infof("mountVolume: %s is already mounted", nm.targetPath) + return nil + } + + if nm.isBlock { + // Handle block device mount + return mountBlockDevice(sourcePath, nm) + } else { + // Handle regular filesystem mount + return mountFilesystem(sourcePath, nm) + } +} + +// UnmountVolume safely unmounts a volume +func UnmountVolume(targetPath string, unmounter *nvmfDiskUnMounter) error { + // Check if already unmounted + notMnt, err := unmounter.mounter.IsLikelyNotMountPoint(targetPath) + if err != nil { + if os.IsNotExist(err) { + klog.Warningf("UnmountVolume: target path %s does not exist, skipping", targetPath) + return nil + } + return fmt.Errorf("failed to check if %s is a mount point: %v", targetPath, err) + } + + if notMnt { + klog.V(4).Infof("UnmountVolume: %s is not a mount point, skipping", targetPath) + return nil + } + + // Unmount the volume + klog.Infof("UnmountVolume: unmounting %s", targetPath) + if err := unmounter.mounter.Unmount(targetPath); err != nil { + klog.Errorf("UnmountVolume: failed to unmount %s: %v", targetPath, err) + return fmt.Errorf("failed to unmount volume: %v", err) + } + + return nil +} + +// mountFilesystem handles mounting a formatted filesystem +func mountFilesystem(devicePath string, nm *nvmfDiskMounter) error { + // Create mount point directory + if err := os.MkdirAll(nm.targetPath, 0750); err != nil { + klog.Errorf("mountFilesystem: failed to mkdir %s: %v", nm.targetPath, err) + return err + } + + // Mount the filesystem + // Tips: use k8s mounter to mount fs and only support "ext4" + var options []string + options = append(options, nm.mountOptions...) + klog.Infof("mountFilesystem: mounting %s at %s with fstype %s and options: %v", devicePath, nm.targetPath, nm.fsType, options) + if err := nm.mounter.FormatAndMount(devicePath, nm.targetPath, nm.fsType, options); err != nil { + klog.Errorf("mountFilesystem: failed to format and mount %s at %s: %v", devicePath, nm.targetPath, err) + return fmt.Errorf("failed to format and mount device: %v", err) + } + + return nil +} + +// mountBlockDevice handles mounting a block device directly (without formatting) +func mountBlockDevice(devicePath string, nm *nvmfDiskMounter) error { + // Create parent directory + parentDir := filepath.Dir(nm.targetPath) + if err := os.MkdirAll(parentDir, 0750); err != nil { + klog.Errorf("mountBlockDevice: failed to create parent dir %s: %v", parentDir, err) + return fmt.Errorf("failed to create parent directory: %v", err) + } + + // Verify parent directory exists + if exists, _ := mount.PathExists(parentDir); !exists { + return fmt.Errorf("parent directory %s still does not exist after creation", parentDir) + } + + // Create block device file + pathFile, err := os.OpenFile(nm.targetPath, os.O_CREATE|os.O_RDWR, 0600) // #nosec G304 + if err != nil { + klog.Errorf("mountBlockDevice: failed to create file %s: %v", nm.targetPath, err) + return fmt.Errorf("failed to create block device file: %v", err) + } + if err = pathFile.Close(); err != nil { + klog.Errorf("mountBlockDevice: failed to close file %s: %v", nm.targetPath, err) + return fmt.Errorf("failed to close block device file: %v", err) + } + + // Mount the block device with bind option + options := append(nm.mountOptions, "bind") + klog.Infof("mountBlockDevice: mounting %s at %s with options: %v", devicePath, nm.targetPath, options) + if err := nm.mounter.Mount(devicePath, nm.targetPath, nm.fsType, options); err != nil { + klog.Errorf("mountBlockDevice: failed to mount %s at %s: %v", devicePath, nm.targetPath, err) + return fmt.Errorf("failed to mount block device: %v", err) + } + + return nil +}