mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-26 21:17:23 +00:00
Merge pull request #76239 from hormes/delivery_watch_events_nonblocking_first
delivery event non blocking firstly
This commit is contained in:
commit
b69cc78e58
@ -276,6 +276,8 @@ type Cacher struct {
|
|||||||
// watchersBuffer is a list of watchers potentially interested in currently
|
// watchersBuffer is a list of watchers potentially interested in currently
|
||||||
// dispatched event.
|
// dispatched event.
|
||||||
watchersBuffer []*cacheWatcher
|
watchersBuffer []*cacheWatcher
|
||||||
|
// blockedWatchers is a list of watchers whose buffer is currently full.
|
||||||
|
blockedWatchers []*cacheWatcher
|
||||||
// watchersToStop is a list of watchers that were supposed to be stopped
|
// watchersToStop is a list of watchers that were supposed to be stopped
|
||||||
// during current dispatching, but stopping was deferred to the end of
|
// during current dispatching, but stopping was deferred to the end of
|
||||||
// dispatching that event to avoid race with closing channels in watchers.
|
// dispatching that event to avoid race with closing channels in watchers.
|
||||||
@ -789,13 +791,45 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
|
|||||||
// Watchers stopped after startDispatching will be delayed to finishDispatching,
|
// Watchers stopped after startDispatching will be delayed to finishDispatching,
|
||||||
|
|
||||||
// Since add() can block, we explicitly add when cacher is unlocked.
|
// Since add() can block, we explicitly add when cacher is unlocked.
|
||||||
|
// Dispatching event in nonblocking way first, which make faster watchers
|
||||||
|
// not be blocked by slower ones.
|
||||||
if event.Type == watch.Bookmark {
|
if event.Type == watch.Bookmark {
|
||||||
for _, watcher := range c.watchersBuffer {
|
for _, watcher := range c.watchersBuffer {
|
||||||
watcher.nonblockingAdd(event)
|
watcher.nonblockingAdd(event)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
c.blockedWatchers = c.blockedWatchers[:0]
|
||||||
for _, watcher := range c.watchersBuffer {
|
for _, watcher := range c.watchersBuffer {
|
||||||
watcher.add(event, c.timer, c.dispatchTimeoutBudget)
|
if !watcher.nonblockingAdd(event) {
|
||||||
|
c.blockedWatchers = append(c.blockedWatchers, watcher)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(c.blockedWatchers) > 0 {
|
||||||
|
// dispatchEvent is called very often, so arrange
|
||||||
|
// to reuse timers instead of constantly allocating.
|
||||||
|
startTime := time.Now()
|
||||||
|
timeout := c.dispatchTimeoutBudget.takeAvailable()
|
||||||
|
c.timer.Reset(timeout)
|
||||||
|
|
||||||
|
// Make sure every watcher will try to send event without blocking first,
|
||||||
|
// even if the timer has already expired.
|
||||||
|
timer := c.timer
|
||||||
|
for _, watcher := range c.blockedWatchers {
|
||||||
|
if !watcher.add(event, timer) {
|
||||||
|
// fired, clean the timer by set it to nil.
|
||||||
|
timer = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop the timer if it is not fired
|
||||||
|
if timer != nil && !timer.Stop() {
|
||||||
|
// Consume triggered (but not yet received) timer event
|
||||||
|
// so that future reuse does not get a spurious timeout.
|
||||||
|
<-timer.C
|
||||||
|
}
|
||||||
|
|
||||||
|
c.dispatchTimeoutBudget.returnUnused(timeout - time.Since(startTime))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1078,7 +1112,6 @@ func (c *cacheWatcher) stop() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool {
|
func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool {
|
||||||
// If we can't send it, don't block on it.
|
|
||||||
select {
|
select {
|
||||||
case c.input <- event:
|
case c.input <- event:
|
||||||
return true
|
return true
|
||||||
@ -1087,28 +1120,14 @@ func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer, budget *timeBudget) {
|
// Nil timer means that add will not block (if it can't send event immediately, it will break the watcher)
|
||||||
|
func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool {
|
||||||
// Try to send the event immediately, without blocking.
|
// Try to send the event immediately, without blocking.
|
||||||
if c.nonblockingAdd(event) {
|
if c.nonblockingAdd(event) {
|
||||||
return
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// OK, block sending, but only for up to <timeout>.
|
closeFunc := func() {
|
||||||
// cacheWatcher.add is called very often, so arrange
|
|
||||||
// to reuse timers instead of constantly allocating.
|
|
||||||
startTime := time.Now()
|
|
||||||
timeout := budget.takeAvailable()
|
|
||||||
|
|
||||||
timer.Reset(timeout)
|
|
||||||
|
|
||||||
select {
|
|
||||||
case c.input <- event:
|
|
||||||
if !timer.Stop() {
|
|
||||||
// Consume triggered (but not yet received) timer event
|
|
||||||
// so that future reuse does not get a spurious timeout.
|
|
||||||
<-timer.C
|
|
||||||
}
|
|
||||||
case <-timer.C:
|
|
||||||
// This means that we couldn't send event to that watcher.
|
// This means that we couldn't send event to that watcher.
|
||||||
// Since we don't want to block on it infinitely,
|
// Since we don't want to block on it infinitely,
|
||||||
// we simply terminate it.
|
// we simply terminate it.
|
||||||
@ -1116,7 +1135,19 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer, budget *ti
|
|||||||
c.forget()
|
c.forget()
|
||||||
}
|
}
|
||||||
|
|
||||||
budget.returnUnused(timeout - time.Since(startTime))
|
if timer == nil {
|
||||||
|
closeFunc()
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// OK, block sending, but only until timer fires.
|
||||||
|
select {
|
||||||
|
case c.input <- event:
|
||||||
|
return true
|
||||||
|
case <-timer.C:
|
||||||
|
closeFunc()
|
||||||
|
return false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cacheWatcher) nextBookmarkTime(now time.Time) (time.Time, bool) {
|
func (c *cacheWatcher) nextBookmarkTime(now time.Time) (time.Time, bool) {
|
||||||
|
@ -743,3 +743,75 @@ func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) {
|
|||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) {
|
||||||
|
backingStorage := &dummyStorage{}
|
||||||
|
cacher, _ := newTestCacher(backingStorage, 1000)
|
||||||
|
defer cacher.Stop()
|
||||||
|
|
||||||
|
// Wait until cacher is initialized.
|
||||||
|
cacher.ready.wait()
|
||||||
|
|
||||||
|
// Ensure there is some budget for slowing down processing.
|
||||||
|
cacher.dispatchTimeoutBudget.returnUnused(50 * time.Millisecond)
|
||||||
|
|
||||||
|
makePod := func(i int) *examplev1.Pod {
|
||||||
|
return &examplev1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: fmt.Sprintf("pod-%d", 1000+i),
|
||||||
|
Namespace: "ns",
|
||||||
|
ResourceVersion: fmt.Sprintf("%d", 1000+i),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := cacher.watchCache.Add(makePod(0)); err != nil {
|
||||||
|
t.Errorf("error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
totalPods := 50
|
||||||
|
|
||||||
|
// Create watcher that will be blocked.
|
||||||
|
w1, err := cacher.Watch(context.TODO(), "pods/ns", "999", storage.Everything)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create watch: %v", err)
|
||||||
|
}
|
||||||
|
defer w1.Stop()
|
||||||
|
|
||||||
|
// Create fast watcher and ensure it will get all objects.
|
||||||
|
w2, err := cacher.Watch(context.TODO(), "pods/ns", "999", storage.Everything)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create watch: %v", err)
|
||||||
|
}
|
||||||
|
defer w2.Stop()
|
||||||
|
|
||||||
|
// Now push a ton of object to cache.
|
||||||
|
for i := 1; i < totalPods; i++ {
|
||||||
|
cacher.watchCache.Add(makePod(i))
|
||||||
|
}
|
||||||
|
|
||||||
|
shouldContinue := true
|
||||||
|
eventsCount := 0
|
||||||
|
for shouldContinue {
|
||||||
|
select {
|
||||||
|
case event, ok := <-w2.ResultChan():
|
||||||
|
if !ok {
|
||||||
|
shouldContinue = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
// Ensure there is some budget for fast watcher after slower one is blocked.
|
||||||
|
cacher.dispatchTimeoutBudget.returnUnused(50 * time.Millisecond)
|
||||||
|
if event.Type == watch.Added {
|
||||||
|
eventsCount++
|
||||||
|
if eventsCount == totalPods {
|
||||||
|
shouldContinue = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
shouldContinue = false
|
||||||
|
w2.Stop()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if eventsCount != totalPods {
|
||||||
|
t.Errorf("watcher is blocked by slower one (count: %d)", eventsCount)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user