From 1af62b5cc74eeb0daba66c5e1b26665bddaee41c Mon Sep 17 00:00:00 2001 From: "cheolho.kang" Date: Sun, 18 May 2025 23:02:45 +0900 Subject: [PATCH] feat: Enable NVMe multipath support - Modified VolumeContext in `CreateVolume` to support an array of target endpoints, allowing multiple paths to be defined for a single volume. - Updated the connection establishment process to iterate through the available endpoints when attaching a volume. Signed-off-by: cheolho.kang --- examples/kubernetes/example/storageclass.yaml | 4 +- pkg/nvmf/controllerserver.go | 14 +++- pkg/nvmf/device_registry.go | 73 ++++++++++++++----- pkg/nvmf/fabrics.go | 59 +++++++++------ pkg/nvmf/nodeserver.go | 6 +- pkg/nvmf/nvmf.go | 37 ++++++---- 6 files changed, 133 insertions(+), 60 deletions(-) diff --git a/examples/kubernetes/example/storageclass.yaml b/examples/kubernetes/example/storageclass.yaml index 5afe9a7..7bacb10 100644 --- a/examples/kubernetes/example/storageclass.yaml +++ b/examples/kubernetes/example/storageclass.yaml @@ -3,8 +3,8 @@ kind: StorageClass metadata: name: csi-nvmf-sc parameters: - targetTrAddr: "192.168.122.18" - targetTrPort: "49153" + targetTrAddr: "192.168.122.18,192.168.122.19" + targetTrPort: "49153,49154" targetTrType: "tcp" provisioner: csi.nvmf.com reclaimPolicy: Delete diff --git a/pkg/nvmf/controllerserver.go b/pkg/nvmf/controllerserver.go index ee8e56d..8324093 100644 --- a/pkg/nvmf/controllerserver.go +++ b/pkg/nvmf/controllerserver.go @@ -18,6 +18,7 @@ package nvmf import ( "context" + "strings" "github.com/container-storage-interface/spec/lib/go/csi" "google.golang.org/grpc/codes" @@ -104,11 +105,22 @@ func (c *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolu return nil, status.Errorf(codes.ResourceExhausted, "no suitable device available: %v", err) } + volumeContext := map[string]string{ + paramType: allocatedDevice.Transport, + } + + if len(allocatedDevice.Endpoints) > 1 { + endpointPairs := []string{} + endpointPairs = append(endpointPairs, allocatedDevice.Endpoints...) + + volumeContext[paramEndpoint] = strings.Join(endpointPairs, ",") + } + return &csi.CreateVolumeResponse{ Volume: &csi.Volume{ VolumeId: allocatedDevice.Nqn, CapacityBytes: UseActualDeviceCapacity, // PV will use the actual capacity - VolumeContext: parameters, + VolumeContext: volumeContext, ContentSource: req.GetVolumeContentSource(), }, }, nil diff --git a/pkg/nvmf/device_registry.go b/pkg/nvmf/device_registry.go index d0a2820..6a5b6d2 100644 --- a/pkg/nvmf/device_registry.go +++ b/pkg/nvmf/device_registry.go @@ -120,8 +120,7 @@ func (r *DeviceRegistry) SyncFromPV(ctx context.Context) error { VolName: pv.Name, Nqn: nqn, Transport: pv.Spec.CSI.VolumeAttributes[paramType], - Addr: pv.Spec.CSI.VolumeAttributes["targetTrAddr"], - Port: pv.Spec.CSI.VolumeAttributes["targetTrPort"], + Endpoints: strings.Split(pv.Spec.CSI.VolumeAttributes["targetTrEndpoint"], ","), }, IsAllocated: true, } @@ -164,7 +163,7 @@ func (r *DeviceRegistry) DiscoverDevices(params map[string]string) error { klog.V(4).Infof("Discovered %d NVMe targets", len(r.devices)) for _, device := range r.devices { - klog.V(4).Infof("- NQN: %s, Endpoints: %v:%v", device.Nqn, device.Addr, device.Port) + klog.V(4).Infof("- NQN: %s, isAllocated: %t, Endpoints: %v", device.Nqn, device.IsAllocated, device.Endpoints) } return nil @@ -207,7 +206,7 @@ func (r *DeviceRegistry) AllocateDevice(volumeName string) (*VolumeInfo, error) device.VolName = volumeName device.IsAllocated = true - klog.V(4).Infof("[%d/%d] Allocated volume %s (NQN %s)", len(r.devices) - len(r.availableNQNs), len(r.devices), volumeName, nqn) + klog.V(4).Infof("[%d/%d] Allocated volume %s (NQN %s)", len(r.devices)-len(r.availableNQNs), len(r.devices), volumeName, nqn) return device, nil } @@ -217,7 +216,6 @@ func (r *DeviceRegistry) ReleaseDevice(nqn string) { r.mutex.Lock() defer r.mutex.Unlock() - device, exists := r.devices[nqn] if !exists { // CSI spec requires idempotency: return success even if volume doesn't exist @@ -232,7 +230,7 @@ func (r *DeviceRegistry) ReleaseDevice(nqn string) { r.availableNQNs[nqn] = struct{}{} device.VolName = "" - klog.V(4).Infof("[%d/%d] Released volume %s", len(r.devices) - len(r.availableNQNs), len(r.devices), nqn) + klog.V(4).Infof("[%d/%d] Released volume %s", len(r.devices)-len(r.availableNQNs), len(r.devices), nqn) } // discoverNVMeDevices runs NVMe discovery and returns available targets @@ -255,21 +253,53 @@ func discoverNVMeDevices(params map[string]string) (map[string]*nvmfDiskInfo, er klog.V(4).Infof("Discovering NVMe targets at %s:%s using %s", targetAddr, targetPort, targetType) + // Discover devices on each port + ips := strings.Split(targetAddr, ",") + ports := strings.Split(targetPort, ",") + // collect devices by NQN with endpoints as a list deviceMap := make(map[string]*nvmfDiskInfo) - cmd := exec.Command("nvme", "discover", "-a", targetAddr, "-s", targetPort, "-t", targetType, "-o", "json") - var out bytes.Buffer - cmd.Stdout = &out + for _, ip := range ips { + ip = strings.TrimSpace(ip) // Trim spaces in case there are spaces after commas + for _, port := range ports { + port = strings.TrimSpace(port) // Trim spaces in case there are spaces after commas + if port == "" { + continue + } - if err := cmd.Run(); err != nil { - return nil, fmt.Errorf("nvme discover command failed: %v", err) + klog.V(4).Infof("Running discovery on %s://%s:%s", targetType, ip, port) + cmd := exec.Command("nvme", "discover", "-a", ip, "-s", port, "-t", targetType, "-o", "json") + var out bytes.Buffer + cmd.Stdout = &out + + if err := cmd.Run(); err != nil { + klog.Warningf("nvme discover command failed for port %s: %v", port, err) + continue // Continue with next port instead of failing completely + } + + // Parse JSON output and organize by NQN + devices := parseNvmeDiscoveryOutput(out.String(), targetType) + for _, device := range devices { + if existingDevice, exists := deviceMap[device.Nqn]; exists { + // NQN already exists, just add the new endpoint if it's not already in the list + endpoint := device.Endpoints[0] + endpointExists := false + for _, existingEndpoint := range existingDevice.Endpoints { + if existingEndpoint == endpoint { + endpointExists = true + break + } + } + if !endpointExists { + existingDevice.Endpoints = append(existingDevice.Endpoints, endpoint) + } + } else { + // New NQN, add the device to the map + deviceMap[device.Nqn] = device + } + } + } } - // Parse JSON output and organize by NQN - devices := parseNvmeDiscoveryOutput(out.String(), targetType) - for _, device := range devices { - deviceMap[device.Nqn] = device - } - return deviceMap, nil } @@ -296,10 +326,17 @@ func parseNvmeDiscoveryOutput(output string, targetType string) []*nvmfDiskInfo continue } + // Set endpoint address if both Addr and Port are provided + // These are required for the noder server to connect to the target with multipath + if record.Addr != "" && record.Port != "" { + record.Endpoints = []string{record.Addr + ":" + record.Port} + } else { + klog.Warningf("Skipping record with invalid Addr or Port: Addr=%s, Port=%s", record.Addr, record.Port) + } + // Append to targets list recordCopy := record // Create a copy because 'record' is reused in each loop iteration targets = append(targets, &recordCopy) } - return targets } diff --git a/pkg/nvmf/fabrics.go b/pkg/nvmf/fabrics.go index 959e765..7d2c1ab 100644 --- a/pkg/nvmf/fabrics.go +++ b/pkg/nvmf/fabrics.go @@ -29,26 +29,24 @@ import ( ) type Connector struct { - VolumeID string - TargetNqn string - TargetAddr string - TargetPort string - Transport string - HostNqn string - RetryCount int32 - CheckInterval int32 + VolumeID string + TargetNqn string + TargetEndpoints []string + Transport string + HostNqn string + RetryCount int32 + CheckInterval int32 } func getNvmfConnector(nvmfInfo *nvmfDiskInfo, hostnqn string) *Connector { return &Connector{ - VolumeID: nvmfInfo.VolName, - TargetNqn: nvmfInfo.Nqn, - TargetAddr: nvmfInfo.Addr, - TargetPort: nvmfInfo.Port, - Transport: nvmfInfo.Transport, - HostNqn: hostnqn, - RetryCount: 10, // Default retry count - CheckInterval: 1, // Default check interval in seconds + VolumeID: nvmfInfo.VolName, + TargetNqn: nvmfInfo.Nqn, + TargetEndpoints: nvmfInfo.Endpoints, + Transport: nvmfInfo.Transport, + HostNqn: hostnqn, + RetryCount: 10, // Default retry count + CheckInterval: 1, // Default check interval in seconds } } @@ -237,14 +235,31 @@ func (c *Connector) Connect() (string, error) { return "", fmt.Errorf("csi transport only support tcp/rdma ") } - baseString := fmt.Sprintf("nqn=%s,transport=%s,traddr=%s,trsvcid=%s,hostnqn=%s", c.TargetNqn, c.Transport, c.TargetAddr, c.TargetPort, c.HostNqn) + // TargetEndpoints is assumed to be populated (via CreateVolume) with multiple "IP:Port" entries + // Attempt to connect to all endpoints to support multi-path configurations + for _, endpoint := range c.TargetEndpoints { + // Split the endpoint into IP and port + parts := strings.Split(endpoint, ":") + if len(parts) != 2 { + return "", fmt.Errorf("invalid endpoint format: %s", endpoint) + } - // connect to nvmf disk - err := _connect(baseString) - if err != nil { - return "", err + ip, port := strings.TrimSpace(parts[0]), strings.TrimSpace(parts[1]) + if ip == "" || port == "" { + return "", fmt.Errorf("empty IP or port in endpoint: %s", endpoint) + } + + baseString := fmt.Sprintf("nqn=%s,transport=%s,traddr=%s,trsvcid=%s,hostnqn=%s", c.TargetNqn, c.Transport, ip, port, c.HostNqn) + klog.V(4).Infof("Running connect on %s://%s:%s", c.Transport, ip, port) + + // connect to nvmf disk + err := _connect(baseString) + if err != nil { + klog.Errorf("Connect: failed to connect to endpoint %s, error: %v", endpoint, err) + return "", err + } } - klog.Infof("Connect Volume %s success nqn: %s, hostnqn: %s", c.VolumeID, c.TargetNqn, c.HostNqn) + klog.V(4).Infof("Connect Volume %s success nqn: %s, hostnqn: %s", c.VolumeID, c.TargetNqn, c.HostNqn) // Wait for device to be ready (find UUID and check path) devicePath, err := findPathWithRetry(c.TargetNqn, c.RetryCount, c.CheckInterval) diff --git a/pkg/nvmf/nodeserver.go b/pkg/nvmf/nodeserver.go index bea95e1..e20c0b0 100644 --- a/pkg/nvmf/nodeserver.go +++ b/pkg/nvmf/nodeserver.go @@ -230,7 +230,11 @@ func (n *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstage } // Detach the volume - err = DetachDisk(volumeID, stagingPath) + // NQN is used as the volume ID + // This assumption is valid because in CreateVolume, we assigned the device's NQN + // as the volumeID when returning the CreateVolumeResponse. + targetNqn := volumeID + err = DetachDisk(targetNqn, stagingPath) if err != nil { klog.Errorf("NodeUnstageVolume: failed to detach volume %s: %v", volumeID, err) return nil, status.Errorf(codes.Internal, "failed to detach volume: %v", err) diff --git a/pkg/nvmf/nvmf.go b/pkg/nvmf/nvmf.go index 8e84772..f24057d 100644 --- a/pkg/nvmf/nvmf.go +++ b/pkg/nvmf/nvmf.go @@ -20,6 +20,7 @@ import ( "fmt" "os" "path/filepath" + "strings" "github.com/container-storage-interface/spec/lib/go/csi" "k8s.io/klog/v2" @@ -29,9 +30,10 @@ import ( // NVMe-oF parameter keys const ( - paramAddr = "targetTrAddr" // Target address parameter - paramPort = "targetTrPort" // Target port parameter - paramType = "targetTrType" // Transport type parameter + paramAddr = "targetTrAddr" // Target address parameter + paramPort = "targetTrPort" // Target port parameter + paramType = "targetTrType" // Transport type parameter + paramEndpoint = "targetTrEndpoint" // Target endpoints parameter ) type nvmfDiskInfo struct { @@ -40,6 +42,7 @@ type nvmfDiskInfo struct { Addr string `json:"traddr"` Port string `json:"trsvcid"` Transport string `json:"trtype"` + Endpoints []string } type nvmfDiskMounter struct { @@ -64,19 +67,22 @@ func getNVMfDiskInfo(volID string, params map[string]string) (*nvmfDiskInfo, err return nil, fmt.Errorf("discovery parameters are nil") } - targetTrAddr := params[paramAddr] - targetTrPort := params[paramPort] targetTrType := params[paramType] + targetTrEndpoints := params[paramEndpoint] nqn := volID - if targetTrAddr == "" || nqn == "" || targetTrPort == "" || targetTrType == "" { - return nil, fmt.Errorf("some nvme target info is missing, volID: %s ", volID) + if nqn == "" || targetTrType == "" || targetTrEndpoints == "" { + return nil, fmt.Errorf("some nvme target info is missing, nqn: %s, type: %s, eindpoints: %s ", nqn, targetTrType, targetTrEndpoints) + } + + endpoints := strings.Split(targetTrEndpoints, ",") + if len(endpoints) == 0 { + return nil, fmt.Errorf("no endpoints found in %s", volID) } return &nvmfDiskInfo{ VolName: volID, - Addr: targetTrAddr, - Port: targetTrPort, + Endpoints: endpoints, Nqn: nqn, Transport: targetTrType, }, nil @@ -127,15 +133,14 @@ func AttachDisk(volumeID string, connector *Connector) (string, error) { } // DetachDisk disconnects an NVMe-oF disk -func DetachDisk(volumeID string, targetPath string) error { - connector, err := GetConnectorFromFile(targetPath + ".json") - if err != nil { - klog.Errorf("DetachDisk: failed to get connector from path %s Error: %v", targetPath, err) - return err +func DetachDisk(targetNqn, targetPath string) error { + connector := Connector{ + TargetNqn: targetNqn, + HostNqn: targetPath, } - err = connector.Disconnect() + err := connector.Disconnect() if err != nil { - klog.Errorf("DetachDisk: VolumeID: %s failed to disconnect, Error: %v", volumeID, err) + klog.Errorf("DetachDisk: failed to disconnect, Error: %v", err) return err }