mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-06 02:34:03 +00:00
Merge pull request #24916 from mesosphere/jdef_fix_23822
Automatic merge from submit-queue MESOS: fix race condition in contrib/mesos/pkg/queue/delay attempt to fix #23822 /cc @ihmccreery
This commit is contained in:
commit
c538106450
@ -18,6 +18,7 @@ package queue
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"container/heap"
|
"container/heap"
|
||||||
|
"runtime"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -138,6 +139,29 @@ func (q *DelayQueue) Pop() interface{} {
|
|||||||
}, nil)
|
}, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func finishWaiting(cond *sync.Cond, waitFinished <-chan struct{}) {
|
||||||
|
runtime.Gosched()
|
||||||
|
select {
|
||||||
|
// avoid creating a timer if we can help it...
|
||||||
|
case <-waitFinished:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
const spinTimeout = 100 * time.Millisecond
|
||||||
|
t := time.NewTimer(spinTimeout)
|
||||||
|
defer t.Stop()
|
||||||
|
for {
|
||||||
|
runtime.Gosched()
|
||||||
|
cond.Broadcast()
|
||||||
|
select {
|
||||||
|
case <-waitFinished:
|
||||||
|
return
|
||||||
|
case <-t.C:
|
||||||
|
t.Reset(spinTimeout)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// returns a non-nil value from the queue, or else nil if/when cancelled; if cancel
|
// returns a non-nil value from the queue, or else nil if/when cancelled; if cancel
|
||||||
// is nil then cancellation is disabled and this func must return a non-nil value.
|
// is nil then cancellation is disabled and this func must return a non-nil value.
|
||||||
func (q *DelayQueue) pop(next func() *qitem, cancel <-chan struct{}) interface{} {
|
func (q *DelayQueue) pop(next func() *qitem, cancel <-chan struct{}) interface{} {
|
||||||
@ -164,6 +188,7 @@ func (q *DelayQueue) pop(next func() *qitem, cancel <-chan struct{}) interface{}
|
|||||||
select {
|
select {
|
||||||
case <-cancel:
|
case <-cancel:
|
||||||
item.readd(item)
|
item.readd(item)
|
||||||
|
finishWaiting(&q.cond, ch)
|
||||||
return nil
|
return nil
|
||||||
case <-ch:
|
case <-ch:
|
||||||
// we may no longer have the earliest deadline, re-try
|
// we may no longer have the earliest deadline, re-try
|
||||||
@ -353,8 +378,7 @@ func (q *DelayFIFO) pop(cancel <-chan struct{}) interface{} {
|
|||||||
// we may not have the lock yet, so
|
// we may not have the lock yet, so
|
||||||
// broadcast to abort Wait, then
|
// broadcast to abort Wait, then
|
||||||
// return after lock re-acquisition
|
// return after lock re-acquisition
|
||||||
q.cond().Broadcast()
|
finishWaiting(q.cond(), signal)
|
||||||
<-signal
|
|
||||||
return nil
|
return nil
|
||||||
case <-signal:
|
case <-signal:
|
||||||
// we have the lock, re-check
|
// we have the lock, re-check
|
||||||
|
Loading…
Reference in New Issue
Block a user