mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #28153 from jsafrane/flaky-goroutinemap-wait
Automatic merge from submit-queue Fixed goroutinemap race on Wait() sync.WaitGroup produces data races when a GoroutineMap is empty and Wait() and Run() are called at the same time. From sync.WaitGroup: > Note that calls with a positive delta that occur when the counter is zero must happen before a Wait. Fixes #28128 Note that this issue affects only PersistentVolume unit tests. @saad-ali, PTAL
This commit is contained in:
commit
5b3b655710
@ -61,16 +61,18 @@ type GoRoutineMap interface {
|
||||
|
||||
// NewGoRoutineMap returns a new instance of GoRoutineMap.
|
||||
func NewGoRoutineMap(exponentialBackOffOnError bool) GoRoutineMap {
|
||||
return &goRoutineMap{
|
||||
g := &goRoutineMap{
|
||||
operations: make(map[string]operation),
|
||||
exponentialBackOffOnError: exponentialBackOffOnError,
|
||||
}
|
||||
g.cond = sync.NewCond(g)
|
||||
return g
|
||||
}
|
||||
|
||||
type goRoutineMap struct {
|
||||
operations map[string]operation
|
||||
exponentialBackOffOnError bool
|
||||
wg sync.WaitGroup
|
||||
cond *sync.Cond
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
@ -102,7 +104,6 @@ func (grm *goRoutineMap) Run(operationName string, operationFunc func() error) e
|
||||
lastErrorTime: existingOp.lastErrorTime,
|
||||
durationBeforeRetry: existingOp.durationBeforeRetry,
|
||||
}
|
||||
grm.wg.Add(1)
|
||||
go func() (err error) {
|
||||
// Handle unhandled panics (very unlikely)
|
||||
defer k8sRuntime.HandleCrash()
|
||||
@ -117,7 +118,7 @@ func (grm *goRoutineMap) Run(operationName string, operationFunc func() error) e
|
||||
}
|
||||
|
||||
func (grm *goRoutineMap) operationComplete(operationName string, err *error) {
|
||||
defer grm.wg.Done()
|
||||
defer grm.cond.Signal()
|
||||
grm.Lock()
|
||||
defer grm.Unlock()
|
||||
|
||||
@ -157,7 +158,12 @@ func (grm *goRoutineMap) operationComplete(operationName string, err *error) {
|
||||
}
|
||||
|
||||
func (grm *goRoutineMap) Wait() {
|
||||
grm.wg.Wait()
|
||||
grm.Lock()
|
||||
defer grm.Unlock()
|
||||
|
||||
for len(grm.operations) > 0 {
|
||||
grm.cond.Wait()
|
||||
}
|
||||
}
|
||||
|
||||
func recoverFromPanic(operationName string, err *error) {
|
||||
|
Loading…
Reference in New Issue
Block a user