From 6a7222cf4e708801624e77336b4e1e0a9bbfed67 Mon Sep 17 00:00:00 2001 From: Jiawei Wang Date: Fri, 12 Feb 2021 17:35:01 -0800 Subject: [PATCH] Add migrated field to storage_operation_duration_seconds metric --- .../volume/expand/expand_controller_test.go | 6 +- .../volume/persistentvolume/pv_controller.go | 5 +- pkg/volume/csimigration/plugin_manager.go | 1 + pkg/volume/plugins.go | 1 + pkg/volume/util/metrics.go | 18 +- .../nestedpendingoperations_test.go | 30 +- .../util/operationexecutor/fakegenerator.go | 4 +- .../operation_executor_test.go | 52 +-- .../operationexecutor/operation_generator.go | 300 ++++++++++++------ .../operation_generator_test.go | 12 +- pkg/volume/util/types/types.go | 35 +- pkg/volume/volume_linux.go | 11 +- 12 files changed, 307 insertions(+), 168 deletions(-) diff --git a/pkg/controller/volume/expand/expand_controller_test.go b/pkg/controller/volume/expand/expand_controller_test.go index a20364c4a67..cf5d37b180a 100644 --- a/pkg/controller/volume/expand/expand_controller_test.go +++ b/pkg/controller/volume/expand/expand_controller_test.go @@ -23,7 +23,7 @@ import ( "regexp" "testing" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -121,9 +121,9 @@ func TestSyncHandler(t *testing.T) { var expController *expandController expController, _ = expc.(*expandController) var expansionCalled bool - expController.operationGenerator = operationexecutor.NewFakeOGCounter(func() (error, error) { + expController.operationGenerator = operationexecutor.NewFakeOGCounter(func() volumetypes.OperationContext { expansionCalled = true - return nil, nil + return volumetypes.NewOperationContext(nil, nil, false) }) if test.pv != nil { diff --git a/pkg/controller/volume/persistentvolume/pv_controller.go b/pkg/controller/volume/persistentvolume/pv_controller.go index ad5ac88bff1..5f9c7e96802 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller.go +++ b/pkg/controller/volume/persistentvolume/pv_controller.go @@ -51,6 +51,7 @@ import ( vol "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/util" "k8s.io/kubernetes/pkg/volume/util/recyclerclient" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" "k8s.io/klog/v2" ) @@ -1412,7 +1413,7 @@ func (ctrl *PersistentVolumeController) doDeleteVolume(volume *v1.PersistentVolu opComplete := util.OperationCompleteHook(pluginName, "volume_delete") err = deleter.Delete() - opComplete(&err) + opComplete(volumetypes.CompleteFuncParam{Err: &err}) if err != nil { // Deleter failed return pluginName, false, err @@ -1558,7 +1559,7 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation( opComplete := util.OperationCompleteHook(plugin.GetPluginName(), "volume_provision") volume, err = provisioner.Provision(selectedNode, allowedTopologies) - opComplete(&err) + opComplete(volumetypes.CompleteFuncParam{Err: &err}) if err != nil { // Other places of failure have nothing to do with VolumeScheduling, // so just let controller retry in the next sync. We'll only call func diff --git a/pkg/volume/csimigration/plugin_manager.go b/pkg/volume/csimigration/plugin_manager.go index d7ca1db228d..143e2a41dd9 100644 --- a/pkg/volume/csimigration/plugin_manager.go +++ b/pkg/volume/csimigration/plugin_manager.go @@ -142,6 +142,7 @@ func TranslateInTreeSpecToCSI(spec *volume.Spec, translator InTreeToCSITranslato return nil, fmt.Errorf("failed to translate in-tree pv to CSI: %v", err) } return &volume.Spec{ + Migrated: true, PersistentVolume: csiPV, ReadOnly: spec.ReadOnly, InlineVolumeSpecForCSIMigration: inlineVolume, diff --git a/pkg/volume/plugins.go b/pkg/volume/plugins.go index 40bc9e5fad2..3831ed216cd 100644 --- a/pkg/volume/plugins.go +++ b/pkg/volume/plugins.go @@ -473,6 +473,7 @@ type Spec struct { PersistentVolume *v1.PersistentVolume ReadOnly bool InlineVolumeSpecForCSIMigration bool + Migrated bool } // Name returns the name of either Volume or PersistentVolume, one of which must not be nil. diff --git a/pkg/volume/util/metrics.go b/pkg/volume/util/metrics.go index efdd2256377..f67860806b3 100644 --- a/pkg/volume/util/metrics.go +++ b/pkg/volume/util/metrics.go @@ -18,6 +18,7 @@ package util import ( "fmt" + "strconv" "time" utilfeature "k8s.io/apiserver/pkg/util/feature" @@ -25,6 +26,7 @@ import ( "k8s.io/component-base/metrics/legacyregistry" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/volume" + "k8s.io/kubernetes/pkg/volume/util/types" ) const ( @@ -47,7 +49,7 @@ var storageOperationMetric = metrics.NewHistogramVec( Buckets: []float64{.1, .25, .5, 1, 2.5, 5, 10, 15, 25, 50, 120, 300, 600}, StabilityLevel: metrics.ALPHA, }, - []string{"volume_plugin", "operation_name", "status"}, + []string{"volume_plugin", "operation_name", "status", "migrated"}, ) var storageOperationStatusMetric = metrics.NewCounterVec( @@ -82,25 +84,29 @@ func registerMetrics() { } // OperationCompleteHook returns a hook to call when an operation is completed -func OperationCompleteHook(plugin, operationName string) func(*error) { +func OperationCompleteHook(plugin, operationName string) func(types.CompleteFuncParam) { requestTime := time.Now() - opComplete := func(err *error) { + opComplete := func(c types.CompleteFuncParam) { timeTaken := time.Since(requestTime).Seconds() // Create metric with operation name and plugin name status := statusSuccess - if *err != nil { + if *c.Err != nil { // TODO: Establish well-known error codes to be able to distinguish // user configuration errors from system errors. status = statusFailUnknown } - storageOperationMetric.WithLabelValues(plugin, operationName, status).Observe(timeTaken) + migrated := false + if c.Migrated != nil { + migrated = *c.Migrated + } + storageOperationMetric.WithLabelValues(plugin, operationName, status, strconv.FormatBool(migrated)).Observe(timeTaken) storageOperationStatusMetric.WithLabelValues(plugin, operationName, status).Inc() } return opComplete } // FSGroupCompleteHook returns a hook to call when volume recursive permission is changed -func FSGroupCompleteHook(plugin volume.VolumePlugin, spec *volume.Spec) func(*error) { +func FSGroupCompleteHook(plugin volume.VolumePlugin, spec *volume.Spec) func(types.CompleteFuncParam) { return OperationCompleteHook(GetFullQualifiedPluginNameForVolume(plugin.GetPluginName(), spec), "volume_fsgroup_recursive_apply") } diff --git a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go index 7bbe0660149..7d25b3a2288 100644 --- a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go +++ b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go @@ -21,7 +21,7 @@ import ( "testing" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" @@ -824,36 +824,38 @@ func testConcurrentOperationsNegative( /* END concurrent operations tests */ -func generateCallbackFunc(done chan<- interface{}) func() (error, error) { - return func() (error, error) { +func generateCallbackFunc(done chan<- interface{}) func() volumetypes.OperationContext { + return func() volumetypes.OperationContext { done <- true - return nil, nil + return volumetypes.NewOperationContext(nil, nil, false) } } -func generateWaitFunc(done <-chan interface{}) func() (error, error) { - return func() (error, error) { +func generateWaitFunc(done <-chan interface{}) func() volumetypes.OperationContext { + return func() volumetypes.OperationContext { <-done - return nil, nil + return volumetypes.NewOperationContext(nil, nil, false) } } -func panicFunc() (error, error) { +func panicFunc() volumetypes.OperationContext { panic("testing panic") } -func errorFunc() (error, error) { - return fmt.Errorf("placeholder1"), fmt.Errorf("placeholder2") +func errorFunc() volumetypes.OperationContext { + return volumetypes.NewOperationContext(fmt.Errorf("placeholder1"), fmt.Errorf("placeholder2"), false) } -func generateWaitWithErrorFunc(done <-chan interface{}) func() (error, error) { - return func() (error, error) { +func generateWaitWithErrorFunc(done <-chan interface{}) func() volumetypes.OperationContext { + return func() volumetypes.OperationContext { <-done - return fmt.Errorf("placeholder1"), fmt.Errorf("placeholder2") + return volumetypes.NewOperationContext(fmt.Errorf("placeholder1"), fmt.Errorf("placeholder2"), false) } } -func noopFunc() (error, error) { return nil, nil } +func noopFunc() volumetypes.OperationContext { + return volumetypes.NewOperationContext(nil, nil, false) +} func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error { backoff := wait.Backoff{ diff --git a/pkg/volume/util/operationexecutor/fakegenerator.go b/pkg/volume/util/operationexecutor/fakegenerator.go index 38d75763cd4..5513dd60db9 100644 --- a/pkg/volume/util/operationexecutor/fakegenerator.go +++ b/pkg/volume/util/operationexecutor/fakegenerator.go @@ -32,13 +32,13 @@ import ( type fakeOGCounter struct { // calledFuncs stores name and count of functions calledFuncs map[string]int - opFunc func() (error, error) + opFunc func() volumetypes.OperationContext } var _ OperationGenerator = &fakeOGCounter{} // NewFakeOGCounter returns a OperationGenerator -func NewFakeOGCounter(opFunc func() (error, error)) OperationGenerator { +func NewFakeOGCounter(opFunc func() volumetypes.OperationContext) OperationGenerator { return &fakeOGCounter{ calledFuncs: map[string]int{}, opFunc: opFunc, diff --git a/pkg/volume/util/operationexecutor/operation_executor_test.go b/pkg/volume/util/operationexecutor/operation_executor_test.go index a65ec69b71f..0c33d347eb9 100644 --- a/pkg/volume/util/operationexecutor/operation_executor_test.go +++ b/pkg/volume/util/operationexecutor/operation_executor_test.go @@ -585,63 +585,63 @@ func newFakeOperationGenerator(ch chan interface{}, quit chan interface{}) Opera } func (fopg *fakeOperationGenerator) GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) volumetypes.GeneratedOperations { - opFunc := func() (error, error) { + opFunc := func() volumetypes.OperationContext { startOperationAndBlock(fopg.ch, fopg.quit) - return nil, nil + return volumetypes.NewOperationContext(nil, nil, false) } return volumetypes.GeneratedOperations{ OperationFunc: opFunc, } } func (fopg *fakeOperationGenerator) GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, podsDir string) (volumetypes.GeneratedOperations, error) { - opFunc := func() (error, error) { + opFunc := func() volumetypes.OperationContext { startOperationAndBlock(fopg.ch, fopg.quit) - return nil, nil + return volumetypes.NewOperationContext(nil, nil, false) } return volumetypes.GeneratedOperations{ OperationFunc: opFunc, }, nil } func (fopg *fakeOperationGenerator) GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) volumetypes.GeneratedOperations { - opFunc := func() (error, error) { + opFunc := func() volumetypes.OperationContext { startOperationAndBlock(fopg.ch, fopg.quit) - return nil, nil + return volumetypes.NewOperationContext(nil, nil, false) } return volumetypes.GeneratedOperations{ OperationFunc: opFunc, } } func (fopg *fakeOperationGenerator) GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { - opFunc := func() (error, error) { + opFunc := func() volumetypes.OperationContext { startOperationAndBlock(fopg.ch, fopg.quit) - return nil, nil + return volumetypes.NewOperationContext(nil, nil, false) } return volumetypes.GeneratedOperations{ OperationFunc: opFunc, }, nil } func (fopg *fakeOperationGenerator) GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { - opFunc := func() (error, error) { + opFunc := func() volumetypes.OperationContext { startOperationAndBlock(fopg.ch, fopg.quit) - return nil, nil + return volumetypes.NewOperationContext(nil, nil, false) } return volumetypes.GeneratedOperations{ OperationFunc: opFunc, }, nil } func (fopg *fakeOperationGenerator) GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, hostutil hostutil.HostUtils) (volumetypes.GeneratedOperations, error) { - opFunc := func() (error, error) { + opFunc := func() volumetypes.OperationContext { startOperationAndBlock(fopg.ch, fopg.quit) - return nil, nil + return volumetypes.NewOperationContext(nil, nil, false) } return volumetypes.GeneratedOperations{ OperationFunc: opFunc, }, nil } func (fopg *fakeOperationGenerator) GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { - opFunc := func() (error, error) { + opFunc := func() volumetypes.OperationContext { startOperationAndBlock(fopg.ch, fopg.quit) - return nil, nil + return volumetypes.NewOperationContext(nil, nil, false) } return volumetypes.GeneratedOperations{ OperationFunc: opFunc, @@ -649,9 +649,9 @@ func (fopg *fakeOperationGenerator) GenerateVerifyControllerAttachedVolumeFunc(v } func (fopg *fakeOperationGenerator) GenerateExpandVolumeFunc(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) (volumetypes.GeneratedOperations, error) { - opFunc := func() (error, error) { + opFunc := func() volumetypes.OperationContext { startOperationAndBlock(fopg.ch, fopg.quit) - return nil, nil + return volumetypes.NewOperationContext(nil, nil, false) } return volumetypes.GeneratedOperations{ OperationFunc: opFunc, @@ -659,9 +659,9 @@ func (fopg *fakeOperationGenerator) GenerateExpandVolumeFunc(pvc *v1.PersistentV } func (fopg *fakeOperationGenerator) GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { - opFunc := func() (error, error) { + opFunc := func() volumetypes.OperationContext { startOperationAndBlock(fopg.ch, fopg.quit) - return nil, nil + return volumetypes.NewOperationContext(nil, nil, false) } return volumetypes.GeneratedOperations{ OperationFunc: opFunc, @@ -673,9 +673,9 @@ func (fopg *fakeOperationGenerator) GenerateBulkVolumeVerifyFunc( pluginNane string, volumeSpecMap map[*volume.Spec]v1.UniqueVolumeName, actualStateOfWorldAttacherUpdater ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { - opFunc := func() (error, error) { + opFunc := func() volumetypes.OperationContext { startOperationAndBlock(fopg.ch, fopg.quit) - return nil, nil + return volumetypes.NewOperationContext(nil, nil, false) } return volumetypes.GeneratedOperations{ OperationFunc: opFunc, @@ -683,9 +683,9 @@ func (fopg *fakeOperationGenerator) GenerateBulkVolumeVerifyFunc( } func (fopg *fakeOperationGenerator) GenerateMapVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { - opFunc := func() (error, error) { + opFunc := func() volumetypes.OperationContext { startOperationAndBlock(fopg.ch, fopg.quit) - return nil, nil + return volumetypes.NewOperationContext(nil, nil, false) } return volumetypes.GeneratedOperations{ OperationFunc: opFunc, @@ -693,9 +693,9 @@ func (fopg *fakeOperationGenerator) GenerateMapVolumeFunc(waitForAttachTimeout t } func (fopg *fakeOperationGenerator) GenerateUnmapVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { - opFunc := func() (error, error) { + opFunc := func() volumetypes.OperationContext { startOperationAndBlock(fopg.ch, fopg.quit) - return nil, nil + return volumetypes.NewOperationContext(nil, nil, false) } return volumetypes.GeneratedOperations{ OperationFunc: opFunc, @@ -703,9 +703,9 @@ func (fopg *fakeOperationGenerator) GenerateUnmapVolumeFunc(volumeToUnmount Moun } func (fopg *fakeOperationGenerator) GenerateUnmapDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, hostutil hostutil.HostUtils) (volumetypes.GeneratedOperations, error) { - opFunc := func() (error, error) { + opFunc := func() volumetypes.OperationContext { startOperationAndBlock(fopg.ch, fopg.quit) - return nil, nil + return volumetypes.NewOperationContext(nil, nil, false) } return volumetypes.GeneratedOperations{ OperationFunc: opFunc, diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index e59f182d65d..bdd81aaca17 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -185,7 +185,7 @@ func (og *operationGenerator) GenerateVolumesAreAttachedFunc( volumeSpecMap[volumeAttached.VolumeSpec] = volumeAttached.VolumeName } - volumesAreAttachedFunc := func() (error, error) { + volumesAreAttachedFunc := func() volumetypes.OperationContext { // For each volume plugin, pass the list of volume specs to VolumesAreAttached to check // whether the volumes are still attached. @@ -227,7 +227,8 @@ func (og *operationGenerator) GenerateVolumesAreAttachedFunc( } } - return nil, nil + // It is hard to differentiate migrated status for all volumes for verify_volumes_are_attached_per_node + return volumetypes.NewOperationContext(nil, nil, false) } return volumetypes.GeneratedOperations{ @@ -248,7 +249,7 @@ func (og *operationGenerator) GenerateBulkVolumeVerifyFunc( // function except volumeSpecMap which contains original volume names for // use with actualStateOfWorld - bulkVolumeVerifyFunc := func() (error, error) { + bulkVolumeVerifyFunc := func() volumetypes.OperationContext { attachableVolumePlugin, err := og.volumePluginMgr.FindAttachablePluginByName(pluginName) if err != nil || attachableVolumePlugin == nil { @@ -256,7 +257,7 @@ func (og *operationGenerator) GenerateBulkVolumeVerifyFunc( "BulkVerifyVolume.FindAttachablePluginBySpec failed for plugin %q with: %v", pluginName, err) - return nil, nil + return volumetypes.NewOperationContext(nil, nil, false) } volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher() @@ -266,19 +267,19 @@ func (og *operationGenerator) GenerateBulkVolumeVerifyFunc( "BulkVerifyVolume.NewAttacher failed for getting plugin %q with: %v", attachableVolumePlugin, newAttacherErr) - return nil, nil + return volumetypes.NewOperationContext(nil, nil, false) } bulkVolumeVerifier, ok := volumeAttacher.(volume.BulkVolumeVerifier) if !ok { klog.Errorf("BulkVerifyVolume failed to type assert attacher %q", bulkVolumeVerifier) - return nil, nil + return volumetypes.NewOperationContext(nil, nil, false) } attached, bulkAttachErr := bulkVolumeVerifier.BulkVerifyVolumes(pluginNodeVolumes) if bulkAttachErr != nil { klog.Errorf("BulkVerifyVolume.BulkVerifyVolumes Error checking volumes are attached with %v", bulkAttachErr) - return nil, nil + return volumetypes.NewOperationContext(nil, nil, false) } for nodeName, volumeSpecs := range pluginNodeVolumes { @@ -303,7 +304,8 @@ func (og *operationGenerator) GenerateBulkVolumeVerifyFunc( } } - return nil, nil + // It is hard to differentiate migrated status for all volumes for verify_volumes_are_attached + return volumetypes.NewOperationContext(nil, nil, false) } return volumetypes.GeneratedOperations{ @@ -319,16 +321,21 @@ func (og *operationGenerator) GenerateAttachVolumeFunc( volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) volumetypes.GeneratedOperations { - attachVolumeFunc := func() (error, error) { + attachVolumeFunc := func() volumetypes.OperationContext { attachableVolumePlugin, err := og.volumePluginMgr.FindAttachablePluginBySpec(volumeToAttach.VolumeSpec) + + migrated := getMigratedStatusBySpec(volumeToAttach.VolumeSpec) + if err != nil || attachableVolumePlugin == nil { - return volumeToAttach.GenerateError("AttachVolume.FindAttachablePluginBySpec failed", err) + eventErr, detailedErr := volumeToAttach.GenerateError("AttachVolume.FindAttachablePluginBySpec failed", err) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher() if newAttacherErr != nil { - return volumeToAttach.GenerateError("AttachVolume.NewAttacher failed", newAttacherErr) + eventErr, detailedErr := volumeToAttach.GenerateError("AttachVolume.NewAttacher failed", newAttacherErr) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } // Execute attach @@ -349,7 +356,8 @@ func (og *operationGenerator) GenerateAttachVolumeFunc( } // On failure, return error. Caller will log and retry. - return volumeToAttach.GenerateError("AttachVolume.Attach failed", attachErr) + eventErr, detailedErr := volumeToAttach.GenerateError("AttachVolume.Attach failed", attachErr) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } // Successful attach event is useful for user debugging @@ -364,10 +372,11 @@ func (og *operationGenerator) GenerateAttachVolumeFunc( v1.UniqueVolumeName(""), volumeToAttach.VolumeSpec, volumeToAttach.NodeName, devicePath) if addVolumeNodeErr != nil { // On failure, return error. Caller will log and retry. - return volumeToAttach.GenerateError("AttachVolume.MarkVolumeAsAttached failed", addVolumeNodeErr) + eventErr, detailedErr := volumeToAttach.GenerateError("AttachVolume.MarkVolumeAsAttached failed", addVolumeNodeErr) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } - return nil, nil + return volumetypes.NewOperationContext(nil, nil, migrated) } eventRecorderFunc := func(err *error) { @@ -452,7 +461,7 @@ func (og *operationGenerator) GenerateDetachVolumeFunc( return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.NewDetacher failed", err) } - getVolumePluginMgrFunc := func() (error, error) { + detachVolumeFunc := func() volumetypes.OperationContext { var err error if verifySafeToDetach { err = og.verifyVolumeIsSafeToDetach(volumeToDetach) @@ -460,11 +469,15 @@ func (og *operationGenerator) GenerateDetachVolumeFunc( if err == nil { err = volumeDetacher.Detach(volumeName, volumeToDetach.NodeName) } + + migrated := getMigratedStatusBySpec(volumeToDetach.VolumeSpec) + if err != nil { // On failure, add volume back to ReportAsAttached list actualStateOfWorld.AddVolumeToReportAsAttached( volumeToDetach.VolumeName, volumeToDetach.NodeName) - return volumeToDetach.GenerateError("DetachVolume.Detach failed", err) + eventErr, detailedErr := volumeToDetach.GenerateError("DetachVolume.Detach failed", err) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } klog.Infof(volumeToDetach.GenerateMsgDetailed("DetachVolume.Detach succeeded", "")) @@ -473,12 +486,12 @@ func (og *operationGenerator) GenerateDetachVolumeFunc( actualStateOfWorld.MarkVolumeAsDetached( volumeToDetach.VolumeName, volumeToDetach.NodeName) - return nil, nil + return volumetypes.NewOperationContext(nil, nil, migrated) } return volumetypes.GeneratedOperations{ OperationName: "volume_detach", - OperationFunc: getVolumePluginMgrFunc, + OperationFunc: detachVolumeFunc, CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(pluginName, volumeToDetach.VolumeSpec), "volume_detach"), EventRecorderFunc: nil, // nil because we do not want to generate event on error }, nil @@ -497,16 +510,21 @@ func (og *operationGenerator) GenerateMountVolumeFunc( volumePluginName = volumePlugin.GetPluginName() } - mountVolumeFunc := func() (error, error) { + mountVolumeFunc := func() volumetypes.OperationContext { // Get mounter plugin volumePlugin, err := og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec) + + migrated := getMigratedStatusBySpec(volumeToMount.VolumeSpec) + if err != nil || volumePlugin == nil { - return volumeToMount.GenerateError("MountVolume.FindPluginBySpec failed", err) + eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.FindPluginBySpec failed", err) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } affinityErr := checkNodeAffinity(og, volumeToMount) if affinityErr != nil { - return volumeToMount.GenerateError("MountVolume.NodeAffinity check failed", affinityErr) + eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.NodeAffinity check failed", affinityErr) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } volumeMounter, newMounterErr := volumePlugin.NewMounter( @@ -514,14 +532,15 @@ func (og *operationGenerator) GenerateMountVolumeFunc( volumeToMount.Pod, volume.VolumeOptions{}) if newMounterErr != nil { - return volumeToMount.GenerateError("MountVolume.NewMounter initialization failed", newMounterErr) - + eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.NewMounter initialization failed", newMounterErr) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } mountCheckError := checkMountOptionSupport(og, volumeToMount, volumePlugin) if mountCheckError != nil { - return volumeToMount.GenerateError("MountVolume.MountOptionSupport check failed", mountCheckError) + eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.MountOptionSupport check failed", mountCheckError) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } // Get attacher, if possible @@ -559,7 +578,8 @@ func (og *operationGenerator) GenerateMountVolumeFunc( volumeToMount.VolumeSpec, devicePath, volumeToMount.Pod, waitForAttachTimeout) if err != nil { // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateError("MountVolume.WaitForAttach failed", err) + eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.WaitForAttach failed", err) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } klog.Infof(volumeToMount.GenerateMsgDetailed("MountVolume.WaitForAttach succeeded", fmt.Sprintf("DevicePath %q", devicePath))) @@ -576,7 +596,8 @@ func (og *operationGenerator) GenerateMountVolumeFunc( volumeDeviceMounter.GetDeviceMountPath(volumeToMount.VolumeSpec) if err != nil { // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateError("MountVolume.GetDeviceMountPath failed", err) + eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.GetDeviceMountPath failed", err) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } // Mount device to global mount path @@ -588,7 +609,8 @@ func (og *operationGenerator) GenerateMountVolumeFunc( og.checkForFailedMount(volumeToMount, err) og.markDeviceErrorState(volumeToMount, devicePath, deviceMountPath, err, actualStateOfWorld) // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateError("MountVolume.MountDevice failed", err) + eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.MountDevice failed", err) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } klog.Infof(volumeToMount.GenerateMsgDetailed("MountVolume.MountDevice succeeded", fmt.Sprintf("device mount path %q", deviceMountPath))) @@ -598,7 +620,8 @@ func (og *operationGenerator) GenerateMountVolumeFunc( volumeToMount.VolumeName, devicePath, deviceMountPath) if markDeviceMountedErr != nil { // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateError("MountVolume.MarkDeviceAsMounted failed", markDeviceMountedErr) + eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.MarkDeviceAsMounted failed", markDeviceMountedErr) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } // If volume expansion is performed after MountDevice but before SetUp then @@ -626,7 +649,8 @@ func (og *operationGenerator) GenerateMountVolumeFunc( "MountVolume.MountDevice failed to mark volume as uncertain", markDeviceUncertainErr.Error())) } - return volumeToMount.GenerateError("MountVolume.MountDevice failed while expanding volume", resizeError) + eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.MountDevice failed while expanding volume", resizeError) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } } @@ -635,7 +659,8 @@ func (og *operationGenerator) GenerateMountVolumeFunc( err = fmt.Errorf( "verify that your node machine has the required components before attempting to mount this volume type. %s", canMountErr) - return volumeToMount.GenerateError("MountVolume.CanMount failed", err) + eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.CanMount failed", err) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } } @@ -661,7 +686,8 @@ func (og *operationGenerator) GenerateMountVolumeFunc( og.checkForFailedMount(volumeToMount, mountErr) og.markVolumeErrorState(volumeToMount, markOpts, mountErr, actualStateOfWorld) // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateError("MountVolume.SetUp failed", mountErr) + eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.SetUp failed", mountErr) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } _, detailedMsg := volumeToMount.GenerateMsg("MountVolume.SetUp succeeded", "") @@ -681,17 +707,19 @@ func (og *operationGenerator) GenerateMountVolumeFunc( _, resizeError = og.nodeExpandVolume(volumeToMount, actualStateOfWorld, resizeOptions) if resizeError != nil { klog.Errorf("MountVolume.NodeExpandVolume failed with %v", resizeError) - return volumeToMount.GenerateError("MountVolume.Setup failed while expanding volume", resizeError) + eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.Setup failed while expanding volume", resizeError) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } } markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted(markOpts) if markVolMountedErr != nil { // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateError("MountVolume.MarkVolumeAsMounted failed", markVolMountedErr) + eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.MarkVolumeAsMounted failed", markVolMountedErr) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } - return nil, nil + return volumetypes.NewOperationContext(nil, nil, migrated) } eventRecorderFunc := func(err *error) { @@ -778,20 +806,24 @@ func (og *operationGenerator) GenerateUnmountVolumeFunc( return volumetypes.GeneratedOperations{}, volumeToUnmount.GenerateErrorDetailed("UnmountVolume.NewUnmounter failed", newUnmounterErr) } - unmountVolumeFunc := func() (error, error) { + unmountVolumeFunc := func() volumetypes.OperationContext { subpather := og.volumePluginMgr.Host.GetSubpather() + migrated := getMigratedStatusBySpec(volumeToUnmount.VolumeSpec) + // Remove all bind-mounts for subPaths podDir := filepath.Join(podsDir, string(volumeToUnmount.PodUID)) if err := subpather.CleanSubPaths(podDir, volumeToUnmount.InnerVolumeSpecName); err != nil { - return volumeToUnmount.GenerateError("error cleaning subPath mounts", err) + eventErr, detailedErr := volumeToUnmount.GenerateError("error cleaning subPath mounts", err) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } // Execute unmount unmountErr := volumeUnmounter.TearDown() if unmountErr != nil { // On failure, return error. Caller will log and retry. - return volumeToUnmount.GenerateError("UnmountVolume.TearDown failed", unmountErr) + eventErr, detailedErr := volumeToUnmount.GenerateError("UnmountVolume.TearDown failed", unmountErr) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } klog.Infof( @@ -812,7 +844,7 @@ func (og *operationGenerator) GenerateUnmountVolumeFunc( klog.Errorf(volumeToUnmount.GenerateErrorDetailed("UnmountVolume.MarkVolumeAsUnmounted failed", markVolMountedErr).Error()) } - return nil, nil + return volumetypes.NewOperationContext(nil, nil, migrated) } return volumetypes.GeneratedOperations{ @@ -844,14 +876,18 @@ func (og *operationGenerator) GenerateUnmountDeviceFunc( return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmountDevice.NewDeviceMounter failed", err) } - unmountDeviceFunc := func() (error, error) { + unmountDeviceFunc := func() volumetypes.OperationContext { + + migrated := getMigratedStatusBySpec(deviceToDetach.VolumeSpec) + //deviceMountPath := deviceToDetach.DeviceMountPath deviceMountPath, err := volumeDeviceMounter.GetDeviceMountPath(deviceToDetach.VolumeSpec) if err != nil { // On failure other than "does not exist", return error. Caller will log and retry. if !strings.Contains(err.Error(), "does not exist") { - return deviceToDetach.GenerateError("GetDeviceMountPath failed", err) + eventErr, detailedErr := deviceToDetach.GenerateError("GetDeviceMountPath failed", err) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } // If the mount path could not be found, don't fail the unmount, but instead log a warning and proceed, // using the value from deviceToDetach.DeviceMountPath, so that the device can be marked as unmounted @@ -865,13 +901,15 @@ func (og *operationGenerator) GenerateUnmountDeviceFunc( if err == nil { err = fmt.Errorf("the device mount path %q is still mounted by other references %v", deviceMountPath, refs) } - return deviceToDetach.GenerateError("GetDeviceMountRefs check failed", err) + eventErr, detailedErr := deviceToDetach.GenerateError("GetDeviceMountRefs check failed", err) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } // Execute unmount unmountDeviceErr := volumeDeviceUnmounter.UnmountDevice(deviceMountPath) if unmountDeviceErr != nil { // On failure, return error. Caller will log and retry. - return deviceToDetach.GenerateError("UnmountDevice failed", unmountDeviceErr) + eventErr, detailedErr := deviceToDetach.GenerateError("UnmountDevice failed", unmountDeviceErr) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } // Before logging that UnmountDevice succeeded and moving on, // use hostutil.PathIsDevice to check if the path is a device, @@ -879,13 +917,14 @@ func (og *operationGenerator) GenerateUnmountDeviceFunc( // else on the system. Retry if it returns true. deviceOpened, deviceOpenedErr := isDeviceOpened(deviceToDetach, hostutil) if deviceOpenedErr != nil { - return nil, deviceOpenedErr + return volumetypes.NewOperationContext(nil, deviceOpenedErr, migrated) } // The device is still in use elsewhere. Caller will log and retry. if deviceOpened { - return deviceToDetach.GenerateError( + eventErr, detailedErr := deviceToDetach.GenerateError( "UnmountDevice failed", goerrors.New("the device is in use when it was no longer expected to be in use")) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } klog.Infof(deviceToDetach.GenerateMsg("UnmountDevice succeeded", "")) @@ -895,10 +934,11 @@ func (og *operationGenerator) GenerateUnmountDeviceFunc( deviceToDetach.VolumeName) if markDeviceUnmountedErr != nil { // On failure, return error. Caller will log and retry. - return deviceToDetach.GenerateError("MarkDeviceAsUnmounted failed", markDeviceUnmountedErr) + eventErr, detailedErr := deviceToDetach.GenerateError("MarkDeviceAsUnmounted failed", markDeviceUnmountedErr) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } - return nil, nil + return volumetypes.NewOperationContext(nil, nil, migrated) } return volumetypes.GeneratedOperations{ @@ -958,15 +998,19 @@ func (og *operationGenerator) GenerateMapVolumeFunc( volumeAttacher, _ = attachableVolumePlugin.NewAttacher() } - mapVolumeFunc := func() (simpleErr error, detailedErr error) { + mapVolumeFunc := func() (operationContext volumetypes.OperationContext) { var devicePath string var stagingPath string + + migrated := getMigratedStatusBySpec(volumeToMount.VolumeSpec) + // Set up global map path under the given plugin directory using symbolic link globalMapPath, err := blockVolumeMapper.GetGlobalMapPath(volumeToMount.VolumeSpec) if err != nil { // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateError("MapVolume.GetGlobalMapPath failed", err) + eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.GetGlobalMapPath failed", err) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } if volumeAttacher != nil { // Wait for attachable volumes to finish attaching @@ -976,7 +1020,8 @@ func (og *operationGenerator) GenerateMapVolumeFunc( volumeToMount.VolumeSpec, volumeToMount.DevicePath, volumeToMount.Pod, waitForAttachTimeout) if err != nil { // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateError("MapVolume.WaitForAttach failed", err) + eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.WaitForAttach failed", err) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } klog.Infof(volumeToMount.GenerateMsgDetailed("MapVolume.WaitForAttach succeeded", fmt.Sprintf("DevicePath %q", devicePath))) @@ -989,7 +1034,8 @@ func (og *operationGenerator) GenerateMapVolumeFunc( if mapErr != nil { og.markDeviceErrorState(volumeToMount, devicePath, globalMapPath, mapErr, actualStateOfWorld) // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateError("MapVolume.SetUpDevice failed", mapErr) + eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.SetUpDevice failed", mapErr) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } } @@ -999,7 +1045,8 @@ func (og *operationGenerator) GenerateMapVolumeFunc( volumeToMount.VolumeName, markedDevicePath, globalMapPath) if markDeviceMappedErr != nil { // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateError("MapVolume.MarkDeviceAsMounted failed", markDeviceMappedErr) + eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.MarkDeviceAsMounted failed", markDeviceMappedErr) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } markVolumeOpts := MarkVolumeOpts{ @@ -1020,14 +1067,15 @@ func (og *operationGenerator) GenerateMapVolumeFunc( if mapErr != nil { // On failure, return error. Caller will log and retry. og.markVolumeErrorState(volumeToMount, markVolumeOpts, mapErr, actualStateOfWorld) - return volumeToMount.GenerateError("MapVolume.MapPodDevice failed", mapErr) + eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.MapPodDevice failed", mapErr) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } // From now on, the volume is mapped. Mark it as uncertain on error, // so it is is unmapped when corresponding pod is deleted. defer func() { - if simpleErr != nil { - errText := simpleErr.Error() + if operationContext.EventErr != nil { + errText := operationContext.EventErr.Error() og.markVolumeErrorState(volumeToMount, markVolumeOpts, volumetypes.NewUncertainProgressError(errText), actualStateOfWorld) } }() @@ -1038,7 +1086,8 @@ func (og *operationGenerator) GenerateMapVolumeFunc( devicePath = pluginDevicePath } if len(devicePath) == 0 { - return volumeToMount.GenerateError("MapVolume failed", goerrors.New("device path of the volume is empty")) + eventErr, detailedErr := volumeToMount.GenerateError("MapVolume failed", goerrors.New("device path of the volume is empty")) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } } @@ -1048,12 +1097,14 @@ func (og *operationGenerator) GenerateMapVolumeFunc( // AttachFileDevice will fail. If kubelet is not containerized, eval it anyway. kvh, ok := og.GetVolumePluginMgr().Host.(volume.KubeletVolumeHost) if !ok { - return volumeToMount.GenerateError("MapVolume type assertion error", fmt.Errorf("volume host does not implement KubeletVolumeHost interface")) + eventErr, detailedErr := volumeToMount.GenerateError("MapVolume type assertion error", fmt.Errorf("volume host does not implement KubeletVolumeHost interface")) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } hu := kvh.GetHostUtil() devicePath, err = hu.EvalHostSymlinks(devicePath) if err != nil { - return volumeToMount.GenerateError("MapVolume.EvalHostSymlinks failed", err) + eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.EvalHostSymlinks failed", err) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } // Update actual state of world with the devicePath again, if devicePath has changed from markedDevicePath @@ -1063,7 +1114,8 @@ func (og *operationGenerator) GenerateMapVolumeFunc( volumeToMount.VolumeName, devicePath, globalMapPath) if markDeviceMappedErr != nil { // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateError("MapVolume.MarkDeviceAsMounted failed", markDeviceMappedErr) + eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.MarkDeviceAsMounted failed", markDeviceMappedErr) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } } @@ -1072,7 +1124,8 @@ func (og *operationGenerator) GenerateMapVolumeFunc( mapErr := util.MapBlockVolume(og.blkUtil, devicePath, globalMapPath, volumeMapPath, volName, volumeToMount.Pod.UID) if mapErr != nil { // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateError("MapVolume.MapBlockVolume failed", mapErr) + eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.MapBlockVolume failed", mapErr) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } // Device mapping for global map path succeeded @@ -1095,16 +1148,18 @@ func (og *operationGenerator) GenerateMapVolumeFunc( _, resizeError := og.nodeExpandVolume(volumeToMount, actualStateOfWorld, resizeOptions) if resizeError != nil { klog.Errorf("MapVolume.NodeExpandVolume failed with %v", resizeError) - return volumeToMount.GenerateError("MapVolume.MarkVolumeAsMounted failed while expanding volume", resizeError) + eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.MarkVolumeAsMounted failed while expanding volume", resizeError) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts) if markVolMountedErr != nil { // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateError("MapVolume.MarkVolumeAsMounted failed", markVolMountedErr) + eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.MarkVolumeAsMounted failed", markVolMountedErr) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } - return nil, nil + return volumetypes.NewOperationContext(nil, nil, migrated) } eventRecorderFunc := func(err *error) { @@ -1144,7 +1199,10 @@ func (og *operationGenerator) GenerateUnmapVolumeFunc( return volumetypes.GeneratedOperations{}, volumeToUnmount.GenerateErrorDetailed("UnmapVolume.NewUnmapper failed", newUnmapperErr) } - unmapVolumeFunc := func() (error, error) { + unmapVolumeFunc := func() volumetypes.OperationContext { + + migrated := getMigratedStatusBySpec(volumeToUnmount.VolumeSpec) + // pods/{podUid}/volumeDevices/{escapeQualifiedPluginName}/{volumeName} podDeviceUnmapPath, volName := blockVolumeUnmapper.GetPodDeviceMapPath() // plugins/kubernetes.io/{PluginName}/volumeDevices/{volumePluginDependentPath}/{podUID} @@ -1154,7 +1212,8 @@ func (og *operationGenerator) GenerateUnmapVolumeFunc( unmapErr := util.UnmapBlockVolume(og.blkUtil, globalUnmapPath, podDeviceUnmapPath, volName, volumeToUnmount.PodUID) if unmapErr != nil { // On failure, return error. Caller will log and retry. - return volumeToUnmount.GenerateError("UnmapVolume.UnmapBlockVolume failed", unmapErr) + eventErr, detailedErr := volumeToUnmount.GenerateError("UnmapVolume.UnmapBlockVolume failed", unmapErr) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } // Call UnmapPodDevice if blockVolumeUnmapper implements CustomBlockVolumeUnmapper @@ -1163,7 +1222,8 @@ func (og *operationGenerator) GenerateUnmapVolumeFunc( unmapErr = customBlockVolumeUnmapper.UnmapPodDevice() if unmapErr != nil { // On failure, return error. Caller will log and retry. - return volumeToUnmount.GenerateError("UnmapVolume.UnmapPodDevice failed", unmapErr) + eventErr, detailedErr := volumeToUnmount.GenerateError("UnmapVolume.UnmapPodDevice failed", unmapErr) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } } @@ -1185,7 +1245,7 @@ func (og *operationGenerator) GenerateUnmapVolumeFunc( klog.Errorf(volumeToUnmount.GenerateErrorDetailed("UnmapVolume.MarkVolumeAsUnmounted failed", markVolUnmountedErr).Error()) } - return nil, nil + return volumetypes.NewOperationContext(nil, nil, migrated) } return volumetypes.GeneratedOperations{ @@ -1228,7 +1288,8 @@ func (og *operationGenerator) GenerateUnmapDeviceFunc( return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.NewUnmapper failed", newUnmapperErr) } - unmapDeviceFunc := func() (error, error) { + unmapDeviceFunc := func() volumetypes.OperationContext { + migrated := getMigratedStatusBySpec(deviceToDetach.VolumeSpec) // Search under globalMapPath dir if all symbolic links from pods have been removed already. // If symbolic links are there, pods may still refer the volume. globalMapPath := deviceToDetach.DeviceMountPath @@ -1238,12 +1299,14 @@ func (og *operationGenerator) GenerateUnmapDeviceFunc( // Looks like SetupDevice did not complete. Fall through to TearDownDevice and mark the device as unmounted. refs = nil } else { - return deviceToDetach.GenerateError("UnmapDevice.GetDeviceBindMountRefs check failed", err) + eventErr, detailedErr := deviceToDetach.GenerateError("UnmapDevice.GetDeviceBindMountRefs check failed", err) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } } if len(refs) > 0 { err = fmt.Errorf("the device %q is still referenced from other Pods %v", globalMapPath, refs) - return deviceToDetach.GenerateError("UnmapDevice failed", err) + eventErr, detailedErr := deviceToDetach.GenerateError("UnmapDevice failed", err) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } // Call TearDownDevice if blockVolumeUnmapper implements CustomBlockVolumeUnmapper @@ -1252,7 +1315,8 @@ func (og *operationGenerator) GenerateUnmapDeviceFunc( unmapErr := customBlockVolumeUnmapper.TearDownDevice(globalMapPath, deviceToDetach.DevicePath) if unmapErr != nil { // On failure, return error. Caller will log and retry. - return deviceToDetach.GenerateError("UnmapDevice.TearDownDevice failed", unmapErr) + eventErr, detailedErr := deviceToDetach.GenerateError("UnmapDevice.TearDownDevice failed", unmapErr) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } } @@ -1261,7 +1325,8 @@ func (og *operationGenerator) GenerateUnmapDeviceFunc( removeMapPathErr := og.blkUtil.RemoveMapPath(globalMapPath) if removeMapPathErr != nil { // On failure, return error. Caller will log and retry. - return deviceToDetach.GenerateError("UnmapDevice.RemoveMapPath failed", removeMapPathErr) + eventErr, detailedErr := deviceToDetach.GenerateError("UnmapDevice.RemoveMapPath failed", removeMapPathErr) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } // Before logging that UnmapDevice succeeded and moving on, @@ -1270,13 +1335,14 @@ func (og *operationGenerator) GenerateUnmapDeviceFunc( // else on the system. Retry if it returns true. deviceOpened, deviceOpenedErr := isDeviceOpened(deviceToDetach, hostutil) if deviceOpenedErr != nil { - return nil, deviceOpenedErr + return volumetypes.NewOperationContext(nil, deviceOpenedErr, migrated) } // The device is still in use elsewhere. Caller will log and retry. if deviceOpened { - return deviceToDetach.GenerateError( + eventErr, detailedErr := deviceToDetach.GenerateError( "UnmapDevice failed", fmt.Errorf("the device is in use when it was no longer expected to be in use")) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } klog.Infof(deviceToDetach.GenerateMsgDetailed("UnmapDevice succeeded", "")) @@ -1286,10 +1352,11 @@ func (og *operationGenerator) GenerateUnmapDeviceFunc( deviceToDetach.VolumeName) if markDeviceUnmountedErr != nil { // On failure, return error. Caller will log and retry. - return deviceToDetach.GenerateError("MarkDeviceAsUnmounted failed", markDeviceUnmountedErr) + eventErr, detailedErr := deviceToDetach.GenerateError("MarkDeviceAsUnmounted failed", markDeviceUnmountedErr) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } - return nil, nil + return volumetypes.NewOperationContext(nil, nil, migrated) } return volumetypes.GeneratedOperations{ @@ -1310,7 +1377,8 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc( return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("VerifyControllerAttachedVolume.FindPluginBySpec failed", err) } - verifyControllerAttachedVolumeFunc := func() (error, error) { + verifyControllerAttachedVolumeFunc := func() volumetypes.OperationContext { + migrated := getMigratedStatusBySpec(volumeToMount.VolumeSpec) if !volumeToMount.PluginIsAttachable { // If the volume does not implement the attacher interface, it is // assumed to be attached and the actual state of the world is @@ -1320,10 +1388,11 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc( volumeToMount.VolumeName, volumeToMount.VolumeSpec, nodeName, "" /* devicePath */) if addVolumeNodeErr != nil { // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateError("VerifyControllerAttachedVolume.MarkVolumeAsAttachedByUniqueVolumeName failed", addVolumeNodeErr) + eventErr, detailedErr := volumeToMount.GenerateError("VerifyControllerAttachedVolume.MarkVolumeAsAttachedByUniqueVolumeName failed", addVolumeNodeErr) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } - return nil, nil + return volumetypes.NewOperationContext(nil, nil, migrated) } if !volumeToMount.ReportedInUse { @@ -1333,21 +1402,24 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc( // periodically by kubelet, so it may take as much as 10 seconds // before this clears. // Issue #28141 to enable on demand status updates. - return volumeToMount.GenerateError("Volume has not been added to the list of VolumesInUse in the node's volume status", nil) + eventErr, detailedErr := volumeToMount.GenerateError("Volume has not been added to the list of VolumesInUse in the node's volume status", nil) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } // Fetch current node object node, fetchErr := og.kubeClient.CoreV1().Nodes().Get(context.TODO(), string(nodeName), metav1.GetOptions{}) if fetchErr != nil { // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateError("VerifyControllerAttachedVolume failed fetching node from API server", fetchErr) + eventErr, detailedErr := volumeToMount.GenerateError("VerifyControllerAttachedVolume failed fetching node from API server", fetchErr) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } if node == nil { // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateError( + eventErr, detailedErr := volumeToMount.GenerateError( "VerifyControllerAttachedVolume failed", fmt.Errorf("node object retrieved from API server is nil")) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } for _, attachedVolume := range node.Status.VolumesAttached { @@ -1357,14 +1429,16 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc( klog.Infof(volumeToMount.GenerateMsgDetailed("Controller attach succeeded", fmt.Sprintf("device path: %q", attachedVolume.DevicePath))) if addVolumeNodeErr != nil { // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateError("VerifyControllerAttachedVolume.MarkVolumeAsAttached failed", addVolumeNodeErr) + eventErr, detailedErr := volumeToMount.GenerateError("VerifyControllerAttachedVolume.MarkVolumeAsAttached failed", addVolumeNodeErr) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } - return nil, nil + return volumetypes.NewOperationContext(nil, nil, migrated) } } // Volume not attached, return error. Caller will log and retry. - return volumeToMount.GenerateError("Volume not attached according to node status", nil) + eventErr, detailedErr := volumeToMount.GenerateError("Volume not attached according to node status", nil) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } return volumetypes.GeneratedOperations{ @@ -1425,7 +1499,10 @@ func (og *operationGenerator) GenerateExpandVolumeFunc( return volumetypes.GeneratedOperations{}, fmt.Errorf("can not find plugin for expanding volume: %q", util.GetPersistentVolumeClaimQualifiedName(pvc)) } - expandVolumeFunc := func() (error, error) { + expandVolumeFunc := func() volumetypes.OperationContext { + + migrated := false + newSize := pvc.Spec.Resources.Requests[v1.ResourceStorage] statusSize := pvc.Status.Capacity[v1.ResourceStorage] pvSize := pv.Spec.Capacity[v1.ResourceStorage] @@ -1436,7 +1513,7 @@ func (og *operationGenerator) GenerateExpandVolumeFunc( statusSize) if expandErr != nil { detailedErr := fmt.Errorf("error expanding volume %q of plugin %q: %v", util.GetPersistentVolumeClaimQualifiedName(pvc), volumePlugin.GetPluginName(), expandErr) - return detailedErr, detailedErr + return volumetypes.NewOperationContext(detailedErr, detailedErr, migrated) } klog.Infof("ExpandVolume succeeded for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc)) @@ -1448,7 +1525,7 @@ func (og *operationGenerator) GenerateExpandVolumeFunc( updateErr := util.UpdatePVSize(pv, newSize, og.kubeClient) if updateErr != nil { detailedErr := fmt.Errorf("error updating PV spec capacity for volume %q with : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), updateErr) - return detailedErr, detailedErr + return volumetypes.NewOperationContext(detailedErr, detailedErr, migrated) } klog.Infof("ExpandVolume.UpdatePV succeeded for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc)) @@ -1463,7 +1540,7 @@ func (og *operationGenerator) GenerateExpandVolumeFunc( err := util.MarkResizeFinished(pvc, newSize, og.kubeClient) if err != nil { detailedErr := fmt.Errorf("error marking pvc %s as resized : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err) - return detailedErr, detailedErr + return volumetypes.NewOperationContext(detailedErr, detailedErr, migrated) } successMsg := fmt.Sprintf("ExpandVolume succeeded for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc)) og.recorder.Eventf(pvc, v1.EventTypeNormal, kevents.VolumeResizeSuccess, successMsg) @@ -1472,10 +1549,10 @@ func (og *operationGenerator) GenerateExpandVolumeFunc( if err != nil { detailedErr := fmt.Errorf("error updating pvc %s condition for fs resize : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err) klog.Warning(detailedErr) - return nil, nil + return volumetypes.NewOperationContext(nil, nil, migrated) } } - return nil, nil + return volumetypes.NewOperationContext(nil, nil, migrated) } eventRecorderFunc := func(err *error) { @@ -1502,16 +1579,19 @@ func (og *operationGenerator) GenerateExpandInUseVolumeFunc( return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("NodeExpandVolume.FindPluginBySpec failed", err) } - fsResizeFunc := func() (error, error) { + fsResizeFunc := func() volumetypes.OperationContext { var resizeDone bool - var simpleErr, detailedErr error + var eventErr, detailedErr error + migrated := false + resizeOptions := volume.NodeResizeOptions{ VolumeSpec: volumeToMount.VolumeSpec, DevicePath: volumeToMount.DevicePath, } fsVolume, err := util.CheckVolumeModeFilesystem(volumeToMount.VolumeSpec) if err != nil { - return volumeToMount.GenerateError("NodeExpandvolume.CheckVolumeModeFilesystem failed", err) + eventErr, detailedErr = volumeToMount.GenerateError("NodeExpandvolume.CheckVolumeModeFilesystem failed", err) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } if fsVolume { @@ -1520,7 +1600,8 @@ func (og *operationGenerator) GenerateExpandInUseVolumeFunc( volumeToMount.Pod, volume.VolumeOptions{}) if newMounterErr != nil { - return volumeToMount.GenerateError("NodeExpandVolume.NewMounter initialization failed", newMounterErr) + eventErr, detailedErr = volumeToMount.GenerateError("NodeExpandVolume.NewMounter initialization failed", newMounterErr) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } resizeOptions.DeviceMountPath = volumeMounter.GetPath() @@ -1534,7 +1615,8 @@ func (og *operationGenerator) GenerateExpandInUseVolumeFunc( if volumeDeviceMounter != nil { deviceStagePath, err := volumeDeviceMounter.GetDeviceMountPath(volumeToMount.VolumeSpec) if err != nil { - return volumeToMount.GenerateError("NodeExpandVolume.GetDeviceMountPath failed", err) + eventErr, detailedErr = volumeToMount.GenerateError("NodeExpandVolume.GetDeviceMountPath failed", err) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } resizeOptions.DeviceStagePath = deviceStagePath } @@ -1543,11 +1625,13 @@ func (og *operationGenerator) GenerateExpandInUseVolumeFunc( blockVolumePlugin, err := og.volumePluginMgr.FindMapperPluginBySpec(volumeToMount.VolumeSpec) if err != nil { - return volumeToMount.GenerateError("MapVolume.FindMapperPluginBySpec failed", err) + eventErr, detailedErr = volumeToMount.GenerateError("MapVolume.FindMapperPluginBySpec failed", err) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } if blockVolumePlugin == nil { - return volumeToMount.GenerateError("MapVolume.FindMapperPluginBySpec failed to find BlockVolumeMapper plugin. Volume plugin is nil.", nil) + eventErr, detailedErr = volumeToMount.GenerateError("MapVolume.FindMapperPluginBySpec failed to find BlockVolumeMapper plugin. Volume plugin is nil.", nil) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } blockVolumeMapper, newMapperErr := blockVolumePlugin.NewBlockVolumeMapper( @@ -1555,7 +1639,8 @@ func (og *operationGenerator) GenerateExpandInUseVolumeFunc( volumeToMount.Pod, volume.VolumeOptions{}) if newMapperErr != nil { - return volumeToMount.GenerateError("MapVolume.NewBlockVolumeMapper initialization failed", newMapperErr) + eventErr, detailedErr = volumeToMount.GenerateError("MapVolume.NewBlockVolumeMapper initialization failed", newMapperErr) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } // if plugin supports custom mappers lets add DeviceStagePath @@ -1566,16 +1651,17 @@ func (og *operationGenerator) GenerateExpandInUseVolumeFunc( // if we are doing online expansion then volume is already published resizeOptions.CSIVolumePhase = volume.CSIVolumePublished - resizeDone, simpleErr, detailedErr = og.doOnlineExpansion(volumeToMount, actualStateOfWorld, resizeOptions) - if simpleErr != nil || detailedErr != nil { - return simpleErr, detailedErr + resizeDone, eventErr, detailedErr = og.doOnlineExpansion(volumeToMount, actualStateOfWorld, resizeOptions) + if eventErr != nil || detailedErr != nil { + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } if resizeDone { - return nil, nil + return volumetypes.NewOperationContext(nil, nil, migrated) } // This is a placeholder error - we should NEVER reach here. err = fmt.Errorf("volume resizing failed for unknown reason") - return volumeToMount.GenerateError("NodeExpandVolume.NodeExpandVolume failed to resize volume", err) + eventErr, detailedErr = volumeToMount.GenerateError("NodeExpandVolume.NodeExpandVolume failed to resize volume", err) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } eventRecorderFunc := func(err *error) { @@ -1762,3 +1848,11 @@ func findDetachablePluginBySpec(spec *volume.Spec, pm *volume.VolumePluginMgr) ( } return nil, nil } + +func getMigratedStatusBySpec(spec *volume.Spec) bool { + migrated := false + if spec != nil { + migrated = spec.Migrated + } + return migrated +} diff --git a/pkg/volume/util/operationexecutor/operation_generator_test.go b/pkg/volume/util/operationexecutor/operation_generator_test.go index b0d5e62758f..7ed4b303638 100644 --- a/pkg/volume/util/operationexecutor/operation_generator_test.go +++ b/pkg/volume/util/operationexecutor/operation_generator_test.go @@ -17,9 +17,12 @@ limitations under the License. package operationexecutor import ( - "github.com/prometheus/client_model/go" + "os" + "testing" + + io_prometheus_client "github.com/prometheus/client_model/go" "github.com/stretchr/testify/assert" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/uuid" @@ -32,8 +35,7 @@ import ( csitesting "k8s.io/kubernetes/pkg/volume/csi/testing" "k8s.io/kubernetes/pkg/volume/gcepd" volumetesting "k8s.io/kubernetes/pkg/volume/testing" - "os" - "testing" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" ) // this method just tests the volume plugin name that's used in CompleteFunc, the same plugin is also used inside the @@ -92,7 +94,7 @@ func TestOperationGenerator_GenerateUnmapVolumeFunc_PluginName(t *testing.T) { storageOperationStatusCountMetricBefore := findMetricWithNameAndLabels(metricFamilyName, labelFilter) var ee error - unmapVolumeFunc.CompleteFunc(&ee) + unmapVolumeFunc.CompleteFunc(volumetypes.CompleteFuncParam{Err: &ee}) storageOperationStatusCountMetricAfter := findMetricWithNameAndLabels(metricFamilyName, labelFilter) if storageOperationStatusCountMetricAfter == nil { diff --git a/pkg/volume/util/types/types.go b/pkg/volume/util/types/types.go index 9a9eb2f81fe..1262f50f3e1 100644 --- a/pkg/volume/util/types/types.go +++ b/pkg/volume/util/types/types.go @@ -36,22 +36,49 @@ type UniquePVCName types.UID type GeneratedOperations struct { // Name of operation - could be used for resetting shared exponential backoff OperationName string - OperationFunc func() (eventErr error, detailedErr error) + OperationFunc func() (context OperationContext) EventRecorderFunc func(*error) - CompleteFunc func(*error) + CompleteFunc func(CompleteFuncParam) +} + +type OperationContext struct { + EventErr error + DetailedErr error + Migrated bool +} + +func NewOperationContext(eventErr, detailedErr error, migrated bool) OperationContext { + return OperationContext{ + EventErr: eventErr, + DetailedErr: detailedErr, + Migrated: migrated, + } +} + +type CompleteFuncParam struct { + Err *error + Migrated *bool } // Run executes the operations and its supporting functions func (o *GeneratedOperations) Run() (eventErr, detailedErr error) { + var context OperationContext if o.CompleteFunc != nil { - defer o.CompleteFunc(&detailedErr) + c := CompleteFuncParam{ + Err: &context.DetailedErr, + Migrated: &context.Migrated, + } + c.Err = &detailedErr + defer o.CompleteFunc(c) } if o.EventRecorderFunc != nil { defer o.EventRecorderFunc(&eventErr) } // Handle panic, if any, from operationFunc() defer runtime.RecoverFromPanic(&detailedErr) - return o.OperationFunc() + + context = o.OperationFunc() + return context.EventErr, context.DetailedErr } // FailedPrecondition error indicates CSI operation returned failed precondition diff --git a/pkg/volume/volume_linux.go b/pkg/volume/volume_linux.go index 5c46e67cd26..214e9db3302 100644 --- a/pkg/volume/volume_linux.go +++ b/pkg/volume/volume_linux.go @@ -29,6 +29,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/volume/util/types" ) const ( @@ -40,7 +41,7 @@ const ( // SetVolumeOwnership modifies the given volume to be owned by // fsGroup, and sets SetGid so that newly created files are owned by // fsGroup. If fsGroup is nil nothing is done. -func SetVolumeOwnership(mounter Mounter, fsGroup *int64, fsGroupChangePolicy *v1.PodFSGroupChangePolicy, completeFunc func(*error)) error { +func SetVolumeOwnership(mounter Mounter, fsGroup *int64, fsGroupChangePolicy *v1.PodFSGroupChangePolicy, completeFunc func(types.CompleteFuncParam)) error { if fsGroup == nil { return nil } @@ -57,7 +58,9 @@ func SetVolumeOwnership(mounter Mounter, fsGroup *int64, fsGroupChangePolicy *v1 if !fsGroupPolicyEnabled { err := legacyOwnershipChange(mounter, fsGroup) if completeFunc != nil { - completeFunc(&err) + completeFunc(types.CompleteFuncParam{ + Err: &err, + }) } return err } @@ -74,7 +77,9 @@ func SetVolumeOwnership(mounter Mounter, fsGroup *int64, fsGroupChangePolicy *v1 return changeFilePermission(path, fsGroup, mounter.GetAttributes().ReadOnly, info) }) if completeFunc != nil { - completeFunc(&err) + completeFunc(types.CompleteFuncParam{ + Err: &err, + }) } return err }