mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
Merge pull request #111460 from wojtek-t/forget_watcher
Cacher: add support for draining watchers
This commit is contained in:
commit
4f74844299
@ -47,7 +47,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
emptyFunc = func() {}
|
emptyFunc = func(bool) {}
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -147,6 +147,10 @@ func (i *indexedWatchers) deleteWatcher(number int, value string, supported bool
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (i *indexedWatchers) terminateAll(objectType reflect.Type, done func(*cacheWatcher)) {
|
func (i *indexedWatchers) terminateAll(objectType reflect.Type, done func(*cacheWatcher)) {
|
||||||
|
// note that we don't have to call setDrainInputBufferLocked method on the watchers
|
||||||
|
// because we take advantage of the default value - stop immediately
|
||||||
|
// also watchers that have had already its draining strategy set
|
||||||
|
// are no longer available (they were removed from the allWatchers and the valueWatchers maps)
|
||||||
if len(i.allWatchers) > 0 || len(i.valueWatchers) > 0 {
|
if len(i.allWatchers) > 0 || len(i.valueWatchers) > 0 {
|
||||||
klog.Warningf("Terminating all watchers from cacher %v", objectType)
|
klog.Warningf("Terminating all watchers from cacher %v", objectType)
|
||||||
}
|
}
|
||||||
@ -183,6 +187,10 @@ func newTimeBucketWatchers(clock clock.Clock, bookmarkFrequency time.Duration) *
|
|||||||
// adds a watcher to the bucket, if the deadline is before the start, it will be
|
// adds a watcher to the bucket, if the deadline is before the start, it will be
|
||||||
// added to the first one.
|
// added to the first one.
|
||||||
func (t *watcherBookmarkTimeBuckets) addWatcher(w *cacheWatcher) bool {
|
func (t *watcherBookmarkTimeBuckets) addWatcher(w *cacheWatcher) bool {
|
||||||
|
// note that the returned time can be before t.createTime,
|
||||||
|
// especially in cases when the nextBookmarkTime method
|
||||||
|
// give us the zero value of type Time
|
||||||
|
// so buckedID can hold a negative value
|
||||||
nextTime, ok := w.nextBookmarkTime(t.clock.Now(), t.bookmarkFrequency)
|
nextTime, ok := w.nextBookmarkTime(t.clock.Now(), t.bookmarkFrequency)
|
||||||
if !ok {
|
if !ok {
|
||||||
return false
|
return false
|
||||||
@ -521,7 +529,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
|
|||||||
c.Lock()
|
c.Lock()
|
||||||
defer c.Unlock()
|
defer c.Unlock()
|
||||||
// Update watcher.forget function once we can compute it.
|
// Update watcher.forget function once we can compute it.
|
||||||
watcher.forget = forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
|
watcher.forget = forgetWatcher(c, watcher, c.watcherIdx, triggerValue, triggerSupported)
|
||||||
c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
|
c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
|
||||||
|
|
||||||
// Add it to the queue only when the client support watch bookmarks.
|
// Add it to the queue only when the client support watch bookmarks.
|
||||||
@ -1032,11 +1040,13 @@ func (c *Cacher) Stop() {
|
|||||||
c.stopWg.Wait()
|
c.stopWg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported bool) func() {
|
func forgetWatcher(c *Cacher, w *cacheWatcher, index int, triggerValue string, triggerSupported bool) func(bool) {
|
||||||
return func() {
|
return func(drainWatcher bool) {
|
||||||
c.Lock()
|
c.Lock()
|
||||||
defer c.Unlock()
|
defer c.Unlock()
|
||||||
|
|
||||||
|
w.setDrainInputBufferLocked(drainWatcher)
|
||||||
|
|
||||||
// It's possible that the watcher is already not in the structure (e.g. in case of
|
// It's possible that the watcher is already not in the structure (e.g. in case of
|
||||||
// simultaneous Stop() and terminateAllWatchers(), but it is safe to call stopLocked()
|
// simultaneous Stop() and terminateAllWatchers(), but it is safe to call stopLocked()
|
||||||
// on a watcher multiple times.
|
// on a watcher multiple times.
|
||||||
@ -1160,7 +1170,7 @@ type cacheWatcher struct {
|
|||||||
done chan struct{}
|
done chan struct{}
|
||||||
filter filterWithAttrsFunc
|
filter filterWithAttrsFunc
|
||||||
stopped bool
|
stopped bool
|
||||||
forget func()
|
forget func(bool)
|
||||||
versioner storage.Versioner
|
versioner storage.Versioner
|
||||||
// The watcher will be closed by server after the deadline,
|
// The watcher will be closed by server after the deadline,
|
||||||
// save it here to send bookmark events before that.
|
// save it here to send bookmark events before that.
|
||||||
@ -1172,9 +1182,13 @@ type cacheWatcher struct {
|
|||||||
// human readable identifier that helps assigning cacheWatcher
|
// human readable identifier that helps assigning cacheWatcher
|
||||||
// instance with request
|
// instance with request
|
||||||
identifier string
|
identifier string
|
||||||
|
|
||||||
|
// drainInputBuffer indicates whether we should delay closing this watcher
|
||||||
|
// and send all event in the input buffer.
|
||||||
|
drainInputBuffer bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(), versioner storage.Versioner, deadline time.Time, allowWatchBookmarks bool, objectType reflect.Type, identifier string) *cacheWatcher {
|
func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(bool), versioner storage.Versioner, deadline time.Time, allowWatchBookmarks bool, objectType reflect.Type, identifier string) *cacheWatcher {
|
||||||
return &cacheWatcher{
|
return &cacheWatcher{
|
||||||
input: make(chan *watchCacheEvent, chanSize),
|
input: make(chan *watchCacheEvent, chanSize),
|
||||||
result: make(chan watch.Event, chanSize),
|
result: make(chan watch.Event, chanSize),
|
||||||
@ -1197,16 +1211,29 @@ func (c *cacheWatcher) ResultChan() <-chan watch.Event {
|
|||||||
|
|
||||||
// Implements watch.Interface.
|
// Implements watch.Interface.
|
||||||
func (c *cacheWatcher) Stop() {
|
func (c *cacheWatcher) Stop() {
|
||||||
c.forget()
|
c.forget(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
// we rely on the fact that stopLocked is actually protected by Cacher.Lock()
|
// we rely on the fact that stopLocked is actually protected by Cacher.Lock()
|
||||||
func (c *cacheWatcher) stopLocked() {
|
func (c *cacheWatcher) stopLocked() {
|
||||||
if !c.stopped {
|
if !c.stopped {
|
||||||
c.stopped = true
|
c.stopped = true
|
||||||
close(c.done)
|
// stop without draining the input channel was requested.
|
||||||
|
if !c.drainInputBuffer {
|
||||||
|
close(c.done)
|
||||||
|
}
|
||||||
close(c.input)
|
close(c.input)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Even if the watcher was already stopped, if it previously was
|
||||||
|
// using draining mode and it's not using it now we need to
|
||||||
|
// close the done channel now. Otherwise we could leak the
|
||||||
|
// processing goroutine if it will be trying to put more objects
|
||||||
|
// into result channel, the channel will be full and there will
|
||||||
|
// already be noone on the processing the events on the receiving end.
|
||||||
|
if !c.drainInputBuffer && !c.isDoneChannelClosedLocked() {
|
||||||
|
close(c.done)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool {
|
func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool {
|
||||||
@ -1231,7 +1258,7 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool {
|
|||||||
// we simply terminate it.
|
// we simply terminate it.
|
||||||
klog.V(1).Infof("Forcing %v watcher close due to unresponsiveness: %v. len(c.input) = %v, len(c.result) = %v", c.objectType.String(), c.identifier, len(c.input), len(c.result))
|
klog.V(1).Infof("Forcing %v watcher close due to unresponsiveness: %v. len(c.input) = %v, len(c.result) = %v", c.objectType.String(), c.identifier, len(c.input), len(c.result))
|
||||||
metrics.TerminatedWatchersCounter.WithLabelValues(c.objectType.String()).Inc()
|
metrics.TerminatedWatchersCounter.WithLabelValues(c.objectType.String()).Inc()
|
||||||
c.forget()
|
c.forget(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
if timer == nil {
|
if timer == nil {
|
||||||
@ -1277,6 +1304,22 @@ func (c *cacheWatcher) nextBookmarkTime(now time.Time, bookmarkFrequency time.Du
|
|||||||
return heartbeatTime, true
|
return heartbeatTime, true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// setDrainInputBufferLocked if set to true indicates that we should delay closing this watcher
|
||||||
|
// until we send all events residing in the input buffer.
|
||||||
|
func (c *cacheWatcher) setDrainInputBufferLocked(drain bool) {
|
||||||
|
c.drainInputBuffer = drain
|
||||||
|
}
|
||||||
|
|
||||||
|
// isDoneChannelClosed checks if c.done channel is closed
|
||||||
|
func (c *cacheWatcher) isDoneChannelClosedLocked() bool {
|
||||||
|
select {
|
||||||
|
case <-c.done:
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func getMutableObject(object runtime.Object) runtime.Object {
|
func getMutableObject(object runtime.Object) runtime.Object {
|
||||||
if _, ok := object.(*cachingObject); ok {
|
if _, ok := object.(*cachingObject); ok {
|
||||||
// It is safe to return without deep-copy, because the underlying
|
// It is safe to return without deep-copy, because the underlying
|
||||||
|
@ -58,13 +58,14 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
|
|||||||
var w *cacheWatcher
|
var w *cacheWatcher
|
||||||
count := 0
|
count := 0
|
||||||
filter := func(string, labels.Set, fields.Set) bool { return true }
|
filter := func(string, labels.Set, fields.Set) bool { return true }
|
||||||
forget := func() {
|
forget := func(drainWatcher bool) {
|
||||||
lock.Lock()
|
lock.Lock()
|
||||||
defer lock.Unlock()
|
defer lock.Unlock()
|
||||||
count++
|
count++
|
||||||
// forget() has to stop the watcher, as only stopping the watcher
|
// forget() has to stop the watcher, as only stopping the watcher
|
||||||
// triggers stopping the process() goroutine which we are in the
|
// triggers stopping the process() goroutine which we are in the
|
||||||
// end waiting for in this test.
|
// end waiting for in this test.
|
||||||
|
w.setDrainInputBufferLocked(drainWatcher)
|
||||||
w.stopLocked()
|
w.stopLocked()
|
||||||
}
|
}
|
||||||
initEvents := []*watchCacheEvent{
|
initEvents := []*watchCacheEvent{
|
||||||
@ -89,7 +90,7 @@ func TestCacheWatcherHandlesFiltering(t *testing.T) {
|
|||||||
filter := func(_ string, _ labels.Set, field fields.Set) bool {
|
filter := func(_ string, _ labels.Set, field fields.Set) bool {
|
||||||
return field["spec.nodeName"] == "host"
|
return field["spec.nodeName"] == "host"
|
||||||
}
|
}
|
||||||
forget := func() {}
|
forget := func(bool) {}
|
||||||
|
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
events []*watchCacheEvent
|
events []*watchCacheEvent
|
||||||
@ -210,6 +211,7 @@ TestCase:
|
|||||||
break TestCase
|
break TestCase
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
w.setDrainInputBufferLocked(false)
|
||||||
w.stopLocked()
|
w.stopLocked()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -524,7 +526,8 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) {
|
|||||||
var w *cacheWatcher
|
var w *cacheWatcher
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
filter := func(string, labels.Set, fields.Set) bool { return true }
|
filter := func(string, labels.Set, fields.Set) bool { return true }
|
||||||
forget := func() {
|
forget := func(drainWatcher bool) {
|
||||||
|
w.setDrainInputBufferLocked(drainWatcher)
|
||||||
w.stopLocked()
|
w.stopLocked()
|
||||||
done <- struct{}{}
|
done <- struct{}{}
|
||||||
}
|
}
|
||||||
@ -556,6 +559,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) {
|
|||||||
case <-time.After(time.Second):
|
case <-time.After(time.Second):
|
||||||
t.Fatal("expected received a event on ResultChan")
|
t.Fatal("expected received a event on ResultChan")
|
||||||
}
|
}
|
||||||
|
w.setDrainInputBufferLocked(false)
|
||||||
w.stopLocked()
|
w.stopLocked()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -667,7 +671,7 @@ func TestTimeBucketWatchersBasic(t *testing.T) {
|
|||||||
filter := func(_ string, _ labels.Set, _ fields.Set) bool {
|
filter := func(_ string, _ labels.Set, _ fields.Set) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
forget := func() {}
|
forget := func(bool) {}
|
||||||
|
|
||||||
newWatcher := func(deadline time.Time) *cacheWatcher {
|
newWatcher := func(deadline time.Time) *cacheWatcher {
|
||||||
return newCacheWatcher(0, filter, forget, testVersioner{}, deadline, true, objectType, "")
|
return newCacheWatcher(0, filter, forget, testVersioner{}, deadline, true, objectType, "")
|
||||||
@ -1581,3 +1585,90 @@ func TestCacheIntervalInvalidationStopsWatch(t *testing.T) {
|
|||||||
t.Errorf("unexpected number of events received, expected: %d, got: %d", bufferSize+1, received)
|
t.Errorf("unexpected number of events received, expected: %d, got: %d", bufferSize+1, received)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func makeWatchCacheEvent(rv uint64) *watchCacheEvent {
|
||||||
|
return &watchCacheEvent{
|
||||||
|
Type: watch.Added,
|
||||||
|
Object: &v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: fmt.Sprintf("pod-%d", rv),
|
||||||
|
ResourceVersion: fmt.Sprintf("%d", rv),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
ResourceVersion: rv,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestCacheWatcherDraining verifies the cacheWatcher.process goroutine is properly cleaned up when draining was requested
|
||||||
|
func TestCacheWatcherDraining(t *testing.T) {
|
||||||
|
var lock sync.RWMutex
|
||||||
|
var w *cacheWatcher
|
||||||
|
count := 0
|
||||||
|
filter := func(string, labels.Set, fields.Set) bool { return true }
|
||||||
|
forget := func(drainWatcher bool) {
|
||||||
|
lock.Lock()
|
||||||
|
defer lock.Unlock()
|
||||||
|
count++
|
||||||
|
w.setDrainInputBufferLocked(drainWatcher)
|
||||||
|
w.stopLocked()
|
||||||
|
}
|
||||||
|
initEvents := []*watchCacheEvent{
|
||||||
|
makeWatchCacheEvent(5),
|
||||||
|
makeWatchCacheEvent(6),
|
||||||
|
}
|
||||||
|
w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, objectType, "")
|
||||||
|
go w.processInterval(context.Background(), intervalFromEvents(initEvents), 1)
|
||||||
|
if !w.add(makeWatchCacheEvent(7), time.NewTimer(1*time.Second)) {
|
||||||
|
t.Fatal("failed adding an even to the watcher")
|
||||||
|
}
|
||||||
|
forget(true) // drain the watcher
|
||||||
|
|
||||||
|
eventCount := 0
|
||||||
|
for range w.ResultChan() {
|
||||||
|
eventCount++
|
||||||
|
}
|
||||||
|
if eventCount != 3 {
|
||||||
|
t.Errorf("Unexpected number of objects received: %d, expected: 3", eventCount)
|
||||||
|
}
|
||||||
|
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
|
||||||
|
lock.RLock()
|
||||||
|
defer lock.RUnlock()
|
||||||
|
return count == 2, nil
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("expected forget() to be called twice, because processInterval should call Stop(): %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestCacheWatcherDrainingRequestedButNotDrained verifies the cacheWatcher.process goroutine is properly cleaned up when draining was requested
|
||||||
|
// but the client never actually get any data
|
||||||
|
func TestCacheWatcherDrainingRequestedButNotDrained(t *testing.T) {
|
||||||
|
var lock sync.RWMutex
|
||||||
|
var w *cacheWatcher
|
||||||
|
count := 0
|
||||||
|
filter := func(string, labels.Set, fields.Set) bool { return true }
|
||||||
|
forget := func(drainWatcher bool) {
|
||||||
|
lock.Lock()
|
||||||
|
defer lock.Unlock()
|
||||||
|
count++
|
||||||
|
w.setDrainInputBufferLocked(drainWatcher)
|
||||||
|
w.stopLocked()
|
||||||
|
}
|
||||||
|
initEvents := []*watchCacheEvent{
|
||||||
|
makeWatchCacheEvent(5),
|
||||||
|
makeWatchCacheEvent(6),
|
||||||
|
}
|
||||||
|
w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, objectType, "")
|
||||||
|
go w.processInterval(context.Background(), intervalFromEvents(initEvents), 1)
|
||||||
|
if !w.add(makeWatchCacheEvent(7), time.NewTimer(1*time.Second)) {
|
||||||
|
t.Fatal("failed adding an even to the watcher")
|
||||||
|
}
|
||||||
|
forget(true) // drain the watcher
|
||||||
|
w.Stop() // client disconnected, timeout expired or ctx was actually closed
|
||||||
|
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
|
||||||
|
lock.RLock()
|
||||||
|
defer lock.RUnlock()
|
||||||
|
return count == 3, nil
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("expected forget() to be called three times, because processInterval should call Stop(): %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user