mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Proper fix for non-receiving watchers
This commit is contained in:
parent
129f5d8b95
commit
01699ef320
@ -18,6 +18,7 @@ go_library(
|
||||
"errors.go",
|
||||
"interfaces.go",
|
||||
"selection_predicate.go",
|
||||
"time_budget.go",
|
||||
"util.go",
|
||||
"watch_cache.go",
|
||||
],
|
||||
@ -50,6 +51,7 @@ go_test(
|
||||
name = "go_default_test",
|
||||
srcs = [
|
||||
"selection_predicate_test.go",
|
||||
"time_budget_test.go",
|
||||
"util_test.go",
|
||||
"watch_cache_test.go",
|
||||
],
|
||||
|
@ -177,6 +177,10 @@ type Cacher struct {
|
||||
watcherIdx int
|
||||
watchers indexedWatchers
|
||||
|
||||
// Defines a time budget that can be spend on waiting for not-ready watchers
|
||||
// while dispatching event before shutting them down.
|
||||
dispatchTimeoutBudget *timeBudget
|
||||
|
||||
// Handling graceful termination.
|
||||
stopLock sync.RWMutex
|
||||
stopped bool
|
||||
@ -199,6 +203,7 @@ func NewCacherFromConfig(config CacherConfig) *Cacher {
|
||||
}
|
||||
}
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
cacher := &Cacher{
|
||||
ready: newReady(),
|
||||
storage: config.Storage,
|
||||
@ -213,18 +218,18 @@ func NewCacherFromConfig(config CacherConfig) *Cacher {
|
||||
valueWatchers: make(map[string]watchersMap),
|
||||
},
|
||||
// TODO: Figure out the correct value for the buffer size.
|
||||
incoming: make(chan watchCacheEvent, 100),
|
||||
incoming: make(chan watchCacheEvent, 100),
|
||||
dispatchTimeoutBudget: newTimeBudget(stopCh),
|
||||
// We need to (potentially) stop both:
|
||||
// - wait.Until go-routine
|
||||
// - reflector.ListAndWatch
|
||||
// and there are no guarantees on the order that they will stop.
|
||||
// So we will be simply closing the channel, and synchronizing on the WaitGroup.
|
||||
stopCh: make(chan struct{}),
|
||||
stopCh: stopCh,
|
||||
}
|
||||
watchCache.SetOnEvent(cacher.processEvent)
|
||||
go cacher.dispatchEvents()
|
||||
|
||||
stopCh := cacher.stopCh
|
||||
cacher.stopWg.Add(1)
|
||||
go func() {
|
||||
defer cacher.stopWg.Done()
|
||||
@ -577,23 +582,17 @@ func (c *Cacher) dispatchEvents() {
|
||||
func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
|
||||
triggerValues, supported := c.triggerValues(event)
|
||||
|
||||
// TODO: For now we assume we have a given <timeout> budget for dispatching
|
||||
// a single event. We should consider changing to the approach with:
|
||||
// - budget has upper bound at <max_timeout>
|
||||
// - we add <portion> to current timeout every second
|
||||
timeout := time.Duration(250) * time.Millisecond
|
||||
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
// Iterate over "allWatchers" no matter what the trigger function is.
|
||||
for _, watcher := range c.watchers.allWatchers {
|
||||
watcher.add(event, &timeout)
|
||||
watcher.add(event, c.dispatchTimeoutBudget)
|
||||
}
|
||||
if supported {
|
||||
// Iterate over watchers interested in the given values of the trigger.
|
||||
for _, triggerValue := range triggerValues {
|
||||
for _, watcher := range c.watchers.valueWatchers[triggerValue] {
|
||||
watcher.add(event, &timeout)
|
||||
watcher.add(event, c.dispatchTimeoutBudget)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@ -606,7 +605,7 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
|
||||
// Iterate over watchers interested in exact values for all values.
|
||||
for _, watchers := range c.watchers.valueWatchers {
|
||||
for _, watcher := range watchers {
|
||||
watcher.add(event, &timeout)
|
||||
watcher.add(event, c.dispatchTimeoutBudget)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -790,7 +789,7 @@ func (c *cacheWatcher) stop() {
|
||||
|
||||
var timerPool sync.Pool
|
||||
|
||||
func (c *cacheWatcher) add(event *watchCacheEvent, timeout *time.Duration) {
|
||||
func (c *cacheWatcher) add(event *watchCacheEvent, budget *timeBudget) {
|
||||
// Try to send the event immediately, without blocking.
|
||||
select {
|
||||
case c.input <- *event:
|
||||
@ -802,12 +801,13 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timeout *time.Duration) {
|
||||
// cacheWatcher.add is called very often, so arrange
|
||||
// to reuse timers instead of constantly allocating.
|
||||
startTime := time.Now()
|
||||
timeout := budget.takeAvailable()
|
||||
|
||||
t, ok := timerPool.Get().(*time.Timer)
|
||||
if ok {
|
||||
t.Reset(*timeout)
|
||||
t.Reset(timeout)
|
||||
} else {
|
||||
t = time.NewTimer(*timeout)
|
||||
t = time.NewTimer(timeout)
|
||||
}
|
||||
defer timerPool.Put(t)
|
||||
|
||||
@ -827,9 +827,7 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timeout *time.Duration) {
|
||||
c.stop()
|
||||
}
|
||||
|
||||
if *timeout = *timeout - time.Since(startTime); *timeout < 0 {
|
||||
*timeout = 0
|
||||
}
|
||||
budget.returnUnused(timeout - time.Since(startTime))
|
||||
}
|
||||
|
||||
// NOTE: sendWatchCacheEvent is assumed to not modify <event> !!!
|
||||
|
95
pkg/storage/time_budget.go
Normal file
95
pkg/storage/time_budget.go
Normal file
@ -0,0 +1,95 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package storage
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
refreshPerSecond = 50 * time.Millisecond
|
||||
maxBudget = 250 * time.Millisecond
|
||||
)
|
||||
|
||||
// timeBudget implements a budget of time that you can use and is
|
||||
// periodically being refreshed. The pattern to use it is:
|
||||
// budget := newTimeBudget(...)
|
||||
// ...
|
||||
// timeout := budget.takeAvailable()
|
||||
// // Now you can spend at most timeout on doing stuff
|
||||
// ...
|
||||
// // If you didn't use all timeout, return what you didn't use
|
||||
// budget.returnUnused(<unused part of timeout>)
|
||||
//
|
||||
// NOTE: It's not recommended to be used concurrently from multiple threads -
|
||||
// if first user takes the whole timeout, the second one will get 0 timeout
|
||||
// even though the first one may return something later.
|
||||
type timeBudget struct {
|
||||
sync.Mutex
|
||||
budget time.Duration
|
||||
|
||||
refresh time.Duration
|
||||
maxBudget time.Duration
|
||||
}
|
||||
|
||||
func newTimeBudget(stopCh <-chan struct{}) *timeBudget {
|
||||
result := &timeBudget{
|
||||
budget: time.Duration(0),
|
||||
refresh: refreshPerSecond,
|
||||
maxBudget: maxBudget,
|
||||
}
|
||||
go result.periodicallyRefresh(stopCh)
|
||||
return result
|
||||
}
|
||||
|
||||
func (t *timeBudget) periodicallyRefresh(stopCh <-chan struct{}) {
|
||||
ticker := time.NewTicker(time.Second)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
t.Lock()
|
||||
if t.budget = t.budget + t.refresh; t.budget > t.maxBudget {
|
||||
t.budget = t.maxBudget
|
||||
}
|
||||
t.Unlock()
|
||||
case <-stopCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (t *timeBudget) takeAvailable() time.Duration {
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
result := t.budget
|
||||
t.budget = time.Duration(0)
|
||||
return result
|
||||
}
|
||||
|
||||
func (t *timeBudget) returnUnused(unused time.Duration) {
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
if unused < 0 {
|
||||
// We used more than allowed.
|
||||
return
|
||||
}
|
||||
if t.budget = t.budget + unused; t.budget > t.maxBudget {
|
||||
t.budget = t.maxBudget
|
||||
}
|
||||
}
|
53
pkg/storage/time_budget_test.go
Normal file
53
pkg/storage/time_budget_test.go
Normal file
@ -0,0 +1,53 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package storage
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestTimeBudget(t *testing.T) {
|
||||
budget := &timeBudget{
|
||||
budget: time.Duration(0),
|
||||
maxBudget: time.Duration(200),
|
||||
}
|
||||
if res := budget.takeAvailable(); res != time.Duration(0) {
|
||||
t.Errorf("Expected: %v, got: %v", time.Duration(0), res)
|
||||
}
|
||||
budget.budget = time.Duration(100)
|
||||
if res := budget.takeAvailable(); res != time.Duration(100) {
|
||||
t.Errorf("Expected: %v, got: %v", time.Duration(100), res)
|
||||
}
|
||||
if res := budget.takeAvailable(); res != time.Duration(0) {
|
||||
t.Errorf("Expected: %v, got: %v", time.Duration(0), res)
|
||||
}
|
||||
budget.returnUnused(time.Duration(50))
|
||||
if res := budget.takeAvailable(); res != time.Duration(50) {
|
||||
t.Errorf("Expected: %v, got: %v", time.Duration(50), res)
|
||||
}
|
||||
budget.budget = time.Duration(100)
|
||||
budget.returnUnused(-time.Duration(50))
|
||||
if res := budget.takeAvailable(); res != time.Duration(100) {
|
||||
t.Errorf("Expected: %v, got: %v", time.Duration(100), res)
|
||||
}
|
||||
// test overflow.
|
||||
budget.returnUnused(time.Duration(500))
|
||||
if res := budget.takeAvailable(); res != time.Duration(200) {
|
||||
t.Errorf("Expected: %v, got: %v", time.Duration(200), res)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user