From 2db4c033504400096a2aa8eeddc78c0afa168b47 Mon Sep 17 00:00:00 2001 From: "cheolho.kang" Date: Tue, 20 May 2025 16:17:12 +0900 Subject: [PATCH] feat: implement `ControllerPublishVolume()` and `ControllerUnpublishVolume()` Signed-off-by: cheolho.kang --- deploy/kubernetes/csi-nvmf-rbac.yaml | 2 +- pkg/nvmf/controllerserver.go | 55 ++++++++++++++++++++++++++-- pkg/nvmf/device_registry.go | 9 +++++ pkg/nvmf/driver.go | 1 + 4 files changed, 63 insertions(+), 4 deletions(-) diff --git a/deploy/kubernetes/csi-nvmf-rbac.yaml b/deploy/kubernetes/csi-nvmf-rbac.yaml index 7aa3600..db2ac22 100644 --- a/deploy/kubernetes/csi-nvmf-rbac.yaml +++ b/deploy/kubernetes/csi-nvmf-rbac.yaml @@ -84,7 +84,7 @@ rules: verbs: ["get", "list", "watch"] - apiGroups: ["storage.k8s.io"] resources: ["volumeattachments"] - verbs: ["get", "list", "watch", "update", "patch"] + verbs: ["get", "list", "watch", "update", "patch", "create", "delete"] - apiGroups: ["storage.k8s.io"] resources: ["volumeattachments/status"] verbs: ["patch"] diff --git a/pkg/nvmf/controllerserver.go b/pkg/nvmf/controllerserver.go index 8324093..d62e2c9 100644 --- a/pkg/nvmf/controllerserver.go +++ b/pkg/nvmf/controllerserver.go @@ -159,12 +159,61 @@ func (c *ControllerServer) ControllerGetVolume(ctx context.Context, request *csi return nil, status.Errorf(codes.Unimplemented, "ControllerGetVolume not implement") } +// ControllerPublishVolume attaches the given volume to the node func (c *ControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "ControllerPublishVolume not implement") + volumeID := req.GetVolumeId() + nodeID := req.GetNodeId() + if !isValidVolumeID(volumeID) { + return nil, status.Error(codes.InvalidArgument, "ControllerPublishVolume Volume ID must be provided") + } + if nodeID == "" { + return nil, status.Error(codes.InvalidArgument, "ControllerPublishVolume Node ID must be provided") + } + + klog.V(4).Infof("ControllerPublishVolume called for volume %s on node %s", volumeID, nodeID) + + // Acquire lock for the volume + if acquired := c.Driver.volumeLocks.TryAcquire(volumeID); !acquired { + return nil, status.Errorf(codes.Aborted, "concurrent operation in progress for volume: %s", volumeID) + } + defer c.Driver.volumeLocks.Release(volumeID) + + nqn := volumeID + device, exists := c.deviceRegistry.GetDeviceByNQN(nqn) + if !exists || !device.IsAllocated { + klog.Errorf("Volume %s not found or not allocated for ControllerPublishVolume", volumeID) + return nil, status.Errorf(codes.NotFound, "volume %s not found or not allocated", volumeID) + } + + return &csi.ControllerPublishVolumeResponse{ + PublishContext: map[string]string{}, + }, nil } -func (c *ControllerServer) ControllerUnpublishVolume(ctx context.Context, request *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "ControllerUnpublishVolume not implement") +// ControllerUnpublishVolume detaches the given volume from the node +func (c *ControllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) { + volumeID := req.GetVolumeId() + nodeID := req.GetNodeId() // Used for logging, actual unpublish might not be node-specific at controller level + if !isValidVolumeID(volumeID) { + return nil, status.Error(codes.InvalidArgument, "ControllerUnpublishVolume Volume ID must be provided") + } + + klog.V(4).Infof("ControllerUnpublishVolume called for volume %s from node %s", volumeID, nodeID) + + // Acquire lock for the volume + if acquired := c.Driver.volumeLocks.TryAcquire(volumeID); !acquired { + return nil, status.Errorf(codes.Aborted, "concurrent operation in progress for volume: %s", volumeID) + } + defer c.Driver.volumeLocks.Release(volumeID) + + nqn := volumeID + _, exists := c.deviceRegistry.GetDeviceByNQN(nqn) + if !exists { + klog.Warningf("ControllerUnpublishVolume: Volume %s not found. Assuming already unpublished or never existed. Returning success as per idempotency.", volumeID) + return &csi.ControllerUnpublishVolumeResponse{}, nil + } + + return &csi.ControllerUnpublishVolumeResponse{}, nil } func (c *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, request *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) { diff --git a/pkg/nvmf/device_registry.go b/pkg/nvmf/device_registry.go index 6a5b6d2..77709df 100644 --- a/pkg/nvmf/device_registry.go +++ b/pkg/nvmf/device_registry.go @@ -233,6 +233,15 @@ func (r *DeviceRegistry) ReleaseDevice(nqn string) { klog.V(4).Infof("[%d/%d] Released volume %s", len(r.devices)-len(r.availableNQNs), len(r.devices), nqn) } +// GetDeviceByNQN returns device info for a given NQN +func (r *DeviceRegistry) GetDeviceByNQN(nqn string) (*VolumeInfo, bool) { + r.mutex.RLock() + defer r.mutex.RUnlock() + + device, exists := r.devices[nqn] + return device, exists +} + // discoverNVMeDevices runs NVMe discovery and returns available targets func discoverNVMeDevices(params map[string]string) (map[string]*nvmfDiskInfo, error) { if params == nil { diff --git a/pkg/nvmf/driver.go b/pkg/nvmf/driver.go index 3ee3091..0b809f6 100644 --- a/pkg/nvmf/driver.go +++ b/pkg/nvmf/driver.go @@ -75,6 +75,7 @@ func NewDriver(conf *GlobalConfig) *driver { func (d *driver) Run(conf *GlobalConfig) { d.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{ csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME, + csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME, }) d.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{ csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,