feat: support block volume mode

Co-authored-by: Jiri 'Ghormoon' Novak <novak.jiri@poda.cz>
Co-authored-by: Meinhard Zhou <zhouenhua@bytedance.com>

Signed-off-by: Meinhard Zhou <zhouenhua@bytedance.com>
This commit is contained in:
Meinhard Zhou
2025-09-29 17:10:37 +08:00
parent 25eda920c8
commit 4b7bb305b9
14 changed files with 276 additions and 137 deletions

View File

@@ -73,6 +73,9 @@ spec:
- name: pods-mount-dir
mountPath: /var/lib/kubelet/pods
mountPropagation: "Bidirectional"
- name: plugins-mount-dir
mountPath: /var/lib/kubelet/plugins
mountPropagation: "Bidirectional"
- name: run-nvmf-dir
mountPath: /run/nvmf
- name: host-dev
@@ -96,6 +99,10 @@ spec:
hostPath:
path: /var/lib/kubelet/pods
type: Directory
- name: plugins-mount-dir
hostPath:
path: /var/lib/kubelet/
type: Directory
- name: run-nvmf-dir
hostPath:
path: /run/nvmf

View File

@@ -0,0 +1,27 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx-block-test
labels:
app: nginx
spec:
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
spec:
containers:
- name: nginx
image: nginx
ports:
- containerPort: 80
volumeDevices:
- devicePath: /dev/nvmf
name: nvmf-volume
volumes:
- name: nvmf-volume
persistentVolumeClaim:
claimName: csi-nvmf-pvc-block

View File

@@ -0,0 +1,33 @@
apiVersion: v1
kind: PersistentVolume
metadata:
name: csi-nvmf-pv-block
spec:
storageClassName: csi-nvmf-sc-block
accessModes:
- ReadWriteOnce
volumeMode: Block
capacity:
storage: 20Gi
csi:
driver: csi.nvmf.com
volumeHandle: nvmf-data-id
volumeAttributes:
targetTrAddr: "192.168.122.18"
targetTrPort: "49153"
targetTrType: "tcp"
nqn: "nqn.2022-08.org.test-nvmf.example"
# you can use any format of ID here, uuid., eui., whatever your storage pusts into ID_WWN or ID_SERIAL, as udev puts that in /dev/disk/by-id
deviceID: "uuid.58668891-c3e4-45d0-b90e-824525c16080"
#deviceID: "INTEL_SSDPF2KX038T9N_PHAB2261050P3P9EGN"
#deviceID: "Pure_Storage_FlashArray_AA00000000000AAA_42"
#
# for uuid, eui and nguid formats. there's an alias
#deviceUUID: "58668891-c3e4-45d0-b90e-824525c16080"
# is the same as
#deviceID: "uuid.58668891-c3e4-45d0-b90e-824525c16080"
#
# both EUI and NGUID will appear as eui.something
#deviceEUI: "00000000000000000042aa42aa42aa42"
# is the same as
#deviceID: "eui.00000000000000000042aa42aa42aa42"

View File

@@ -0,0 +1,12 @@
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: csi-nvmf-pvc-block
spec:
accessModes:
- ReadWriteOnce
volumeMode: Block
storageClassName: csi-nvmf-sc-block
resources:
requests:
storage: 20Gi

View File

@@ -0,0 +1,7 @@
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: csi-nvmf-sc-block
provisioner: csi.nvmf.com
reclaimPolicy: Delete
allowVolumeExpansion: true

View File

@@ -1,7 +1,7 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx-block-test1
name: nginx-fs-test
labels:
app: nginx
spec:
@@ -24,4 +24,4 @@ spec:
volumes:
- name: nvmf-volume
persistentVolumeClaim:
claimName: csi-nvmf-pvc
claimName: csi-nvmf-pvc-fs

View File

@@ -1,9 +1,9 @@
apiVersion: v1
kind: PersistentVolume
metadata:
name: csi-nvmf-pv
name: csi-nvmf-pv-fs
spec:
storageClassName: csi-nvmf-sc
storageClassName: csi-nvmf-sc-fs
accessModes:
- ReadWriteOnce
capacity:

View File

@@ -1,11 +1,11 @@
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: csi-nvmf-pvc
name: csi-nvmf-pvc-fs
spec:
accessModes:
- ReadWriteOnce
storageClassName: csi-nvmf-sc
storageClassName: csi-nvmf-sc-fs
resources:
requests:
storage: 20Gi

View File

@@ -1,7 +1,7 @@
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: csi-nvmf-sc
name: csi-nvmf-sc-fs
provisioner: csi.nvmf.com
reclaimPolicy: Delete
allowVolumeExpansion: true

View File

@@ -32,7 +32,7 @@ const (
DefaultDriverServicePort = "12230"
DefaultDriverVersion = "v1.0.0"
DefaultVolumeMapPath = "/var/lib/nvmf/volumes"
DefaultVolumeMapPath = "/var/lib/kubelet/plugins/csi.nvmf.com/volumes"
)
type GlobalConfig struct {

View File

@@ -259,7 +259,7 @@ func (c *Connector) Connect() (string, error) {
}
if c.RetryCount < 0 || c.CheckInterval < 0 {
return "", fmt.Errorf("Invalid RetryCount and CheckInterval combinaitons "+
return "", fmt.Errorf("invalid RetryCount and CheckInterval combinaitons "+
"RetryCount: %d, CheckInterval: %d ", c.RetryCount, c.CheckInterval)
}
@@ -351,12 +351,12 @@ func persistConnectorFile(c *Connector, filePath string) error {
}
func removeConnectorFile(targetPath string) {
func removeConnectorFile(filePath string) {
// todo: here maybe be attack for os.Remove can operate any file, fix?
if err := os.Remove(targetPath + ".json"); err != nil {
klog.Errorf("DetachDisk: Can't remove connector file: %s", targetPath)
if err := os.Remove(filePath); err != nil {
klog.Errorf("DetachDisk: Can't remove connector file: %s", filePath)
}
if err := os.RemoveAll(targetPath); err != nil {
if err := os.RemoveAll(filePath); err != nil {
klog.Errorf("DetachDisk: failed to remove mount path Error: %v", err)
}
}

View File

@@ -18,6 +18,7 @@ package nvmf
import (
"os"
"path"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-csi/csi-driver-nvmf/pkg/utils"
@@ -53,7 +54,7 @@ func (n *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCa
}
func (n *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
// 1. check parameters
// Pre-check
if req.GetVolumeCapability() == nil {
return nil, status.Errorf(codes.InvalidArgument, "NodePublishVolume missing Volume Capability in req.")
}
@@ -66,18 +67,48 @@ func (n *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublish
return nil, status.Errorf(codes.InvalidArgument, "NodePublishVolume missing TargetPath in req.")
}
// 2. attachdisk
klog.Infof("VolumeID %s publish to targetPath %s.", req.GetVolumeId(), req.GetTargetPath())
// Connect remote disk
nvmfInfo, err := getNVMfDiskInfo(req)
if err != nil {
return nil, status.Errorf(codes.Internal, "NodePublishVolume: get NVMf disk info from req err: %v", err)
}
diskMounter := getNVMfDiskMounter(nvmfInfo, req)
// attachDisk realize connect NVMf disk and mount to docker path
_, err = AttachDisk(req, *diskMounter)
connector := getNvmfConnector(nvmfInfo)
devicePath, err := connector.Connect()
connectorFilePath := path.Join(DefaultVolumeMapPath, req.GetVolumeId()+".json")
if err != nil {
klog.Errorf("NodePublishVolume: Attach volume %s with error: %s", req.VolumeId, err.Error())
return nil, err
klog.Errorf("VolumeID %s failed to connect, Error: %v", req.VolumeId, err)
return nil, status.Errorf(codes.Internal, "VolumeID %s failed to connect, Error: %v", req.VolumeId, err)
}
if devicePath == "" {
klog.Errorf("VolumeID %s connected, but return nil devicePath", req.VolumeId)
return nil, status.Errorf(codes.Internal, "VolumeID %s connected, but return nil devicePath", req.VolumeId)
}
klog.Infof("Volume %s successful connected, Device%s", req.VolumeId, devicePath)
defer Rollback(err, func() {
connector.Disconnect()
})
err = persistConnectorFile(connector, connectorFilePath)
if err != nil {
klog.Errorf("failed to persist connection info: %v", err)
return nil, status.Errorf(codes.Internal, "VolumeID %s persist connection info error: %v", req.VolumeId, err)
}
// Attach disk to container path
if req.GetVolumeCapability().GetBlock() != nil && req.GetVolumeCapability().GetMount() != nil {
return nil, status.Errorf(codes.InvalidArgument, "cannot have both block and mount access type")
}
defer Rollback(err, func() {
removeConnectorFile(connectorFilePath)
})
err = AttachDisk(req, devicePath)
if err != nil {
return nil, status.Errorf(codes.Internal, "VolumeID %s attach error: %v", req.VolumeId, err)
}
return &csi.NodePublishVolumeResponse{}, nil
@@ -86,19 +117,36 @@ func (n *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublish
func (n *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
klog.Infof("NodeUnpublishVolume: Starting unpublish volume, %s, %v", req.VolumeId, req)
// Pre-check
if req.VolumeId == "" {
return nil, status.Error(codes.InvalidArgument, "NodeUnpublishVolume VolumeID must be provided")
}
if req.TargetPath == "" {
return nil, status.Error(codes.InvalidArgument, "NodeUnpublishVolume Staging TargetPath must be provided")
}
// Detach disk
targetPath := req.GetTargetPath()
err := DetachDisk(req.VolumeId, getNVMfDiskUnMounter(req), targetPath)
err := DetachDisk(targetPath)
if err != nil {
klog.Errorf("NodeUnpublishVolume: VolumeID: %s detachDisk err: %v", req.VolumeId, err)
klog.Errorf("VolumeID: %s detachDisk err: %v", req.VolumeId, err)
return nil, err
}
// Disconnect remote disk
connectorFilePath := path.Join(DefaultVolumeMapPath, req.GetVolumeId()+".json")
connector, err := GetConnectorFromFile(connectorFilePath)
if err != nil {
klog.Errorf("failed to get connector from path %s Error: %v", targetPath, err)
return nil, status.Errorf(codes.Internal, "failed to get connector from path %s Error: %v", targetPath, err)
}
err = connector.Disconnect()
if err != nil {
klog.Errorf("VolumeID: %s failed to disconnect, Error: %v", targetPath, err)
return nil, status.Errorf(codes.Internal, "failed to get connector from path %s Error: %v", targetPath, err)
}
removeConnectorFile(connectorFilePath)
return &csi.NodeUnpublishVolumeResponse{}, nil
}

View File

@@ -19,6 +19,7 @@ package nvmf
import (
"fmt"
"os"
"path/filepath"
"strings"
"github.com/container-storage-interface/spec/lib/go/csi"
@@ -38,22 +39,6 @@ type nvmfDiskInfo struct {
HostNqn string
}
type nvmfDiskMounter struct {
*nvmfDiskInfo
readOnly bool
fsType string
mountOptions []string
mounter *mount.SafeFormatAndMount
exec exec.Interface
targetPath string
connector *Connector
}
type nvmfDiskUnMounter struct {
mounter mount.Interface
exec exec.Interface
}
func getNVMfDiskInfo(req *csi.NodePublishVolumeRequest) (*nvmfDiskInfo, error) {
volName := req.GetVolumeId()
@@ -94,108 +79,116 @@ func getNVMfDiskInfo(req *csi.NodePublishVolumeRequest) (*nvmfDiskInfo, error) {
}, nil
}
func getNVMfDiskMounter(nvmfInfo *nvmfDiskInfo, req *csi.NodePublishVolumeRequest) *nvmfDiskMounter {
readOnly := req.GetReadonly()
fsType := req.GetVolumeCapability().GetMount().GetFsType()
mountOptions := req.GetVolumeCapability().GetMount().GetMountFlags()
func AttachDisk(req *csi.NodePublishVolumeRequest, devicePath string) error {
mounter := &mount.SafeFormatAndMount{Interface: mount.New(""), Exec: exec.New()}
return &nvmfDiskMounter{
nvmfDiskInfo: nvmfInfo,
readOnly: readOnly,
fsType: fsType,
mountOptions: mountOptions,
mounter: &mount.SafeFormatAndMount{Interface: mount.New(""), Exec: exec.New()},
exec: exec.New(),
targetPath: req.GetTargetPath(),
connector: getNvmfConnector(nvmfInfo),
}
}
targetPath := req.GetTargetPath()
if req.GetVolumeCapability().GetBlock() != nil {
dir := filepath.Dir(targetPath)
if _, err := os.Stat(dir); os.IsNotExist(err) {
if err := os.MkdirAll(dir, 0755); err != nil {
return fmt.Errorf("failed to create dir of target block file: %s, err: %v", targetPath, err.Error())
}
}
func getNVMfDiskUnMounter(req *csi.NodeUnpublishVolumeRequest) *nvmfDiskUnMounter {
return &nvmfDiskUnMounter{
mounter: mount.New(""),
exec: exec.New(),
}
}
// bind mount
file, err := os.Stat(targetPath)
if err != nil {
if !os.IsNotExist(err) {
return fmt.Errorf("failed to stat target block file exist %s, err: %v", targetPath, err.Error())
}
func AttachDisk(req *csi.NodePublishVolumeRequest, nm nvmfDiskMounter) (string, error) {
if nm.connector == nil {
return "", fmt.Errorf("connector is nil")
newFile, err := os.OpenFile(targetPath, os.O_CREATE|os.O_RDWR, 0750)
if err != nil {
return fmt.Errorf("failed to open target block file: %s, err: %v", targetPath, err.Error())
}
if err := newFile.Close(); err != nil {
return fmt.Errorf("failed to close target block file: %s, err: %v", targetPath, err.Error())
}
} else {
if file.Mode()&os.ModeDevice == os.ModeDevice {
klog.Warning("AttachDisk Warning: Map skipped because bind mount already exist on the path: %v", targetPath)
return nil
}
}
if err := mounter.MountSensitive(devicePath, targetPath, "", []string{"bind"}, nil); err != nil {
klog.Errorf("AttachDisk: failed to mount Device %s to %s, err: %v", devicePath, targetPath, err.Error())
return fmt.Errorf("failed to mount Device %s to %s, err: %v", devicePath, targetPath, err.Error())
}
// symlink
// if err := os.Remove(targetPath); err != nil && !os.IsNotExist(err) {
// return fmt.Errorf("failed to remove file %s: %v", targetPath, err)
// }
// if err := os.Symlink(devicePath, targetPath); err != nil {
// klog.Errorf("AttachDisk: failed to link Device %s to %s, err: %v", devicePath, targetPath, err.Error())
// return fmt.Errorf("failed to link Device %s to %s, err: %v", devicePath, targetPath, err.Error())
// }
// if _, err := os.Lstat(targetPath); err != nil {
// klog.Errorf("Failed to verify symlink creation: %v", err)
// return err
// }
// klog.Infof("Successfully created symlink from %s to %s", devicePath, targetPath)
} else if req.GetVolumeCapability().GetMount() != nil {
notMounted, err := mounter.IsLikelyNotMountPoint(targetPath)
if err != nil {
if os.IsNotExist(err) {
klog.Errorf("AttachDisk: VolumeID: %s, Path %s is not exist, so create one.", req.GetVolumeId(), req.GetTargetPath())
if err = os.MkdirAll(targetPath, 0750); err != nil {
return fmt.Errorf("create target path: %v", err)
}
notMounted = true
} else {
return fmt.Errorf("check target path %v", err)
}
}
if !notMounted {
klog.Infof("AttachDisk: VolumeID: %s, Path: %s is already mounted.", req.GetVolumeId(), req.GetTargetPath())
return nil
}
fsType := req.GetVolumeCapability().GetMount().GetFsType()
readonly := req.GetReadonly()
mountOptions := req.GetVolumeCapability().GetMount().GetMountFlags()
options := []string{""}
if readonly {
options = append(options, "ro")
} else {
options = append(options, "rw")
}
options = append(options, mountOptions...)
if err = mounter.FormatAndMount(devicePath, targetPath, fsType, options); err != nil {
klog.Errorf("AttachDisk: failed to mount Device %s to %s with options: %v", devicePath, targetPath, options)
return fmt.Errorf("failed to mount Device %s to %s with options: %v", devicePath, targetPath, options)
}
}
klog.Infof("AttachDisk: Successfully Attach Device %s to %s", devicePath, targetPath)
return nil
}
func DetachDisk(targetPath string) (err error) {
mounter := &mount.SafeFormatAndMount{Interface: mount.New(""), Exec: exec.New()}
if notMnt, err := mount.IsNotMountPoint(mounter, targetPath); err != nil {
if !os.IsNotExist(err) {
return fmt.Errorf("check target path error: %w", err)
}
} else if !notMnt {
if err = mounter.Unmount(targetPath); err != nil {
klog.Errorf("nvmf detach disk: failed to unmount: %s\nError: %v", targetPath, err)
return err
}
// Delete the mount point
if err = os.RemoveAll(targetPath); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to remove target path: %w", err)
}
}
// connect nvmf target disk
devicePath, err := nm.connector.Connect()
if err != nil {
klog.Errorf("AttachDisk: VolumeID %s failed to connect, Error: %v", req.VolumeId, err)
return "", err
}
if devicePath == "" {
klog.Errorf("AttachDisk: VolumeId %s return nil devicePath", req.VolumeId)
return "", fmt.Errorf("VolumeId %s return nil devicePath", req.VolumeId)
}
klog.Infof("AttachDisk: Volume %s successful connected, Device%s", req.VolumeId, devicePath)
mntPath := nm.targetPath
klog.Infof("AttachDisk: MntPath: %s", mntPath)
notMounted, err := nm.mounter.IsLikelyNotMountPoint(mntPath)
if err != nil && !os.IsNotExist(err) {
return "", fmt.Errorf("Heuristic determination of mount point failed:%v", err)
}
if !notMounted {
klog.Infof("AttachDisk: VolumeID: %s, Path: %s is already mounted, device: %s", req.VolumeId, nm.targetPath, nm.DeviceID)
return "", nil
}
// pre to mount
if err := os.MkdirAll(mntPath, 0750); err != nil {
klog.Errorf("AttachDisk: failed to mkdir %s, error", mntPath)
return "", err
}
err = persistConnectorFile(nm.connector, mntPath+".json")
if err != nil {
klog.Errorf("AttachDisk: failed to persist connection info: %v", err)
klog.Errorf("AttachDisk: disconnecting volume and failing the publish request because persistence file are required for unpublish volume")
return "", fmt.Errorf("unable to create persistence file for connection")
}
// Tips: use k8s mounter to mount fs and only support "ext4"
var options []string
options = append(options, nm.mountOptions...)
err = nm.mounter.FormatAndMount(devicePath, mntPath, nm.fsType, options)
if err != nil {
klog.Errorf("AttachDisk: failed to mount Device %s to %s with options: %v", devicePath, mntPath, options)
nm.connector.Disconnect()
removeConnectorFile(mntPath)
return "", fmt.Errorf("failed to mount Device %s to %s with options: %v", devicePath, mntPath, options)
}
klog.Infof("AttachDisk: Successfully Mount Device %s to %s with options: %v", devicePath, mntPath, options)
return devicePath, nil
}
func DetachDisk(volumeID string, num *nvmfDiskUnMounter, targetPath string) error {
if pathExists, pathErr := mount.PathExists(targetPath); pathErr != nil {
return fmt.Errorf("Error checking if path exists: %v", pathErr)
} else if !pathExists {
klog.Warningf("Warning: Unmount skipped because path does not exist: %v", targetPath)
return nil
}
if err := num.mounter.Unmount(targetPath); err != nil {
klog.Errorf("iscsi detach disk: failed to unmount: %s\nError: %v", targetPath, err)
return err
}
connector, err := GetConnectorFromFile(targetPath + ".json")
if err != nil {
klog.Errorf("DetachDisk: failed to get connector from path %s Error: %v", targetPath, err)
return err
}
err = connector.Disconnect()
if err != nil {
klog.Errorf("DetachDisk: VolumeID: %s failed to disconnect, Error: %v", volumeID, err)
return err
}
removeConnectorFile(targetPath)
return nil
}

View File

@@ -21,6 +21,8 @@ import (
"fmt"
"os"
"path/filepath"
"reflect"
"runtime"
"strings"
"time"
@@ -41,7 +43,7 @@ func waitForPathToExist(devicePath string, maxRetries, intervalSeconds int, devi
}
time.Sleep(time.Second * time.Duration(intervalSeconds))
}
return false, fmt.Errorf("not found devicePath %s", devicePath)
return false, fmt.Errorf("not found devicePath %s and transport %s", devicePath, deviceTransport)
}
func GetDeviceNameByVolumeID(volumeID string) (deviceName string, err error) {
@@ -91,3 +93,13 @@ func logGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, h
}
return resp, err
}
func Rollback(err error, fc func()) {
if err != nil {
if fc != nil {
klog.Infof("Executing rollback func:%s for error: %v", runtime.FuncForPC(reflect.ValueOf(fc).Pointer()).Name(), err)
}
fc()
}
}