mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 12:43:23 +00:00
Fix variable shadowing in exponential backoff when deleting volumes
Also fix pv_controller unit tests to behave more accurately in light of exponential backoffs
This commit is contained in:
parent
15059e6a5b
commit
7b423085fa
@ -461,7 +461,7 @@ func (r *volumeReactor) getChangeCount() int {
|
|||||||
// waitForIdle waits until all tests, controllers and other goroutines do their
|
// waitForIdle waits until all tests, controllers and other goroutines do their
|
||||||
// job and no new actions are registered for 10 milliseconds.
|
// job and no new actions are registered for 10 milliseconds.
|
||||||
func (r *volumeReactor) waitForIdle() {
|
func (r *volumeReactor) waitForIdle() {
|
||||||
r.ctrl.runningOperations.Wait()
|
r.ctrl.runningOperations.WaitForCompletion()
|
||||||
// Check every 10ms if the controller does something and stop if it's
|
// Check every 10ms if the controller does something and stop if it's
|
||||||
// idle.
|
// idle.
|
||||||
oldChanges := -1
|
oldChanges := -1
|
||||||
@ -489,7 +489,7 @@ func (r *volumeReactor) waitTest(test controllerTest) error {
|
|||||||
}
|
}
|
||||||
err := wait.ExponentialBackoff(backoff, func() (done bool, err error) {
|
err := wait.ExponentialBackoff(backoff, func() (done bool, err error) {
|
||||||
// Finish all operations that are in progress
|
// Finish all operations that are in progress
|
||||||
r.ctrl.runningOperations.Wait()
|
r.ctrl.runningOperations.WaitForCompletion()
|
||||||
|
|
||||||
// Return 'true' if the reactor reached the expected state
|
// Return 'true' if the reactor reached the expected state
|
||||||
err1 := r.checkClaims(test.expectedClaims)
|
err1 := r.checkClaims(test.expectedClaims)
|
||||||
@ -1038,6 +1038,8 @@ func runMultisyncTests(t *testing.T, tests []controllerTest, storageClasses []*s
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// waiting here cools down exponential backoff
|
||||||
|
time.Sleep(600 * time.Millisecond)
|
||||||
|
|
||||||
// There were some changes, process them
|
// There were some changes, process them
|
||||||
switch obj.(type) {
|
switch obj.(type) {
|
||||||
|
@ -1079,7 +1079,7 @@ func (ctrl *PersistentVolumeController) deleteVolumeOperation(arg interface{}) e
|
|||||||
} else {
|
} else {
|
||||||
// The plugin failed, mark the volume as Failed and send Warning
|
// The plugin failed, mark the volume as Failed and send Warning
|
||||||
// event
|
// event
|
||||||
if _, err = ctrl.updateVolumePhaseWithEvent(volume, v1.VolumeFailed, v1.EventTypeWarning, "VolumeFailedDelete", err.Error()); err != nil {
|
if _, err := ctrl.updateVolumePhaseWithEvent(volume, v1.VolumeFailed, v1.EventTypeWarning, "VolumeFailedDelete", err.Error()); err != nil {
|
||||||
glog.V(4).Infof("deleteVolumeOperation [%s]: failed to mark volume as failed: %v", volume.Name, err)
|
glog.V(4).Infof("deleteVolumeOperation [%s]: failed to mark volume as failed: %v", volume.Name, err)
|
||||||
// Save failed, retry on the next deletion attempt
|
// Save failed, retry on the next deletion attempt
|
||||||
return err
|
return err
|
||||||
|
@ -57,11 +57,16 @@ type GoRoutineMap interface {
|
|||||||
// a new operation to be started with the same operation name without error.
|
// a new operation to be started with the same operation name without error.
|
||||||
Run(operationName string, operationFunc func() error) error
|
Run(operationName string, operationFunc func() error) error
|
||||||
|
|
||||||
// Wait blocks until all operations are completed. This is typically
|
// Wait blocks until operations map is empty. This is typically
|
||||||
// necessary during tests - the test should wait until all operations finish
|
// necessary during tests - the test should wait until all operations finish
|
||||||
// and evaluate results after that.
|
// and evaluate results after that.
|
||||||
Wait()
|
Wait()
|
||||||
|
|
||||||
|
// WaitForCompletion blocks until either all operations have successfully completed
|
||||||
|
// or have failed but are not pending. The test should wait until operations are either
|
||||||
|
// complete or have failed.
|
||||||
|
WaitForCompletion()
|
||||||
|
|
||||||
// IsOperationPending returns true if the operation is pending (currently
|
// IsOperationPending returns true if the operation is pending (currently
|
||||||
// running), otherwise returns false.
|
// running), otherwise returns false.
|
||||||
IsOperationPending(operationName string) bool
|
IsOperationPending(operationName string) bool
|
||||||
@ -179,6 +184,32 @@ func (grm *goRoutineMap) Wait() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (grm *goRoutineMap) WaitForCompletion() {
|
||||||
|
grm.lock.Lock()
|
||||||
|
defer grm.lock.Unlock()
|
||||||
|
|
||||||
|
for {
|
||||||
|
if len(grm.operations) == 0 || grm.nothingPending() {
|
||||||
|
break
|
||||||
|
} else {
|
||||||
|
grm.cond.Wait()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if any operation is pending. Already assumes caller has the
|
||||||
|
// necessary locks
|
||||||
|
func (grm *goRoutineMap) nothingPending() bool {
|
||||||
|
nothingIsPending := true
|
||||||
|
for _, operation := range grm.operations {
|
||||||
|
if operation.operationPending {
|
||||||
|
nothingIsPending = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nothingIsPending
|
||||||
|
}
|
||||||
|
|
||||||
// NewAlreadyExistsError returns a new instance of AlreadyExists error.
|
// NewAlreadyExistsError returns a new instance of AlreadyExists error.
|
||||||
func NewAlreadyExistsError(operationName string) error {
|
func NewAlreadyExistsError(operationName string) error {
|
||||||
return alreadyExistsError{operationName}
|
return alreadyExistsError{operationName}
|
||||||
|
@ -448,6 +448,34 @@ func Test_NewGoRoutineMap_Positive_WaitWithExpBackoff(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Test_NewGoRoutineMap_WaitForCompletionWithExpBackoff(t *testing.T) {
|
||||||
|
grm := NewGoRoutineMap(true /* exponentialBackOffOnError */)
|
||||||
|
operationName := "operation-err"
|
||||||
|
|
||||||
|
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
|
||||||
|
operation1 := generateErrorFunc(operation1DoneCh)
|
||||||
|
err := grm.Run(operationName, operation1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Act
|
||||||
|
waitDoneCh := make(chan interface{}, 1)
|
||||||
|
go func() {
|
||||||
|
grm.WaitForCompletion()
|
||||||
|
waitDoneCh <- true
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Finish the operation
|
||||||
|
operation1DoneCh <- true
|
||||||
|
|
||||||
|
// Assert that WaitForCompletion returns even if scheduled op had error
|
||||||
|
err = waitChannelWithTimeout(waitDoneCh, testTimeout)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error waiting for GoRoutineMap.Wait: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func generateCallbackFunc(done chan<- interface{}) func() error {
|
func generateCallbackFunc(done chan<- interface{}) func() error {
|
||||||
return func() error {
|
return func() error {
|
||||||
done <- true
|
done <- true
|
||||||
@ -455,6 +483,13 @@ func generateCallbackFunc(done chan<- interface{}) func() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func generateErrorFunc(done <-chan interface{}) func() error {
|
||||||
|
return func() error {
|
||||||
|
<-done
|
||||||
|
return fmt.Errorf("Generic error")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func generateWaitFunc(done <-chan interface{}) func() error {
|
func generateWaitFunc(done <-chan interface{}) func() error {
|
||||||
return func() error {
|
return func() error {
|
||||||
<-done
|
<-done
|
||||||
|
Loading…
Reference in New Issue
Block a user