feat: Enable NVMe multipath support

- Modified VolumeContext in `CreateVolume` to support an array of target endpoints, allowing multiple paths to be defined for a single volume.
- Updated the connection establishment process to iterate through the available endpoints when attaching a volume.

Signed-off-by: cheolho.kang <cheolho.kang@samsung.com>
This commit is contained in:
cheolho.kang 2025-05-18 23:02:45 +09:00
parent 879ba7b890
commit 1af62b5cc7
6 changed files with 133 additions and 60 deletions

View File

@ -3,8 +3,8 @@ kind: StorageClass
metadata:
name: csi-nvmf-sc
parameters:
targetTrAddr: "192.168.122.18"
targetTrPort: "49153"
targetTrAddr: "192.168.122.18,192.168.122.19"
targetTrPort: "49153,49154"
targetTrType: "tcp"
provisioner: csi.nvmf.com
reclaimPolicy: Delete

View File

@ -18,6 +18,7 @@ package nvmf
import (
"context"
"strings"
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
@ -104,11 +105,22 @@ func (c *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolu
return nil, status.Errorf(codes.ResourceExhausted, "no suitable device available: %v", err)
}
volumeContext := map[string]string{
paramType: allocatedDevice.Transport,
}
if len(allocatedDevice.Endpoints) > 1 {
endpointPairs := []string{}
endpointPairs = append(endpointPairs, allocatedDevice.Endpoints...)
volumeContext[paramEndpoint] = strings.Join(endpointPairs, ",")
}
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: allocatedDevice.Nqn,
CapacityBytes: UseActualDeviceCapacity, // PV will use the actual capacity
VolumeContext: parameters,
VolumeContext: volumeContext,
ContentSource: req.GetVolumeContentSource(),
},
}, nil

View File

@ -120,8 +120,7 @@ func (r *DeviceRegistry) SyncFromPV(ctx context.Context) error {
VolName: pv.Name,
Nqn: nqn,
Transport: pv.Spec.CSI.VolumeAttributes[paramType],
Addr: pv.Spec.CSI.VolumeAttributes["targetTrAddr"],
Port: pv.Spec.CSI.VolumeAttributes["targetTrPort"],
Endpoints: strings.Split(pv.Spec.CSI.VolumeAttributes["targetTrEndpoint"], ","),
},
IsAllocated: true,
}
@ -164,7 +163,7 @@ func (r *DeviceRegistry) DiscoverDevices(params map[string]string) error {
klog.V(4).Infof("Discovered %d NVMe targets", len(r.devices))
for _, device := range r.devices {
klog.V(4).Infof("- NQN: %s, Endpoints: %v:%v", device.Nqn, device.Addr, device.Port)
klog.V(4).Infof("- NQN: %s, isAllocated: %t, Endpoints: %v", device.Nqn, device.IsAllocated, device.Endpoints)
}
return nil
@ -207,7 +206,7 @@ func (r *DeviceRegistry) AllocateDevice(volumeName string) (*VolumeInfo, error)
device.VolName = volumeName
device.IsAllocated = true
klog.V(4).Infof("[%d/%d] Allocated volume %s (NQN %s)", len(r.devices) - len(r.availableNQNs), len(r.devices), volumeName, nqn)
klog.V(4).Infof("[%d/%d] Allocated volume %s (NQN %s)", len(r.devices)-len(r.availableNQNs), len(r.devices), volumeName, nqn)
return device, nil
}
@ -217,7 +216,6 @@ func (r *DeviceRegistry) ReleaseDevice(nqn string) {
r.mutex.Lock()
defer r.mutex.Unlock()
device, exists := r.devices[nqn]
if !exists {
// CSI spec requires idempotency: return success even if volume doesn't exist
@ -232,7 +230,7 @@ func (r *DeviceRegistry) ReleaseDevice(nqn string) {
r.availableNQNs[nqn] = struct{}{}
device.VolName = ""
klog.V(4).Infof("[%d/%d] Released volume %s", len(r.devices) - len(r.availableNQNs), len(r.devices), nqn)
klog.V(4).Infof("[%d/%d] Released volume %s", len(r.devices)-len(r.availableNQNs), len(r.devices), nqn)
}
// discoverNVMeDevices runs NVMe discovery and returns available targets
@ -255,21 +253,53 @@ func discoverNVMeDevices(params map[string]string) (map[string]*nvmfDiskInfo, er
klog.V(4).Infof("Discovering NVMe targets at %s:%s using %s", targetAddr, targetPort, targetType)
// Discover devices on each port
ips := strings.Split(targetAddr, ",")
ports := strings.Split(targetPort, ",")
// collect devices by NQN with endpoints as a list
deviceMap := make(map[string]*nvmfDiskInfo)
cmd := exec.Command("nvme", "discover", "-a", targetAddr, "-s", targetPort, "-t", targetType, "-o", "json")
var out bytes.Buffer
cmd.Stdout = &out
for _, ip := range ips {
ip = strings.TrimSpace(ip) // Trim spaces in case there are spaces after commas
for _, port := range ports {
port = strings.TrimSpace(port) // Trim spaces in case there are spaces after commas
if port == "" {
continue
}
if err := cmd.Run(); err != nil {
return nil, fmt.Errorf("nvme discover command failed: %v", err)
klog.V(4).Infof("Running discovery on %s://%s:%s", targetType, ip, port)
cmd := exec.Command("nvme", "discover", "-a", ip, "-s", port, "-t", targetType, "-o", "json")
var out bytes.Buffer
cmd.Stdout = &out
if err := cmd.Run(); err != nil {
klog.Warningf("nvme discover command failed for port %s: %v", port, err)
continue // Continue with next port instead of failing completely
}
// Parse JSON output and organize by NQN
devices := parseNvmeDiscoveryOutput(out.String(), targetType)
for _, device := range devices {
if existingDevice, exists := deviceMap[device.Nqn]; exists {
// NQN already exists, just add the new endpoint if it's not already in the list
endpoint := device.Endpoints[0]
endpointExists := false
for _, existingEndpoint := range existingDevice.Endpoints {
if existingEndpoint == endpoint {
endpointExists = true
break
}
}
if !endpointExists {
existingDevice.Endpoints = append(existingDevice.Endpoints, endpoint)
}
} else {
// New NQN, add the device to the map
deviceMap[device.Nqn] = device
}
}
}
}
// Parse JSON output and organize by NQN
devices := parseNvmeDiscoveryOutput(out.String(), targetType)
for _, device := range devices {
deviceMap[device.Nqn] = device
}
return deviceMap, nil
}
@ -296,10 +326,17 @@ func parseNvmeDiscoveryOutput(output string, targetType string) []*nvmfDiskInfo
continue
}
// Set endpoint address if both Addr and Port are provided
// These are required for the noder server to connect to the target with multipath
if record.Addr != "" && record.Port != "" {
record.Endpoints = []string{record.Addr + ":" + record.Port}
} else {
klog.Warningf("Skipping record with invalid Addr or Port: Addr=%s, Port=%s", record.Addr, record.Port)
}
// Append to targets list
recordCopy := record // Create a copy because 'record' is reused in each loop iteration
targets = append(targets, &recordCopy)
}
return targets
}

View File

@ -29,26 +29,24 @@ import (
)
type Connector struct {
VolumeID string
TargetNqn string
TargetAddr string
TargetPort string
Transport string
HostNqn string
RetryCount int32
CheckInterval int32
VolumeID string
TargetNqn string
TargetEndpoints []string
Transport string
HostNqn string
RetryCount int32
CheckInterval int32
}
func getNvmfConnector(nvmfInfo *nvmfDiskInfo, hostnqn string) *Connector {
return &Connector{
VolumeID: nvmfInfo.VolName,
TargetNqn: nvmfInfo.Nqn,
TargetAddr: nvmfInfo.Addr,
TargetPort: nvmfInfo.Port,
Transport: nvmfInfo.Transport,
HostNqn: hostnqn,
RetryCount: 10, // Default retry count
CheckInterval: 1, // Default check interval in seconds
VolumeID: nvmfInfo.VolName,
TargetNqn: nvmfInfo.Nqn,
TargetEndpoints: nvmfInfo.Endpoints,
Transport: nvmfInfo.Transport,
HostNqn: hostnqn,
RetryCount: 10, // Default retry count
CheckInterval: 1, // Default check interval in seconds
}
}
@ -237,14 +235,31 @@ func (c *Connector) Connect() (string, error) {
return "", fmt.Errorf("csi transport only support tcp/rdma ")
}
baseString := fmt.Sprintf("nqn=%s,transport=%s,traddr=%s,trsvcid=%s,hostnqn=%s", c.TargetNqn, c.Transport, c.TargetAddr, c.TargetPort, c.HostNqn)
// TargetEndpoints is assumed to be populated (via CreateVolume) with multiple "IP:Port" entries
// Attempt to connect to all endpoints to support multi-path configurations
for _, endpoint := range c.TargetEndpoints {
// Split the endpoint into IP and port
parts := strings.Split(endpoint, ":")
if len(parts) != 2 {
return "", fmt.Errorf("invalid endpoint format: %s", endpoint)
}
// connect to nvmf disk
err := _connect(baseString)
if err != nil {
return "", err
ip, port := strings.TrimSpace(parts[0]), strings.TrimSpace(parts[1])
if ip == "" || port == "" {
return "", fmt.Errorf("empty IP or port in endpoint: %s", endpoint)
}
baseString := fmt.Sprintf("nqn=%s,transport=%s,traddr=%s,trsvcid=%s,hostnqn=%s", c.TargetNqn, c.Transport, ip, port, c.HostNqn)
klog.V(4).Infof("Running connect on %s://%s:%s", c.Transport, ip, port)
// connect to nvmf disk
err := _connect(baseString)
if err != nil {
klog.Errorf("Connect: failed to connect to endpoint %s, error: %v", endpoint, err)
return "", err
}
}
klog.Infof("Connect Volume %s success nqn: %s, hostnqn: %s", c.VolumeID, c.TargetNqn, c.HostNqn)
klog.V(4).Infof("Connect Volume %s success nqn: %s, hostnqn: %s", c.VolumeID, c.TargetNqn, c.HostNqn)
// Wait for device to be ready (find UUID and check path)
devicePath, err := findPathWithRetry(c.TargetNqn, c.RetryCount, c.CheckInterval)

View File

@ -230,7 +230,11 @@ func (n *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstage
}
// Detach the volume
err = DetachDisk(volumeID, stagingPath)
// NQN is used as the volume ID
// This assumption is valid because in CreateVolume, we assigned the device's NQN
// as the volumeID when returning the CreateVolumeResponse.
targetNqn := volumeID
err = DetachDisk(targetNqn, stagingPath)
if err != nil {
klog.Errorf("NodeUnstageVolume: failed to detach volume %s: %v", volumeID, err)
return nil, status.Errorf(codes.Internal, "failed to detach volume: %v", err)

View File

@ -20,6 +20,7 @@ import (
"fmt"
"os"
"path/filepath"
"strings"
"github.com/container-storage-interface/spec/lib/go/csi"
"k8s.io/klog/v2"
@ -29,9 +30,10 @@ import (
// NVMe-oF parameter keys
const (
paramAddr = "targetTrAddr" // Target address parameter
paramPort = "targetTrPort" // Target port parameter
paramType = "targetTrType" // Transport type parameter
paramAddr = "targetTrAddr" // Target address parameter
paramPort = "targetTrPort" // Target port parameter
paramType = "targetTrType" // Transport type parameter
paramEndpoint = "targetTrEndpoint" // Target endpoints parameter
)
type nvmfDiskInfo struct {
@ -40,6 +42,7 @@ type nvmfDiskInfo struct {
Addr string `json:"traddr"`
Port string `json:"trsvcid"`
Transport string `json:"trtype"`
Endpoints []string
}
type nvmfDiskMounter struct {
@ -64,19 +67,22 @@ func getNVMfDiskInfo(volID string, params map[string]string) (*nvmfDiskInfo, err
return nil, fmt.Errorf("discovery parameters are nil")
}
targetTrAddr := params[paramAddr]
targetTrPort := params[paramPort]
targetTrType := params[paramType]
targetTrEndpoints := params[paramEndpoint]
nqn := volID
if targetTrAddr == "" || nqn == "" || targetTrPort == "" || targetTrType == "" {
return nil, fmt.Errorf("some nvme target info is missing, volID: %s ", volID)
if nqn == "" || targetTrType == "" || targetTrEndpoints == "" {
return nil, fmt.Errorf("some nvme target info is missing, nqn: %s, type: %s, eindpoints: %s ", nqn, targetTrType, targetTrEndpoints)
}
endpoints := strings.Split(targetTrEndpoints, ",")
if len(endpoints) == 0 {
return nil, fmt.Errorf("no endpoints found in %s", volID)
}
return &nvmfDiskInfo{
VolName: volID,
Addr: targetTrAddr,
Port: targetTrPort,
Endpoints: endpoints,
Nqn: nqn,
Transport: targetTrType,
}, nil
@ -127,15 +133,14 @@ func AttachDisk(volumeID string, connector *Connector) (string, error) {
}
// DetachDisk disconnects an NVMe-oF disk
func DetachDisk(volumeID string, targetPath string) error {
connector, err := GetConnectorFromFile(targetPath + ".json")
if err != nil {
klog.Errorf("DetachDisk: failed to get connector from path %s Error: %v", targetPath, err)
return err
func DetachDisk(targetNqn, targetPath string) error {
connector := Connector{
TargetNqn: targetNqn,
HostNqn: targetPath,
}
err = connector.Disconnect()
err := connector.Disconnect()
if err != nil {
klog.Errorf("DetachDisk: VolumeID: %s failed to disconnect, Error: %v", volumeID, err)
klog.Errorf("DetachDisk: failed to disconnect, Error: %v", err)
return err
}