Fixed goroutinemap race on Wait()

sync.WaitGroup produces data races when a GoroutineMap is empty and Wait() and
Run() are called at the same time.

From sync.WaitGroup:
   Note that calls with a positive delta that occur when the counter is zero
   must happen before a Wait.

Fixes #28128
This commit is contained in:
Jan Safranek 2016-06-28 12:04:45 +02:00
parent d5e60cefc4
commit 06082b1bdf

View File

@ -61,16 +61,18 @@ type GoRoutineMap interface {
// NewGoRoutineMap returns a new instance of GoRoutineMap. // NewGoRoutineMap returns a new instance of GoRoutineMap.
func NewGoRoutineMap(exponentialBackOffOnError bool) GoRoutineMap { func NewGoRoutineMap(exponentialBackOffOnError bool) GoRoutineMap {
return &goRoutineMap{ g := &goRoutineMap{
operations: make(map[string]operation), operations: make(map[string]operation),
exponentialBackOffOnError: exponentialBackOffOnError, exponentialBackOffOnError: exponentialBackOffOnError,
} }
g.cond = sync.NewCond(g)
return g
} }
type goRoutineMap struct { type goRoutineMap struct {
operations map[string]operation operations map[string]operation
exponentialBackOffOnError bool exponentialBackOffOnError bool
wg sync.WaitGroup cond *sync.Cond
sync.Mutex sync.Mutex
} }
@ -102,7 +104,6 @@ func (grm *goRoutineMap) Run(operationName string, operationFunc func() error) e
lastErrorTime: existingOp.lastErrorTime, lastErrorTime: existingOp.lastErrorTime,
durationBeforeRetry: existingOp.durationBeforeRetry, durationBeforeRetry: existingOp.durationBeforeRetry,
} }
grm.wg.Add(1)
go func() (err error) { go func() (err error) {
// Handle unhandled panics (very unlikely) // Handle unhandled panics (very unlikely)
defer k8sRuntime.HandleCrash() defer k8sRuntime.HandleCrash()
@ -117,7 +118,7 @@ func (grm *goRoutineMap) Run(operationName string, operationFunc func() error) e
} }
func (grm *goRoutineMap) operationComplete(operationName string, err *error) { func (grm *goRoutineMap) operationComplete(operationName string, err *error) {
defer grm.wg.Done() defer grm.cond.Signal()
grm.Lock() grm.Lock()
defer grm.Unlock() defer grm.Unlock()
@ -157,7 +158,12 @@ func (grm *goRoutineMap) operationComplete(operationName string, err *error) {
} }
func (grm *goRoutineMap) Wait() { func (grm *goRoutineMap) Wait() {
grm.wg.Wait() grm.Lock()
defer grm.Unlock()
for len(grm.operations) > 0 {
grm.cond.Wait()
}
} }
func recoverFromPanic(operationName string, err *error) { func recoverFromPanic(operationName string, err *error) {