From 7d6959ce2ce49fa5b37b3aa28d20fdeb01c02223 Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Wed, 8 Jan 2020 14:38:34 -0500 Subject: [PATCH 1/4] Add extra fields in node expansion CSI call --- pkg/volume/csi/csi_client.go | 57 ++++++++++++++----- pkg/volume/csi/csi_client_test.go | 31 ++++++++-- pkg/volume/csi/expander.go | 26 +++++++-- pkg/volume/csi/expander_test.go | 51 ++++++++++++----- pkg/volume/csi/fake/fake_client.go | 19 ++++--- pkg/volume/plugins.go | 3 + .../operationexecutor/operation_generator.go | 2 + 7 files changed, 143 insertions(+), 46 deletions(-) diff --git a/pkg/volume/csi/csi_client.go b/pkg/volume/csi/csi_client.go index bb336691c0e..b076e15ec46 100644 --- a/pkg/volume/csi/csi_client.go +++ b/pkg/volume/csi/csi_client.go @@ -54,7 +54,7 @@ type csiClient interface { fsType string, mountOptions []string, ) error - NodeExpandVolume(ctx context.Context, volumeid, volumePath string, newSize resource.Quantity) (resource.Quantity, error) + NodeExpandVolume(ctx context.Context, rsOpts csiResizeOptions) (resource.Quantity, error) NodeUnpublishVolume( ctx context.Context, volID string, @@ -95,6 +95,16 @@ type csiDriverClient struct { nodeV1ClientCreator nodeV1ClientCreator } +type csiResizeOptions struct { + volumeid string + volumePath string + stagingTargetPath string + fsType string + accessMode api.PersistentVolumeAccessMode + newSize resource.Quantity + mountOptions []string +} + var _ csiClient = &csiDriverClient{} type nodeV1ClientCreator func(addr csiAddr) ( @@ -245,36 +255,55 @@ func (c *csiDriverClient) NodePublishVolume( return err } -func (c *csiDriverClient) NodeExpandVolume(ctx context.Context, volumeID, volumePath string, newSize resource.Quantity) (resource.Quantity, error) { +func (c *csiDriverClient) NodeExpandVolume(ctx context.Context, opts csiResizeOptions) (resource.Quantity, error) { if c.nodeV1ClientCreator == nil { - return newSize, fmt.Errorf("version of CSI driver does not support volume expansion") + return opts.newSize, fmt.Errorf("version of CSI driver does not support volume expansion") } - if volumeID == "" { - return newSize, errors.New("missing volume id") + if opts.volumeid == "" { + return opts.newSize, errors.New("missing volume id") } - if volumePath == "" { - return newSize, errors.New("missing volume path") + if opts.volumeid == "" { + return opts.newSize, errors.New("missing volume path") } - if newSize.Value() < 0 { - return newSize, errors.New("size can not be less than 0") + if opts.newSize.Value() < 0 { + return opts.newSize, errors.New("size can not be less than 0") } nodeClient, closer, err := c.nodeV1ClientCreator(c.addr) if err != nil { - return newSize, err + return opts.newSize, err } defer closer.Close() req := &csipbv1.NodeExpandVolumeRequest{ - VolumeId: volumeID, - VolumePath: volumePath, - CapacityRange: &csipbv1.CapacityRange{RequiredBytes: newSize.Value()}, + VolumeId: opts.volumeid, + VolumePath: opts.volumePath, + StagingTargetPath: opts.stagingTargetPath, + CapacityRange: &csipbv1.CapacityRange{RequiredBytes: opts.newSize.Value()}, + VolumeCapability: &csipbv1.VolumeCapability{ + AccessMode: &csipbv1.VolumeCapability_AccessMode{ + Mode: asCSIAccessModeV1(opts.accessMode), + }, + }, } + if opts.fsType == fsTypeBlockName { + req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{ + Block: &csipbv1.VolumeCapability_BlockVolume{}, + } + } else { + req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{ + Mount: &csipbv1.VolumeCapability_MountVolume{ + FsType: opts.fsType, + MountFlags: opts.mountOptions, + }, + } + } + resp, err := nodeClient.NodeExpandVolume(ctx, req) if err != nil { - return newSize, err + return opts.newSize, err } updatedQuantity := resource.NewQuantity(resp.CapacityBytes, resource.BinarySI) return *updatedQuantity, nil diff --git a/pkg/volume/csi/csi_client_test.go b/pkg/volume/csi/csi_client_test.go index 50f869d45c3..7d1052bd9a7 100644 --- a/pkg/volume/csi/csi_client_test.go +++ b/pkg/volume/csi/csi_client_test.go @@ -284,16 +284,34 @@ func (c *fakeCsiDriverClient) NodeSupportsStageUnstage(ctx context.Context) (boo return stageUnstageSet, nil } -func (c *fakeCsiDriverClient) NodeExpandVolume(ctx context.Context, volumeid, volumePath string, newSize resource.Quantity) (resource.Quantity, error) { +func (c *fakeCsiDriverClient) NodeExpandVolume(ctx context.Context, opts csiResizeOptions) (resource.Quantity, error) { c.t.Log("calling fake.NodeExpandVolume") req := &csipbv1.NodeExpandVolumeRequest{ - VolumeId: volumeid, - VolumePath: volumePath, - CapacityRange: &csipbv1.CapacityRange{RequiredBytes: newSize.Value()}, + VolumeId: opts.volumeid, + VolumePath: opts.volumePath, + StagingTargetPath: opts.stagingTargetPath, + CapacityRange: &csipbv1.CapacityRange{RequiredBytes: opts.newSize.Value()}, + VolumeCapability: &csipbv1.VolumeCapability{ + AccessMode: &csipbv1.VolumeCapability_AccessMode{ + Mode: asCSIAccessModeV1(opts.accessMode), + }, + }, + } + if opts.fsType == fsTypeBlockName { + req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{ + Block: &csipbv1.VolumeCapability_BlockVolume{}, + } + } else { + req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{ + Mount: &csipbv1.VolumeCapability_MountVolume{ + FsType: opts.fsType, + MountFlags: opts.mountOptions, + }, + } } resp, err := c.nodeClient.NodeExpandVolume(ctx, req) if err != nil { - return newSize, err + return opts.newSize, err } updatedQuantity := resource.NewQuantity(resp.CapacityBytes, resource.BinarySI) return *updatedQuantity, nil @@ -635,7 +653,8 @@ func TestNodeExpandVolume(t *testing.T) { return nodeClient, fakeCloser, nil }, } - _, err := client.NodeExpandVolume(context.Background(), tc.volID, tc.volumePath, tc.newSize) + opts := csiResizeOptions{volumeid: tc.volID, volumePath: tc.volumePath, newSize: tc.newSize} + _, err := client.NodeExpandVolume(context.Background(), opts) checkErr(t, tc.mustFail, err) if !tc.mustFail { fakeCloser.Check() diff --git a/pkg/volume/csi/expander.go b/pkg/volume/csi/expander.go index 8bb1718d98c..7af4d4dff25 100644 --- a/pkg/volume/csi/expander.go +++ b/pkg/volume/csi/expander.go @@ -94,12 +94,30 @@ func (c *csiPlugin) nodeExpandWithClient( return false, nil } - volumeTargetPath := resizeOptions.DeviceMountPath - if !fsVolume { - volumeTargetPath = resizeOptions.DevicePath + pv := resizeOptions.VolumeSpec.PersistentVolume + if pv == nil { + return false, fmt.Errorf("Expander.NodeExpand failed to find associated PersistentVolume for plugin %s", c.GetPluginName()) } - _, err = csClient.NodeExpandVolume(ctx, csiSource.VolumeHandle, volumeTargetPath, resizeOptions.NewSize) + opts := csiResizeOptions{ + volumePath: resizeOptions.DeviceMountPath, + stagingTargetPath: resizeOptions.DeviceStagePath, + volumeid: csiSource.VolumeHandle, + newSize: resizeOptions.NewSize, + fsType: csiSource.FSType, + accessMode: api.ReadWriteOnce, + mountOptions: pv.Spec.MountOptions, + } + if !fsVolume { + opts.volumePath = resizeOptions.DevicePath + opts.fsType = fsTypeBlockName + } + + if pv.Spec.AccessModes != nil { + opts.accessMode = pv.Spec.AccessModes[0] + } + + _, err = csClient.NodeExpandVolume(ctx, opts) if err != nil { return false, fmt.Errorf("Expander.NodeExpand failed to expand the volume : %v", err) } diff --git a/pkg/volume/csi/expander_test.go b/pkg/volume/csi/expander_test.go index 26cb0ad8342..b53afb1ae45 100644 --- a/pkg/volume/csi/expander_test.go +++ b/pkg/volume/csi/expander_test.go @@ -26,24 +26,26 @@ import ( func TestNodeExpand(t *testing.T) { tests := []struct { - name string - nodeExpansion bool - nodeStageSet bool - volumePhase volume.CSIVolumePhaseType - success bool - fsVolume bool + name string + nodeExpansion bool + nodeStageSet bool + volumePhase volume.CSIVolumePhaseType + success bool + fsVolume bool + deviceStagePath string }{ { name: "when node expansion is not set", success: false, }, { - name: "when nodeExpansion=on, nodeStage=on, volumePhase=staged", - nodeExpansion: true, - nodeStageSet: true, - volumePhase: volume.CSIVolumeStaged, - success: true, - fsVolume: true, + name: "when nodeExpansion=on, nodeStage=on, volumePhase=staged", + nodeExpansion: true, + nodeStageSet: true, + volumePhase: volume.CSIVolumeStaged, + success: true, + fsVolume: true, + deviceStagePath: "/foo/bar", }, { name: "when nodeExpansion=on, nodeStage=off, volumePhase=staged", @@ -88,21 +90,42 @@ func TestNodeExpand(t *testing.T) { VolumeSpec: spec, NewSize: newSize, DeviceMountPath: "/foo/bar", + DeviceStagePath: "/foo/bar", DevicePath: "/mnt/foobar", CSIVolumePhase: tc.volumePhase, } csiSource, _ := getCSISourceFromSpec(resizeOptions.VolumeSpec) - csClient := setupClientWithExpansion(t, tc.nodeStageSet, tc.nodeExpansion) + fakeCSIClient, _ := csClient.(*fakeCsiDriverClient) + fakeNodeClient := fakeCSIClient.nodeClient ok, err := plug.nodeExpandWithClient(resizeOptions, csiSource, csClient, tc.fsVolume) + + // verify device staging targer path + stagingTargetPath := fakeNodeClient.FakeNodeExpansionRequest.GetStagingTargetPath() + if tc.deviceStagePath != "" && tc.deviceStagePath != stagingTargetPath { + t.Errorf("For %s: expected staging path %s got %s", tc.name, tc.deviceStagePath, stagingTargetPath) + } + if ok != tc.success { if err != nil { t.Errorf("For %s : expected %v got %v with %v", tc.name, tc.success, ok, err) } else { t.Errorf("For %s : expected %v got %v", tc.name, tc.success, ok) } - + } + // verify volume capability received by node expansion request + if tc.success { + capability := fakeNodeClient.FakeNodeExpansionRequest.GetVolumeCapability() + if tc.fsVolume { + if capability.GetMount() == nil { + t.Errorf("For %s: expected mount accesstype got: %v", tc.name, capability) + } + } else { + if capability.GetBlock() == nil { + t.Errorf("For %s: expected block accesstype got: %v", tc.name, capability) + } + } } }) } diff --git a/pkg/volume/csi/fake/fake_client.go b/pkg/volume/csi/fake/fake_client.go index 6e37ec686db..4cd276f719a 100644 --- a/pkg/volume/csi/fake/fake_client.go +++ b/pkg/volume/csi/fake/fake_client.go @@ -79,14 +79,15 @@ type CSIVolume struct { // NodeClient returns CSI node client type NodeClient struct { - nodePublishedVolumes map[string]CSIVolume - nodeStagedVolumes map[string]CSIVolume - stageUnstageSet bool - expansionSet bool - volumeStatsSet bool - nodeGetInfoResp *csipb.NodeGetInfoResponse - nodeVolumeStatsResp *csipb.NodeGetVolumeStatsResponse - nextErr error + nodePublishedVolumes map[string]CSIVolume + nodeStagedVolumes map[string]CSIVolume + stageUnstageSet bool + expansionSet bool + volumeStatsSet bool + nodeGetInfoResp *csipb.NodeGetInfoResponse + nodeVolumeStatsResp *csipb.NodeGetVolumeStatsResponse + FakeNodeExpansionRequest *csipb.NodeExpandVolumeRequest + nextErr error } // NewNodeClient returns fake node client @@ -296,6 +297,8 @@ func (f *NodeClient) NodeExpandVolume(ctx context.Context, req *csipb.NodeExpand return nil, errors.New("required bytes should be greater than 0") } + f.FakeNodeExpansionRequest = req + resp := &csipb.NodeExpandVolumeResponse{ CapacityBytes: req.GetCapacityRange().RequiredBytes, } diff --git a/pkg/volume/plugins.go b/pkg/volume/plugins.go index c4546a4452d..9feccd04286 100644 --- a/pkg/volume/plugins.go +++ b/pkg/volume/plugins.go @@ -115,6 +115,9 @@ type NodeResizeOptions struct { // it would be location where volume was mounted for the pod DeviceMountPath string + // DeviceStagingPath stores location where the volume is staged + DeviceStagePath string + NewSize resource.Quantity OldSize resource.Quantity diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index e2500810454..76a50a457f6 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -604,6 +604,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc( } resizeOptions.DeviceMountPath = deviceMountPath + resizeOptions.DeviceStagePath = deviceMountPath resizeOptions.CSIVolumePhase = volume.CSIVolumeStaged // NodeExpandVolume will resize the file system if user has requested a resize of @@ -1505,6 +1506,7 @@ func (og *operationGenerator) GenerateExpandInUseVolumeFunc( return volumeToMount.GenerateError("NodeExpandVolume.GetDeviceMountPath failed", err) } resizeOptions.DeviceMountPath = dmp + resizeOptions.DeviceStagePath = dmp resizeDone, simpleErr, detailedErr = og.doOnlineExpansion(volumeToMount, actualStateOfWorld, resizeOptions) if simpleErr != nil || detailedErr != nil { return simpleErr, detailedErr From 75e13e370e48a0880ad7ab46191723f9f150cebc Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Mon, 16 Mar 2020 16:38:00 -0400 Subject: [PATCH 2/4] Supply staging path for block expansion --- pkg/volume/csi/csi_block.go | 20 +++++++++---------- pkg/volume/csi/csi_block_test.go | 3 +-- pkg/volume/local/local.go | 4 ++-- .../operationexecutor/operation_generator.go | 9 ++++++--- pkg/volume/volume.go | 2 +- 5 files changed, 20 insertions(+), 18 deletions(-) diff --git a/pkg/volume/csi/csi_block.go b/pkg/volume/csi/csi_block.go index da14ea9b5cc..6903add1d8e 100644 --- a/pkg/volume/csi/csi_block.go +++ b/pkg/volume/csi/csi_block.go @@ -255,26 +255,26 @@ func (m *csiBlockMapper) publishVolumeForBlock( } // SetUpDevice ensures the device is attached returns path where the device is located. -func (m *csiBlockMapper) SetUpDevice() error { +func (m *csiBlockMapper) SetUpDevice() (string, error) { if !m.plugin.blockEnabled { - return errors.New("CSIBlockVolume feature not enabled") + return "", errors.New("CSIBlockVolume feature not enabled") } klog.V(4).Infof(log("blockMapper.SetUpDevice called")) // Get csiSource from spec if m.spec == nil { - return errors.New(log("blockMapper.SetUpDevice spec is nil")) + return "", errors.New(log("blockMapper.SetUpDevice spec is nil")) } csiSource, err := getCSISourceFromSpec(m.spec) if err != nil { - return errors.New(log("blockMapper.SetUpDevice failed to get CSI persistent source: %v", err)) + return "", errors.New(log("blockMapper.SetUpDevice failed to get CSI persistent source: %v", err)) } driverName := csiSource.Driver skip, err := m.plugin.skipAttach(driverName) if err != nil { - return errors.New(log("blockMapper.SetupDevice failed to check CSIDriver for %s: %v", driverName, err)) + return "", errors.New(log("blockMapper.SetupDevice failed to check CSIDriver for %s: %v", driverName, err)) } var attachment *storage.VolumeAttachment @@ -284,7 +284,7 @@ func (m *csiBlockMapper) SetUpDevice() error { attachID := getAttachmentName(csiSource.VolumeHandle, csiSource.Driver, nodeName) attachment, err = m.k8s.StorageV1().VolumeAttachments().Get(context.TODO(), attachID, meta.GetOptions{}) if err != nil { - return errors.New(log("blockMapper.SetupDevice failed to get volume attachment [id=%v]: %v", attachID, err)) + return "", errors.New(log("blockMapper.SetupDevice failed to get volume attachment [id=%v]: %v", attachID, err)) } } @@ -299,11 +299,11 @@ func (m *csiBlockMapper) SetUpDevice() error { csiClient, err := m.csiClientGetter.Get() if err != nil { - return errors.New(log("blockMapper.SetUpDevice failed to get CSI client: %v", err)) + return "", errors.New(log("blockMapper.SetUpDevice failed to get CSI client: %v", err)) } // Call NodeStageVolume - _, err = m.stageVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment) + stagingPath, err := m.stageVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment) if err != nil { if volumetypes.IsOperationFinishedError(err) { cleanupErr := m.cleanupOrphanDeviceFiles() @@ -312,10 +312,10 @@ func (m *csiBlockMapper) SetUpDevice() error { klog.V(4).Infof("Failed to clean up block volume directory %s", cleanupErr) } } - return err + return "", err } - return nil + return stagingPath, nil } func (m *csiBlockMapper) MapPodDevice() (string, error) { diff --git a/pkg/volume/csi/csi_block_test.go b/pkg/volume/csi/csi_block_test.go index ccc8d6faf5c..47be8f3303a 100644 --- a/pkg/volume/csi/csi_block_test.go +++ b/pkg/volume/csi/csi_block_test.go @@ -234,13 +234,12 @@ func TestBlockMapperSetupDevice(t *testing.T) { } t.Log("created attachement ", attachID) - err = csiMapper.SetUpDevice() + stagingPath, err := csiMapper.SetUpDevice() if err != nil { t.Fatalf("mapper failed to SetupDevice: %v", err) } // Check if NodeStageVolume staged to the right path - stagingPath := csiMapper.getStagingPath() svols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodeStagedVolumes() svol, ok := svols[csiMapper.volumeID] if !ok { diff --git a/pkg/volume/local/local.go b/pkg/volume/local/local.go index 09ec2f202c8..18bb72b46b3 100644 --- a/pkg/volume/local/local.go +++ b/pkg/volume/local/local.go @@ -610,8 +610,8 @@ var _ volume.BlockVolumeMapper = &localVolumeMapper{} var _ volume.CustomBlockVolumeMapper = &localVolumeMapper{} // SetUpDevice prepares the volume to the node by the plugin specific way. -func (m *localVolumeMapper) SetUpDevice() error { - return nil +func (m *localVolumeMapper) SetUpDevice() (string, error) { + return "", nil } // MapPodDevice provides physical device path for the local PV. diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index 76a50a457f6..6ee6e865570 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -947,6 +947,7 @@ func (og *operationGenerator) GenerateMapVolumeFunc( mapVolumeFunc := func() (simpleErr error, detailedErr error) { var devicePath string + var stagingPath string // Set up global map path under the given plugin directory using symbolic link globalMapPath, err := blockVolumeMapper.GetGlobalMapPath(volumeToMount.VolumeSpec) @@ -970,7 +971,8 @@ func (og *operationGenerator) GenerateMapVolumeFunc( } // Call SetUpDevice if blockVolumeMapper implements CustomBlockVolumeMapper if customBlockVolumeMapper, ok := blockVolumeMapper.(volume.CustomBlockVolumeMapper); ok { - mapErr := customBlockVolumeMapper.SetUpDevice() + var mapErr error + stagingPath, mapErr = customBlockVolumeMapper.SetUpDevice() if mapErr != nil { og.markDeviceErrorState(volumeToMount, devicePath, globalMapPath, mapErr, actualStateOfWorld) // On failure, return error. Caller will log and retry. @@ -1073,8 +1075,9 @@ func (og *operationGenerator) GenerateMapVolumeFunc( klog.V(verbosity).Infof(detailedMsg) resizeOptions := volume.NodeResizeOptions{ - DevicePath: devicePath, - CSIVolumePhase: volume.CSIVolumePublished, + DevicePath: devicePath, + DeviceStagePath: stagingPath, + CSIVolumePhase: volume.CSIVolumePublished, } _, resizeError := og.nodeExpandVolume(volumeToMount, resizeOptions) if resizeError != nil { diff --git a/pkg/volume/volume.go b/pkg/volume/volume.go index 9a2f08403d4..1f36d1e17de 100644 --- a/pkg/volume/volume.go +++ b/pkg/volume/volume.go @@ -174,7 +174,7 @@ type CustomBlockVolumeMapper interface { // For most in-tree plugins, attacher.Attach() and attacher.WaitForAttach() // will do necessary works. // This may be called more than once, so implementations must be idempotent. - SetUpDevice() error + SetUpDevice() (string, error) // MapPodDevice maps the block device to a path and return the path. // Unique device path across kubelet node reboot is required to avoid From 69613da0ae9743397ba7b2b3daebdc18d68e795e Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Mon, 16 Mar 2020 16:41:24 -0400 Subject: [PATCH 3/4] rename volumeid to volumeID --- pkg/volume/csi/csi_block_test.go | 7 +++---- pkg/volume/csi/csi_client.go | 12 ++++++++---- pkg/volume/csi/csi_client_test.go | 4 ++-- pkg/volume/csi/expander.go | 2 +- pkg/volume/local/local_test.go | 2 +- pkg/volume/testing/testing.go | 16 ++++++++-------- .../operationexecutor/operation_generator.go | 4 ++++ pkg/volume/volume.go | 5 +++-- 8 files changed, 30 insertions(+), 22 deletions(-) diff --git a/pkg/volume/csi/csi_block_test.go b/pkg/volume/csi/csi_block_test.go index 47be8f3303a..09985747435 100644 --- a/pkg/volume/csi/csi_block_test.go +++ b/pkg/volume/csi/csi_block_test.go @@ -277,7 +277,7 @@ func TestBlockMapperSetupDeviceError(t *testing.T) { } t.Log("created attachement ", attachID) - err = csiMapper.SetUpDevice() + stagingPath, err := csiMapper.SetUpDevice() if err == nil { t.Fatal("mapper unexpectedly succeeded") } @@ -292,7 +292,7 @@ func TestBlockMapperSetupDeviceError(t *testing.T) { if _, err := os.Stat(devDir); err == nil { t.Errorf("volume publish device directory %s was not deleted", devDir) } - stagingPath := csiMapper.getStagingPath() + if _, err := os.Stat(stagingPath); err == nil { t.Errorf("volume staging path %s was not deleted", stagingPath) } @@ -474,12 +474,11 @@ func TestVolumeSetupTeardown(t *testing.T) { } t.Log("created attachement ", attachID) - err = csiMapper.SetUpDevice() + stagingPath, err := csiMapper.SetUpDevice() if err != nil { t.Fatalf("mapper failed to SetupDevice: %v", err) } // Check if NodeStageVolume staged to the right path - stagingPath := csiMapper.getStagingPath() svols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodeStagedVolumes() svol, ok := svols[csiMapper.volumeID] if !ok { diff --git a/pkg/volume/csi/csi_client.go b/pkg/volume/csi/csi_client.go index b076e15ec46..aa7aa37f68d 100644 --- a/pkg/volume/csi/csi_client.go +++ b/pkg/volume/csi/csi_client.go @@ -96,7 +96,11 @@ type csiDriverClient struct { } type csiResizeOptions struct { - volumeid string + volumeID string + // volumePath is path where volume is available. It could be: + // - path where node is staged if NodeExpandVolume is called after NodeStageVolume + // - path where volume is published if NodeExpandVolume is called after NodePublishVolume + // DEPRECATION NOTICE: in future NodeExpandVolume will be always called after NodePublish volumePath string stagingTargetPath string fsType string @@ -260,10 +264,10 @@ func (c *csiDriverClient) NodeExpandVolume(ctx context.Context, opts csiResizeOp return opts.newSize, fmt.Errorf("version of CSI driver does not support volume expansion") } - if opts.volumeid == "" { + if opts.volumeID == "" { return opts.newSize, errors.New("missing volume id") } - if opts.volumeid == "" { + if opts.volumePath == "" { return opts.newSize, errors.New("missing volume path") } @@ -278,7 +282,7 @@ func (c *csiDriverClient) NodeExpandVolume(ctx context.Context, opts csiResizeOp defer closer.Close() req := &csipbv1.NodeExpandVolumeRequest{ - VolumeId: opts.volumeid, + VolumeId: opts.volumeID, VolumePath: opts.volumePath, StagingTargetPath: opts.stagingTargetPath, CapacityRange: &csipbv1.CapacityRange{RequiredBytes: opts.newSize.Value()}, diff --git a/pkg/volume/csi/csi_client_test.go b/pkg/volume/csi/csi_client_test.go index 7d1052bd9a7..714c2181ad8 100644 --- a/pkg/volume/csi/csi_client_test.go +++ b/pkg/volume/csi/csi_client_test.go @@ -287,7 +287,7 @@ func (c *fakeCsiDriverClient) NodeSupportsStageUnstage(ctx context.Context) (boo func (c *fakeCsiDriverClient) NodeExpandVolume(ctx context.Context, opts csiResizeOptions) (resource.Quantity, error) { c.t.Log("calling fake.NodeExpandVolume") req := &csipbv1.NodeExpandVolumeRequest{ - VolumeId: opts.volumeid, + VolumeId: opts.volumeID, VolumePath: opts.volumePath, StagingTargetPath: opts.stagingTargetPath, CapacityRange: &csipbv1.CapacityRange{RequiredBytes: opts.newSize.Value()}, @@ -653,7 +653,7 @@ func TestNodeExpandVolume(t *testing.T) { return nodeClient, fakeCloser, nil }, } - opts := csiResizeOptions{volumeid: tc.volID, volumePath: tc.volumePath, newSize: tc.newSize} + opts := csiResizeOptions{volumeID: tc.volID, volumePath: tc.volumePath, newSize: tc.newSize} _, err := client.NodeExpandVolume(context.Background(), opts) checkErr(t, tc.mustFail, err) if !tc.mustFail { diff --git a/pkg/volume/csi/expander.go b/pkg/volume/csi/expander.go index 7af4d4dff25..650c94b2427 100644 --- a/pkg/volume/csi/expander.go +++ b/pkg/volume/csi/expander.go @@ -102,7 +102,7 @@ func (c *csiPlugin) nodeExpandWithClient( opts := csiResizeOptions{ volumePath: resizeOptions.DeviceMountPath, stagingTargetPath: resizeOptions.DeviceStagePath, - volumeid: csiSource.VolumeHandle, + volumeID: csiSource.VolumeHandle, newSize: resizeOptions.NewSize, fsType: csiSource.FSType, accessMode: api.ReadWriteOnce, diff --git a/pkg/volume/local/local_test.go b/pkg/volume/local/local_test.go index 301cff5f1dd..f76564af33f 100644 --- a/pkg/volume/local/local_test.go +++ b/pkg/volume/local/local_test.go @@ -375,7 +375,7 @@ func TestMapUnmap(t *testing.T) { var devPath string if customMapper, ok := mapper.(volume.CustomBlockVolumeMapper); ok { - err = customMapper.SetUpDevice() + _, err = customMapper.SetUpDevice() if err != nil { t.Errorf("Failed to SetUpDevice, err: %v", err) } diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index 4ce7962328b..d9c2f628398 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -956,46 +956,46 @@ func (fv *FakeVolume) TearDownAt(dir string) error { } // Block volume support -func (fv *FakeVolume) SetUpDevice() error { +func (fv *FakeVolume) SetUpDevice() (string, error) { fv.Lock() defer fv.Unlock() if fv.VolName == TimeoutOnMountDeviceVolumeName { fv.DeviceMountState[fv.VolName] = deviceMountUncertain - return volumetypes.NewUncertainProgressError("mount failed") + return "", volumetypes.NewUncertainProgressError("mount failed") } if fv.VolName == FailMountDeviceVolumeName { fv.DeviceMountState[fv.VolName] = deviceNotMounted - return fmt.Errorf("error mapping disk: %s", fv.VolName) + return "", fmt.Errorf("error mapping disk: %s", fv.VolName) } if fv.VolName == TimeoutAndFailOnMountDeviceVolumeName { _, ok := fv.DeviceMountState[fv.VolName] if !ok { fv.DeviceMountState[fv.VolName] = deviceMountUncertain - return volumetypes.NewUncertainProgressError("timed out mounting error") + return "", volumetypes.NewUncertainProgressError("timed out mounting error") } fv.DeviceMountState[fv.VolName] = deviceNotMounted - return fmt.Errorf("error mapping disk: %s", fv.VolName) + return "", fmt.Errorf("error mapping disk: %s", fv.VolName) } if fv.VolName == SuccessAndTimeoutDeviceName { _, ok := fv.DeviceMountState[fv.VolName] if ok { fv.DeviceMountState[fv.VolName] = deviceMountUncertain - return volumetypes.NewUncertainProgressError("error mounting state") + return "", volumetypes.NewUncertainProgressError("error mounting state") } } if fv.VolName == SuccessAndFailOnMountDeviceName { _, ok := fv.DeviceMountState[fv.VolName] if ok { - return fmt.Errorf("error mapping disk: %s", fv.VolName) + return "", fmt.Errorf("error mapping disk: %s", fv.VolName) } } fv.DeviceMountState[fv.VolName] = deviceMounted fv.SetUpDeviceCallCount++ - return nil + return "", nil } // Block volume support diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index 6ee6e865570..2bcbb8bdbfd 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -603,6 +603,10 @@ func (og *operationGenerator) GenerateMountVolumeFunc( return volumeToMount.GenerateError("MountVolume.MarkDeviceAsMounted failed", markDeviceMountedErr) } + // If volume expansion is performed after MountDevice but before SetUp then + // deviceMountPath and deviceStagePath is going to be the same. + // Deprecation: Calling NodeExpandVolume after NodeStage/MountDevice will be deprecated + // in a future version of k8s. resizeOptions.DeviceMountPath = deviceMountPath resizeOptions.DeviceStagePath = deviceMountPath resizeOptions.CSIVolumePhase = volume.CSIVolumeStaged diff --git a/pkg/volume/volume.go b/pkg/volume/volume.go index 1f36d1e17de..23949a64034 100644 --- a/pkg/volume/volume.go +++ b/pkg/volume/volume.go @@ -174,14 +174,15 @@ type CustomBlockVolumeMapper interface { // For most in-tree plugins, attacher.Attach() and attacher.WaitForAttach() // will do necessary works. // This may be called more than once, so implementations must be idempotent. - SetUpDevice() (string, error) + // SetUpDevice returns stagingPath if device setup was successful + SetUpDevice() (stagingPath string, err error) // MapPodDevice maps the block device to a path and return the path. // Unique device path across kubelet node reboot is required to avoid // unexpected block volume destruction. // If empty string is returned, the path retuned by attacher.Attach() and // attacher.WaitForAttach() will be used. - MapPodDevice() (string, error) + MapPodDevice() (publishPath string, err error) } // BlockVolumeUnmapper interface is an unmapper interface for block volume. From 6342dad709626dcc6da600b33f50a29a33525868 Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Wed, 1 Jul 2020 10:16:23 -0400 Subject: [PATCH 4/4] Ensure that StagingPath is supplied to blockVolume expansion --- pkg/volume/csi/csi_block.go | 12 +-- pkg/volume/csi/csi_block_test.go | 2 +- pkg/volume/csi/csi_client.go | 14 +++- pkg/volume/csi/expander.go | 4 + pkg/volume/local/local.go | 5 ++ pkg/volume/testing/testing.go | 4 + .../operationexecutor/operation_generator.go | 75 ++++++++++++------- pkg/volume/volume.go | 4 + 8 files changed, 83 insertions(+), 37 deletions(-) diff --git a/pkg/volume/csi/csi_block.go b/pkg/volume/csi/csi_block.go index 6903add1d8e..e5ca3efec40 100644 --- a/pkg/volume/csi/csi_block.go +++ b/pkg/volume/csi/csi_block.go @@ -107,9 +107,9 @@ func (m *csiBlockMapper) GetGlobalMapPath(spec *volume.Spec) (string, error) { return dir, nil } -// getStagingPath returns a staging path for a directory (on the node) that should be used on NodeStageVolume/NodeUnstageVolume +// GetStagingPath returns a staging path for a directory (on the node) that should be used on NodeStageVolume/NodeUnstageVolume // Example: plugins/kubernetes.io/csi/volumeDevices/staging/{specName} -func (m *csiBlockMapper) getStagingPath() string { +func (m *csiBlockMapper) GetStagingPath() string { return filepath.Join(m.plugin.host.GetVolumeDevicePluginDir(CSIPluginName), "staging", m.specName) } @@ -143,7 +143,7 @@ func (m *csiBlockMapper) stageVolumeForBlock( ) (string, error) { klog.V(4).Infof(log("blockMapper.stageVolumeForBlock called")) - stagingPath := m.getStagingPath() + stagingPath := m.GetStagingPath() klog.V(4).Infof(log("blockMapper.stageVolumeForBlock stagingPath set [%s]", stagingPath)) // Check whether "STAGE_UNSTAGE_VOLUME" is set @@ -237,7 +237,7 @@ func (m *csiBlockMapper) publishVolumeForBlock( ctx, m.volumeID, m.readOnly, - m.getStagingPath(), + m.GetStagingPath(), publishPath, accessMode, publishVolumeInfo, @@ -435,7 +435,7 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error } // Call NodeUnstageVolume - stagingPath := m.getStagingPath() + stagingPath := m.GetStagingPath() if _, err := os.Stat(stagingPath); err != nil { if os.IsNotExist(err) { klog.V(4).Infof(log("blockMapper.TearDownDevice stagingPath(%s) has already been deleted, skip calling NodeUnstageVolume", stagingPath)) @@ -471,7 +471,7 @@ func (m *csiBlockMapper) cleanupOrphanDeviceFiles() error { // Remove artifacts of NodeStage. // stagingPath: xxx/plugins/kubernetes.io/csi/volumeDevices/staging/ - stagingPath := m.getStagingPath() + stagingPath := m.GetStagingPath() if err := os.Remove(stagingPath); err != nil && !os.IsNotExist(err) { return errors.New(log("failed to delete volume staging path [%s]: %v", stagingPath, err)) } diff --git a/pkg/volume/csi/csi_block_test.go b/pkg/volume/csi/csi_block_test.go index 09985747435..ab59653d71b 100644 --- a/pkg/volume/csi/csi_block_test.go +++ b/pkg/volume/csi/csi_block_test.go @@ -123,7 +123,7 @@ func TestBlockMapperGetStagingPath(t *testing.T) { t.Fatalf("Failed to make a new Mapper: %v", err) } - path := csiMapper.getStagingPath() + path := csiMapper.GetStagingPath() if tc.path != path { t.Errorf("expecting path %s, got %s", tc.path, path) diff --git a/pkg/volume/csi/csi_client.go b/pkg/volume/csi/csi_client.go index aa7aa37f68d..9c06736f790 100644 --- a/pkg/volume/csi/csi_client.go +++ b/pkg/volume/csi/csi_client.go @@ -282,16 +282,22 @@ func (c *csiDriverClient) NodeExpandVolume(ctx context.Context, opts csiResizeOp defer closer.Close() req := &csipbv1.NodeExpandVolumeRequest{ - VolumeId: opts.volumeID, - VolumePath: opts.volumePath, - StagingTargetPath: opts.stagingTargetPath, - CapacityRange: &csipbv1.CapacityRange{RequiredBytes: opts.newSize.Value()}, + VolumeId: opts.volumeID, + VolumePath: opts.volumePath, + CapacityRange: &csipbv1.CapacityRange{RequiredBytes: opts.newSize.Value()}, VolumeCapability: &csipbv1.VolumeCapability{ AccessMode: &csipbv1.VolumeCapability_AccessMode{ Mode: asCSIAccessModeV1(opts.accessMode), }, }, } + + // not all CSI drivers support NodeStageUnstage and hence the StagingTargetPath + // should only be set when available + if opts.stagingTargetPath != "" { + req.StagingTargetPath = opts.stagingTargetPath + } + if opts.fsType == fsTypeBlockName { req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{ Block: &csipbv1.VolumeCapability_BlockVolume{}, diff --git a/pkg/volume/csi/expander.go b/pkg/volume/csi/expander.go index 650c94b2427..aac2e31e059 100644 --- a/pkg/volume/csi/expander.go +++ b/pkg/volume/csi/expander.go @@ -108,7 +108,11 @@ func (c *csiPlugin) nodeExpandWithClient( accessMode: api.ReadWriteOnce, mountOptions: pv.Spec.MountOptions, } + if !fsVolume { + // for block volumes the volumePath in CSI NodeExpandvolumeRequest is + // basically same as DevicePath because block devices are not mounted and hence + // DeviceMountPath does not get populated in resizeOptions.DeviceMountPath opts.volumePath = resizeOptions.DevicePath opts.fsType = fsTypeBlockName } diff --git a/pkg/volume/local/local.go b/pkg/volume/local/local.go index 18bb72b46b3..8e4ae6f55ca 100644 --- a/pkg/volume/local/local.go +++ b/pkg/volume/local/local.go @@ -621,6 +621,11 @@ func (m *localVolumeMapper) MapPodDevice() (string, error) { return globalPath, nil } +// GetStagingPath returns +func (m *localVolumeMapper) GetStagingPath() string { + return "" +} + // localVolumeUnmapper implements the BlockVolumeUnmapper interface for local volumes. type localVolumeUnmapper struct { *localVolume diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index d9c2f628398..c481f46a102 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -998,6 +998,10 @@ func (fv *FakeVolume) SetUpDevice() (string, error) { return "", nil } +func (fv *FakeVolume) GetStagingPath() string { + return filepath.Join(fv.Plugin.Host.GetVolumeDevicePluginDir(utilstrings.EscapeQualifiedName(fv.Plugin.PluginName)), "staging", fv.VolName) +} + // Block volume support func (fv *FakeVolume) GetSetUpDeviceCallCount() int { fv.RLock() diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index 2bcbb8bdbfd..7756578b808 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -1498,41 +1498,64 @@ func (og *operationGenerator) GenerateExpandInUseVolumeFunc( var simpleErr, detailedErr error resizeOptions := volume.NodeResizeOptions{ VolumeSpec: volumeToMount.VolumeSpec, + DevicePath: volumeToMount.DevicePath, + } + fsVolume, err := util.CheckVolumeModeFilesystem(volumeToMount.VolumeSpec) + if err != nil { + return volumeToMount.GenerateError("NodeExpandvolume.CheckVolumeModeFilesystem failed", err) } - attachableVolumePlugin, _ := - og.volumePluginMgr.FindAttachablePluginBySpec(volumeToMount.VolumeSpec) + if fsVolume { + volumeMounter, newMounterErr := volumePlugin.NewMounter( + volumeToMount.VolumeSpec, + volumeToMount.Pod, + volume.VolumeOptions{}) + if newMounterErr != nil { + return volumeToMount.GenerateError("NodeExpandVolume.NewMounter initialization failed", newMounterErr) + } - if attachableVolumePlugin != nil { - volumeAttacher, _ := attachableVolumePlugin.NewAttacher() - if volumeAttacher != nil { - resizeOptions.CSIVolumePhase = volume.CSIVolumeStaged - resizeOptions.DevicePath = volumeToMount.DevicePath - dmp, err := volumeAttacher.GetDeviceMountPath(volumeToMount.VolumeSpec) + resizeOptions.DeviceMountPath = volumeMounter.GetPath() + + deviceMountableVolumePlugin, _ := og.volumePluginMgr.FindDeviceMountablePluginBySpec(volumeToMount.VolumeSpec) + var volumeDeviceMounter volume.DeviceMounter + if deviceMountableVolumePlugin != nil { + volumeDeviceMounter, _ = deviceMountableVolumePlugin.NewDeviceMounter() + } + + if volumeDeviceMounter != nil { + deviceStagePath, err := volumeDeviceMounter.GetDeviceMountPath(volumeToMount.VolumeSpec) if err != nil { return volumeToMount.GenerateError("NodeExpandVolume.GetDeviceMountPath failed", err) } - resizeOptions.DeviceMountPath = dmp - resizeOptions.DeviceStagePath = dmp - resizeDone, simpleErr, detailedErr = og.doOnlineExpansion(volumeToMount, actualStateOfWorld, resizeOptions) - if simpleErr != nil || detailedErr != nil { - return simpleErr, detailedErr - } - if resizeDone { - return nil, nil - } + resizeOptions.DeviceStagePath = deviceStagePath + } + } else { + // Get block volume mapper plugin + blockVolumePlugin, err := + og.volumePluginMgr.FindMapperPluginBySpec(volumeToMount.VolumeSpec) + if err != nil { + return volumeToMount.GenerateError("MapVolume.FindMapperPluginBySpec failed", err) + } + + if blockVolumePlugin == nil { + return volumeToMount.GenerateError("MapVolume.FindMapperPluginBySpec failed to find BlockVolumeMapper plugin. Volume plugin is nil.", nil) + } + + blockVolumeMapper, newMapperErr := blockVolumePlugin.NewBlockVolumeMapper( + volumeToMount.VolumeSpec, + volumeToMount.Pod, + volume.VolumeOptions{}) + if newMapperErr != nil { + return volumeToMount.GenerateError("MapVolume.NewBlockVolumeMapper initialization failed", newMapperErr) + } + + // if plugin supports custom mappers lets add DeviceStagePath + if customBlockVolumeMapper, ok := blockVolumeMapper.(volume.CustomBlockVolumeMapper); ok { + resizeOptions.DeviceStagePath = customBlockVolumeMapper.GetStagingPath() } } - // if we are here that means volume plugin does not support attach interface - volumeMounter, newMounterErr := volumePlugin.NewMounter( - volumeToMount.VolumeSpec, - volumeToMount.Pod, - volume.VolumeOptions{}) - if newMounterErr != nil { - return volumeToMount.GenerateError("NodeExpandVolume.NewMounter initialization failed", newMounterErr) - } - resizeOptions.DeviceMountPath = volumeMounter.GetPath() + // if we are doing online expansion then volume is already published resizeOptions.CSIVolumePhase = volume.CSIVolumePublished resizeDone, simpleErr, detailedErr = og.doOnlineExpansion(volumeToMount, actualStateOfWorld, resizeOptions) if simpleErr != nil || detailedErr != nil { diff --git a/pkg/volume/volume.go b/pkg/volume/volume.go index 23949a64034..85b8c4aaa52 100644 --- a/pkg/volume/volume.go +++ b/pkg/volume/volume.go @@ -183,6 +183,10 @@ type CustomBlockVolumeMapper interface { // If empty string is returned, the path retuned by attacher.Attach() and // attacher.WaitForAttach() will be used. MapPodDevice() (publishPath string, err error) + + // GetStagingPath returns path that was used for staging the volume + // it is mainly used by CSI plugins + GetStagingPath() string } // BlockVolumeUnmapper interface is an unmapper interface for block volume.