refactored mount, attach, resize operation's so that all failures generate events and event generation is more consistent.

refactored operation generator and operation executor to use more general generated functions for operations, completions, and events.
This commit is contained in:
David Zhu 2017-12-05 16:21:20 -08:00
parent a5d2a025b7
commit e3f8f64c17
8 changed files with 440 additions and 319 deletions

View File

@ -1250,7 +1250,7 @@ func (ctrl *PersistentVolumeController) doDeleteVolume(volume *v1.PersistentVolu
opComplete := util.OperationCompleteHook(plugin.GetPluginName(), "volume_delete")
err = deleter.Delete()
opComplete(err)
opComplete(&err)
if err != nil {
// Deleter failed
return false, err
@ -1373,7 +1373,7 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(claimObj interfa
opComplete := util.OperationCompleteHook(plugin.GetPluginName(), "volume_provision")
volume, err = provisioner.Provision()
opComplete(err)
opComplete(&err)
if err != nil {
strerr := fmt.Sprintf("Failed to provision volume with StorageClass %q: %v", storageClass.Name, err)
glog.V(2).Infof("failed to provision volume for claim %q with StorageClass %q: %v", claimToClaimKey(claim), storageClass.Name, err)

View File

@ -49,12 +49,12 @@ func registerMetrics() {
}
// OperationCompleteHook returns a hook to call when an operation is completed
func OperationCompleteHook(plugin, operationName string) func(error) {
func OperationCompleteHook(plugin, operationName string) func(*error) {
requestTime := time.Now()
opComplete := func(err error) {
opComplete := func(err *error) {
timeTaken := time.Since(requestTime).Seconds()
// Create metric with operation name and plugin name
if err != nil {
if *err != nil {
storageOperationErrorMetric.WithLabelValues(plugin, operationName).Inc()
} else {
storageOperationMetric.WithLabelValues(plugin, operationName).Observe(timeTaken)

View File

@ -55,7 +55,7 @@ type NestedPendingOperations interface {
// concatenation of volumeName and podName is removed from the list of
// executing operations allowing a new operation to be started with the
// volumeName without error.
Run(volumeName v1.UniqueVolumeName, podName types.UniquePodName, operationFunc func() error, operationCompleteFunc func(error)) error
Run(volumeName v1.UniqueVolumeName, podName types.UniquePodName, generatedOperations types.GeneratedOperations) error
// Wait blocks until all operations are completed. This is typically
// necessary during tests - the test should wait until all operations finish
@ -94,8 +94,7 @@ type operation struct {
func (grm *nestedPendingOperations) Run(
volumeName v1.UniqueVolumeName,
podName types.UniquePodName,
operationFunc func() error,
operationCompleteFunc func(error)) error {
generatedOperations types.GeneratedOperations) error {
grm.lock.Lock()
defer grm.lock.Unlock()
opExists, previousOpIndex := grm.isOperationExists(volumeName, podName)
@ -128,15 +127,20 @@ func (grm *nestedPendingOperations) Run(
})
}
go func() (err error) {
go func() (eventErr, detailedErr error) {
// Handle unhandled panics (very unlikely)
defer k8sRuntime.HandleCrash()
// Handle completion of and error, if any, from operationFunc()
defer grm.operationComplete(volumeName, podName, &err)
defer operationCompleteFunc(err)
defer grm.operationComplete(volumeName, podName, &detailedErr)
if generatedOperations.CompleteFunc != nil {
defer generatedOperations.CompleteFunc(&detailedErr)
}
if generatedOperations.EventRecorderFunc != nil {
defer generatedOperations.EventRecorderFunc(&eventErr)
}
// Handle panic, if any, from operationFunc()
defer k8sRuntime.RecoverFromPanic(&err)
return operationFunc()
defer k8sRuntime.RecoverFromPanic(&detailedErr)
return generatedOperations.OperationFunc()
}()
return nil

View File

@ -47,10 +47,10 @@ func Test_NewGoRoutineMap_Positive_SingleOp(t *testing.T) {
// Arrange
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name")
operation := func() error { return nil }
operation := func() (error, error) { return nil, nil }
// Act
err := grm.Run(volumeName, "" /* operationSubName */, operation, func(error) {})
err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation})
// Assert
if err != nil {
@ -63,11 +63,11 @@ func Test_NewGoRoutineMap_Positive_TwoOps(t *testing.T) {
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
volume1Name := v1.UniqueVolumeName("volume1-name")
volume2Name := v1.UniqueVolumeName("volume2-name")
operation := func() error { return nil }
operation := func() (error, error) { return nil, nil }
// Act
err1 := grm.Run(volume1Name, "" /* operationSubName */, operation, func(error) {})
err2 := grm.Run(volume2Name, "" /* operationSubName */, operation, func(error) {})
err1 := grm.Run(volume1Name, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation})
err2 := grm.Run(volume2Name, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation})
// Assert
if err1 != nil {
@ -85,11 +85,11 @@ func Test_NewGoRoutineMap_Positive_TwoSubOps(t *testing.T) {
volumeName := v1.UniqueVolumeName("volume-name")
operation1PodName := types.UniquePodName("operation1-podname")
operation2PodName := types.UniquePodName("operation2-podname")
operation := func() error { return nil }
operation := func() (error, error) { return nil, nil }
// Act
err1 := grm.Run(volumeName, operation1PodName, operation, func(error) {})
err2 := grm.Run(volumeName, operation2PodName, operation, func(error) {})
err1 := grm.Run(volumeName, operation1PodName, types.GeneratedOperations{OperationFunc: operation})
err2 := grm.Run(volumeName, operation2PodName, types.GeneratedOperations{OperationFunc: operation})
// Assert
if err1 != nil {
@ -105,10 +105,10 @@ func Test_NewGoRoutineMap_Positive_SingleOpWithExpBackoff(t *testing.T) {
// Arrange
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name")
operation := func() error { return nil }
operation := func() (error, error) { return nil, nil }
// Act
err := grm.Run(volumeName, "" /* operationSubName */, operation, func(error) {})
err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation})
// Assert
if err != nil {
@ -122,7 +122,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletes(t *testing.T) {
volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateCallbackFunc(operation1DoneCh)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {})
err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1})
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
@ -133,7 +133,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletes(t *testing.T) {
err2 := retryWithExponentialBackOff(
time.Duration(initialOperationWaitTimeShort),
func() (bool, error) {
err := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {})
err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2})
if err != nil {
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
return false, nil
@ -154,7 +154,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletesWithExpBackoff(t *
volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateCallbackFunc(operation1DoneCh)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {})
err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1})
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
@ -165,7 +165,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletesWithExpBackoff(t *
err2 := retryWithExponentialBackOff(
time.Duration(initialOperationWaitTimeShort),
func() (bool, error) {
err := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {})
err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2})
if err != nil {
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
return false, nil
@ -185,7 +185,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanics(t *testing.T) {
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name")
operation1 := generatePanicFunc()
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {})
err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1})
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
@ -195,7 +195,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanics(t *testing.T) {
err2 := retryWithExponentialBackOff(
time.Duration(initialOperationWaitTimeShort),
func() (bool, error) {
err := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {})
err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2})
if err != nil {
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
return false, nil
@ -215,7 +215,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanicsWithExpBackoff(t *tes
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name")
operation1 := generatePanicFunc()
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {})
err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1})
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
@ -225,7 +225,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanicsWithExpBackoff(t *tes
err2 := retryWithExponentialBackOff(
time.Duration(initialOperationWaitTimeLong), // Longer duration to accommodate for backoff
func() (bool, error) {
err := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {})
err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2})
if err != nil {
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
return false, nil
@ -246,14 +246,14 @@ func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletes(t *testing.T) {
volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {})
err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1})
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := generateNoopFunc()
// Act
err2 := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {})
err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2})
// Assert
if err2 == nil {
@ -271,14 +271,14 @@ func Test_NewGoRoutineMap_Negative_SecondSubOpBeforeFirstCompletes2(t *testing.T
operationPodName := types.UniquePodName("operation-podname")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName, operationPodName, operation1, func(error) {})
err1 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation1})
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := generateNoopFunc()
// Act
err2 := grm.Run(volumeName, operationPodName, operation2, func(error) {})
err2 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation2})
// Assert
if err2 == nil {
@ -296,14 +296,14 @@ func Test_NewGoRoutineMap_Negative_SecondSubOpBeforeFirstCompletes(t *testing.T)
operationPodName := types.UniquePodName("operation-podname")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName, operationPodName, operation1, func(error) {})
err1 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation1})
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := generateNoopFunc()
// Act
err2 := grm.Run(volumeName, operationPodName, operation2, func(error) {})
err2 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation2})
// Assert
if err2 == nil {
@ -320,14 +320,14 @@ func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletesWithExpBackoff(t
volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {})
err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1})
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := generateNoopFunc()
// Act
err2 := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {})
err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2})
// Assert
if err2 == nil {
@ -344,7 +344,7 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletes(t *testing.T) {
volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {})
err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1})
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
@ -352,7 +352,7 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletes(t *testing.T) {
operation3 := generateNoopFunc()
// Act
err2 := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {})
err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2})
// Assert
if err2 == nil {
@ -367,7 +367,7 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletes(t *testing.T) {
err3 := retryWithExponentialBackOff(
time.Duration(initialOperationWaitTimeShort),
func() (bool, error) {
err := grm.Run(volumeName, "" /* operationSubName */, operation3, func(error) {})
err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation3})
if err != nil {
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
return false, nil
@ -388,7 +388,7 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *t
volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {})
err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1})
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
@ -396,7 +396,7 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *t
operation3 := generateNoopFunc()
// Act
err2 := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {})
err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2})
// Assert
if err2 == nil {
@ -411,7 +411,7 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *t
err3 := retryWithExponentialBackOff(
time.Duration(initialOperationWaitTimeShort),
func() (bool, error) {
err := grm.Run(volumeName, "" /* operationSubName */, operation3, func(error) {})
err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation3})
if err != nil {
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
return false, nil
@ -471,7 +471,7 @@ func Test_NewGoRoutineMap_Positive_Wait(t *testing.T) {
volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh)
err := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {})
err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1})
if err != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
}
@ -500,7 +500,7 @@ func Test_NewGoRoutineMap_Positive_WaitWithExpBackoff(t *testing.T) {
volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh)
err := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {})
err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1})
if err != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
}
@ -522,28 +522,28 @@ func Test_NewGoRoutineMap_Positive_WaitWithExpBackoff(t *testing.T) {
}
}
func generateCallbackFunc(done chan<- interface{}) func() error {
return func() error {
func generateCallbackFunc(done chan<- interface{}) func() (error, error) {
return func() (error, error) {
done <- true
return nil
return nil, nil
}
}
func generateWaitFunc(done <-chan interface{}) func() error {
return func() error {
func generateWaitFunc(done <-chan interface{}) func() (error, error) {
return func() (error, error) {
<-done
return nil
return nil, nil
}
}
func generatePanicFunc() func() error {
return func() error {
func generatePanicFunc() func() (error, error) {
return func() (error, error) {
panic("testing panic")
}
}
func generateNoopFunc() func() error {
return func() error { return nil }
func generateNoopFunc() func() (error, error) {
return func() (error, error) { return nil, nil }
}
func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error {

View File

@ -571,30 +571,28 @@ func (oe *operationExecutor) IsOperationPending(volumeName v1.UniqueVolumeName,
func (oe *operationExecutor) AttachVolume(
volumeToAttach VolumeToAttach,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
attachFunc, plugin, err :=
generatedOperations, err :=
oe.operationGenerator.GenerateAttachVolumeFunc(volumeToAttach, actualStateOfWorld)
if err != nil {
return err
}
opCompleteFunc := util.OperationCompleteHook(plugin, "volume_attach")
return oe.pendingOperations.Run(
volumeToAttach.VolumeName, "" /* podName */, attachFunc, opCompleteFunc)
volumeToAttach.VolumeName, "" /* podName */, generatedOperations)
}
func (oe *operationExecutor) DetachVolume(
volumeToDetach AttachedVolume,
verifySafeToDetach bool,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
detachFunc, plugin, err :=
generatedOperations, err :=
oe.operationGenerator.GenerateDetachVolumeFunc(volumeToDetach, verifySafeToDetach, actualStateOfWorld)
if err != nil {
return err
}
opCompleteFunc := util.OperationCompleteHook(plugin, "volume_detach")
return oe.pendingOperations.Run(
volumeToDetach.VolumeName, "" /* podName */, detachFunc, opCompleteFunc)
volumeToDetach.VolumeName, "" /* podName */, generatedOperations)
}
func (oe *operationExecutor) VerifyVolumesAreAttached(
@ -661,7 +659,7 @@ func (oe *operationExecutor) VerifyVolumesAreAttached(
}
for pluginName, pluginNodeVolumes := range bulkVerifyPluginsByNode {
bulkVerifyVolumeFunc, err := oe.operationGenerator.GenerateBulkVolumeVerifyFunc(
generatedOperations, err := oe.operationGenerator.GenerateBulkVolumeVerifyFunc(
pluginNodeVolumes,
pluginName,
volumeSpecMapByPlugin[pluginName],
@ -670,10 +668,9 @@ func (oe *operationExecutor) VerifyVolumesAreAttached(
glog.Errorf("BulkVerifyVolumes.GenerateBulkVolumeVerifyFunc error bulk verifying volumes for plugin %q with %v", pluginName, err)
}
opCompleteFunc := util.OperationCompleteHook(pluginName, "verify_volumes_are_attached")
// Ugly hack to ensure - we don't do parallel bulk polling of same volume plugin
uniquePluginName := v1.UniqueVolumeName(pluginName)
err = oe.pendingOperations.Run(uniquePluginName, "" /* Pod Name */, bulkVerifyVolumeFunc, opCompleteFunc)
err = oe.pendingOperations.Run(uniquePluginName, "" /* Pod Name */, generatedOperations)
if err != nil {
glog.Errorf("BulkVerifyVolumes.Run Error bulk volume verification for plugin %q with %v", pluginName, err)
}
@ -684,15 +681,14 @@ func (oe *operationExecutor) VerifyVolumesAreAttachedPerNode(
attachedVolumes []AttachedVolume,
nodeName types.NodeName,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
volumesAreAttachedFunc, err :=
generatedOperations, err :=
oe.operationGenerator.GenerateVolumesAreAttachedFunc(attachedVolumes, nodeName, actualStateOfWorld)
if err != nil {
return err
}
opCompleteFunc := util.OperationCompleteHook("<n/a>", "verify_volumes_are_attached_per_node")
// Give an empty UniqueVolumeName so that this operation could be executed concurrently.
return oe.pendingOperations.Run("" /* volumeName */, "" /* podName */, volumesAreAttachedFunc, opCompleteFunc)
return oe.pendingOperations.Run("" /* volumeName */, "" /* podName */, generatedOperations)
}
func (oe *operationExecutor) MountVolume(
@ -700,7 +696,7 @@ func (oe *operationExecutor) MountVolume(
volumeToMount VolumeToMount,
actualStateOfWorld ActualStateOfWorldMounterUpdater,
isRemount bool) error {
mountFunc, plugin, err := oe.operationGenerator.GenerateMountVolumeFunc(
generatedOperations, err := oe.operationGenerator.GenerateMountVolumeFunc(
waitForAttachTimeout, volumeToMount, actualStateOfWorld, isRemount)
if err != nil {
return err
@ -715,16 +711,15 @@ func (oe *operationExecutor) MountVolume(
}
// TODO mount_device
opCompleteFunc := util.OperationCompleteHook(plugin, "volume_mount")
return oe.pendingOperations.Run(
volumeToMount.VolumeName, podName, mountFunc, opCompleteFunc)
volumeToMount.VolumeName, podName, generatedOperations)
}
func (oe *operationExecutor) UnmountVolume(
volumeToUnmount MountedVolume,
actualStateOfWorld ActualStateOfWorldMounterUpdater) error {
unmountFunc, plugin, err :=
generatedOperations, err :=
oe.operationGenerator.GenerateUnmountVolumeFunc(volumeToUnmount, actualStateOfWorld)
if err != nil {
return err
@ -734,42 +729,40 @@ func (oe *operationExecutor) UnmountVolume(
// same volume in parallel
podName := volumetypes.UniquePodName(volumeToUnmount.PodUID)
opCompleteFunc := util.OperationCompleteHook(plugin, "volume_unmount")
return oe.pendingOperations.Run(
volumeToUnmount.VolumeName, podName, unmountFunc, opCompleteFunc)
volumeToUnmount.VolumeName, podName, generatedOperations)
}
func (oe *operationExecutor) UnmountDevice(
deviceToDetach AttachedVolume,
actualStateOfWorld ActualStateOfWorldMounterUpdater,
mounter mount.Interface) error {
unmountDeviceFunc, plugin, err :=
generatedOperations, err :=
oe.operationGenerator.GenerateUnmountDeviceFunc(deviceToDetach, actualStateOfWorld, mounter)
if err != nil {
return err
}
opCompleteFunc := util.OperationCompleteHook(plugin, "unmount_device")
return oe.pendingOperations.Run(
deviceToDetach.VolumeName, "" /* podName */, unmountDeviceFunc, opCompleteFunc)
deviceToDetach.VolumeName, "" /* podName */, generatedOperations)
}
func (oe *operationExecutor) ExpandVolume(pvcWithResizeRequest *expandcache.PVCWithResizeRequest, resizeMap expandcache.VolumeResizeMap) error {
expandFunc, pluginName, err := oe.operationGenerator.GenerateExpandVolumeFunc(pvcWithResizeRequest, resizeMap)
generatedOperations, err := oe.operationGenerator.GenerateExpandVolumeFunc(pvcWithResizeRequest, resizeMap)
if err != nil {
return err
}
uniqueVolumeKey := v1.UniqueVolumeName(pvcWithResizeRequest.UniquePVCKey())
opCompleteFunc := util.OperationCompleteHook(pluginName, "expand_volume")
return oe.pendingOperations.Run(uniqueVolumeKey, "", expandFunc, opCompleteFunc)
return oe.pendingOperations.Run(uniqueVolumeKey, "", generatedOperations)
}
func (oe *operationExecutor) MapVolume(
waitForAttachTimeout time.Duration,
volumeToMount VolumeToMount,
actualStateOfWorld ActualStateOfWorldMounterUpdater) error {
mapFunc, plugin, err := oe.operationGenerator.GenerateMapVolumeFunc(
generatedOperations, err := oe.operationGenerator.GenerateMapVolumeFunc(
waitForAttachTimeout, volumeToMount, actualStateOfWorld)
if err != nil {
return err
@ -785,15 +778,14 @@ func (oe *operationExecutor) MapVolume(
podName = volumehelper.GetUniquePodName(volumeToMount.Pod)
}
opCompleteFunc := util.OperationCompleteHook(plugin, "map_volume")
return oe.pendingOperations.Run(
volumeToMount.VolumeName, podName, mapFunc, opCompleteFunc)
volumeToMount.VolumeName, podName, generatedOperations)
}
func (oe *operationExecutor) UnmapVolume(
volumeToUnmount MountedVolume,
actualStateOfWorld ActualStateOfWorldMounterUpdater) error {
unmapFunc, plugin, err :=
generatedOperations, err :=
oe.operationGenerator.GenerateUnmapVolumeFunc(volumeToUnmount, actualStateOfWorld)
if err != nil {
return err
@ -803,16 +795,15 @@ func (oe *operationExecutor) UnmapVolume(
// same volume in parallel
podName := volumetypes.UniquePodName(volumeToUnmount.PodUID)
opCompleteFunc := util.OperationCompleteHook(plugin, "unmap_volume")
return oe.pendingOperations.Run(
volumeToUnmount.VolumeName, podName, unmapFunc, opCompleteFunc)
volumeToUnmount.VolumeName, podName, generatedOperations)
}
func (oe *operationExecutor) UnmapDevice(
deviceToDetach AttachedVolume,
actualStateOfWorld ActualStateOfWorldMounterUpdater,
mounter mount.Interface) error {
unmapDeviceFunc, plugin, err :=
generatedOperations, err :=
oe.operationGenerator.GenerateUnmapDeviceFunc(deviceToDetach, actualStateOfWorld, mounter)
if err != nil {
return err
@ -822,24 +813,22 @@ func (oe *operationExecutor) UnmapDevice(
// the same volume in parallel
podName := nestedpendingoperations.EmptyUniquePodName
opCompleteFunc := util.OperationCompleteHook(plugin, "unmap_device")
return oe.pendingOperations.Run(
deviceToDetach.VolumeName, podName, unmapDeviceFunc, opCompleteFunc)
deviceToDetach.VolumeName, podName, generatedOperations)
}
func (oe *operationExecutor) VerifyControllerAttachedVolume(
volumeToMount VolumeToMount,
nodeName types.NodeName,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
verifyControllerAttachedVolumeFunc, plugin, err :=
generatedOperations, err :=
oe.operationGenerator.GenerateVerifyControllerAttachedVolumeFunc(volumeToMount, nodeName, actualStateOfWorld)
if err != nil {
return err
}
opCompleteFunc := util.OperationCompleteHook(plugin, "verify_controller_attached_volume")
return oe.pendingOperations.Run(
volumeToMount.VolumeName, "" /* podName */, verifyControllerAttachedVolumeFunc, opCompleteFunc)
volumeToMount.VolumeName, "" /* podName */, generatedOperations)
}
// VolumeStateHandler defines a set of operations for handling mount/unmount/detach/reconstruct volume-related operations

View File

@ -350,87 +350,123 @@ func newFakeOperationGenerator(ch chan interface{}, quit chan interface{}) Opera
}
}
func (fopg *fakeOperationGenerator) GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) (func() error, string, error) {
return func() error {
func (fopg *fakeOperationGenerator) GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) (volumetypes.GeneratedOperations, error) {
opFunc := func() (error, error) {
startOperationAndBlock(fopg.ch, fopg.quit)
return nil
}, "", nil
}
func (fopg *fakeOperationGenerator) GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, string, error) {
return func() error {
startOperationAndBlock(fopg.ch, fopg.quit)
return nil
}, "", nil
}
func (fopg *fakeOperationGenerator) GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, string, error) {
return func() error {
startOperationAndBlock(fopg.ch, fopg.quit)
return nil
}, "", nil
}
func (fopg *fakeOperationGenerator) GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, string, error) {
return func() error {
startOperationAndBlock(fopg.ch, fopg.quit)
return nil
}, "", nil
}
func (fopg *fakeOperationGenerator) GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) {
return func() error {
startOperationAndBlock(fopg.ch, fopg.quit)
return nil
return nil, nil
}
return volumetypes.GeneratedOperations{
OperationFunc: opFunc,
}, nil
}
func (fopg *fakeOperationGenerator) GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (func() error, string, error) {
return func() error {
func (fopg *fakeOperationGenerator) GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
opFunc := func() (error, error) {
startOperationAndBlock(fopg.ch, fopg.quit)
return nil
}, "", nil
return nil, nil
}
return volumetypes.GeneratedOperations{
OperationFunc: opFunc,
}, nil
}
func (fopg *fakeOperationGenerator) GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, string, error) {
return func() error {
func (fopg *fakeOperationGenerator) GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
opFunc := func() (error, error) {
startOperationAndBlock(fopg.ch, fopg.quit)
return nil
}, "", nil
return nil, nil
}
return volumetypes.GeneratedOperations{
OperationFunc: opFunc,
}, nil
}
func (fopg *fakeOperationGenerator) GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
opFunc := func() (error, error) {
startOperationAndBlock(fopg.ch, fopg.quit)
return nil, nil
}
return volumetypes.GeneratedOperations{
OperationFunc: opFunc,
}, nil
}
func (fopg *fakeOperationGenerator) GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
opFunc := func() (error, error) {
startOperationAndBlock(fopg.ch, fopg.quit)
return nil, nil
}
return volumetypes.GeneratedOperations{
OperationFunc: opFunc,
}, nil
}
func (fopg *fakeOperationGenerator) GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (volumetypes.GeneratedOperations, error) {
opFunc := func() (error, error) {
startOperationAndBlock(fopg.ch, fopg.quit)
return nil, nil
}
return volumetypes.GeneratedOperations{
OperationFunc: opFunc,
}, nil
}
func (fopg *fakeOperationGenerator) GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
opFunc := func() (error, error) {
startOperationAndBlock(fopg.ch, fopg.quit)
return nil, nil
}
return volumetypes.GeneratedOperations{
OperationFunc: opFunc,
}, nil
}
func (fopg *fakeOperationGenerator) GenerateExpandVolumeFunc(pvcWithResizeRequest *expandcache.PVCWithResizeRequest,
resizeMap expandcache.VolumeResizeMap) (func() error, string, error) {
return func() error {
resizeMap expandcache.VolumeResizeMap) (volumetypes.GeneratedOperations, error) {
opFunc := func() (error, error) {
startOperationAndBlock(fopg.ch, fopg.quit)
return nil
}, "", nil
return nil, nil
}
return volumetypes.GeneratedOperations{
OperationFunc: opFunc,
}, nil
}
func (fopg *fakeOperationGenerator) GenerateBulkVolumeVerifyFunc(
pluginNodeVolumes map[types.NodeName][]*volume.Spec,
pluginNane string,
volumeSpecMap map[*volume.Spec]v1.UniqueVolumeName,
actualStateOfWorldAttacherUpdater ActualStateOfWorldAttacherUpdater) (func() error, error) {
return func() error {
actualStateOfWorldAttacherUpdater ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
opFunc := func() (error, error) {
startOperationAndBlock(fopg.ch, fopg.quit)
return nil
return nil, nil
}
return volumetypes.GeneratedOperations{
OperationFunc: opFunc,
}, nil
}
func (fopg *fakeOperationGenerator) GenerateMapVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater) (func() error, string, error) {
return func() error {
func (fopg *fakeOperationGenerator) GenerateMapVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
opFunc := func() (error, error) {
startOperationAndBlock(fopg.ch, fopg.quit)
return nil
}, "", nil
return nil, nil
}
return volumetypes.GeneratedOperations{
OperationFunc: opFunc,
}, nil
}
func (fopg *fakeOperationGenerator) GenerateUnmapVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, string, error) {
return func() error {
func (fopg *fakeOperationGenerator) GenerateUnmapVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
opFunc := func() (error, error) {
startOperationAndBlock(fopg.ch, fopg.quit)
return nil
}, "", nil
return nil, nil
}
return volumetypes.GeneratedOperations{
OperationFunc: opFunc,
}, nil
}
func (fopg *fakeOperationGenerator) GenerateUnmapDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (func() error, string, error) {
return func() error {
func (fopg *fakeOperationGenerator) GenerateUnmapDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (volumetypes.GeneratedOperations, error) {
opFunc := func() (error, error) {
startOperationAndBlock(fopg.ch, fopg.quit)
return nil
}, "", nil
return nil, nil
}
return volumetypes.GeneratedOperations{
OperationFunc: opFunc,
}, nil
}
func (fopg *fakeOperationGenerator) GetVolumePluginMgr() *volume.VolumePluginMgr {

File diff suppressed because it is too large Load Diff

View File

@ -24,3 +24,11 @@ type UniquePodName types.UID
// UniquePVCName defines the type to key pvc off
type UniquePVCName types.UID
// GeneratedOperations contains the operation that is created as well as
// supporting functions required for the operation executor
type GeneratedOperations struct {
OperationFunc func() (eventErr error, detailedErr error)
EventRecorderFunc func(*error)
CompleteFunc func(*error)
}