feat: implement device registry for NVMe device discovery and management

Signed-off-by: cheolho.kang <cheolho.kang@samsung.com>
This commit is contained in:
cheolho.kang
2025-03-19 15:01:28 +09:00
parent f70aef31d7
commit d325296302
6 changed files with 214 additions and 12 deletions

View File

@@ -1,5 +1,7 @@
FROM debian:9.13
RUN apt-get update && apt-get install -y nvme-cli
COPY ./bin/nvmfplugin .
ENTRYPOINT ["/nvmfplugin"]

View File

@@ -51,13 +51,18 @@ spec:
env:
- name: CSI_ENDPOINT
value: unix:///csi/csi.sock
securityContext:
privileged: true
volumeMounts:
- name: socket-dir
mountPath: /csi
- name: volume-map
mountPath: /var/lib/kubelet/plugins/csi.nvmf.com/volumes
mountPropagation: "HostToContainer"
- name: dev
mountPath: /dev
- name: sys
mountPath: /sys
volumes:
- name: socket-dir
emptyDir: {}
@@ -65,5 +70,9 @@ spec:
hostPath:
path: /var/lib/kubelet/plugins/csi.nvmf.com/volumes
type: DirectoryOrCreate
- name: dev
hostPath:
path: /dev
- name: sys
hostPath:
path: /sys

View File

@@ -2,6 +2,10 @@ apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: csi-nvmf-sc
parameters:
targetTrAddr: "192.168.122.18"
targetTrPort: "49153"
targetTrType: "tcp"
provisioner: csi.nvmf.com
reclaimPolicy: Delete
allowVolumeExpansion: true

View File

@@ -26,13 +26,15 @@ import (
)
type ControllerServer struct {
Driver *driver
Driver *driver
deviceRegistry *DeviceRegistry
}
// create controller server
func NewControllerServer(d *driver) *ControllerServer {
return &ControllerServer{
Driver: d,
Driver: d,
deviceRegistry: NewDeviceRegistry(),
}
}
@@ -50,6 +52,22 @@ func (c *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolu
klog.V(4).Infof("CreateVolume called with name: %s", volumeName)
// Extract volume parameters
parameters := req.GetParameters()
if parameters == nil {
parameters = make(map[string]string)
}
// Discover NVMe devices if needed
if err := c.deviceRegistry.DiscoverDevices(parameters); err != nil {
klog.Errorf("Failed to discover NVMe devices: %v", err)
return nil, status.Errorf(codes.Internal, "device discovery failed: %v", err)
}
// TODO (cheolho.kang): In a future implementation, this method would:
// 1. Allocate a device for the volume
// 2. Store the allocation info in etcd
// 3. Return the allocated device information in the response
return nil, status.Errorf(codes.Unimplemented, "CreateVolume should implement by yourself. ")
}

162
pkg/nvmf/device_registry.go Normal file
View File

@@ -0,0 +1,162 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nvmf
import (
"bytes"
"encoding/json"
"fmt"
"os/exec"
"strings"
"sync"
"k8s.io/klog/v2"
)
// VolumeInfo wraps nvmfDiskInfo with allocation metadata
type VolumeInfo struct {
*nvmfDiskInfo
IsAllocated bool
}
// DeviceRegistry manages NVMe device discovery and allocation
type DeviceRegistry struct {
// Protects device registry data
mutex sync.RWMutex
// All discovered volume info indexed by NQN
devices map[string]*VolumeInfo
// Set of available NQNs for quick lookup
availableNQNs map[string]struct{}
// Map from volume name to NQN for allocated devices
volumeToNQN map[string]string
}
// NewDeviceRegistry creates a new device registry
func NewDeviceRegistry() *DeviceRegistry {
return &DeviceRegistry{
devices: make(map[string]*VolumeInfo),
availableNQNs: make(map[string]struct{}),
volumeToNQN: make(map[string]string),
}
}
// DiscoverDevices performs NVMe device discovery
func (r *DeviceRegistry) DiscoverDevices(params map[string]string) error {
r.mutex.Lock()
defer r.mutex.Unlock()
klog.V(4).Info("Performing NVMe device discovery")
discoveredDevices, err := discoverNVMeDevices(params)
if err != nil {
return fmt.Errorf("device discovery failed: %v", err)
}
if len(discoveredDevices) == len(r.devices) {
klog.V(4).Info("No new devices discovered, skipping update")
return nil
}
for nqn, diskInfo := range discoveredDevices {
if _, exists := r.devices[nqn]; !exists {
r.devices[nqn] = &VolumeInfo{
nvmfDiskInfo: diskInfo,
IsAllocated: false,
}
r.availableNQNs[nqn] = struct{}{}
}
}
klog.V(4).Infof("Discovered %d NVMe targets", len(r.devices))
for _, device := range r.devices {
klog.V(4).Infof("- NQN: %s, Endpoints: %v:%v", device.Nqn, device.Addr, device.Port)
}
return nil
}
// discoverNVMeDevices runs NVMe discovery and returns available targets
func discoverNVMeDevices(params map[string]string) (map[string]*nvmfDiskInfo, error) {
if params == nil {
return nil, fmt.Errorf("discovery parameters are nil")
}
targetAddr := params[paramAddr]
targetPort := params[paramPort]
targetType := params[paramType]
if targetAddr == "" || targetPort == "" || targetType == "" {
return nil, fmt.Errorf("missing required discovery parameters")
}
if strings.ToLower(targetType) != "tcp" && strings.ToLower(targetType) != "rdma" {
return nil, fmt.Errorf("transport type must be tcp or rdma, got: %s", targetType)
}
klog.V(4).Infof("Discovering NVMe targets at %s:%s using %s", targetAddr, targetPort, targetType)
deviceMap := make(map[string]*nvmfDiskInfo)
cmd := exec.Command("nvme", "discover", "-a", targetAddr, "-s", targetPort, "-t", targetType, "-o", "json")
var out bytes.Buffer
cmd.Stdout = &out
if err := cmd.Run(); err != nil {
return nil, fmt.Errorf("nvme discover command failed: %v", err)
}
// Parse JSON output and organize by NQN
devices := parseNvmeDiscoveryOutput(out.String(), targetType)
for _, device := range devices {
deviceMap[device.Nqn] = device
}
return deviceMap, nil
}
// parseNvmeDiscoveryOutput parses the JSON output of nvme discover command
func parseNvmeDiscoveryOutput(output string, targetType string) []*nvmfDiskInfo {
targets := make([]*nvmfDiskInfo, 0)
discoveryNQN := "discovery"
// Define structure for JSON parsing
type discoveryResponse struct {
Records []nvmfDiskInfo `json:"records"`
}
var response discoveryResponse
if err := json.Unmarshal([]byte(output), &response); err != nil {
klog.Errorf("Failed to parse NVMe discovery JSON output: %v", err)
return targets
}
for _, record := range response.Records {
// Skip discovery NQN and non-matching transport type
if strings.Contains(strings.ToLower(record.Nqn), discoveryNQN) ||
record.Transport != targetType {
continue
}
// Append to targets list
recordCopy := record // Create a copy because 'record' is reused in each loop iteration
targets = append(targets, &recordCopy)
}
return targets
}

View File

@@ -26,13 +26,20 @@ import (
"k8s.io/utils/mount"
)
// NVMe-oF parameter keys
const (
paramAddr = "targetTrAddr" // Target address parameter
paramPort = "targetTrPort" // Target port parameter
paramType = "targetTrType" // Transport type parameter
)
type nvmfDiskInfo struct {
VolName string
Nqn string
Addr string
Port string
Nqn string `json:"subnqn"`
Addr string `json:"traddr"`
Port string `json:"trsvcid"`
DeviceUUID string
Transport string
Transport string `json:"trtype"`
}
type nvmfDiskMounter struct {
@@ -55,9 +62,9 @@ func getNVMfDiskInfo(req *csi.NodePublishVolumeRequest) (*nvmfDiskInfo, error) {
volName := req.GetVolumeId()
volOpts := req.GetVolumeContext()
targetTrAddr := volOpts["targetTrAddr"]
targetTrPort := volOpts["targetTrPort"]
targetTrType := volOpts["targetTrType"]
targetTrAddr := volOpts[paramAddr]
targetTrPort := volOpts[paramPort]
targetTrType := volOpts[paramType]
deviceUUID := volOpts["deviceUUID"]
nqn := volOpts["nqn"]