From e3f8f64c1743e261fffa1df6795fa44ea226d3b1 Mon Sep 17 00:00:00 2001 From: David Zhu Date: Tue, 5 Dec 2017 16:21:20 -0800 Subject: [PATCH] refactored mount, attach, resize operation's so that all failures generate events and event generation is more consistent. refactored operation generator and operation executor to use more general generated functions for operations, completions, and events. --- .../volume/persistentvolume/pv_controller.go | 4 +- pkg/volume/util/metrics.go | 6 +- .../nestedpendingoperations.go | 20 +- .../nestedpendingoperations_test.go | 88 ++-- .../operationexecutor/operation_executor.go | 61 +-- .../operation_executor_test.go | 144 +++--- .../operationexecutor/operation_generator.go | 428 +++++++++++------- pkg/volume/util/types/types.go | 8 + 8 files changed, 440 insertions(+), 319 deletions(-) diff --git a/pkg/controller/volume/persistentvolume/pv_controller.go b/pkg/controller/volume/persistentvolume/pv_controller.go index 980d960c750..e13d7a813d6 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller.go +++ b/pkg/controller/volume/persistentvolume/pv_controller.go @@ -1250,7 +1250,7 @@ func (ctrl *PersistentVolumeController) doDeleteVolume(volume *v1.PersistentVolu opComplete := util.OperationCompleteHook(plugin.GetPluginName(), "volume_delete") err = deleter.Delete() - opComplete(err) + opComplete(&err) if err != nil { // Deleter failed return false, err @@ -1373,7 +1373,7 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(claimObj interfa opComplete := util.OperationCompleteHook(plugin.GetPluginName(), "volume_provision") volume, err = provisioner.Provision() - opComplete(err) + opComplete(&err) if err != nil { strerr := fmt.Sprintf("Failed to provision volume with StorageClass %q: %v", storageClass.Name, err) glog.V(2).Infof("failed to provision volume for claim %q with StorageClass %q: %v", claimToClaimKey(claim), storageClass.Name, err) diff --git a/pkg/volume/util/metrics.go b/pkg/volume/util/metrics.go index ab2d76286bb..e3af12df890 100644 --- a/pkg/volume/util/metrics.go +++ b/pkg/volume/util/metrics.go @@ -49,12 +49,12 @@ func registerMetrics() { } // OperationCompleteHook returns a hook to call when an operation is completed -func OperationCompleteHook(plugin, operationName string) func(error) { +func OperationCompleteHook(plugin, operationName string) func(*error) { requestTime := time.Now() - opComplete := func(err error) { + opComplete := func(err *error) { timeTaken := time.Since(requestTime).Seconds() // Create metric with operation name and plugin name - if err != nil { + if *err != nil { storageOperationErrorMetric.WithLabelValues(plugin, operationName).Inc() } else { storageOperationMetric.WithLabelValues(plugin, operationName).Observe(timeTaken) diff --git a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go index 82462c1f2f6..526ea403cee 100644 --- a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go +++ b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go @@ -55,7 +55,7 @@ type NestedPendingOperations interface { // concatenation of volumeName and podName is removed from the list of // executing operations allowing a new operation to be started with the // volumeName without error. - Run(volumeName v1.UniqueVolumeName, podName types.UniquePodName, operationFunc func() error, operationCompleteFunc func(error)) error + Run(volumeName v1.UniqueVolumeName, podName types.UniquePodName, generatedOperations types.GeneratedOperations) error // Wait blocks until all operations are completed. This is typically // necessary during tests - the test should wait until all operations finish @@ -94,8 +94,7 @@ type operation struct { func (grm *nestedPendingOperations) Run( volumeName v1.UniqueVolumeName, podName types.UniquePodName, - operationFunc func() error, - operationCompleteFunc func(error)) error { + generatedOperations types.GeneratedOperations) error { grm.lock.Lock() defer grm.lock.Unlock() opExists, previousOpIndex := grm.isOperationExists(volumeName, podName) @@ -128,15 +127,20 @@ func (grm *nestedPendingOperations) Run( }) } - go func() (err error) { + go func() (eventErr, detailedErr error) { // Handle unhandled panics (very unlikely) defer k8sRuntime.HandleCrash() // Handle completion of and error, if any, from operationFunc() - defer grm.operationComplete(volumeName, podName, &err) - defer operationCompleteFunc(err) + defer grm.operationComplete(volumeName, podName, &detailedErr) + if generatedOperations.CompleteFunc != nil { + defer generatedOperations.CompleteFunc(&detailedErr) + } + if generatedOperations.EventRecorderFunc != nil { + defer generatedOperations.EventRecorderFunc(&eventErr) + } // Handle panic, if any, from operationFunc() - defer k8sRuntime.RecoverFromPanic(&err) - return operationFunc() + defer k8sRuntime.RecoverFromPanic(&detailedErr) + return generatedOperations.OperationFunc() }() return nil diff --git a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go index 8882303bde5..5865f96c21a 100644 --- a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go +++ b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go @@ -47,10 +47,10 @@ func Test_NewGoRoutineMap_Positive_SingleOp(t *testing.T) { // Arrange grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") - operation := func() error { return nil } + operation := func() (error, error) { return nil, nil } // Act - err := grm.Run(volumeName, "" /* operationSubName */, operation, func(error) {}) + err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation}) // Assert if err != nil { @@ -63,11 +63,11 @@ func Test_NewGoRoutineMap_Positive_TwoOps(t *testing.T) { grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) volume1Name := v1.UniqueVolumeName("volume1-name") volume2Name := v1.UniqueVolumeName("volume2-name") - operation := func() error { return nil } + operation := func() (error, error) { return nil, nil } // Act - err1 := grm.Run(volume1Name, "" /* operationSubName */, operation, func(error) {}) - err2 := grm.Run(volume2Name, "" /* operationSubName */, operation, func(error) {}) + err1 := grm.Run(volume1Name, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation}) + err2 := grm.Run(volume2Name, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation}) // Assert if err1 != nil { @@ -85,11 +85,11 @@ func Test_NewGoRoutineMap_Positive_TwoSubOps(t *testing.T) { volumeName := v1.UniqueVolumeName("volume-name") operation1PodName := types.UniquePodName("operation1-podname") operation2PodName := types.UniquePodName("operation2-podname") - operation := func() error { return nil } + operation := func() (error, error) { return nil, nil } // Act - err1 := grm.Run(volumeName, operation1PodName, operation, func(error) {}) - err2 := grm.Run(volumeName, operation2PodName, operation, func(error) {}) + err1 := grm.Run(volumeName, operation1PodName, types.GeneratedOperations{OperationFunc: operation}) + err2 := grm.Run(volumeName, operation2PodName, types.GeneratedOperations{OperationFunc: operation}) // Assert if err1 != nil { @@ -105,10 +105,10 @@ func Test_NewGoRoutineMap_Positive_SingleOpWithExpBackoff(t *testing.T) { // Arrange grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") - operation := func() error { return nil } + operation := func() (error, error) { return nil, nil } // Act - err := grm.Run(volumeName, "" /* operationSubName */, operation, func(error) {}) + err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation}) // Assert if err != nil { @@ -122,7 +122,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletes(t *testing.T) { volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateCallbackFunc(operation1DoneCh) - err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {}) + err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } @@ -133,7 +133,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletes(t *testing.T) { err2 := retryWithExponentialBackOff( time.Duration(initialOperationWaitTimeShort), func() (bool, error) { - err := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {}) + err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) if err != nil { t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) return false, nil @@ -154,7 +154,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletesWithExpBackoff(t * volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateCallbackFunc(operation1DoneCh) - err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {}) + err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } @@ -165,7 +165,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletesWithExpBackoff(t * err2 := retryWithExponentialBackOff( time.Duration(initialOperationWaitTimeShort), func() (bool, error) { - err := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {}) + err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) if err != nil { t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) return false, nil @@ -185,7 +185,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanics(t *testing.T) { grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") operation1 := generatePanicFunc() - err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {}) + err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } @@ -195,7 +195,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanics(t *testing.T) { err2 := retryWithExponentialBackOff( time.Duration(initialOperationWaitTimeShort), func() (bool, error) { - err := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {}) + err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) if err != nil { t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) return false, nil @@ -215,7 +215,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanicsWithExpBackoff(t *tes grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") operation1 := generatePanicFunc() - err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {}) + err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } @@ -225,7 +225,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanicsWithExpBackoff(t *tes err2 := retryWithExponentialBackOff( time.Duration(initialOperationWaitTimeLong), // Longer duration to accommodate for backoff func() (bool, error) { - err := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {}) + err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) if err != nil { t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) return false, nil @@ -246,14 +246,14 @@ func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletes(t *testing.T) { volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {}) + err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } operation2 := generateNoopFunc() // Act - err2 := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {}) + err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) // Assert if err2 == nil { @@ -271,14 +271,14 @@ func Test_NewGoRoutineMap_Negative_SecondSubOpBeforeFirstCompletes2(t *testing.T operationPodName := types.UniquePodName("operation-podname") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err1 := grm.Run(volumeName, operationPodName, operation1, func(error) {}) + err1 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } operation2 := generateNoopFunc() // Act - err2 := grm.Run(volumeName, operationPodName, operation2, func(error) {}) + err2 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation2}) // Assert if err2 == nil { @@ -296,14 +296,14 @@ func Test_NewGoRoutineMap_Negative_SecondSubOpBeforeFirstCompletes(t *testing.T) operationPodName := types.UniquePodName("operation-podname") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err1 := grm.Run(volumeName, operationPodName, operation1, func(error) {}) + err1 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } operation2 := generateNoopFunc() // Act - err2 := grm.Run(volumeName, operationPodName, operation2, func(error) {}) + err2 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation2}) // Assert if err2 == nil { @@ -320,14 +320,14 @@ func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletesWithExpBackoff(t volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {}) + err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } operation2 := generateNoopFunc() // Act - err2 := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {}) + err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) // Assert if err2 == nil { @@ -344,7 +344,7 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletes(t *testing.T) { volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {}) + err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } @@ -352,7 +352,7 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletes(t *testing.T) { operation3 := generateNoopFunc() // Act - err2 := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {}) + err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) // Assert if err2 == nil { @@ -367,7 +367,7 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletes(t *testing.T) { err3 := retryWithExponentialBackOff( time.Duration(initialOperationWaitTimeShort), func() (bool, error) { - err := grm.Run(volumeName, "" /* operationSubName */, operation3, func(error) {}) + err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation3}) if err != nil { t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) return false, nil @@ -388,7 +388,7 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *t volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {}) + err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } @@ -396,7 +396,7 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *t operation3 := generateNoopFunc() // Act - err2 := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {}) + err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) // Assert if err2 == nil { @@ -411,7 +411,7 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *t err3 := retryWithExponentialBackOff( time.Duration(initialOperationWaitTimeShort), func() (bool, error) { - err := grm.Run(volumeName, "" /* operationSubName */, operation3, func(error) {}) + err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation3}) if err != nil { t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) return false, nil @@ -471,7 +471,7 @@ func Test_NewGoRoutineMap_Positive_Wait(t *testing.T) { volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {}) + err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) if err != nil { t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err) } @@ -500,7 +500,7 @@ func Test_NewGoRoutineMap_Positive_WaitWithExpBackoff(t *testing.T) { volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {}) + err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) if err != nil { t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err) } @@ -522,28 +522,28 @@ func Test_NewGoRoutineMap_Positive_WaitWithExpBackoff(t *testing.T) { } } -func generateCallbackFunc(done chan<- interface{}) func() error { - return func() error { +func generateCallbackFunc(done chan<- interface{}) func() (error, error) { + return func() (error, error) { done <- true - return nil + return nil, nil } } -func generateWaitFunc(done <-chan interface{}) func() error { - return func() error { +func generateWaitFunc(done <-chan interface{}) func() (error, error) { + return func() (error, error) { <-done - return nil + return nil, nil } } -func generatePanicFunc() func() error { - return func() error { +func generatePanicFunc() func() (error, error) { + return func() (error, error) { panic("testing panic") } } -func generateNoopFunc() func() error { - return func() error { return nil } +func generateNoopFunc() func() (error, error) { + return func() (error, error) { return nil, nil } } func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error { diff --git a/pkg/volume/util/operationexecutor/operation_executor.go b/pkg/volume/util/operationexecutor/operation_executor.go index 543067be6b7..7df9f43d79a 100644 --- a/pkg/volume/util/operationexecutor/operation_executor.go +++ b/pkg/volume/util/operationexecutor/operation_executor.go @@ -571,30 +571,28 @@ func (oe *operationExecutor) IsOperationPending(volumeName v1.UniqueVolumeName, func (oe *operationExecutor) AttachVolume( volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error { - attachFunc, plugin, err := + generatedOperations, err := oe.operationGenerator.GenerateAttachVolumeFunc(volumeToAttach, actualStateOfWorld) if err != nil { return err } - opCompleteFunc := util.OperationCompleteHook(plugin, "volume_attach") return oe.pendingOperations.Run( - volumeToAttach.VolumeName, "" /* podName */, attachFunc, opCompleteFunc) + volumeToAttach.VolumeName, "" /* podName */, generatedOperations) } func (oe *operationExecutor) DetachVolume( volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error { - detachFunc, plugin, err := + generatedOperations, err := oe.operationGenerator.GenerateDetachVolumeFunc(volumeToDetach, verifySafeToDetach, actualStateOfWorld) if err != nil { return err } - opCompleteFunc := util.OperationCompleteHook(plugin, "volume_detach") return oe.pendingOperations.Run( - volumeToDetach.VolumeName, "" /* podName */, detachFunc, opCompleteFunc) + volumeToDetach.VolumeName, "" /* podName */, generatedOperations) } func (oe *operationExecutor) VerifyVolumesAreAttached( @@ -661,7 +659,7 @@ func (oe *operationExecutor) VerifyVolumesAreAttached( } for pluginName, pluginNodeVolumes := range bulkVerifyPluginsByNode { - bulkVerifyVolumeFunc, err := oe.operationGenerator.GenerateBulkVolumeVerifyFunc( + generatedOperations, err := oe.operationGenerator.GenerateBulkVolumeVerifyFunc( pluginNodeVolumes, pluginName, volumeSpecMapByPlugin[pluginName], @@ -670,10 +668,9 @@ func (oe *operationExecutor) VerifyVolumesAreAttached( glog.Errorf("BulkVerifyVolumes.GenerateBulkVolumeVerifyFunc error bulk verifying volumes for plugin %q with %v", pluginName, err) } - opCompleteFunc := util.OperationCompleteHook(pluginName, "verify_volumes_are_attached") // Ugly hack to ensure - we don't do parallel bulk polling of same volume plugin uniquePluginName := v1.UniqueVolumeName(pluginName) - err = oe.pendingOperations.Run(uniquePluginName, "" /* Pod Name */, bulkVerifyVolumeFunc, opCompleteFunc) + err = oe.pendingOperations.Run(uniquePluginName, "" /* Pod Name */, generatedOperations) if err != nil { glog.Errorf("BulkVerifyVolumes.Run Error bulk volume verification for plugin %q with %v", pluginName, err) } @@ -684,15 +681,14 @@ func (oe *operationExecutor) VerifyVolumesAreAttachedPerNode( attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error { - volumesAreAttachedFunc, err := + generatedOperations, err := oe.operationGenerator.GenerateVolumesAreAttachedFunc(attachedVolumes, nodeName, actualStateOfWorld) if err != nil { return err } - opCompleteFunc := util.OperationCompleteHook("", "verify_volumes_are_attached_per_node") // Give an empty UniqueVolumeName so that this operation could be executed concurrently. - return oe.pendingOperations.Run("" /* volumeName */, "" /* podName */, volumesAreAttachedFunc, opCompleteFunc) + return oe.pendingOperations.Run("" /* volumeName */, "" /* podName */, generatedOperations) } func (oe *operationExecutor) MountVolume( @@ -700,7 +696,7 @@ func (oe *operationExecutor) MountVolume( volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, isRemount bool) error { - mountFunc, plugin, err := oe.operationGenerator.GenerateMountVolumeFunc( + generatedOperations, err := oe.operationGenerator.GenerateMountVolumeFunc( waitForAttachTimeout, volumeToMount, actualStateOfWorld, isRemount) if err != nil { return err @@ -715,16 +711,15 @@ func (oe *operationExecutor) MountVolume( } // TODO mount_device - opCompleteFunc := util.OperationCompleteHook(plugin, "volume_mount") return oe.pendingOperations.Run( - volumeToMount.VolumeName, podName, mountFunc, opCompleteFunc) + volumeToMount.VolumeName, podName, generatedOperations) } func (oe *operationExecutor) UnmountVolume( volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) error { - unmountFunc, plugin, err := + generatedOperations, err := oe.operationGenerator.GenerateUnmountVolumeFunc(volumeToUnmount, actualStateOfWorld) if err != nil { return err @@ -734,42 +729,40 @@ func (oe *operationExecutor) UnmountVolume( // same volume in parallel podName := volumetypes.UniquePodName(volumeToUnmount.PodUID) - opCompleteFunc := util.OperationCompleteHook(plugin, "volume_unmount") return oe.pendingOperations.Run( - volumeToUnmount.VolumeName, podName, unmountFunc, opCompleteFunc) + volumeToUnmount.VolumeName, podName, generatedOperations) } func (oe *operationExecutor) UnmountDevice( deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) error { - unmountDeviceFunc, plugin, err := + generatedOperations, err := oe.operationGenerator.GenerateUnmountDeviceFunc(deviceToDetach, actualStateOfWorld, mounter) if err != nil { return err } - opCompleteFunc := util.OperationCompleteHook(plugin, "unmount_device") return oe.pendingOperations.Run( - deviceToDetach.VolumeName, "" /* podName */, unmountDeviceFunc, opCompleteFunc) + deviceToDetach.VolumeName, "" /* podName */, generatedOperations) } func (oe *operationExecutor) ExpandVolume(pvcWithResizeRequest *expandcache.PVCWithResizeRequest, resizeMap expandcache.VolumeResizeMap) error { - expandFunc, pluginName, err := oe.operationGenerator.GenerateExpandVolumeFunc(pvcWithResizeRequest, resizeMap) + generatedOperations, err := oe.operationGenerator.GenerateExpandVolumeFunc(pvcWithResizeRequest, resizeMap) if err != nil { return err } uniqueVolumeKey := v1.UniqueVolumeName(pvcWithResizeRequest.UniquePVCKey()) - opCompleteFunc := util.OperationCompleteHook(pluginName, "expand_volume") - return oe.pendingOperations.Run(uniqueVolumeKey, "", expandFunc, opCompleteFunc) + + return oe.pendingOperations.Run(uniqueVolumeKey, "", generatedOperations) } func (oe *operationExecutor) MapVolume( waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error { - mapFunc, plugin, err := oe.operationGenerator.GenerateMapVolumeFunc( + generatedOperations, err := oe.operationGenerator.GenerateMapVolumeFunc( waitForAttachTimeout, volumeToMount, actualStateOfWorld) if err != nil { return err @@ -785,15 +778,14 @@ func (oe *operationExecutor) MapVolume( podName = volumehelper.GetUniquePodName(volumeToMount.Pod) } - opCompleteFunc := util.OperationCompleteHook(plugin, "map_volume") return oe.pendingOperations.Run( - volumeToMount.VolumeName, podName, mapFunc, opCompleteFunc) + volumeToMount.VolumeName, podName, generatedOperations) } func (oe *operationExecutor) UnmapVolume( volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) error { - unmapFunc, plugin, err := + generatedOperations, err := oe.operationGenerator.GenerateUnmapVolumeFunc(volumeToUnmount, actualStateOfWorld) if err != nil { return err @@ -803,16 +795,15 @@ func (oe *operationExecutor) UnmapVolume( // same volume in parallel podName := volumetypes.UniquePodName(volumeToUnmount.PodUID) - opCompleteFunc := util.OperationCompleteHook(plugin, "unmap_volume") return oe.pendingOperations.Run( - volumeToUnmount.VolumeName, podName, unmapFunc, opCompleteFunc) + volumeToUnmount.VolumeName, podName, generatedOperations) } func (oe *operationExecutor) UnmapDevice( deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) error { - unmapDeviceFunc, plugin, err := + generatedOperations, err := oe.operationGenerator.GenerateUnmapDeviceFunc(deviceToDetach, actualStateOfWorld, mounter) if err != nil { return err @@ -822,24 +813,22 @@ func (oe *operationExecutor) UnmapDevice( // the same volume in parallel podName := nestedpendingoperations.EmptyUniquePodName - opCompleteFunc := util.OperationCompleteHook(plugin, "unmap_device") return oe.pendingOperations.Run( - deviceToDetach.VolumeName, podName, unmapDeviceFunc, opCompleteFunc) + deviceToDetach.VolumeName, podName, generatedOperations) } func (oe *operationExecutor) VerifyControllerAttachedVolume( volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error { - verifyControllerAttachedVolumeFunc, plugin, err := + generatedOperations, err := oe.operationGenerator.GenerateVerifyControllerAttachedVolumeFunc(volumeToMount, nodeName, actualStateOfWorld) if err != nil { return err } - opCompleteFunc := util.OperationCompleteHook(plugin, "verify_controller_attached_volume") return oe.pendingOperations.Run( - volumeToMount.VolumeName, "" /* podName */, verifyControllerAttachedVolumeFunc, opCompleteFunc) + volumeToMount.VolumeName, "" /* podName */, generatedOperations) } // VolumeStateHandler defines a set of operations for handling mount/unmount/detach/reconstruct volume-related operations diff --git a/pkg/volume/util/operationexecutor/operation_executor_test.go b/pkg/volume/util/operationexecutor/operation_executor_test.go index 4e06b39616b..18e68a3ab0d 100644 --- a/pkg/volume/util/operationexecutor/operation_executor_test.go +++ b/pkg/volume/util/operationexecutor/operation_executor_test.go @@ -350,87 +350,123 @@ func newFakeOperationGenerator(ch chan interface{}, quit chan interface{}) Opera } } -func (fopg *fakeOperationGenerator) GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) (func() error, string, error) { - return func() error { +func (fopg *fakeOperationGenerator) GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) (volumetypes.GeneratedOperations, error) { + opFunc := func() (error, error) { startOperationAndBlock(fopg.ch, fopg.quit) - return nil - }, "", nil -} -func (fopg *fakeOperationGenerator) GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, string, error) { - return func() error { - startOperationAndBlock(fopg.ch, fopg.quit) - return nil - }, "", nil -} -func (fopg *fakeOperationGenerator) GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, string, error) { - return func() error { - startOperationAndBlock(fopg.ch, fopg.quit) - return nil - }, "", nil -} -func (fopg *fakeOperationGenerator) GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, string, error) { - return func() error { - startOperationAndBlock(fopg.ch, fopg.quit) - return nil - }, "", nil -} -func (fopg *fakeOperationGenerator) GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { - return func() error { - startOperationAndBlock(fopg.ch, fopg.quit) - return nil + return nil, nil + } + return volumetypes.GeneratedOperations{ + OperationFunc: opFunc, }, nil } -func (fopg *fakeOperationGenerator) GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (func() error, string, error) { - return func() error { +func (fopg *fakeOperationGenerator) GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { + opFunc := func() (error, error) { startOperationAndBlock(fopg.ch, fopg.quit) - return nil - }, "", nil + return nil, nil + } + return volumetypes.GeneratedOperations{ + OperationFunc: opFunc, + }, nil } -func (fopg *fakeOperationGenerator) GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, string, error) { - return func() error { +func (fopg *fakeOperationGenerator) GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { + opFunc := func() (error, error) { startOperationAndBlock(fopg.ch, fopg.quit) - return nil - }, "", nil + return nil, nil + } + return volumetypes.GeneratedOperations{ + OperationFunc: opFunc, + }, nil +} +func (fopg *fakeOperationGenerator) GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { + opFunc := func() (error, error) { + startOperationAndBlock(fopg.ch, fopg.quit) + return nil, nil + } + return volumetypes.GeneratedOperations{ + OperationFunc: opFunc, + }, nil +} +func (fopg *fakeOperationGenerator) GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { + opFunc := func() (error, error) { + startOperationAndBlock(fopg.ch, fopg.quit) + return nil, nil + } + return volumetypes.GeneratedOperations{ + OperationFunc: opFunc, + }, nil +} +func (fopg *fakeOperationGenerator) GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (volumetypes.GeneratedOperations, error) { + opFunc := func() (error, error) { + startOperationAndBlock(fopg.ch, fopg.quit) + return nil, nil + } + return volumetypes.GeneratedOperations{ + OperationFunc: opFunc, + }, nil +} +func (fopg *fakeOperationGenerator) GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { + opFunc := func() (error, error) { + startOperationAndBlock(fopg.ch, fopg.quit) + return nil, nil + } + return volumetypes.GeneratedOperations{ + OperationFunc: opFunc, + }, nil } func (fopg *fakeOperationGenerator) GenerateExpandVolumeFunc(pvcWithResizeRequest *expandcache.PVCWithResizeRequest, - resizeMap expandcache.VolumeResizeMap) (func() error, string, error) { - return func() error { + resizeMap expandcache.VolumeResizeMap) (volumetypes.GeneratedOperations, error) { + opFunc := func() (error, error) { startOperationAndBlock(fopg.ch, fopg.quit) - return nil - }, "", nil + return nil, nil + } + return volumetypes.GeneratedOperations{ + OperationFunc: opFunc, + }, nil } func (fopg *fakeOperationGenerator) GenerateBulkVolumeVerifyFunc( pluginNodeVolumes map[types.NodeName][]*volume.Spec, pluginNane string, volumeSpecMap map[*volume.Spec]v1.UniqueVolumeName, - actualStateOfWorldAttacherUpdater ActualStateOfWorldAttacherUpdater) (func() error, error) { - return func() error { + actualStateOfWorldAttacherUpdater ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { + opFunc := func() (error, error) { startOperationAndBlock(fopg.ch, fopg.quit) - return nil + return nil, nil + } + return volumetypes.GeneratedOperations{ + OperationFunc: opFunc, }, nil } -func (fopg *fakeOperationGenerator) GenerateMapVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater) (func() error, string, error) { - return func() error { +func (fopg *fakeOperationGenerator) GenerateMapVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { + opFunc := func() (error, error) { startOperationAndBlock(fopg.ch, fopg.quit) - return nil - }, "", nil + return nil, nil + } + return volumetypes.GeneratedOperations{ + OperationFunc: opFunc, + }, nil } -func (fopg *fakeOperationGenerator) GenerateUnmapVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, string, error) { - return func() error { +func (fopg *fakeOperationGenerator) GenerateUnmapVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { + opFunc := func() (error, error) { startOperationAndBlock(fopg.ch, fopg.quit) - return nil - }, "", nil + return nil, nil + } + return volumetypes.GeneratedOperations{ + OperationFunc: opFunc, + }, nil } -func (fopg *fakeOperationGenerator) GenerateUnmapDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (func() error, string, error) { - return func() error { +func (fopg *fakeOperationGenerator) GenerateUnmapDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (volumetypes.GeneratedOperations, error) { + opFunc := func() (error, error) { startOperationAndBlock(fopg.ch, fopg.quit) - return nil - }, "", nil + return nil, nil + } + return volumetypes.GeneratedOperations{ + OperationFunc: opFunc, + }, nil } func (fopg *fakeOperationGenerator) GetVolumePluginMgr() *volume.VolumePluginMgr { diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index 2ff4b668f00..a322ed4ca3d 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -38,6 +38,7 @@ import ( "k8s.io/kubernetes/pkg/util/resizefs" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/util" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" "k8s.io/kubernetes/pkg/volume/util/volumehelper" ) @@ -83,34 +84,34 @@ func NewOperationGenerator(kubeClient clientset.Interface, // OperationGenerator interface that extracts out the functions from operation_executor to make it dependency injectable type OperationGenerator interface { // Generates the MountVolume function needed to perform the mount of a volume plugin - GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) (func() error, string, error) + GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) (volumetypes.GeneratedOperations, error) // Generates the UnmountVolume function needed to perform the unmount of a volume plugin - GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, string, error) + GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) // Generates the AttachVolume function needed to perform attach of a volume plugin - GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, string, error) + GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) // Generates the DetachVolume function needed to perform the detach of a volume plugin - GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, string, error) + GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) // Generates the VolumesAreAttached function needed to verify if volume plugins are attached - GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) + GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) // Generates the UnMountDevice function needed to perform the unmount of a device - GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (func() error, string, error) + GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (volumetypes.GeneratedOperations, error) // Generates the function needed to check if the attach_detach controller has attached the volume plugin - GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, string, error) + GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) // Generates the MapVolume function needed to perform the map of a volume plugin - GenerateMapVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater) (func() error, string, error) + GenerateMapVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) // Generates the UnmapVolume function needed to perform the unmap of a volume plugin - GenerateUnmapVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, string, error) + GenerateUnmapVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) // Generates the UnmapDevice function needed to perform the unmap of a device - GenerateUnmapDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (func() error, string, error) + GenerateUnmapDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (volumetypes.GeneratedOperations, error) // GetVolumePluginMgr returns volume plugin manager GetVolumePluginMgr() *volume.VolumePluginMgr @@ -118,15 +119,15 @@ type OperationGenerator interface { GenerateBulkVolumeVerifyFunc( map[types.NodeName][]*volume.Spec, string, - map[*volume.Spec]v1.UniqueVolumeName, ActualStateOfWorldAttacherUpdater) (func() error, error) + map[*volume.Spec]v1.UniqueVolumeName, ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) - GenerateExpandVolumeFunc(*expandcache.PVCWithResizeRequest, expandcache.VolumeResizeMap) (func() error, string, error) + GenerateExpandVolumeFunc(*expandcache.PVCWithResizeRequest, expandcache.VolumeResizeMap) (volumetypes.GeneratedOperations, error) } func (og *operationGenerator) GenerateVolumesAreAttachedFunc( attachedVolumes []AttachedVolume, nodeName types.NodeName, - actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { + actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { // volumesPerPlugin maps from a volume plugin to a list of volume specs which belong // to this type of plugin @@ -154,7 +155,7 @@ func (og *operationGenerator) GenerateVolumesAreAttachedFunc( volumeSpecMap[volumeAttached.VolumeSpec] = volumeAttached.VolumeName } - return func() error { + volumesAreAttachedFunc := func() (error, error) { // For each volume plugin, pass the list of volume specs to VolumesAreAttached to check // whether the volumes are still attached. @@ -195,7 +196,13 @@ func (og *operationGenerator) GenerateVolumesAreAttachedFunc( } } } - return nil + return nil, nil + } + + return volumetypes.GeneratedOperations{ + OperationFunc: volumesAreAttachedFunc, + CompleteFunc: util.OperationCompleteHook("", "verify_volumes_are_attached_per_node"), + EventRecorderFunc: nil, // nil because we do not want to generate event on error }, nil } @@ -203,9 +210,9 @@ func (og *operationGenerator) GenerateBulkVolumeVerifyFunc( pluginNodeVolumes map[types.NodeName][]*volume.Spec, pluginName string, volumeSpecMap map[*volume.Spec]v1.UniqueVolumeName, - actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { + actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { - return func() error { + bulkVolumeVerifyFunc := func() (error, error) { attachableVolumePlugin, err := og.volumePluginMgr.FindAttachablePluginByName(pluginName) if err != nil || attachableVolumePlugin == nil { @@ -213,7 +220,7 @@ func (og *operationGenerator) GenerateBulkVolumeVerifyFunc( "BulkVerifyVolume.FindAttachablePluginBySpec failed for plugin %q with: %v", pluginName, err) - return nil + return nil, nil } volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher() @@ -223,19 +230,19 @@ func (og *operationGenerator) GenerateBulkVolumeVerifyFunc( "BulkVerifyVolume.NewAttacher failed for getting plugin %q with: %v", attachableVolumePlugin, newAttacherErr) - return nil + return nil, nil } bulkVolumeVerifier, ok := volumeAttacher.(volume.BulkVolumeVerifier) if !ok { glog.Errorf("BulkVerifyVolume failed to type assert attacher %q", bulkVolumeVerifier) - return nil + return nil, nil } attached, bulkAttachErr := bulkVolumeVerifier.BulkVerifyVolumes(pluginNodeVolumes) if bulkAttachErr != nil { glog.Errorf("BulkVerifyVolume.BulkVerifyVolumes Error checking volumes are attached with %v", bulkAttachErr) - return nil + return nil, nil } for nodeName, volumeSpecs := range pluginNodeVolumes { @@ -260,26 +267,43 @@ func (og *operationGenerator) GenerateBulkVolumeVerifyFunc( } } - return nil + return nil, nil + } + + return volumetypes.GeneratedOperations{ + OperationFunc: bulkVolumeVerifyFunc, + CompleteFunc: util.OperationCompleteHook(pluginName, "verify_volumes_are_attached"), + EventRecorderFunc: nil, // nil because we do not want to generate event on error }, nil + } func (og *operationGenerator) GenerateAttachVolumeFunc( volumeToAttach VolumeToAttach, - actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, string, error) { + actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { // Get attacher plugin + eventRecorderFunc := func(err *error) { + if *err != nil { + for _, pod := range volumeToAttach.ScheduledPods { + og.recorder.Eventf(pod, v1.EventTypeWarning, kevents.FailedAttachVolume, (*err).Error()) + } + } + } + attachableVolumePlugin, err := og.volumePluginMgr.FindAttachablePluginBySpec(volumeToAttach.VolumeSpec) if err != nil || attachableVolumePlugin == nil { - return nil, "", volumeToAttach.GenerateErrorDetailed("AttachVolume.FindAttachablePluginBySpec failed", err) + eventRecorderFunc(&err) + return volumetypes.GeneratedOperations{}, volumeToAttach.GenerateErrorDetailed("AttachVolume.FindAttachablePluginBySpec failed", err) } volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher() if newAttacherErr != nil { - return nil, attachableVolumePlugin.GetPluginName(), volumeToAttach.GenerateErrorDetailed("AttachVolume.NewAttacher failed", newAttacherErr) + eventRecorderFunc(&err) + return volumetypes.GeneratedOperations{}, volumeToAttach.GenerateErrorDetailed("AttachVolume.NewAttacher failed", newAttacherErr) } - return func() error { + attachVolumeFunc := func() (error, error) { // Execute attach devicePath, attachErr := volumeAttacher.Attach( volumeToAttach.VolumeSpec, volumeToAttach.NodeName) @@ -298,11 +322,7 @@ func (og *operationGenerator) GenerateAttachVolumeFunc( } // On failure, return error. Caller will log and retry. - eventErr, detailedErr := volumeToAttach.GenerateError("AttachVolume.Attach failed", attachErr) - for _, pod := range volumeToAttach.ScheduledPods { - og.recorder.Eventf(pod, v1.EventTypeWarning, kevents.FailedMountVolume, eventErr.Error()) - } - return detailedErr + return volumeToAttach.GenerateError("AttachVolume.Attach failed", attachErr) } glog.Infof(volumeToAttach.GenerateMsgDetailed("AttachVolume.Attach succeeded", "")) @@ -312,11 +332,17 @@ func (og *operationGenerator) GenerateAttachVolumeFunc( v1.UniqueVolumeName(""), volumeToAttach.VolumeSpec, volumeToAttach.NodeName, devicePath) if addVolumeNodeErr != nil { // On failure, return error. Caller will log and retry. - return volumeToAttach.GenerateErrorDetailed("AttachVolume.MarkVolumeAsAttached failed", addVolumeNodeErr) + return volumeToAttach.GenerateError("AttachVolume.MarkVolumeAsAttached failed", addVolumeNodeErr) } - return nil - }, attachableVolumePlugin.GetPluginName(), nil + return nil, nil + } + + return volumetypes.GeneratedOperations{ + OperationFunc: attachVolumeFunc, + EventRecorderFunc: eventRecorderFunc, + CompleteFunc: util.OperationCompleteHook(attachableVolumePlugin.GetPluginName(), "volume_attach"), + }, nil } func (og *operationGenerator) GetVolumePluginMgr() *volume.VolumePluginMgr { @@ -326,7 +352,7 @@ func (og *operationGenerator) GetVolumePluginMgr() *volume.VolumePluginMgr { func (og *operationGenerator) GenerateDetachVolumeFunc( volumeToDetach AttachedVolume, verifySafeToDetach bool, - actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, string, error) { + actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { var volumeName string var attachableVolumePlugin volume.AttachableVolumePlugin var pluginName string @@ -337,13 +363,13 @@ func (og *operationGenerator) GenerateDetachVolumeFunc( attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginBySpec(volumeToDetach.VolumeSpec) if err != nil || attachableVolumePlugin == nil { - return nil, "", volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginBySpec failed", err) + return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginBySpec failed", err) } volumeName, err = attachableVolumePlugin.GetVolumeName(volumeToDetach.VolumeSpec) if err != nil { - return nil, attachableVolumePlugin.GetPluginName(), volumeToDetach.GenerateErrorDetailed("DetachVolume.GetVolumeName failed", err) + return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.GetVolumeName failed", err) } } else { // Get attacher plugin and the volumeName by splitting the volume unique name in case @@ -351,11 +377,11 @@ func (og *operationGenerator) GenerateDetachVolumeFunc( // when a pod has been deleted during the controller downtime pluginName, volumeName, err = volumehelper.SplitUniqueName(volumeToDetach.VolumeName) if err != nil { - return nil, pluginName, volumeToDetach.GenerateErrorDetailed("DetachVolume.SplitUniqueName failed", err) + return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.SplitUniqueName failed", err) } attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(pluginName) if err != nil { - return nil, pluginName, volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginBySpec failed", err) + return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginBySpec failed", err) } } @@ -365,10 +391,10 @@ func (og *operationGenerator) GenerateDetachVolumeFunc( volumeDetacher, err := attachableVolumePlugin.NewDetacher() if err != nil { - return nil, pluginName, volumeToDetach.GenerateErrorDetailed("DetachVolume.NewDetacher failed", err) + return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.NewDetacher failed", err) } - return func() error { + getVolumePluginMgrFunc := func() (error, error) { var err error if verifySafeToDetach { err = og.verifyVolumeIsSafeToDetach(volumeToDetach) @@ -380,7 +406,7 @@ func (og *operationGenerator) GenerateDetachVolumeFunc( // On failure, add volume back to ReportAsAttached list actualStateOfWorld.AddVolumeToReportAsAttached( volumeToDetach.VolumeName, volumeToDetach.NodeName) - return volumeToDetach.GenerateErrorDetailed("DetachVolume.Detach failed", err) + return volumeToDetach.GenerateError("DetachVolume.Detach failed", err) } glog.Infof(volumeToDetach.GenerateMsgDetailed("DetachVolume.Detach succeeded", "")) @@ -389,25 +415,33 @@ func (og *operationGenerator) GenerateDetachVolumeFunc( actualStateOfWorld.MarkVolumeAsDetached( volumeToDetach.VolumeName, volumeToDetach.NodeName) - return nil - }, pluginName, nil + return nil, nil + } + + return volumetypes.GeneratedOperations{ + OperationFunc: getVolumePluginMgrFunc, + CompleteFunc: util.OperationCompleteHook(pluginName, "volume_detach"), + EventRecorderFunc: nil, // nil because we do not want to generate event on error + }, nil } func (og *operationGenerator) GenerateMountVolumeFunc( waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, - isRemount bool) (func() error, string, error) { + isRemount bool) (volumetypes.GeneratedOperations, error) { // Get mounter plugin volumePlugin, err := og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec) if err != nil || volumePlugin == nil { - return nil, "", volumeToMount.GenerateErrorDetailed("MountVolume.FindPluginBySpec failed", err) + return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("MountVolume.FindPluginBySpec failed", err) } affinityErr := checkNodeAffinity(og, volumeToMount, volumePlugin) if affinityErr != nil { - return nil, volumePlugin.GetPluginName(), affinityErr + eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.NodeAffinity check failed", affinityErr) + og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, eventErr.Error()) + return volumetypes.GeneratedOperations{}, detailedErr } volumeMounter, newMounterErr := volumePlugin.NewMounter( @@ -417,13 +451,15 @@ func (og *operationGenerator) GenerateMountVolumeFunc( if newMounterErr != nil { eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.NewMounter initialization failed", newMounterErr) og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, eventErr.Error()) - return nil, volumePlugin.GetPluginName(), detailedErr + return volumetypes.GeneratedOperations{}, detailedErr } mountCheckError := checkMountOptionSupport(og, volumeToMount, volumePlugin) if mountCheckError != nil { - return nil, volumePlugin.GetPluginName(), mountCheckError + eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.MountOptionSupport check failed", mountCheckError) + og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.UnsupportedMountOption, eventErr.Error()) + return volumetypes.GeneratedOperations{}, detailedErr } // Get attacher, if possible @@ -440,7 +476,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc( fsGroup = volumeToMount.Pod.Spec.SecurityContext.FSGroup } - return func() error { + mountVolumeFunc := func() (error, error) { if volumeAttacher != nil { // Wait for attachable volumes to finish attaching glog.Infof(volumeToMount.GenerateMsgDetailed("MountVolume.WaitForAttach entering", fmt.Sprintf("DevicePath %q", volumeToMount.DevicePath))) @@ -449,7 +485,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc( volumeToMount.VolumeSpec, volumeToMount.DevicePath, volumeToMount.Pod, waitForAttachTimeout) if err != nil { // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateErrorDetailed("MountVolume.WaitForAttach failed", err) + return volumeToMount.GenerateError("MountVolume.WaitForAttach failed", err) } glog.Infof(volumeToMount.GenerateMsgDetailed("MountVolume.WaitForAttach succeeded", fmt.Sprintf("DevicePath %q", devicePath))) @@ -459,14 +495,14 @@ func (og *operationGenerator) GenerateMountVolumeFunc( resizeError := og.resizeFileSystem(volumeToMount, devicePath, volumePlugin.GetPluginName()) if resizeError != nil { - return volumeToMount.GenerateErrorDetailed("MountVolume.Resize failed", resizeError) + return volumeToMount.GenerateError("MountVolume.Resize failed", resizeError) } deviceMountPath, err := volumeAttacher.GetDeviceMountPath(volumeToMount.VolumeSpec) if err != nil { // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateErrorDetailed("MountVolume.GetDeviceMountPath failed", err) + return volumeToMount.GenerateError("MountVolume.GetDeviceMountPath failed", err) } // Mount device to global mount path @@ -476,9 +512,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc( deviceMountPath) if err != nil { // On failure, return error. Caller will log and retry. - eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.MountDevice failed", err) - og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, eventErr.Error()) - return detailedErr + return volumeToMount.GenerateError("MountVolume.MountDevice failed", err) } glog.Infof(volumeToMount.GenerateMsgDetailed("MountVolume.MountDevice succeeded", fmt.Sprintf("device mount path %q", deviceMountPath))) @@ -488,7 +522,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc( volumeToMount.VolumeName) if markDeviceMountedErr != nil { // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateErrorDetailed("MountVolume.MarkDeviceAsMounted failed", markDeviceMountedErr) + return volumeToMount.GenerateError("MountVolume.MarkDeviceAsMounted failed", markDeviceMountedErr) } } @@ -497,9 +531,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc( err = fmt.Errorf( "Verify that your node machine has the required components before attempting to mount this volume type. %s", canMountErr) - eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.CanMount failed", err) - og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, eventErr.Error()) - return detailedErr + return volumeToMount.GenerateError("MountVolume.CanMount failed", err) } } @@ -507,9 +539,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc( mountErr := volumeMounter.SetUp(fsGroup) if mountErr != nil { // On failure, return error. Caller will log and retry. - eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.SetUp failed", mountErr) - og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, eventErr.Error()) - return detailedErr + return volumeToMount.GenerateError("MountVolume.SetUp failed", mountErr) } simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.SetUp succeeded", "") @@ -532,11 +562,23 @@ func (og *operationGenerator) GenerateMountVolumeFunc( volumeToMount.VolumeGidValue) if markVolMountedErr != nil { // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateErrorDetailed("MountVolume.MarkVolumeAsMounted failed", markVolMountedErr) + return volumeToMount.GenerateError("MountVolume.MarkVolumeAsMounted failed", markVolMountedErr) } - return nil - }, volumePlugin.GetPluginName(), nil + return nil, nil + } + + eventRecorderFunc := func(err *error) { + if *err != nil { + og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, (*err).Error()) + } + } + + return volumetypes.GeneratedOperations{ + OperationFunc: mountVolumeFunc, + EventRecorderFunc: eventRecorderFunc, + CompleteFunc: util.OperationCompleteHook(volumePlugin.GetPluginName(), "volume_mount"), + }, nil } func (og *operationGenerator) resizeFileSystem(volumeToMount VolumeToMount, devicePath string, pluginName string) error { @@ -608,26 +650,26 @@ func (og *operationGenerator) resizeFileSystem(volumeToMount VolumeToMount, devi func (og *operationGenerator) GenerateUnmountVolumeFunc( volumeToUnmount MountedVolume, - actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, string, error) { + actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { // Get mountable plugin volumePlugin, err := og.volumePluginMgr.FindPluginByName(volumeToUnmount.PluginName) if err != nil || volumePlugin == nil { - return nil, "", volumeToUnmount.GenerateErrorDetailed("UnmountVolume.FindPluginByName failed", err) + return volumetypes.GeneratedOperations{}, volumeToUnmount.GenerateErrorDetailed("UnmountVolume.FindPluginByName failed", err) } volumeUnmounter, newUnmounterErr := volumePlugin.NewUnmounter( volumeToUnmount.InnerVolumeSpecName, volumeToUnmount.PodUID) if newUnmounterErr != nil { - return nil, volumePlugin.GetPluginName(), volumeToUnmount.GenerateErrorDetailed("UnmountVolume.NewUnmounter failed", newUnmounterErr) + return volumetypes.GeneratedOperations{}, volumeToUnmount.GenerateErrorDetailed("UnmountVolume.NewUnmounter failed", newUnmounterErr) } - return func() error { + unmountVolumeFunc := func() (error, error) { // Execute unmount unmountErr := volumeUnmounter.TearDown() if unmountErr != nil { // On failure, return error. Caller will log and retry. - return volumeToUnmount.GenerateErrorDetailed("UnmountVolume.TearDown failed", unmountErr) + return volumeToUnmount.GenerateError("UnmountVolume.TearDown failed", unmountErr) } glog.Infof( @@ -648,37 +690,43 @@ func (og *operationGenerator) GenerateUnmountVolumeFunc( glog.Errorf(volumeToUnmount.GenerateErrorDetailed("UnmountVolume.MarkVolumeAsUnmounted failed", markVolMountedErr).Error()) } - return nil - }, volumePlugin.GetPluginName(), nil + return nil, nil + } + + return volumetypes.GeneratedOperations{ + OperationFunc: unmountVolumeFunc, + CompleteFunc: util.OperationCompleteHook(volumePlugin.GetPluginName(), "volume_unmount"), + EventRecorderFunc: nil, // nil because we do not want to generate event on error + }, nil } func (og *operationGenerator) GenerateUnmountDeviceFunc( deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, - mounter mount.Interface) (func() error, string, error) { + mounter mount.Interface) (volumetypes.GeneratedOperations, error) { // Get attacher plugin attachableVolumePlugin, err := og.volumePluginMgr.FindAttachablePluginBySpec(deviceToDetach.VolumeSpec) if err != nil || attachableVolumePlugin == nil { - return nil, "", deviceToDetach.GenerateErrorDetailed("UnmountDevice.FindAttachablePluginBySpec failed", err) + return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmountDevice.FindAttachablePluginBySpec failed", err) } volumeDetacher, err := attachableVolumePlugin.NewDetacher() if err != nil { - return nil, attachableVolumePlugin.GetPluginName(), deviceToDetach.GenerateErrorDetailed("UnmountDevice.NewDetacher failed", err) + return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmountDevice.NewDetacher failed", err) } volumeAttacher, err := attachableVolumePlugin.NewAttacher() if err != nil { - return nil, attachableVolumePlugin.GetPluginName(), deviceToDetach.GenerateErrorDetailed("UnmountDevice.NewAttacher failed", err) + return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmountDevice.NewAttacher failed", err) } - return func() error { + unmountDeviceFunc := func() (error, error) { deviceMountPath, err := volumeAttacher.GetDeviceMountPath(deviceToDetach.VolumeSpec) if err != nil { // On failure, return error. Caller will log and retry. - return deviceToDetach.GenerateErrorDetailed("GetDeviceMountPath failed", err) + return deviceToDetach.GenerateError("GetDeviceMountPath failed", err) } refs, err := attachableVolumePlugin.GetDeviceMountRefs(deviceMountPath) @@ -686,13 +734,13 @@ func (og *operationGenerator) GenerateUnmountDeviceFunc( if err == nil { err = fmt.Errorf("The device mount path %q is still mounted by other references %v", deviceMountPath, refs) } - return deviceToDetach.GenerateErrorDetailed("GetDeviceMountRefs check failed", err) + return deviceToDetach.GenerateError("GetDeviceMountRefs check failed", err) } // Execute unmount unmountDeviceErr := volumeDetacher.UnmountDevice(deviceMountPath) if unmountDeviceErr != nil { // On failure, return error. Caller will log and retry. - return deviceToDetach.GenerateErrorDetailed("UnmountDevice failed", unmountDeviceErr) + return deviceToDetach.GenerateError("UnmountDevice failed", unmountDeviceErr) } // Before logging that UnmountDevice succeeded and moving on, // use mounter.PathIsDevice to check if the path is a device, @@ -700,27 +748,33 @@ func (og *operationGenerator) GenerateUnmountDeviceFunc( // else on the system. Retry if it returns true. deviceOpened, deviceOpenedErr := isDeviceOpened(deviceToDetach, mounter) if deviceOpenedErr != nil { - return deviceOpenedErr + return nil, deviceOpenedErr } // The device is still in use elsewhere. Caller will log and retry. if deviceOpened { - return deviceToDetach.GenerateErrorDetailed( + return deviceToDetach.GenerateError( "UnmountDevice failed", fmt.Errorf("the device is in use when it was no longer expected to be in use")) } - glog.Infof(deviceToDetach.GenerateMsgDetailed("UnmountDevice succeeded", "")) + glog.Infof(deviceToDetach.GenerateMsg("UnmountDevice succeeded", "")) // Update actual state of world markDeviceUnmountedErr := actualStateOfWorld.MarkDeviceAsUnmounted( deviceToDetach.VolumeName) if markDeviceUnmountedErr != nil { // On failure, return error. Caller will log and retry. - return deviceToDetach.GenerateErrorDetailed("MarkDeviceAsUnmounted failed", markDeviceUnmountedErr) + return deviceToDetach.GenerateError("MarkDeviceAsUnmounted failed", markDeviceUnmountedErr) } - return nil - }, attachableVolumePlugin.GetPluginName(), nil + return nil, nil + } + + return volumetypes.GeneratedOperations{ + OperationFunc: unmountDeviceFunc, + CompleteFunc: util.OperationCompleteHook(attachableVolumePlugin.GetPluginName(), "unmount_device"), + EventRecorderFunc: nil, // nil because we do not want to generate event on error + }, nil } // GenerateMapVolumeFunc marks volume as mounted based on following steps. @@ -731,25 +785,27 @@ func (og *operationGenerator) GenerateUnmountDeviceFunc( // device map path. Once symbolic links are created, take fd lock by // loopback for the device to avoid silent volume replacement. This lock // will be realased once no one uses the device. -// If all steps are completed, the volume is marked as unmounted. +// If all steps are completed, the volume is marked as mounted. func (og *operationGenerator) GenerateMapVolumeFunc( waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, - actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, string, error) { + actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { // Get block volume mapper plugin var blockVolumeMapper volume.BlockVolumeMapper blockVolumePlugin, err := og.volumePluginMgr.FindMapperPluginBySpec(volumeToMount.VolumeSpec) if err != nil { - return nil, "", volumeToMount.GenerateErrorDetailed("MapVolume.FindMapperPluginBySpec failed", err) + return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("MapVolume.FindMapperPluginBySpec failed", err) } if blockVolumePlugin == nil { - return nil, "", volumeToMount.GenerateErrorDetailed("MapVolume.FindMapperPluginBySpec failed to find BlockVolumeMapper plugin. Volume plugin is nil.", nil) + return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("MapVolume.FindMapperPluginBySpec failed to find BlockVolumeMapper plugin. Volume plugin is nil.", nil) } affinityErr := checkNodeAffinity(og, volumeToMount, blockVolumePlugin) if affinityErr != nil { - return nil, blockVolumePlugin.GetPluginName(), affinityErr + eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.NodeAffinity check failed", affinityErr) + og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, eventErr.Error()) + return volumetypes.GeneratedOperations{}, detailedErr } blockVolumeMapper, newMapperErr := blockVolumePlugin.NewBlockVolumeMapper( volumeToMount.VolumeSpec, @@ -758,7 +814,7 @@ func (og *operationGenerator) GenerateMapVolumeFunc( if newMapperErr != nil { eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.NewBlockVolumeMapper initialization failed", newMapperErr) og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMapVolume, eventErr.Error()) - return nil, blockVolumePlugin.GetPluginName(), detailedErr + return volumetypes.GeneratedOperations{}, detailedErr } // Get attacher, if possible @@ -769,7 +825,7 @@ func (og *operationGenerator) GenerateMapVolumeFunc( volumeAttacher, _ = attachableVolumePlugin.NewAttacher() } - return func() error { + mapVolumeFunc := func() (error, error) { var devicePath string if volumeAttacher != nil { // Wait for attachable volumes to finish attaching @@ -779,7 +835,7 @@ func (og *operationGenerator) GenerateMapVolumeFunc( volumeToMount.VolumeSpec, volumeToMount.DevicePath, volumeToMount.Pod, waitForAttachTimeout) if err != nil { // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateErrorDetailed("MapVolume.WaitForAttach failed", err) + return volumeToMount.GenerateError("MapVolume.WaitForAttach failed", err) } glog.Infof(volumeToMount.GenerateMsgDetailed("MapVolume.WaitForAttach succeeded", fmt.Sprintf("DevicePath %q", devicePath))) @@ -789,23 +845,21 @@ func (og *operationGenerator) GenerateMapVolumeFunc( volumeToMount.VolumeName) if markDeviceMappedErr != nil { // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateErrorDetailed("MapVolume.MarkDeviceAsMounted failed", markDeviceMappedErr) + return volumeToMount.GenerateError("MapVolume.MarkDeviceAsMounted failed", markDeviceMappedErr) } } // A plugin doesn't have attacher also needs to map device to global map path with SetUpDevice() pluginDevicePath, mapErr := blockVolumeMapper.SetUpDevice() if mapErr != nil { // On failure, return error. Caller will log and retry. - eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.SetUp failed", mapErr) - og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMapVolume, eventErr.Error()) - return detailedErr + return volumeToMount.GenerateError("MapVolume.SetUp failed", mapErr) } // Update devicePath for none attachable plugin case if len(devicePath) == 0 { if len(pluginDevicePath) != 0 { devicePath = pluginDevicePath } else { - return volumeToMount.GenerateErrorDetailed("MapVolume failed", fmt.Errorf("Device path of the volume is empty")) + return volumeToMount.GenerateError("MapVolume failed", fmt.Errorf("Device path of the volume is empty")) } } // Set up global map path under the given plugin directory using symbolic link @@ -813,14 +867,12 @@ func (og *operationGenerator) GenerateMapVolumeFunc( blockVolumeMapper.GetGlobalMapPath(volumeToMount.VolumeSpec) if err != nil { // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateErrorDetailed("MapVolume.GetDeviceMountPath failed", err) + return volumeToMount.GenerateError("MapVolume.GetDeviceMountPath failed", err) } mapErr = og.blkUtil.MapDevice(devicePath, globalMapPath, string(volumeToMount.Pod.UID)) if mapErr != nil { // On failure, return error. Caller will log and retry. - eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.MapDevice failed", mapErr) - og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMapVolume, eventErr.Error()) - return detailedErr + return volumeToMount.GenerateError("MapVolume.MapDevice failed", mapErr) } // Device mapping for global map path succeeded simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MapVolume.MapDevice succeeded", fmt.Sprintf("globalMapPath %q", globalMapPath)) @@ -833,9 +885,7 @@ func (og *operationGenerator) GenerateMapVolumeFunc( mapErr = og.blkUtil.MapDevice(devicePath, volumeMapPath, volName) if mapErr != nil { // On failure, return error. Caller will log and retry. - eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.MapDevice failed", mapErr) - og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMapVolume, eventErr.Error()) - return detailedErr + return volumeToMount.GenerateError("MapVolume.MapDevice failed", mapErr) } // Take filedescriptor lock to keep a block device opened. Otherwise, there is a case @@ -844,7 +894,7 @@ func (og *operationGenerator) GenerateMapVolumeFunc( // for the block device is required. _, err = og.blkUtil.AttachFileDevice(devicePath) if err != nil { - return volumeToMount.GenerateErrorDetailed("MapVolume.AttachFileDevice failed", err) + return volumeToMount.GenerateError("MapVolume.AttachFileDevice failed", err) } // Device mapping for pod device map path succeeded @@ -864,11 +914,23 @@ func (og *operationGenerator) GenerateMapVolumeFunc( volumeToMount.VolumeGidValue) if markVolMountedErr != nil { // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateErrorDetailed("MapVolume.MarkVolumeAsMounted failed", markVolMountedErr) + return volumeToMount.GenerateError("MapVolume.MarkVolumeAsMounted failed", markVolMountedErr) } - return nil - }, blockVolumePlugin.GetPluginName(), nil + return nil, nil + } + + eventRecorderFunc := func(err *error) { + if *err != nil { + og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMapVolume, (*err).Error()) + } + } + + return volumetypes.GeneratedOperations{ + OperationFunc: mapVolumeFunc, + EventRecorderFunc: eventRecorderFunc, + CompleteFunc: util.OperationCompleteHook(blockVolumePlugin.GetPluginName(), "map_volume"), + }, nil } // GenerateUnmapVolumeFunc marks volume as unmonuted based on following steps. @@ -877,32 +939,32 @@ func (og *operationGenerator) GenerateMapVolumeFunc( // If all steps are completed, the volume is marked as unmounted. func (og *operationGenerator) GenerateUnmapVolumeFunc( volumeToUnmount MountedVolume, - actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, string, error) { + actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { // Get block volume unmapper plugin var blockVolumeUnmapper volume.BlockVolumeUnmapper blockVolumePlugin, err := og.volumePluginMgr.FindMapperPluginByName(volumeToUnmount.PluginName) if err != nil { - return nil, "", volumeToUnmount.GenerateErrorDetailed("UnmapVolume.FindMapperPluginByName failed", err) + return volumetypes.GeneratedOperations{}, volumeToUnmount.GenerateErrorDetailed("UnmapVolume.FindMapperPluginByName failed", err) } if blockVolumePlugin == nil { - return nil, "", volumeToUnmount.GenerateErrorDetailed("UnmapVolume.FindMapperPluginByName failed to find BlockVolumeMapper plugin. Volume plugin is nil.", nil) + return volumetypes.GeneratedOperations{}, volumeToUnmount.GenerateErrorDetailed("UnmapVolume.FindMapperPluginByName failed to find BlockVolumeMapper plugin. Volume plugin is nil.", nil) } blockVolumeUnmapper, newUnmapperErr := blockVolumePlugin.NewBlockVolumeUnmapper( volumeToUnmount.InnerVolumeSpecName, volumeToUnmount.PodUID) if newUnmapperErr != nil { - return nil, blockVolumePlugin.GetPluginName(), volumeToUnmount.GenerateErrorDetailed("UnmapVolume.NewUnmapper failed", newUnmapperErr) + return volumetypes.GeneratedOperations{}, volumeToUnmount.GenerateErrorDetailed("UnmapVolume.NewUnmapper failed", newUnmapperErr) } - return func() error { + unmapVolumeFunc := func() (error, error) { // Try to unmap volumeName symlink under pod device map path dir // pods/{podUid}/volumeDevices/{escapeQualifiedPluginName}/{volumeName} podDeviceUnmapPath, volName := blockVolumeUnmapper.GetPodDeviceMapPath() unmapDeviceErr := og.blkUtil.UnmapDevice(podDeviceUnmapPath, volName) if unmapDeviceErr != nil { // On failure, return error. Caller will log and retry. - return volumeToUnmount.GenerateErrorDetailed("UnmapVolume.UnmapDevice on pod device map path failed", unmapDeviceErr) + return volumeToUnmount.GenerateError("UnmapVolume.UnmapDevice on pod device map path failed", unmapDeviceErr) } // Try to unmap podUID symlink under global map path dir // plugins/kubernetes.io/{PluginName}/volumeDevices/{volumePluginDependentPath}/{podUID} @@ -910,12 +972,12 @@ func (og *operationGenerator) GenerateUnmapVolumeFunc( blockVolumeUnmapper.GetGlobalMapPath(volumeToUnmount.VolumeSpec) if err != nil { // On failure, return error. Caller will log and retry. - return volumeToUnmount.GenerateErrorDetailed("UnmapVolume.GetGlobalUnmapPath failed", err) + return volumeToUnmount.GenerateError("UnmapVolume.GetGlobalUnmapPath failed", err) } unmapDeviceErr = og.blkUtil.UnmapDevice(globalUnmapPath, string(volumeToUnmount.PodUID)) if unmapDeviceErr != nil { // On failure, return error. Caller will log and retry. - return volumeToUnmount.GenerateErrorDetailed("UnmapVolume.UnmapDevice on global map path failed", unmapDeviceErr) + return volumeToUnmount.GenerateError("UnmapVolume.UnmapDevice on global map path failed", unmapDeviceErr) } glog.Infof( @@ -936,8 +998,14 @@ func (og *operationGenerator) GenerateUnmapVolumeFunc( glog.Errorf(volumeToUnmount.GenerateErrorDetailed("UnmapVolume.MarkVolumeAsUnmounted failed", markVolUnmountedErr).Error()) } - return nil - }, blockVolumePlugin.GetPluginName(), nil + return nil, nil + } + + return volumetypes.GeneratedOperations{ + OperationFunc: unmapVolumeFunc, + CompleteFunc: util.OperationCompleteHook(blockVolumePlugin.GetPluginName(), "unmap_volume"), + EventRecorderFunc: nil, // nil because we do not want to generate event on error + }, nil } // GenerateUnmapDeviceFunc marks device as unmounted based on following steps. @@ -953,56 +1021,56 @@ func (og *operationGenerator) GenerateUnmapVolumeFunc( func (og *operationGenerator) GenerateUnmapDeviceFunc( deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, - mounter mount.Interface) (func() error, string, error) { + mounter mount.Interface) (volumetypes.GeneratedOperations, error) { // Get block volume mapper plugin var blockVolumeMapper volume.BlockVolumeMapper blockVolumePlugin, err := og.volumePluginMgr.FindMapperPluginBySpec(deviceToDetach.VolumeSpec) if err != nil { - return nil, "", deviceToDetach.GenerateErrorDetailed("UnmapDevice.FindMapperPluginBySpec failed", err) + return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.FindMapperPluginBySpec failed", err) } if blockVolumePlugin == nil { - return nil, "", deviceToDetach.GenerateErrorDetailed("UnmapDevice.FindMapperPluginBySpec failed to find BlockVolumeMapper plugin. Volume plugin is nil.", nil) + return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.FindMapperPluginBySpec failed to find BlockVolumeMapper plugin. Volume plugin is nil.", nil) } blockVolumeMapper, newMapperErr := blockVolumePlugin.NewBlockVolumeMapper( deviceToDetach.VolumeSpec, nil, /* Pod */ volume.VolumeOptions{}) if newMapperErr != nil { - return nil, "", deviceToDetach.GenerateErrorDetailed("UnmapDevice.NewBlockVolumeMapper initialization failed", newMapperErr) + return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.NewBlockVolumeMapper initialization failed", newMapperErr) } blockVolumeUnmapper, newUnmapperErr := blockVolumePlugin.NewBlockVolumeUnmapper( string(deviceToDetach.VolumeName), "" /* podUID */) if newUnmapperErr != nil { - return nil, blockVolumePlugin.GetPluginName(), deviceToDetach.GenerateErrorDetailed("UnmapDevice.NewUnmapper failed", newUnmapperErr) + return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.NewUnmapper failed", newUnmapperErr) } - return func() error { + unmapDeviceFunc := func() (error, error) { // Search under globalMapPath dir if all symbolic links from pods have been removed already. // If symbolick links are there, pods may still refer the volume. globalMapPath, err := blockVolumeMapper.GetGlobalMapPath(deviceToDetach.VolumeSpec) if err != nil { // On failure, return error. Caller will log and retry. - return deviceToDetach.GenerateErrorDetailed("UnmapDevice.GetGlobalMapPath failed", err) + return deviceToDetach.GenerateError("UnmapDevice.GetGlobalMapPath failed", err) } refs, err := og.blkUtil.GetDeviceSymlinkRefs(deviceToDetach.DevicePath, globalMapPath) if err != nil { - return deviceToDetach.GenerateErrorDetailed("UnmapDevice.GetDeviceSymlinkRefs check failed", err) + return deviceToDetach.GenerateError("UnmapDevice.GetDeviceSymlinkRefs check failed", err) } if len(refs) > 0 { err = fmt.Errorf("The device %q is still referenced from other Pods %v", globalMapPath, refs) - return deviceToDetach.GenerateErrorDetailed("UnmapDevice failed", err) + return deviceToDetach.GenerateError("UnmapDevice failed", err) } // Execute tear down device unmapErr := blockVolumeUnmapper.TearDownDevice(globalMapPath, deviceToDetach.DevicePath) if unmapErr != nil { // On failure, return error. Caller will log and retry. - return deviceToDetach.GenerateErrorDetailed("UnmapDevice.TearDownDevice failed", unmapErr) + return deviceToDetach.GenerateError("UnmapDevice.TearDownDevice failed", unmapErr) } // Plugin finished TearDownDevice(). Now globalMapPath dir and plugin's stored data @@ -1010,7 +1078,7 @@ func (og *operationGenerator) GenerateUnmapDeviceFunc( removeMapPathErr := og.blkUtil.RemoveMapPath(globalMapPath) if removeMapPathErr != nil { // On failure, return error. Caller will log and retry. - return deviceToDetach.GenerateErrorDetailed("UnmapDevice failed", removeMapPathErr) + return deviceToDetach.GenerateError("UnmapDevice failed", removeMapPathErr) } // The block volume is not referenced from Pods. Release file descriptor lock. @@ -1021,7 +1089,7 @@ func (og *operationGenerator) GenerateUnmapDeviceFunc( } else { err = og.blkUtil.RemoveLoopDevice(loopPath) if err != nil { - return deviceToDetach.GenerateErrorDetailed("UnmapDevice.AttachFileDevice failed", err) + return deviceToDetach.GenerateError("UnmapDevice.AttachFileDevice failed", err) } } @@ -1031,11 +1099,11 @@ func (og *operationGenerator) GenerateUnmapDeviceFunc( // else on the system. Retry if it returns true. deviceOpened, deviceOpenedErr := isDeviceOpened(deviceToDetach, mounter) if deviceOpenedErr != nil { - return deviceOpenedErr + return nil, deviceOpenedErr } // The device is still in use elsewhere. Caller will log and retry. if deviceOpened { - return deviceToDetach.GenerateErrorDetailed( + return deviceToDetach.GenerateError( "UnmapDevice failed", fmt.Errorf("the device is in use when it was no longer expected to be in use")) } @@ -1047,24 +1115,30 @@ func (og *operationGenerator) GenerateUnmapDeviceFunc( deviceToDetach.VolumeName) if markDeviceUnmountedErr != nil { // On failure, return error. Caller will log and retry. - return deviceToDetach.GenerateErrorDetailed("MarkDeviceAsUnmounted failed", markDeviceUnmountedErr) + return deviceToDetach.GenerateError("MarkDeviceAsUnmounted failed", markDeviceUnmountedErr) } - return nil - }, blockVolumePlugin.GetPluginName(), nil + return nil, nil + } + + return volumetypes.GeneratedOperations{ + OperationFunc: unmapDeviceFunc, + CompleteFunc: util.OperationCompleteHook(blockVolumePlugin.GetPluginName(), "unmap_device"), + EventRecorderFunc: nil, // nil because we do not want to generate event on error + }, nil } func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc( volumeToMount VolumeToMount, nodeName types.NodeName, - actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, string, error) { + actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { volumePlugin, err := og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec) if err != nil || volumePlugin == nil { - return nil, "", volumeToMount.GenerateErrorDetailed("VerifyControllerAttachedVolume.FindPluginBySpec failed", err) + return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("VerifyControllerAttachedVolume.FindPluginBySpec failed", err) } - return func() error { + verifyControllerAttachedVolumeFunc := func() (error, error) { if !volumeToMount.PluginIsAttachable { // If the volume does not implement the attacher interface, it is // assumed to be attached and the actual state of the world is @@ -1074,10 +1148,10 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc( volumeToMount.VolumeName, volumeToMount.VolumeSpec, nodeName, "" /* devicePath */) if addVolumeNodeErr != nil { // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateErrorDetailed("VerifyControllerAttachedVolume.MarkVolumeAsAttachedByUniqueVolumeName failed", addVolumeNodeErr) + return volumeToMount.GenerateError("VerifyControllerAttachedVolume.MarkVolumeAsAttachedByUniqueVolumeName failed", addVolumeNodeErr) } - return nil + return nil, nil } if !volumeToMount.ReportedInUse { @@ -1087,19 +1161,19 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc( // periodically by kubelet, so it may take as much as 10 seconds // before this clears. // Issue #28141 to enable on demand status updates. - return volumeToMount.GenerateErrorDetailed("Volume has not been added to the list of VolumesInUse in the node's volume status", nil) + return volumeToMount.GenerateError("Volume has not been added to the list of VolumesInUse in the node's volume status", nil) } // Fetch current node object node, fetchErr := og.kubeClient.CoreV1().Nodes().Get(string(nodeName), metav1.GetOptions{}) if fetchErr != nil { // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateErrorDetailed("VerifyControllerAttachedVolume failed fetching node from API server", fetchErr) + return volumeToMount.GenerateError("VerifyControllerAttachedVolume failed fetching node from API server", fetchErr) } if node == nil { // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateErrorDetailed( + return volumeToMount.GenerateError( "VerifyControllerAttachedVolume failed", fmt.Errorf("Node object retrieved from API server is nil")) } @@ -1111,15 +1185,22 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc( glog.Infof(volumeToMount.GenerateMsgDetailed("Controller attach succeeded", fmt.Sprintf("device path: %q", attachedVolume.DevicePath))) if addVolumeNodeErr != nil { // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateErrorDetailed("VerifyControllerAttachedVolume.MarkVolumeAsAttached failed", addVolumeNodeErr) + return volumeToMount.GenerateError("VerifyControllerAttachedVolume.MarkVolumeAsAttached failed", addVolumeNodeErr) } - return nil + return nil, nil } } // Volume not attached, return error. Caller will log and retry. - return volumeToMount.GenerateErrorDetailed("Volume not attached according to node status", nil) - }, volumePlugin.GetPluginName(), nil + return volumeToMount.GenerateError("Volume not attached according to node status", nil) + } + + return volumetypes.GeneratedOperations{ + OperationFunc: verifyControllerAttachedVolumeFunc, + CompleteFunc: util.OperationCompleteHook(volumePlugin.GetPluginName(), "verify_controller_attached_volume"), + EventRecorderFunc: nil, // nil because we do not want to generate event on error + }, nil + } func (og *operationGenerator) verifyVolumeIsSafeToDetach( @@ -1158,17 +1239,17 @@ func (og *operationGenerator) verifyVolumeIsSafeToDetach( func (og *operationGenerator) GenerateExpandVolumeFunc( pvcWithResizeRequest *expandcache.PVCWithResizeRequest, - resizeMap expandcache.VolumeResizeMap) (func() error, string, error) { + resizeMap expandcache.VolumeResizeMap) (volumetypes.GeneratedOperations, error) { volumeSpec := volume.NewSpecFromPersistentVolume(pvcWithResizeRequest.PersistentVolume, false) volumePlugin, err := og.volumePluginMgr.FindExpandablePluginBySpec(volumeSpec) if err != nil { - return nil, "", fmt.Errorf("Error finding plugin for expanding volume: %q with error %v", pvcWithResizeRequest.QualifiedName(), err) + return volumetypes.GeneratedOperations{}, fmt.Errorf("Error finding plugin for expanding volume: %q with error %v", pvcWithResizeRequest.QualifiedName(), err) } - expandFunc := func() error { + expandVolumeFunc := func() (error, error) { newSize := pvcWithResizeRequest.ExpectedSize pvSize := pvcWithResizeRequest.PersistentVolume.Spec.Capacity[v1.ResourceStorage] if pvSize.Cmp(newSize) < 0 { @@ -1178,9 +1259,8 @@ func (og *operationGenerator) GenerateExpandVolumeFunc( pvcWithResizeRequest.CurrentSize) if expandErr != nil { - glog.Errorf("Error expanding volume %q of plugin %s : %v", pvcWithResizeRequest.QualifiedName(), volumePlugin.GetPluginName(), expandErr) - og.recorder.Eventf(pvcWithResizeRequest.PVC, v1.EventTypeWarning, kevents.VolumeResizeFailed, expandErr.Error()) - return expandErr + detailedErr := fmt.Errorf("Error expanding volume %q of plugin %s : %v", pvcWithResizeRequest.QualifiedName(), volumePlugin.GetPluginName(), expandErr) + return detailedErr, detailedErr } glog.Infof("ExpandVolume succeeded for volume %s", pvcWithResizeRequest.QualifiedName()) newSize = updatedSize @@ -1190,9 +1270,8 @@ func (og *operationGenerator) GenerateExpandVolumeFunc( updateErr := resizeMap.UpdatePVSize(pvcWithResizeRequest, newSize) if updateErr != nil { - glog.V(4).Infof("Error updating PV spec capacity for volume %q with : %v", pvcWithResizeRequest.QualifiedName(), updateErr) - og.recorder.Eventf(pvcWithResizeRequest.PVC, v1.EventTypeWarning, kevents.VolumeResizeFailed, updateErr.Error()) - return updateErr + detailedErr := fmt.Errorf("Error updating PV spec capacity for volume %q with : %v", pvcWithResizeRequest.QualifiedName(), updateErr) + return detailedErr, detailedErr } glog.Infof("ExpandVolume.UpdatePV succeeded for volume %s", pvcWithResizeRequest.QualifiedName()) } @@ -1205,24 +1284,32 @@ func (og *operationGenerator) GenerateExpandVolumeFunc( err := resizeMap.MarkAsResized(pvcWithResizeRequest, newSize) if err != nil { - glog.Errorf("Error marking pvc %s as resized : %v", pvcWithResizeRequest.QualifiedName(), err) - og.recorder.Eventf(pvcWithResizeRequest.PVC, v1.EventTypeWarning, kevents.VolumeResizeFailed, err.Error()) - return err + detailedErr := fmt.Errorf("Error marking pvc %s as resized : %v", pvcWithResizeRequest.QualifiedName(), err) + return detailedErr, detailedErr } } - return nil + return nil, nil } - return expandFunc, volumePlugin.GetPluginName(), nil + + eventRecorderFunc := func(err *error) { + if *err != nil { + og.recorder.Eventf(pvcWithResizeRequest.PVC, v1.EventTypeWarning, kevents.VolumeResizeFailed, (*err).Error()) + } + } + + return volumetypes.GeneratedOperations{ + OperationFunc: expandVolumeFunc, + EventRecorderFunc: eventRecorderFunc, + CompleteFunc: util.OperationCompleteHook(volumePlugin.GetPluginName(), "expand_volume"), + }, nil } func checkMountOptionSupport(og *operationGenerator, volumeToMount VolumeToMount, plugin volume.VolumePlugin) error { mountOptions := volume.MountOptionFromSpec(volumeToMount.VolumeSpec) if len(mountOptions) > 0 && !plugin.SupportsMountOption() { - eventErr, detailedErr := volumeToMount.GenerateError("Mount options are not supported for this volume type", nil) - og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.UnsupportedMountOption, eventErr.Error()) - return detailedErr + return fmt.Errorf("Mount options are not supported for this volume type") } return nil } @@ -1238,14 +1325,11 @@ func checkNodeAffinity(og *operationGenerator, volumeToMount VolumeToMount, plug if pv != nil { nodeLabels, err := og.volumePluginMgr.Host.GetNodeLabels() if err != nil { - return volumeToMount.GenerateErrorDetailed("Error getting node labels", err) + return err } - err = util.CheckNodeAffinity(pv, nodeLabels) if err != nil { - eventErr, detailedErr := volumeToMount.GenerateError("Storage node affinity check failed", err) - og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, eventErr.Error()) - return detailedErr + return err } } return nil diff --git a/pkg/volume/util/types/types.go b/pkg/volume/util/types/types.go index 9375ad6750c..9815545ff60 100644 --- a/pkg/volume/util/types/types.go +++ b/pkg/volume/util/types/types.go @@ -24,3 +24,11 @@ type UniquePodName types.UID // UniquePVCName defines the type to key pvc off type UniquePVCName types.UID + +// GeneratedOperations contains the operation that is created as well as +// supporting functions required for the operation executor +type GeneratedOperations struct { + OperationFunc func() (eventErr error, detailedErr error) + EventRecorderFunc func(*error) + CompleteFunc func(*error) +}