mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 12:15:52 +00:00
Merge pull request #86968 from gnufied/add-extra-csi-fields
Add extra fields in node expansion CSI call
This commit is contained in:
commit
47a9952337
@ -107,9 +107,9 @@ func (m *csiBlockMapper) GetGlobalMapPath(spec *volume.Spec) (string, error) {
|
||||
return dir, nil
|
||||
}
|
||||
|
||||
// getStagingPath returns a staging path for a directory (on the node) that should be used on NodeStageVolume/NodeUnstageVolume
|
||||
// GetStagingPath returns a staging path for a directory (on the node) that should be used on NodeStageVolume/NodeUnstageVolume
|
||||
// Example: plugins/kubernetes.io/csi/volumeDevices/staging/{specName}
|
||||
func (m *csiBlockMapper) getStagingPath() string {
|
||||
func (m *csiBlockMapper) GetStagingPath() string {
|
||||
return filepath.Join(m.plugin.host.GetVolumeDevicePluginDir(CSIPluginName), "staging", m.specName)
|
||||
}
|
||||
|
||||
@ -143,7 +143,7 @@ func (m *csiBlockMapper) stageVolumeForBlock(
|
||||
) (string, error) {
|
||||
klog.V(4).Infof(log("blockMapper.stageVolumeForBlock called"))
|
||||
|
||||
stagingPath := m.getStagingPath()
|
||||
stagingPath := m.GetStagingPath()
|
||||
klog.V(4).Infof(log("blockMapper.stageVolumeForBlock stagingPath set [%s]", stagingPath))
|
||||
|
||||
// Check whether "STAGE_UNSTAGE_VOLUME" is set
|
||||
@ -237,7 +237,7 @@ func (m *csiBlockMapper) publishVolumeForBlock(
|
||||
ctx,
|
||||
m.volumeID,
|
||||
m.readOnly,
|
||||
m.getStagingPath(),
|
||||
m.GetStagingPath(),
|
||||
publishPath,
|
||||
accessMode,
|
||||
publishVolumeInfo,
|
||||
@ -255,26 +255,26 @@ func (m *csiBlockMapper) publishVolumeForBlock(
|
||||
}
|
||||
|
||||
// SetUpDevice ensures the device is attached returns path where the device is located.
|
||||
func (m *csiBlockMapper) SetUpDevice() error {
|
||||
func (m *csiBlockMapper) SetUpDevice() (string, error) {
|
||||
if !m.plugin.blockEnabled {
|
||||
return errors.New("CSIBlockVolume feature not enabled")
|
||||
return "", errors.New("CSIBlockVolume feature not enabled")
|
||||
}
|
||||
klog.V(4).Infof(log("blockMapper.SetUpDevice called"))
|
||||
|
||||
// Get csiSource from spec
|
||||
if m.spec == nil {
|
||||
return errors.New(log("blockMapper.SetUpDevice spec is nil"))
|
||||
return "", errors.New(log("blockMapper.SetUpDevice spec is nil"))
|
||||
}
|
||||
|
||||
csiSource, err := getCSISourceFromSpec(m.spec)
|
||||
if err != nil {
|
||||
return errors.New(log("blockMapper.SetUpDevice failed to get CSI persistent source: %v", err))
|
||||
return "", errors.New(log("blockMapper.SetUpDevice failed to get CSI persistent source: %v", err))
|
||||
}
|
||||
|
||||
driverName := csiSource.Driver
|
||||
skip, err := m.plugin.skipAttach(driverName)
|
||||
if err != nil {
|
||||
return errors.New(log("blockMapper.SetupDevice failed to check CSIDriver for %s: %v", driverName, err))
|
||||
return "", errors.New(log("blockMapper.SetupDevice failed to check CSIDriver for %s: %v", driverName, err))
|
||||
}
|
||||
|
||||
var attachment *storage.VolumeAttachment
|
||||
@ -284,7 +284,7 @@ func (m *csiBlockMapper) SetUpDevice() error {
|
||||
attachID := getAttachmentName(csiSource.VolumeHandle, csiSource.Driver, nodeName)
|
||||
attachment, err = m.k8s.StorageV1().VolumeAttachments().Get(context.TODO(), attachID, meta.GetOptions{})
|
||||
if err != nil {
|
||||
return errors.New(log("blockMapper.SetupDevice failed to get volume attachment [id=%v]: %v", attachID, err))
|
||||
return "", errors.New(log("blockMapper.SetupDevice failed to get volume attachment [id=%v]: %v", attachID, err))
|
||||
}
|
||||
}
|
||||
|
||||
@ -299,11 +299,11 @@ func (m *csiBlockMapper) SetUpDevice() error {
|
||||
|
||||
csiClient, err := m.csiClientGetter.Get()
|
||||
if err != nil {
|
||||
return errors.New(log("blockMapper.SetUpDevice failed to get CSI client: %v", err))
|
||||
return "", errors.New(log("blockMapper.SetUpDevice failed to get CSI client: %v", err))
|
||||
}
|
||||
|
||||
// Call NodeStageVolume
|
||||
_, err = m.stageVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment)
|
||||
stagingPath, err := m.stageVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment)
|
||||
if err != nil {
|
||||
if volumetypes.IsOperationFinishedError(err) {
|
||||
cleanupErr := m.cleanupOrphanDeviceFiles()
|
||||
@ -312,10 +312,10 @@ func (m *csiBlockMapper) SetUpDevice() error {
|
||||
klog.V(4).Infof("Failed to clean up block volume directory %s", cleanupErr)
|
||||
}
|
||||
}
|
||||
return err
|
||||
return "", err
|
||||
}
|
||||
|
||||
return nil
|
||||
return stagingPath, nil
|
||||
}
|
||||
|
||||
func (m *csiBlockMapper) MapPodDevice() (string, error) {
|
||||
@ -435,7 +435,7 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error
|
||||
}
|
||||
|
||||
// Call NodeUnstageVolume
|
||||
stagingPath := m.getStagingPath()
|
||||
stagingPath := m.GetStagingPath()
|
||||
if _, err := os.Stat(stagingPath); err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
klog.V(4).Infof(log("blockMapper.TearDownDevice stagingPath(%s) has already been deleted, skip calling NodeUnstageVolume", stagingPath))
|
||||
@ -471,7 +471,7 @@ func (m *csiBlockMapper) cleanupOrphanDeviceFiles() error {
|
||||
|
||||
// Remove artifacts of NodeStage.
|
||||
// stagingPath: xxx/plugins/kubernetes.io/csi/volumeDevices/staging/<volume name>
|
||||
stagingPath := m.getStagingPath()
|
||||
stagingPath := m.GetStagingPath()
|
||||
if err := os.Remove(stagingPath); err != nil && !os.IsNotExist(err) {
|
||||
return errors.New(log("failed to delete volume staging path [%s]: %v", stagingPath, err))
|
||||
}
|
||||
|
@ -123,7 +123,7 @@ func TestBlockMapperGetStagingPath(t *testing.T) {
|
||||
t.Fatalf("Failed to make a new Mapper: %v", err)
|
||||
}
|
||||
|
||||
path := csiMapper.getStagingPath()
|
||||
path := csiMapper.GetStagingPath()
|
||||
|
||||
if tc.path != path {
|
||||
t.Errorf("expecting path %s, got %s", tc.path, path)
|
||||
@ -234,13 +234,12 @@ func TestBlockMapperSetupDevice(t *testing.T) {
|
||||
}
|
||||
t.Log("created attachement ", attachID)
|
||||
|
||||
err = csiMapper.SetUpDevice()
|
||||
stagingPath, err := csiMapper.SetUpDevice()
|
||||
if err != nil {
|
||||
t.Fatalf("mapper failed to SetupDevice: %v", err)
|
||||
}
|
||||
|
||||
// Check if NodeStageVolume staged to the right path
|
||||
stagingPath := csiMapper.getStagingPath()
|
||||
svols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodeStagedVolumes()
|
||||
svol, ok := svols[csiMapper.volumeID]
|
||||
if !ok {
|
||||
@ -278,7 +277,7 @@ func TestBlockMapperSetupDeviceError(t *testing.T) {
|
||||
}
|
||||
t.Log("created attachement ", attachID)
|
||||
|
||||
err = csiMapper.SetUpDevice()
|
||||
stagingPath, err := csiMapper.SetUpDevice()
|
||||
if err == nil {
|
||||
t.Fatal("mapper unexpectedly succeeded")
|
||||
}
|
||||
@ -293,7 +292,7 @@ func TestBlockMapperSetupDeviceError(t *testing.T) {
|
||||
if _, err := os.Stat(devDir); err == nil {
|
||||
t.Errorf("volume publish device directory %s was not deleted", devDir)
|
||||
}
|
||||
stagingPath := csiMapper.getStagingPath()
|
||||
|
||||
if _, err := os.Stat(stagingPath); err == nil {
|
||||
t.Errorf("volume staging path %s was not deleted", stagingPath)
|
||||
}
|
||||
@ -475,12 +474,11 @@ func TestVolumeSetupTeardown(t *testing.T) {
|
||||
}
|
||||
t.Log("created attachement ", attachID)
|
||||
|
||||
err = csiMapper.SetUpDevice()
|
||||
stagingPath, err := csiMapper.SetUpDevice()
|
||||
if err != nil {
|
||||
t.Fatalf("mapper failed to SetupDevice: %v", err)
|
||||
}
|
||||
// Check if NodeStageVolume staged to the right path
|
||||
stagingPath := csiMapper.getStagingPath()
|
||||
svols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodeStagedVolumes()
|
||||
svol, ok := svols[csiMapper.volumeID]
|
||||
if !ok {
|
||||
|
@ -54,7 +54,7 @@ type csiClient interface {
|
||||
fsType string,
|
||||
mountOptions []string,
|
||||
) error
|
||||
NodeExpandVolume(ctx context.Context, volumeid, volumePath string, newSize resource.Quantity) (resource.Quantity, error)
|
||||
NodeExpandVolume(ctx context.Context, rsOpts csiResizeOptions) (resource.Quantity, error)
|
||||
NodeUnpublishVolume(
|
||||
ctx context.Context,
|
||||
volID string,
|
||||
@ -95,6 +95,20 @@ type csiDriverClient struct {
|
||||
nodeV1ClientCreator nodeV1ClientCreator
|
||||
}
|
||||
|
||||
type csiResizeOptions struct {
|
||||
volumeID string
|
||||
// volumePath is path where volume is available. It could be:
|
||||
// - path where node is staged if NodeExpandVolume is called after NodeStageVolume
|
||||
// - path where volume is published if NodeExpandVolume is called after NodePublishVolume
|
||||
// DEPRECATION NOTICE: in future NodeExpandVolume will be always called after NodePublish
|
||||
volumePath string
|
||||
stagingTargetPath string
|
||||
fsType string
|
||||
accessMode api.PersistentVolumeAccessMode
|
||||
newSize resource.Quantity
|
||||
mountOptions []string
|
||||
}
|
||||
|
||||
var _ csiClient = &csiDriverClient{}
|
||||
|
||||
type nodeV1ClientCreator func(addr csiAddr) (
|
||||
@ -245,36 +259,61 @@ func (c *csiDriverClient) NodePublishVolume(
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *csiDriverClient) NodeExpandVolume(ctx context.Context, volumeID, volumePath string, newSize resource.Quantity) (resource.Quantity, error) {
|
||||
func (c *csiDriverClient) NodeExpandVolume(ctx context.Context, opts csiResizeOptions) (resource.Quantity, error) {
|
||||
if c.nodeV1ClientCreator == nil {
|
||||
return newSize, fmt.Errorf("version of CSI driver does not support volume expansion")
|
||||
return opts.newSize, fmt.Errorf("version of CSI driver does not support volume expansion")
|
||||
}
|
||||
|
||||
if volumeID == "" {
|
||||
return newSize, errors.New("missing volume id")
|
||||
if opts.volumeID == "" {
|
||||
return opts.newSize, errors.New("missing volume id")
|
||||
}
|
||||
if volumePath == "" {
|
||||
return newSize, errors.New("missing volume path")
|
||||
if opts.volumePath == "" {
|
||||
return opts.newSize, errors.New("missing volume path")
|
||||
}
|
||||
|
||||
if newSize.Value() < 0 {
|
||||
return newSize, errors.New("size can not be less than 0")
|
||||
if opts.newSize.Value() < 0 {
|
||||
return opts.newSize, errors.New("size can not be less than 0")
|
||||
}
|
||||
|
||||
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
|
||||
if err != nil {
|
||||
return newSize, err
|
||||
return opts.newSize, err
|
||||
}
|
||||
defer closer.Close()
|
||||
|
||||
req := &csipbv1.NodeExpandVolumeRequest{
|
||||
VolumeId: volumeID,
|
||||
VolumePath: volumePath,
|
||||
CapacityRange: &csipbv1.CapacityRange{RequiredBytes: newSize.Value()},
|
||||
VolumeId: opts.volumeID,
|
||||
VolumePath: opts.volumePath,
|
||||
CapacityRange: &csipbv1.CapacityRange{RequiredBytes: opts.newSize.Value()},
|
||||
VolumeCapability: &csipbv1.VolumeCapability{
|
||||
AccessMode: &csipbv1.VolumeCapability_AccessMode{
|
||||
Mode: asCSIAccessModeV1(opts.accessMode),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// not all CSI drivers support NodeStageUnstage and hence the StagingTargetPath
|
||||
// should only be set when available
|
||||
if opts.stagingTargetPath != "" {
|
||||
req.StagingTargetPath = opts.stagingTargetPath
|
||||
}
|
||||
|
||||
if opts.fsType == fsTypeBlockName {
|
||||
req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{
|
||||
Block: &csipbv1.VolumeCapability_BlockVolume{},
|
||||
}
|
||||
} else {
|
||||
req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{
|
||||
Mount: &csipbv1.VolumeCapability_MountVolume{
|
||||
FsType: opts.fsType,
|
||||
MountFlags: opts.mountOptions,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
resp, err := nodeClient.NodeExpandVolume(ctx, req)
|
||||
if err != nil {
|
||||
return newSize, err
|
||||
return opts.newSize, err
|
||||
}
|
||||
updatedQuantity := resource.NewQuantity(resp.CapacityBytes, resource.BinarySI)
|
||||
return *updatedQuantity, nil
|
||||
|
@ -284,16 +284,34 @@ func (c *fakeCsiDriverClient) NodeSupportsStageUnstage(ctx context.Context) (boo
|
||||
return stageUnstageSet, nil
|
||||
}
|
||||
|
||||
func (c *fakeCsiDriverClient) NodeExpandVolume(ctx context.Context, volumeid, volumePath string, newSize resource.Quantity) (resource.Quantity, error) {
|
||||
func (c *fakeCsiDriverClient) NodeExpandVolume(ctx context.Context, opts csiResizeOptions) (resource.Quantity, error) {
|
||||
c.t.Log("calling fake.NodeExpandVolume")
|
||||
req := &csipbv1.NodeExpandVolumeRequest{
|
||||
VolumeId: volumeid,
|
||||
VolumePath: volumePath,
|
||||
CapacityRange: &csipbv1.CapacityRange{RequiredBytes: newSize.Value()},
|
||||
VolumeId: opts.volumeID,
|
||||
VolumePath: opts.volumePath,
|
||||
StagingTargetPath: opts.stagingTargetPath,
|
||||
CapacityRange: &csipbv1.CapacityRange{RequiredBytes: opts.newSize.Value()},
|
||||
VolumeCapability: &csipbv1.VolumeCapability{
|
||||
AccessMode: &csipbv1.VolumeCapability_AccessMode{
|
||||
Mode: asCSIAccessModeV1(opts.accessMode),
|
||||
},
|
||||
},
|
||||
}
|
||||
if opts.fsType == fsTypeBlockName {
|
||||
req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{
|
||||
Block: &csipbv1.VolumeCapability_BlockVolume{},
|
||||
}
|
||||
} else {
|
||||
req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{
|
||||
Mount: &csipbv1.VolumeCapability_MountVolume{
|
||||
FsType: opts.fsType,
|
||||
MountFlags: opts.mountOptions,
|
||||
},
|
||||
}
|
||||
}
|
||||
resp, err := c.nodeClient.NodeExpandVolume(ctx, req)
|
||||
if err != nil {
|
||||
return newSize, err
|
||||
return opts.newSize, err
|
||||
}
|
||||
updatedQuantity := resource.NewQuantity(resp.CapacityBytes, resource.BinarySI)
|
||||
return *updatedQuantity, nil
|
||||
@ -635,7 +653,8 @@ func TestNodeExpandVolume(t *testing.T) {
|
||||
return nodeClient, fakeCloser, nil
|
||||
},
|
||||
}
|
||||
_, err := client.NodeExpandVolume(context.Background(), tc.volID, tc.volumePath, tc.newSize)
|
||||
opts := csiResizeOptions{volumeID: tc.volID, volumePath: tc.volumePath, newSize: tc.newSize}
|
||||
_, err := client.NodeExpandVolume(context.Background(), opts)
|
||||
checkErr(t, tc.mustFail, err)
|
||||
if !tc.mustFail {
|
||||
fakeCloser.Check()
|
||||
|
@ -94,12 +94,34 @@ func (c *csiPlugin) nodeExpandWithClient(
|
||||
return false, nil
|
||||
}
|
||||
|
||||
volumeTargetPath := resizeOptions.DeviceMountPath
|
||||
if !fsVolume {
|
||||
volumeTargetPath = resizeOptions.DevicePath
|
||||
pv := resizeOptions.VolumeSpec.PersistentVolume
|
||||
if pv == nil {
|
||||
return false, fmt.Errorf("Expander.NodeExpand failed to find associated PersistentVolume for plugin %s", c.GetPluginName())
|
||||
}
|
||||
|
||||
_, err = csClient.NodeExpandVolume(ctx, csiSource.VolumeHandle, volumeTargetPath, resizeOptions.NewSize)
|
||||
opts := csiResizeOptions{
|
||||
volumePath: resizeOptions.DeviceMountPath,
|
||||
stagingTargetPath: resizeOptions.DeviceStagePath,
|
||||
volumeID: csiSource.VolumeHandle,
|
||||
newSize: resizeOptions.NewSize,
|
||||
fsType: csiSource.FSType,
|
||||
accessMode: api.ReadWriteOnce,
|
||||
mountOptions: pv.Spec.MountOptions,
|
||||
}
|
||||
|
||||
if !fsVolume {
|
||||
// for block volumes the volumePath in CSI NodeExpandvolumeRequest is
|
||||
// basically same as DevicePath because block devices are not mounted and hence
|
||||
// DeviceMountPath does not get populated in resizeOptions.DeviceMountPath
|
||||
opts.volumePath = resizeOptions.DevicePath
|
||||
opts.fsType = fsTypeBlockName
|
||||
}
|
||||
|
||||
if pv.Spec.AccessModes != nil {
|
||||
opts.accessMode = pv.Spec.AccessModes[0]
|
||||
}
|
||||
|
||||
_, err = csClient.NodeExpandVolume(ctx, opts)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("Expander.NodeExpand failed to expand the volume : %v", err)
|
||||
}
|
||||
|
@ -26,24 +26,26 @@ import (
|
||||
|
||||
func TestNodeExpand(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
nodeExpansion bool
|
||||
nodeStageSet bool
|
||||
volumePhase volume.CSIVolumePhaseType
|
||||
success bool
|
||||
fsVolume bool
|
||||
name string
|
||||
nodeExpansion bool
|
||||
nodeStageSet bool
|
||||
volumePhase volume.CSIVolumePhaseType
|
||||
success bool
|
||||
fsVolume bool
|
||||
deviceStagePath string
|
||||
}{
|
||||
{
|
||||
name: "when node expansion is not set",
|
||||
success: false,
|
||||
},
|
||||
{
|
||||
name: "when nodeExpansion=on, nodeStage=on, volumePhase=staged",
|
||||
nodeExpansion: true,
|
||||
nodeStageSet: true,
|
||||
volumePhase: volume.CSIVolumeStaged,
|
||||
success: true,
|
||||
fsVolume: true,
|
||||
name: "when nodeExpansion=on, nodeStage=on, volumePhase=staged",
|
||||
nodeExpansion: true,
|
||||
nodeStageSet: true,
|
||||
volumePhase: volume.CSIVolumeStaged,
|
||||
success: true,
|
||||
fsVolume: true,
|
||||
deviceStagePath: "/foo/bar",
|
||||
},
|
||||
{
|
||||
name: "when nodeExpansion=on, nodeStage=off, volumePhase=staged",
|
||||
@ -88,21 +90,42 @@ func TestNodeExpand(t *testing.T) {
|
||||
VolumeSpec: spec,
|
||||
NewSize: newSize,
|
||||
DeviceMountPath: "/foo/bar",
|
||||
DeviceStagePath: "/foo/bar",
|
||||
DevicePath: "/mnt/foobar",
|
||||
CSIVolumePhase: tc.volumePhase,
|
||||
}
|
||||
csiSource, _ := getCSISourceFromSpec(resizeOptions.VolumeSpec)
|
||||
|
||||
csClient := setupClientWithExpansion(t, tc.nodeStageSet, tc.nodeExpansion)
|
||||
|
||||
fakeCSIClient, _ := csClient.(*fakeCsiDriverClient)
|
||||
fakeNodeClient := fakeCSIClient.nodeClient
|
||||
ok, err := plug.nodeExpandWithClient(resizeOptions, csiSource, csClient, tc.fsVolume)
|
||||
|
||||
// verify device staging targer path
|
||||
stagingTargetPath := fakeNodeClient.FakeNodeExpansionRequest.GetStagingTargetPath()
|
||||
if tc.deviceStagePath != "" && tc.deviceStagePath != stagingTargetPath {
|
||||
t.Errorf("For %s: expected staging path %s got %s", tc.name, tc.deviceStagePath, stagingTargetPath)
|
||||
}
|
||||
|
||||
if ok != tc.success {
|
||||
if err != nil {
|
||||
t.Errorf("For %s : expected %v got %v with %v", tc.name, tc.success, ok, err)
|
||||
} else {
|
||||
t.Errorf("For %s : expected %v got %v", tc.name, tc.success, ok)
|
||||
}
|
||||
|
||||
}
|
||||
// verify volume capability received by node expansion request
|
||||
if tc.success {
|
||||
capability := fakeNodeClient.FakeNodeExpansionRequest.GetVolumeCapability()
|
||||
if tc.fsVolume {
|
||||
if capability.GetMount() == nil {
|
||||
t.Errorf("For %s: expected mount accesstype got: %v", tc.name, capability)
|
||||
}
|
||||
} else {
|
||||
if capability.GetBlock() == nil {
|
||||
t.Errorf("For %s: expected block accesstype got: %v", tc.name, capability)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -79,14 +79,15 @@ type CSIVolume struct {
|
||||
|
||||
// NodeClient returns CSI node client
|
||||
type NodeClient struct {
|
||||
nodePublishedVolumes map[string]CSIVolume
|
||||
nodeStagedVolumes map[string]CSIVolume
|
||||
stageUnstageSet bool
|
||||
expansionSet bool
|
||||
volumeStatsSet bool
|
||||
nodeGetInfoResp *csipb.NodeGetInfoResponse
|
||||
nodeVolumeStatsResp *csipb.NodeGetVolumeStatsResponse
|
||||
nextErr error
|
||||
nodePublishedVolumes map[string]CSIVolume
|
||||
nodeStagedVolumes map[string]CSIVolume
|
||||
stageUnstageSet bool
|
||||
expansionSet bool
|
||||
volumeStatsSet bool
|
||||
nodeGetInfoResp *csipb.NodeGetInfoResponse
|
||||
nodeVolumeStatsResp *csipb.NodeGetVolumeStatsResponse
|
||||
FakeNodeExpansionRequest *csipb.NodeExpandVolumeRequest
|
||||
nextErr error
|
||||
}
|
||||
|
||||
// NewNodeClient returns fake node client
|
||||
@ -296,6 +297,8 @@ func (f *NodeClient) NodeExpandVolume(ctx context.Context, req *csipb.NodeExpand
|
||||
return nil, errors.New("required bytes should be greater than 0")
|
||||
}
|
||||
|
||||
f.FakeNodeExpansionRequest = req
|
||||
|
||||
resp := &csipb.NodeExpandVolumeResponse{
|
||||
CapacityBytes: req.GetCapacityRange().RequiredBytes,
|
||||
}
|
||||
|
@ -610,8 +610,8 @@ var _ volume.BlockVolumeMapper = &localVolumeMapper{}
|
||||
var _ volume.CustomBlockVolumeMapper = &localVolumeMapper{}
|
||||
|
||||
// SetUpDevice prepares the volume to the node by the plugin specific way.
|
||||
func (m *localVolumeMapper) SetUpDevice() error {
|
||||
return nil
|
||||
func (m *localVolumeMapper) SetUpDevice() (string, error) {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
// MapPodDevice provides physical device path for the local PV.
|
||||
@ -621,6 +621,11 @@ func (m *localVolumeMapper) MapPodDevice() (string, error) {
|
||||
return globalPath, nil
|
||||
}
|
||||
|
||||
// GetStagingPath returns
|
||||
func (m *localVolumeMapper) GetStagingPath() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
// localVolumeUnmapper implements the BlockVolumeUnmapper interface for local volumes.
|
||||
type localVolumeUnmapper struct {
|
||||
*localVolume
|
||||
|
@ -375,7 +375,7 @@ func TestMapUnmap(t *testing.T) {
|
||||
var devPath string
|
||||
|
||||
if customMapper, ok := mapper.(volume.CustomBlockVolumeMapper); ok {
|
||||
err = customMapper.SetUpDevice()
|
||||
_, err = customMapper.SetUpDevice()
|
||||
if err != nil {
|
||||
t.Errorf("Failed to SetUpDevice, err: %v", err)
|
||||
}
|
||||
|
@ -115,6 +115,9 @@ type NodeResizeOptions struct {
|
||||
// it would be location where volume was mounted for the pod
|
||||
DeviceMountPath string
|
||||
|
||||
// DeviceStagingPath stores location where the volume is staged
|
||||
DeviceStagePath string
|
||||
|
||||
NewSize resource.Quantity
|
||||
OldSize resource.Quantity
|
||||
|
||||
|
@ -956,46 +956,50 @@ func (fv *FakeVolume) TearDownAt(dir string) error {
|
||||
}
|
||||
|
||||
// Block volume support
|
||||
func (fv *FakeVolume) SetUpDevice() error {
|
||||
func (fv *FakeVolume) SetUpDevice() (string, error) {
|
||||
fv.Lock()
|
||||
defer fv.Unlock()
|
||||
if fv.VolName == TimeoutOnMountDeviceVolumeName {
|
||||
fv.DeviceMountState[fv.VolName] = deviceMountUncertain
|
||||
return volumetypes.NewUncertainProgressError("mount failed")
|
||||
return "", volumetypes.NewUncertainProgressError("mount failed")
|
||||
}
|
||||
if fv.VolName == FailMountDeviceVolumeName {
|
||||
fv.DeviceMountState[fv.VolName] = deviceNotMounted
|
||||
return fmt.Errorf("error mapping disk: %s", fv.VolName)
|
||||
return "", fmt.Errorf("error mapping disk: %s", fv.VolName)
|
||||
}
|
||||
|
||||
if fv.VolName == TimeoutAndFailOnMountDeviceVolumeName {
|
||||
_, ok := fv.DeviceMountState[fv.VolName]
|
||||
if !ok {
|
||||
fv.DeviceMountState[fv.VolName] = deviceMountUncertain
|
||||
return volumetypes.NewUncertainProgressError("timed out mounting error")
|
||||
return "", volumetypes.NewUncertainProgressError("timed out mounting error")
|
||||
}
|
||||
fv.DeviceMountState[fv.VolName] = deviceNotMounted
|
||||
return fmt.Errorf("error mapping disk: %s", fv.VolName)
|
||||
return "", fmt.Errorf("error mapping disk: %s", fv.VolName)
|
||||
}
|
||||
|
||||
if fv.VolName == SuccessAndTimeoutDeviceName {
|
||||
_, ok := fv.DeviceMountState[fv.VolName]
|
||||
if ok {
|
||||
fv.DeviceMountState[fv.VolName] = deviceMountUncertain
|
||||
return volumetypes.NewUncertainProgressError("error mounting state")
|
||||
return "", volumetypes.NewUncertainProgressError("error mounting state")
|
||||
}
|
||||
}
|
||||
if fv.VolName == SuccessAndFailOnMountDeviceName {
|
||||
_, ok := fv.DeviceMountState[fv.VolName]
|
||||
if ok {
|
||||
return fmt.Errorf("error mapping disk: %s", fv.VolName)
|
||||
return "", fmt.Errorf("error mapping disk: %s", fv.VolName)
|
||||
}
|
||||
}
|
||||
|
||||
fv.DeviceMountState[fv.VolName] = deviceMounted
|
||||
fv.SetUpDeviceCallCount++
|
||||
|
||||
return nil
|
||||
return "", nil
|
||||
}
|
||||
|
||||
func (fv *FakeVolume) GetStagingPath() string {
|
||||
return filepath.Join(fv.Plugin.Host.GetVolumeDevicePluginDir(utilstrings.EscapeQualifiedName(fv.Plugin.PluginName)), "staging", fv.VolName)
|
||||
}
|
||||
|
||||
// Block volume support
|
||||
|
@ -603,7 +603,12 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
|
||||
return volumeToMount.GenerateError("MountVolume.MarkDeviceAsMounted failed", markDeviceMountedErr)
|
||||
}
|
||||
|
||||
// If volume expansion is performed after MountDevice but before SetUp then
|
||||
// deviceMountPath and deviceStagePath is going to be the same.
|
||||
// Deprecation: Calling NodeExpandVolume after NodeStage/MountDevice will be deprecated
|
||||
// in a future version of k8s.
|
||||
resizeOptions.DeviceMountPath = deviceMountPath
|
||||
resizeOptions.DeviceStagePath = deviceMountPath
|
||||
resizeOptions.CSIVolumePhase = volume.CSIVolumeStaged
|
||||
|
||||
// NodeExpandVolume will resize the file system if user has requested a resize of
|
||||
@ -946,6 +951,7 @@ func (og *operationGenerator) GenerateMapVolumeFunc(
|
||||
|
||||
mapVolumeFunc := func() (simpleErr error, detailedErr error) {
|
||||
var devicePath string
|
||||
var stagingPath string
|
||||
// Set up global map path under the given plugin directory using symbolic link
|
||||
globalMapPath, err :=
|
||||
blockVolumeMapper.GetGlobalMapPath(volumeToMount.VolumeSpec)
|
||||
@ -969,7 +975,8 @@ func (og *operationGenerator) GenerateMapVolumeFunc(
|
||||
}
|
||||
// Call SetUpDevice if blockVolumeMapper implements CustomBlockVolumeMapper
|
||||
if customBlockVolumeMapper, ok := blockVolumeMapper.(volume.CustomBlockVolumeMapper); ok {
|
||||
mapErr := customBlockVolumeMapper.SetUpDevice()
|
||||
var mapErr error
|
||||
stagingPath, mapErr = customBlockVolumeMapper.SetUpDevice()
|
||||
if mapErr != nil {
|
||||
og.markDeviceErrorState(volumeToMount, devicePath, globalMapPath, mapErr, actualStateOfWorld)
|
||||
// On failure, return error. Caller will log and retry.
|
||||
@ -1072,8 +1079,9 @@ func (og *operationGenerator) GenerateMapVolumeFunc(
|
||||
klog.V(verbosity).Infof(detailedMsg)
|
||||
|
||||
resizeOptions := volume.NodeResizeOptions{
|
||||
DevicePath: devicePath,
|
||||
CSIVolumePhase: volume.CSIVolumePublished,
|
||||
DevicePath: devicePath,
|
||||
DeviceStagePath: stagingPath,
|
||||
CSIVolumePhase: volume.CSIVolumePublished,
|
||||
}
|
||||
_, resizeError := og.nodeExpandVolume(volumeToMount, resizeOptions)
|
||||
if resizeError != nil {
|
||||
@ -1490,40 +1498,64 @@ func (og *operationGenerator) GenerateExpandInUseVolumeFunc(
|
||||
var simpleErr, detailedErr error
|
||||
resizeOptions := volume.NodeResizeOptions{
|
||||
VolumeSpec: volumeToMount.VolumeSpec,
|
||||
DevicePath: volumeToMount.DevicePath,
|
||||
}
|
||||
fsVolume, err := util.CheckVolumeModeFilesystem(volumeToMount.VolumeSpec)
|
||||
if err != nil {
|
||||
return volumeToMount.GenerateError("NodeExpandvolume.CheckVolumeModeFilesystem failed", err)
|
||||
}
|
||||
|
||||
attachableVolumePlugin, _ :=
|
||||
og.volumePluginMgr.FindAttachablePluginBySpec(volumeToMount.VolumeSpec)
|
||||
if fsVolume {
|
||||
volumeMounter, newMounterErr := volumePlugin.NewMounter(
|
||||
volumeToMount.VolumeSpec,
|
||||
volumeToMount.Pod,
|
||||
volume.VolumeOptions{})
|
||||
if newMounterErr != nil {
|
||||
return volumeToMount.GenerateError("NodeExpandVolume.NewMounter initialization failed", newMounterErr)
|
||||
}
|
||||
|
||||
if attachableVolumePlugin != nil {
|
||||
volumeAttacher, _ := attachableVolumePlugin.NewAttacher()
|
||||
if volumeAttacher != nil {
|
||||
resizeOptions.CSIVolumePhase = volume.CSIVolumeStaged
|
||||
resizeOptions.DevicePath = volumeToMount.DevicePath
|
||||
dmp, err := volumeAttacher.GetDeviceMountPath(volumeToMount.VolumeSpec)
|
||||
resizeOptions.DeviceMountPath = volumeMounter.GetPath()
|
||||
|
||||
deviceMountableVolumePlugin, _ := og.volumePluginMgr.FindDeviceMountablePluginBySpec(volumeToMount.VolumeSpec)
|
||||
var volumeDeviceMounter volume.DeviceMounter
|
||||
if deviceMountableVolumePlugin != nil {
|
||||
volumeDeviceMounter, _ = deviceMountableVolumePlugin.NewDeviceMounter()
|
||||
}
|
||||
|
||||
if volumeDeviceMounter != nil {
|
||||
deviceStagePath, err := volumeDeviceMounter.GetDeviceMountPath(volumeToMount.VolumeSpec)
|
||||
if err != nil {
|
||||
return volumeToMount.GenerateError("NodeExpandVolume.GetDeviceMountPath failed", err)
|
||||
}
|
||||
resizeOptions.DeviceMountPath = dmp
|
||||
resizeDone, simpleErr, detailedErr = og.doOnlineExpansion(volumeToMount, actualStateOfWorld, resizeOptions)
|
||||
if simpleErr != nil || detailedErr != nil {
|
||||
return simpleErr, detailedErr
|
||||
}
|
||||
if resizeDone {
|
||||
return nil, nil
|
||||
}
|
||||
resizeOptions.DeviceStagePath = deviceStagePath
|
||||
}
|
||||
} else {
|
||||
// Get block volume mapper plugin
|
||||
blockVolumePlugin, err :=
|
||||
og.volumePluginMgr.FindMapperPluginBySpec(volumeToMount.VolumeSpec)
|
||||
if err != nil {
|
||||
return volumeToMount.GenerateError("MapVolume.FindMapperPluginBySpec failed", err)
|
||||
}
|
||||
|
||||
if blockVolumePlugin == nil {
|
||||
return volumeToMount.GenerateError("MapVolume.FindMapperPluginBySpec failed to find BlockVolumeMapper plugin. Volume plugin is nil.", nil)
|
||||
}
|
||||
|
||||
blockVolumeMapper, newMapperErr := blockVolumePlugin.NewBlockVolumeMapper(
|
||||
volumeToMount.VolumeSpec,
|
||||
volumeToMount.Pod,
|
||||
volume.VolumeOptions{})
|
||||
if newMapperErr != nil {
|
||||
return volumeToMount.GenerateError("MapVolume.NewBlockVolumeMapper initialization failed", newMapperErr)
|
||||
}
|
||||
|
||||
// if plugin supports custom mappers lets add DeviceStagePath
|
||||
if customBlockVolumeMapper, ok := blockVolumeMapper.(volume.CustomBlockVolumeMapper); ok {
|
||||
resizeOptions.DeviceStagePath = customBlockVolumeMapper.GetStagingPath()
|
||||
}
|
||||
}
|
||||
// if we are here that means volume plugin does not support attach interface
|
||||
volumeMounter, newMounterErr := volumePlugin.NewMounter(
|
||||
volumeToMount.VolumeSpec,
|
||||
volumeToMount.Pod,
|
||||
volume.VolumeOptions{})
|
||||
if newMounterErr != nil {
|
||||
return volumeToMount.GenerateError("NodeExpandVolume.NewMounter initialization failed", newMounterErr)
|
||||
}
|
||||
|
||||
resizeOptions.DeviceMountPath = volumeMounter.GetPath()
|
||||
// if we are doing online expansion then volume is already published
|
||||
resizeOptions.CSIVolumePhase = volume.CSIVolumePublished
|
||||
resizeDone, simpleErr, detailedErr = og.doOnlineExpansion(volumeToMount, actualStateOfWorld, resizeOptions)
|
||||
if simpleErr != nil || detailedErr != nil {
|
||||
|
@ -174,14 +174,19 @@ type CustomBlockVolumeMapper interface {
|
||||
// For most in-tree plugins, attacher.Attach() and attacher.WaitForAttach()
|
||||
// will do necessary works.
|
||||
// This may be called more than once, so implementations must be idempotent.
|
||||
SetUpDevice() error
|
||||
// SetUpDevice returns stagingPath if device setup was successful
|
||||
SetUpDevice() (stagingPath string, err error)
|
||||
|
||||
// MapPodDevice maps the block device to a path and return the path.
|
||||
// Unique device path across kubelet node reboot is required to avoid
|
||||
// unexpected block volume destruction.
|
||||
// If empty string is returned, the path retuned by attacher.Attach() and
|
||||
// attacher.WaitForAttach() will be used.
|
||||
MapPodDevice() (string, error)
|
||||
MapPodDevice() (publishPath string, err error)
|
||||
|
||||
// GetStagingPath returns path that was used for staging the volume
|
||||
// it is mainly used by CSI plugins
|
||||
GetStagingPath() string
|
||||
}
|
||||
|
||||
// BlockVolumeUnmapper interface is an unmapper interface for block volume.
|
||||
|
Loading…
Reference in New Issue
Block a user