From d3252963022943c39646d04e978f175f031c8de9 Mon Sep 17 00:00:00 2001 From: "cheolho.kang" Date: Wed, 19 Mar 2025 15:01:28 +0900 Subject: [PATCH] feat: implement device registry for NVMe device discovery and management Signed-off-by: cheolho.kang --- Dockerfile | 2 + deploy/kubernetes/csi-nvmf-controller.yaml | 15 +- examples/kubernetes/example/storageclass.yaml | 4 + pkg/nvmf/controllerserver.go | 22 ++- pkg/nvmf/device_registry.go | 162 ++++++++++++++++++ pkg/nvmf/nvmf.go | 21 ++- 6 files changed, 214 insertions(+), 12 deletions(-) create mode 100644 pkg/nvmf/device_registry.go diff --git a/Dockerfile b/Dockerfile index 1077d2d..f2ad0d1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,7 @@ FROM debian:9.13 +RUN apt-get update && apt-get install -y nvme-cli + COPY ./bin/nvmfplugin . ENTRYPOINT ["/nvmfplugin"] diff --git a/deploy/kubernetes/csi-nvmf-controller.yaml b/deploy/kubernetes/csi-nvmf-controller.yaml index 7715bca..fb6f7eb 100644 --- a/deploy/kubernetes/csi-nvmf-controller.yaml +++ b/deploy/kubernetes/csi-nvmf-controller.yaml @@ -51,13 +51,18 @@ spec: env: - name: CSI_ENDPOINT value: unix:///csi/csi.sock + securityContext: + privileged: true volumeMounts: - name: socket-dir mountPath: /csi - name: volume-map mountPath: /var/lib/kubelet/plugins/csi.nvmf.com/volumes mountPropagation: "HostToContainer" - + - name: dev + mountPath: /dev + - name: sys + mountPath: /sys volumes: - name: socket-dir emptyDir: {} @@ -65,5 +70,9 @@ spec: hostPath: path: /var/lib/kubelet/plugins/csi.nvmf.com/volumes type: DirectoryOrCreate - - + - name: dev + hostPath: + path: /dev + - name: sys + hostPath: + path: /sys diff --git a/examples/kubernetes/example/storageclass.yaml b/examples/kubernetes/example/storageclass.yaml index d4cb830..5afe9a7 100644 --- a/examples/kubernetes/example/storageclass.yaml +++ b/examples/kubernetes/example/storageclass.yaml @@ -2,6 +2,10 @@ apiVersion: storage.k8s.io/v1 kind: StorageClass metadata: name: csi-nvmf-sc +parameters: + targetTrAddr: "192.168.122.18" + targetTrPort: "49153" + targetTrType: "tcp" provisioner: csi.nvmf.com reclaimPolicy: Delete allowVolumeExpansion: true \ No newline at end of file diff --git a/pkg/nvmf/controllerserver.go b/pkg/nvmf/controllerserver.go index 2c45ec4..6fe3262 100644 --- a/pkg/nvmf/controllerserver.go +++ b/pkg/nvmf/controllerserver.go @@ -26,13 +26,15 @@ import ( ) type ControllerServer struct { - Driver *driver + Driver *driver + deviceRegistry *DeviceRegistry } // create controller server func NewControllerServer(d *driver) *ControllerServer { return &ControllerServer{ - Driver: d, + Driver: d, + deviceRegistry: NewDeviceRegistry(), } } @@ -50,6 +52,22 @@ func (c *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolu klog.V(4).Infof("CreateVolume called with name: %s", volumeName) + // Extract volume parameters + parameters := req.GetParameters() + if parameters == nil { + parameters = make(map[string]string) + } + + // Discover NVMe devices if needed + if err := c.deviceRegistry.DiscoverDevices(parameters); err != nil { + klog.Errorf("Failed to discover NVMe devices: %v", err) + return nil, status.Errorf(codes.Internal, "device discovery failed: %v", err) + } + + // TODO (cheolho.kang): In a future implementation, this method would: + // 1. Allocate a device for the volume + // 2. Store the allocation info in etcd + // 3. Return the allocated device information in the response return nil, status.Errorf(codes.Unimplemented, "CreateVolume should implement by yourself. ") } diff --git a/pkg/nvmf/device_registry.go b/pkg/nvmf/device_registry.go new file mode 100644 index 0000000..2a37c80 --- /dev/null +++ b/pkg/nvmf/device_registry.go @@ -0,0 +1,162 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nvmf + +import ( + "bytes" + "encoding/json" + "fmt" + "os/exec" + "strings" + "sync" + + "k8s.io/klog/v2" +) + +// VolumeInfo wraps nvmfDiskInfo with allocation metadata +type VolumeInfo struct { + *nvmfDiskInfo + IsAllocated bool +} + +// DeviceRegistry manages NVMe device discovery and allocation +type DeviceRegistry struct { + // Protects device registry data + mutex sync.RWMutex + + // All discovered volume info indexed by NQN + devices map[string]*VolumeInfo + + // Set of available NQNs for quick lookup + availableNQNs map[string]struct{} + + // Map from volume name to NQN for allocated devices + volumeToNQN map[string]string +} + +// NewDeviceRegistry creates a new device registry +func NewDeviceRegistry() *DeviceRegistry { + return &DeviceRegistry{ + devices: make(map[string]*VolumeInfo), + availableNQNs: make(map[string]struct{}), + volumeToNQN: make(map[string]string), + } +} + +// DiscoverDevices performs NVMe device discovery +func (r *DeviceRegistry) DiscoverDevices(params map[string]string) error { + r.mutex.Lock() + defer r.mutex.Unlock() + + klog.V(4).Info("Performing NVMe device discovery") + discoveredDevices, err := discoverNVMeDevices(params) + if err != nil { + return fmt.Errorf("device discovery failed: %v", err) + } + + if len(discoveredDevices) == len(r.devices) { + klog.V(4).Info("No new devices discovered, skipping update") + return nil + } + + for nqn, diskInfo := range discoveredDevices { + if _, exists := r.devices[nqn]; !exists { + r.devices[nqn] = &VolumeInfo{ + nvmfDiskInfo: diskInfo, + IsAllocated: false, + } + + r.availableNQNs[nqn] = struct{}{} + } + } + klog.V(4).Infof("Discovered %d NVMe targets", len(r.devices)) + + for _, device := range r.devices { + klog.V(4).Infof("- NQN: %s, Endpoints: %v:%v", device.Nqn, device.Addr, device.Port) + } + + return nil +} + +// discoverNVMeDevices runs NVMe discovery and returns available targets +func discoverNVMeDevices(params map[string]string) (map[string]*nvmfDiskInfo, error) { + if params == nil { + return nil, fmt.Errorf("discovery parameters are nil") + } + + targetAddr := params[paramAddr] + targetPort := params[paramPort] + targetType := params[paramType] + + if targetAddr == "" || targetPort == "" || targetType == "" { + return nil, fmt.Errorf("missing required discovery parameters") + } + + if strings.ToLower(targetType) != "tcp" && strings.ToLower(targetType) != "rdma" { + return nil, fmt.Errorf("transport type must be tcp or rdma, got: %s", targetType) + } + + klog.V(4).Infof("Discovering NVMe targets at %s:%s using %s", targetAddr, targetPort, targetType) + + deviceMap := make(map[string]*nvmfDiskInfo) + cmd := exec.Command("nvme", "discover", "-a", targetAddr, "-s", targetPort, "-t", targetType, "-o", "json") + var out bytes.Buffer + cmd.Stdout = &out + + if err := cmd.Run(); err != nil { + return nil, fmt.Errorf("nvme discover command failed: %v", err) + } + + // Parse JSON output and organize by NQN + devices := parseNvmeDiscoveryOutput(out.String(), targetType) + for _, device := range devices { + deviceMap[device.Nqn] = device + } + + return deviceMap, nil +} + +// parseNvmeDiscoveryOutput parses the JSON output of nvme discover command +func parseNvmeDiscoveryOutput(output string, targetType string) []*nvmfDiskInfo { + targets := make([]*nvmfDiskInfo, 0) + discoveryNQN := "discovery" + + // Define structure for JSON parsing + type discoveryResponse struct { + Records []nvmfDiskInfo `json:"records"` + } + + var response discoveryResponse + if err := json.Unmarshal([]byte(output), &response); err != nil { + klog.Errorf("Failed to parse NVMe discovery JSON output: %v", err) + return targets + } + + for _, record := range response.Records { + // Skip discovery NQN and non-matching transport type + if strings.Contains(strings.ToLower(record.Nqn), discoveryNQN) || + record.Transport != targetType { + continue + } + + // Append to targets list + recordCopy := record // Create a copy because 'record' is reused in each loop iteration + targets = append(targets, &recordCopy) + } + + return targets +} diff --git a/pkg/nvmf/nvmf.go b/pkg/nvmf/nvmf.go index caaa786..4c0e2dc 100644 --- a/pkg/nvmf/nvmf.go +++ b/pkg/nvmf/nvmf.go @@ -26,13 +26,20 @@ import ( "k8s.io/utils/mount" ) +// NVMe-oF parameter keys +const ( + paramAddr = "targetTrAddr" // Target address parameter + paramPort = "targetTrPort" // Target port parameter + paramType = "targetTrType" // Transport type parameter +) + type nvmfDiskInfo struct { VolName string - Nqn string - Addr string - Port string + Nqn string `json:"subnqn"` + Addr string `json:"traddr"` + Port string `json:"trsvcid"` DeviceUUID string - Transport string + Transport string `json:"trtype"` } type nvmfDiskMounter struct { @@ -55,9 +62,9 @@ func getNVMfDiskInfo(req *csi.NodePublishVolumeRequest) (*nvmfDiskInfo, error) { volName := req.GetVolumeId() volOpts := req.GetVolumeContext() - targetTrAddr := volOpts["targetTrAddr"] - targetTrPort := volOpts["targetTrPort"] - targetTrType := volOpts["targetTrType"] + targetTrAddr := volOpts[paramAddr] + targetTrPort := volOpts[paramPort] + targetTrType := volOpts[paramType] deviceUUID := volOpts["deviceUUID"] nqn := volOpts["nqn"]