mirror of
https://github.com/kubernetes-csi/csi-driver-nvmf.git
synced 2025-09-13 13:34:30 +00:00
feat: implement ControllerPublishVolume()
and ControllerUnpublishVolume()
Signed-off-by: cheolho.kang <cheolho.kang@samsung.com>
This commit is contained in:
@@ -84,7 +84,7 @@ rules:
|
||||
verbs: ["get", "list", "watch"]
|
||||
- apiGroups: ["storage.k8s.io"]
|
||||
resources: ["volumeattachments"]
|
||||
verbs: ["get", "list", "watch", "update", "patch"]
|
||||
verbs: ["get", "list", "watch", "update", "patch", "create", "delete"]
|
||||
- apiGroups: ["storage.k8s.io"]
|
||||
resources: ["volumeattachments/status"]
|
||||
verbs: ["patch"]
|
||||
|
@@ -159,12 +159,61 @@ func (c *ControllerServer) ControllerGetVolume(ctx context.Context, request *csi
|
||||
return nil, status.Errorf(codes.Unimplemented, "ControllerGetVolume not implement")
|
||||
}
|
||||
|
||||
// ControllerPublishVolume attaches the given volume to the node
|
||||
func (c *ControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "ControllerPublishVolume not implement")
|
||||
volumeID := req.GetVolumeId()
|
||||
nodeID := req.GetNodeId()
|
||||
if !isValidVolumeID(volumeID) {
|
||||
return nil, status.Error(codes.InvalidArgument, "ControllerPublishVolume Volume ID must be provided")
|
||||
}
|
||||
if nodeID == "" {
|
||||
return nil, status.Error(codes.InvalidArgument, "ControllerPublishVolume Node ID must be provided")
|
||||
}
|
||||
|
||||
klog.V(4).Infof("ControllerPublishVolume called for volume %s on node %s", volumeID, nodeID)
|
||||
|
||||
// Acquire lock for the volume
|
||||
if acquired := c.Driver.volumeLocks.TryAcquire(volumeID); !acquired {
|
||||
return nil, status.Errorf(codes.Aborted, "concurrent operation in progress for volume: %s", volumeID)
|
||||
}
|
||||
defer c.Driver.volumeLocks.Release(volumeID)
|
||||
|
||||
nqn := volumeID
|
||||
device, exists := c.deviceRegistry.GetDeviceByNQN(nqn)
|
||||
if !exists || !device.IsAllocated {
|
||||
klog.Errorf("Volume %s not found or not allocated for ControllerPublishVolume", volumeID)
|
||||
return nil, status.Errorf(codes.NotFound, "volume %s not found or not allocated", volumeID)
|
||||
}
|
||||
|
||||
return &csi.ControllerPublishVolumeResponse{
|
||||
PublishContext: map[string]string{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *ControllerServer) ControllerUnpublishVolume(ctx context.Context, request *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "ControllerUnpublishVolume not implement")
|
||||
// ControllerUnpublishVolume detaches the given volume from the node
|
||||
func (c *ControllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
|
||||
volumeID := req.GetVolumeId()
|
||||
nodeID := req.GetNodeId() // Used for logging, actual unpublish might not be node-specific at controller level
|
||||
if !isValidVolumeID(volumeID) {
|
||||
return nil, status.Error(codes.InvalidArgument, "ControllerUnpublishVolume Volume ID must be provided")
|
||||
}
|
||||
|
||||
klog.V(4).Infof("ControllerUnpublishVolume called for volume %s from node %s", volumeID, nodeID)
|
||||
|
||||
// Acquire lock for the volume
|
||||
if acquired := c.Driver.volumeLocks.TryAcquire(volumeID); !acquired {
|
||||
return nil, status.Errorf(codes.Aborted, "concurrent operation in progress for volume: %s", volumeID)
|
||||
}
|
||||
defer c.Driver.volumeLocks.Release(volumeID)
|
||||
|
||||
nqn := volumeID
|
||||
_, exists := c.deviceRegistry.GetDeviceByNQN(nqn)
|
||||
if !exists {
|
||||
klog.Warningf("ControllerUnpublishVolume: Volume %s not found. Assuming already unpublished or never existed. Returning success as per idempotency.", volumeID)
|
||||
return &csi.ControllerUnpublishVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
return &csi.ControllerUnpublishVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
func (c *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, request *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
|
||||
|
@@ -233,6 +233,15 @@ func (r *DeviceRegistry) ReleaseDevice(nqn string) {
|
||||
klog.V(4).Infof("[%d/%d] Released volume %s", len(r.devices)-len(r.availableNQNs), len(r.devices), nqn)
|
||||
}
|
||||
|
||||
// GetDeviceByNQN returns device info for a given NQN
|
||||
func (r *DeviceRegistry) GetDeviceByNQN(nqn string) (*VolumeInfo, bool) {
|
||||
r.mutex.RLock()
|
||||
defer r.mutex.RUnlock()
|
||||
|
||||
device, exists := r.devices[nqn]
|
||||
return device, exists
|
||||
}
|
||||
|
||||
// discoverNVMeDevices runs NVMe discovery and returns available targets
|
||||
func discoverNVMeDevices(params map[string]string) (map[string]*nvmfDiskInfo, error) {
|
||||
if params == nil {
|
||||
|
@@ -75,6 +75,7 @@ func NewDriver(conf *GlobalConfig) *driver {
|
||||
func (d *driver) Run(conf *GlobalConfig) {
|
||||
d.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{
|
||||
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
|
||||
csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME,
|
||||
})
|
||||
d.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{
|
||||
csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
|
||||
|
Reference in New Issue
Block a user