mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
Merge pull request #116037 from wojtek-t/move_cache_watcher
Split cacheWatcher into its own file
This commit is contained in:
commit
e8662a46dd
370
staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go
Normal file
370
staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go
Normal file
@ -0,0 +1,370 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2023 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package cacher
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
|
"k8s.io/apiserver/pkg/storage"
|
||||||
|
"k8s.io/apiserver/pkg/storage/cacher/metrics"
|
||||||
|
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
|
||||||
|
|
||||||
|
"k8s.io/klog/v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
// cacheWatcher implements watch.Interface
|
||||||
|
// this is not thread-safe
|
||||||
|
type cacheWatcher struct {
|
||||||
|
input chan *watchCacheEvent
|
||||||
|
result chan watch.Event
|
||||||
|
done chan struct{}
|
||||||
|
filter filterWithAttrsFunc
|
||||||
|
stopped bool
|
||||||
|
forget func(bool)
|
||||||
|
versioner storage.Versioner
|
||||||
|
// The watcher will be closed by server after the deadline,
|
||||||
|
// save it here to send bookmark events before that.
|
||||||
|
deadline time.Time
|
||||||
|
allowWatchBookmarks bool
|
||||||
|
groupResource schema.GroupResource
|
||||||
|
|
||||||
|
// human readable identifier that helps assigning cacheWatcher
|
||||||
|
// instance with request
|
||||||
|
identifier string
|
||||||
|
|
||||||
|
// drainInputBuffer indicates whether we should delay closing this watcher
|
||||||
|
// and send all event in the input buffer.
|
||||||
|
drainInputBuffer bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func newCacheWatcher(
|
||||||
|
chanSize int,
|
||||||
|
filter filterWithAttrsFunc,
|
||||||
|
forget func(bool),
|
||||||
|
versioner storage.Versioner,
|
||||||
|
deadline time.Time,
|
||||||
|
allowWatchBookmarks bool,
|
||||||
|
groupResource schema.GroupResource,
|
||||||
|
identifier string,
|
||||||
|
) *cacheWatcher {
|
||||||
|
return &cacheWatcher{
|
||||||
|
input: make(chan *watchCacheEvent, chanSize),
|
||||||
|
result: make(chan watch.Event, chanSize),
|
||||||
|
done: make(chan struct{}),
|
||||||
|
filter: filter,
|
||||||
|
stopped: false,
|
||||||
|
forget: forget,
|
||||||
|
versioner: versioner,
|
||||||
|
deadline: deadline,
|
||||||
|
allowWatchBookmarks: allowWatchBookmarks,
|
||||||
|
groupResource: groupResource,
|
||||||
|
identifier: identifier,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Implements watch.Interface.
|
||||||
|
func (c *cacheWatcher) ResultChan() <-chan watch.Event {
|
||||||
|
return c.result
|
||||||
|
}
|
||||||
|
|
||||||
|
// Implements watch.Interface.
|
||||||
|
func (c *cacheWatcher) Stop() {
|
||||||
|
c.forget(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
// we rely on the fact that stopLocked is actually protected by Cacher.Lock()
|
||||||
|
func (c *cacheWatcher) stopLocked() {
|
||||||
|
if !c.stopped {
|
||||||
|
c.stopped = true
|
||||||
|
// stop without draining the input channel was requested.
|
||||||
|
if !c.drainInputBuffer {
|
||||||
|
close(c.done)
|
||||||
|
}
|
||||||
|
close(c.input)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Even if the watcher was already stopped, if it previously was
|
||||||
|
// using draining mode and it's not using it now we need to
|
||||||
|
// close the done channel now. Otherwise we could leak the
|
||||||
|
// processing goroutine if it will be trying to put more objects
|
||||||
|
// into result channel, the channel will be full and there will
|
||||||
|
// already be noone on the processing the events on the receiving end.
|
||||||
|
if !c.drainInputBuffer && !c.isDoneChannelClosedLocked() {
|
||||||
|
close(c.done)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool {
|
||||||
|
select {
|
||||||
|
case c.input <- event:
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Nil timer means that add will not block (if it can't send event immediately, it will break the watcher)
|
||||||
|
func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool {
|
||||||
|
// Try to send the event immediately, without blocking.
|
||||||
|
if c.nonblockingAdd(event) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
closeFunc := func() {
|
||||||
|
// This means that we couldn't send event to that watcher.
|
||||||
|
// Since we don't want to block on it infinitely,
|
||||||
|
// we simply terminate it.
|
||||||
|
klog.V(1).Infof("Forcing %v watcher close due to unresponsiveness: %v. len(c.input) = %v, len(c.result) = %v", c.groupResource.String(), c.identifier, len(c.input), len(c.result))
|
||||||
|
metrics.TerminatedWatchersCounter.WithLabelValues(c.groupResource.String()).Inc()
|
||||||
|
c.forget(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
if timer == nil {
|
||||||
|
closeFunc()
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// OK, block sending, but only until timer fires.
|
||||||
|
select {
|
||||||
|
case c.input <- event:
|
||||||
|
return true
|
||||||
|
case <-timer.C:
|
||||||
|
closeFunc()
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cacheWatcher) nextBookmarkTime(now time.Time, bookmarkFrequency time.Duration) (time.Time, bool) {
|
||||||
|
// We try to send bookmarks:
|
||||||
|
//
|
||||||
|
// (a) right before the watcher timeout - for now we simply set it 2s before
|
||||||
|
// the deadline
|
||||||
|
//
|
||||||
|
// (b) roughly every minute
|
||||||
|
//
|
||||||
|
// (b) gives us periodicity if the watch breaks due to unexpected
|
||||||
|
// conditions, (a) ensures that on timeout the watcher is as close to
|
||||||
|
// now as possible - this covers 99% of cases.
|
||||||
|
|
||||||
|
heartbeatTime := now.Add(bookmarkFrequency)
|
||||||
|
if c.deadline.IsZero() {
|
||||||
|
// Timeout is set by our client libraries (e.g. reflector) as well as defaulted by
|
||||||
|
// apiserver if properly configured. So this shoudln't happen in practice.
|
||||||
|
return heartbeatTime, true
|
||||||
|
}
|
||||||
|
if pretimeoutTime := c.deadline.Add(-2 * time.Second); pretimeoutTime.Before(heartbeatTime) {
|
||||||
|
heartbeatTime = pretimeoutTime
|
||||||
|
}
|
||||||
|
|
||||||
|
if heartbeatTime.Before(now) {
|
||||||
|
return time.Time{}, false
|
||||||
|
}
|
||||||
|
return heartbeatTime, true
|
||||||
|
}
|
||||||
|
|
||||||
|
// setDrainInputBufferLocked if set to true indicates that we should delay closing this watcher
|
||||||
|
// until we send all events residing in the input buffer.
|
||||||
|
func (c *cacheWatcher) setDrainInputBufferLocked(drain bool) {
|
||||||
|
c.drainInputBuffer = drain
|
||||||
|
}
|
||||||
|
|
||||||
|
// isDoneChannelClosed checks if c.done channel is closed
|
||||||
|
func (c *cacheWatcher) isDoneChannelClosedLocked() bool {
|
||||||
|
select {
|
||||||
|
case <-c.done:
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func getMutableObject(object runtime.Object) runtime.Object {
|
||||||
|
if _, ok := object.(*cachingObject); ok {
|
||||||
|
// It is safe to return without deep-copy, because the underlying
|
||||||
|
// object will lazily perform deep-copy on the first try to change
|
||||||
|
// any of its fields.
|
||||||
|
return object
|
||||||
|
}
|
||||||
|
return object.DeepCopyObject()
|
||||||
|
}
|
||||||
|
|
||||||
|
func updateResourceVersion(object runtime.Object, versioner storage.Versioner, resourceVersion uint64) {
|
||||||
|
if err := versioner.UpdateObject(object, resourceVersion); err != nil {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", resourceVersion, object, err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event {
|
||||||
|
if event.Type == watch.Bookmark {
|
||||||
|
return &watch.Event{Type: watch.Bookmark, Object: event.Object.DeepCopyObject()}
|
||||||
|
}
|
||||||
|
|
||||||
|
curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields)
|
||||||
|
oldObjPasses := false
|
||||||
|
if event.PrevObject != nil {
|
||||||
|
oldObjPasses = c.filter(event.Key, event.PrevObjLabels, event.PrevObjFields)
|
||||||
|
}
|
||||||
|
if !curObjPasses && !oldObjPasses {
|
||||||
|
// Watcher is not interested in that object.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case curObjPasses && !oldObjPasses:
|
||||||
|
return &watch.Event{Type: watch.Added, Object: getMutableObject(event.Object)}
|
||||||
|
case curObjPasses && oldObjPasses:
|
||||||
|
return &watch.Event{Type: watch.Modified, Object: getMutableObject(event.Object)}
|
||||||
|
case !curObjPasses && oldObjPasses:
|
||||||
|
// return a delete event with the previous object content, but with the event's resource version
|
||||||
|
oldObj := getMutableObject(event.PrevObject)
|
||||||
|
// We know that if oldObj is cachingObject (which can only be set via
|
||||||
|
// setCachingObjects), its resourceVersion is already set correctly and
|
||||||
|
// we don't need to update it. However, since cachingObject efficiently
|
||||||
|
// handles noop updates, we avoid this microoptimization here.
|
||||||
|
updateResourceVersion(oldObj, c.versioner, event.ResourceVersion)
|
||||||
|
return &watch.Event{Type: watch.Deleted, Object: oldObj}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NOTE: sendWatchCacheEvent is assumed to not modify <event> !!!
|
||||||
|
func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
|
||||||
|
watchEvent := c.convertToWatchEvent(event)
|
||||||
|
if watchEvent == nil {
|
||||||
|
// Watcher is not interested in that object.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// We need to ensure that if we put event X to the c.result, all
|
||||||
|
// previous events were already put into it before, no matter whether
|
||||||
|
// c.done is close or not.
|
||||||
|
// Thus we cannot simply select from c.done and c.result and this
|
||||||
|
// would give us non-determinism.
|
||||||
|
// At the same time, we don't want to block infinitely on putting
|
||||||
|
// to c.result, when c.done is already closed.
|
||||||
|
//
|
||||||
|
// This ensures that with c.done already close, we at most once go
|
||||||
|
// into the next select after this. With that, no matter which
|
||||||
|
// statement we choose there, we will deliver only consecutive
|
||||||
|
// events.
|
||||||
|
select {
|
||||||
|
case <-c.done:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case c.result <- *watchEvent:
|
||||||
|
case <-c.done:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watchCacheInterval, resourceVersion uint64) {
|
||||||
|
defer utilruntime.HandleCrash()
|
||||||
|
defer close(c.result)
|
||||||
|
defer c.Stop()
|
||||||
|
|
||||||
|
// Check how long we are processing initEvents.
|
||||||
|
// As long as these are not processed, we are not processing
|
||||||
|
// any incoming events, so if it takes long, we may actually
|
||||||
|
// block all watchers for some time.
|
||||||
|
// TODO: From the logs it seems that there happens processing
|
||||||
|
// times even up to 1s which is very long. However, this doesn't
|
||||||
|
// depend that much on the number of initEvents. E.g. from the
|
||||||
|
// 2000-node Kubemark run we have logs like this, e.g.:
|
||||||
|
// ... processing 13862 initEvents took 66.808689ms
|
||||||
|
// ... processing 14040 initEvents took 993.532539ms
|
||||||
|
// We should understand what is blocking us in those cases (e.g.
|
||||||
|
// is it lack of CPU, network, or sth else) and potentially
|
||||||
|
// consider increase size of result buffer in those cases.
|
||||||
|
const initProcessThreshold = 500 * time.Millisecond
|
||||||
|
startTime := time.Now()
|
||||||
|
|
||||||
|
initEventCount := 0
|
||||||
|
for {
|
||||||
|
event, err := cacheInterval.Next()
|
||||||
|
if err != nil {
|
||||||
|
// An error indicates that the cache interval
|
||||||
|
// has been invalidated and can no longer serve
|
||||||
|
// events.
|
||||||
|
//
|
||||||
|
// Initially we considered sending an "out-of-history"
|
||||||
|
// Error event in this case, but because historically
|
||||||
|
// such events weren't sent out of the watchCache, we
|
||||||
|
// decided not to. This is still ok, because on watch
|
||||||
|
// closure, the watcher will try to re-instantiate the
|
||||||
|
// watch and then will get an explicit "out-of-history"
|
||||||
|
// window. There is potential for optimization, but for
|
||||||
|
// now, in order to be on the safe side and not break
|
||||||
|
// custom clients, the cost of it is something that we
|
||||||
|
// are fully accepting.
|
||||||
|
klog.Warningf("couldn't retrieve watch event to serve: %#v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if event == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
c.sendWatchCacheEvent(event)
|
||||||
|
// With some events already sent, update resourceVersion so that
|
||||||
|
// events that were buffered and not yet processed won't be delivered
|
||||||
|
// to this watcher second time causing going back in time.
|
||||||
|
resourceVersion = event.ResourceVersion
|
||||||
|
initEventCount++
|
||||||
|
}
|
||||||
|
|
||||||
|
if initEventCount > 0 {
|
||||||
|
metrics.InitCounter.WithLabelValues(c.groupResource.String()).Add(float64(initEventCount))
|
||||||
|
}
|
||||||
|
processingTime := time.Since(startTime)
|
||||||
|
if processingTime > initProcessThreshold {
|
||||||
|
klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", initEventCount, c.groupResource, c.identifier, processingTime)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.process(ctx, resourceVersion)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cacheWatcher) process(ctx context.Context, resourceVersion uint64) {
|
||||||
|
// At this point we already start processing incoming watch events.
|
||||||
|
// However, the init event can still be processed because their serialization
|
||||||
|
// and sending to the client happens asynchrnously.
|
||||||
|
// TODO: As describe in the KEP, we would like to estimate that by delaying
|
||||||
|
// the initialization signal proportionally to the number of events to
|
||||||
|
// process, but we're leaving this to the tuning phase.
|
||||||
|
utilflowcontrol.WatchInitialized(ctx)
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case event, ok := <-c.input:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// only send events newer than resourceVersion
|
||||||
|
if event.ResourceVersion > resourceVersion {
|
||||||
|
c.sendWatchCacheEvent(event)
|
||||||
|
}
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,414 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2023 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package cacher
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
v1 "k8s.io/api/core/v1"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/fields"
|
||||||
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
|
"k8s.io/apimachinery/pkg/util/diff"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
|
"k8s.io/apiserver/pkg/storage"
|
||||||
|
testingclock "k8s.io/utils/clock/testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
// verifies the cacheWatcher.process goroutine is properly cleaned up even if
|
||||||
|
// the writes to cacheWatcher.result channel is blocked.
|
||||||
|
func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
|
||||||
|
var lock sync.RWMutex
|
||||||
|
var w *cacheWatcher
|
||||||
|
count := 0
|
||||||
|
filter := func(string, labels.Set, fields.Set) bool { return true }
|
||||||
|
forget := func(drainWatcher bool) {
|
||||||
|
lock.Lock()
|
||||||
|
defer lock.Unlock()
|
||||||
|
count++
|
||||||
|
// forget() has to stop the watcher, as only stopping the watcher
|
||||||
|
// triggers stopping the process() goroutine which we are in the
|
||||||
|
// end waiting for in this test.
|
||||||
|
w.setDrainInputBufferLocked(drainWatcher)
|
||||||
|
w.stopLocked()
|
||||||
|
}
|
||||||
|
initEvents := []*watchCacheEvent{
|
||||||
|
{Object: &v1.Pod{}},
|
||||||
|
{Object: &v1.Pod{}},
|
||||||
|
}
|
||||||
|
// set the size of the buffer of w.result to 0, so that the writes to
|
||||||
|
// w.result is blocked.
|
||||||
|
w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "")
|
||||||
|
go w.processInterval(context.Background(), intervalFromEvents(initEvents), 0)
|
||||||
|
w.Stop()
|
||||||
|
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
|
||||||
|
lock.RLock()
|
||||||
|
defer lock.RUnlock()
|
||||||
|
return count == 2, nil
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("expected forget() to be called twice, because sendWatchCacheEvent should not be blocked by the result channel: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCacheWatcherHandlesFiltering(t *testing.T) {
|
||||||
|
filter := func(_ string, _ labels.Set, field fields.Set) bool {
|
||||||
|
return field["spec.nodeName"] == "host"
|
||||||
|
}
|
||||||
|
forget := func(bool) {}
|
||||||
|
|
||||||
|
testCases := []struct {
|
||||||
|
events []*watchCacheEvent
|
||||||
|
expected []watch.Event
|
||||||
|
}{
|
||||||
|
// properly handle starting with the filter, then being deleted, then re-added
|
||||||
|
{
|
||||||
|
events: []*watchCacheEvent{
|
||||||
|
{
|
||||||
|
Type: watch.Added,
|
||||||
|
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
|
||||||
|
ObjFields: fields.Set{"spec.nodeName": "host"},
|
||||||
|
ResourceVersion: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Type: watch.Modified,
|
||||||
|
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
|
||||||
|
PrevObjFields: fields.Set{"spec.nodeName": "host"},
|
||||||
|
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
|
||||||
|
ObjFields: fields.Set{"spec.nodeName": ""},
|
||||||
|
ResourceVersion: 2,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Type: watch.Modified,
|
||||||
|
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
|
||||||
|
PrevObjFields: fields.Set{"spec.nodeName": ""},
|
||||||
|
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}},
|
||||||
|
ObjFields: fields.Set{"spec.nodeName": "host"},
|
||||||
|
ResourceVersion: 3,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expected: []watch.Event{
|
||||||
|
{Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}},
|
||||||
|
{Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}},
|
||||||
|
{Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
// properly handle ignoring changes prior to the filter, then getting added, then deleted
|
||||||
|
{
|
||||||
|
events: []*watchCacheEvent{
|
||||||
|
{
|
||||||
|
Type: watch.Added,
|
||||||
|
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
|
||||||
|
ObjFields: fields.Set{"spec.nodeName": ""},
|
||||||
|
ResourceVersion: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Type: watch.Modified,
|
||||||
|
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
|
||||||
|
PrevObjFields: fields.Set{"spec.nodeName": ""},
|
||||||
|
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
|
||||||
|
ObjFields: fields.Set{"spec.nodeName": ""},
|
||||||
|
ResourceVersion: 2,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Type: watch.Modified,
|
||||||
|
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
|
||||||
|
PrevObjFields: fields.Set{"spec.nodeName": ""},
|
||||||
|
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}},
|
||||||
|
ObjFields: fields.Set{"spec.nodeName": "host"},
|
||||||
|
ResourceVersion: 3,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Type: watch.Modified,
|
||||||
|
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}},
|
||||||
|
PrevObjFields: fields.Set{"spec.nodeName": "host"},
|
||||||
|
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}},
|
||||||
|
ObjFields: fields.Set{"spec.nodeName": "host"},
|
||||||
|
ResourceVersion: 4,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Type: watch.Modified,
|
||||||
|
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}},
|
||||||
|
PrevObjFields: fields.Set{"spec.nodeName": "host"},
|
||||||
|
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}},
|
||||||
|
ObjFields: fields.Set{"spec.nodeName": ""},
|
||||||
|
ResourceVersion: 5,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Type: watch.Modified,
|
||||||
|
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}},
|
||||||
|
PrevObjFields: fields.Set{"spec.nodeName": ""},
|
||||||
|
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "6"}},
|
||||||
|
ObjFields: fields.Set{"spec.nodeName": ""},
|
||||||
|
ResourceVersion: 6,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expected: []watch.Event{
|
||||||
|
{Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}},
|
||||||
|
{Type: watch.Modified, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}},
|
||||||
|
{Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
TestCase:
|
||||||
|
for i, testCase := range testCases {
|
||||||
|
// set the size of the buffer of w.result to 0, so that the writes to
|
||||||
|
// w.result is blocked.
|
||||||
|
for j := range testCase.events {
|
||||||
|
testCase.events[j].ResourceVersion = uint64(j) + 1
|
||||||
|
}
|
||||||
|
|
||||||
|
w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "")
|
||||||
|
go w.processInterval(context.Background(), intervalFromEvents(testCase.events), 0)
|
||||||
|
|
||||||
|
ch := w.ResultChan()
|
||||||
|
for j, event := range testCase.expected {
|
||||||
|
e := <-ch
|
||||||
|
if !reflect.DeepEqual(event, e) {
|
||||||
|
t.Errorf("%d: unexpected event %d: %s", i, j, diff.ObjectReflectDiff(event, e))
|
||||||
|
break TestCase
|
||||||
|
}
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case obj, ok := <-ch:
|
||||||
|
t.Errorf("%d: unexpected excess event: %#v %t", i, obj, ok)
|
||||||
|
break TestCase
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
w.setDrainInputBufferLocked(false)
|
||||||
|
w.stopLocked()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) {
|
||||||
|
var w *cacheWatcher
|
||||||
|
done := make(chan struct{})
|
||||||
|
filter := func(string, labels.Set, fields.Set) bool { return true }
|
||||||
|
forget := func(drainWatcher bool) {
|
||||||
|
w.setDrainInputBufferLocked(drainWatcher)
|
||||||
|
w.stopLocked()
|
||||||
|
done <- struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
maxRetriesToProduceTheRaceCondition := 1000
|
||||||
|
// Simulating the timer is fired and stopped concurrently by set time
|
||||||
|
// timeout to zero and run the Stop goroutine concurrently.
|
||||||
|
// May sure that the watch will not be blocked on Stop.
|
||||||
|
for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ {
|
||||||
|
w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "")
|
||||||
|
go w.Stop()
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("stop is blocked when the timer is fired concurrently")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
deadline := time.Now().Add(time.Hour)
|
||||||
|
// After that, verifies the cacheWatcher.process goroutine works correctly.
|
||||||
|
for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ {
|
||||||
|
w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false, schema.GroupResource{Resource: "pods"}, "")
|
||||||
|
w.input <- &watchCacheEvent{Object: &v1.Pod{}, ResourceVersion: uint64(i + 1)}
|
||||||
|
ctx, cancel := context.WithDeadline(context.Background(), deadline)
|
||||||
|
defer cancel()
|
||||||
|
go w.processInterval(ctx, intervalFromEvents(nil), 0)
|
||||||
|
select {
|
||||||
|
case <-w.ResultChan():
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("expected received a event on ResultChan")
|
||||||
|
}
|
||||||
|
w.setDrainInputBufferLocked(false)
|
||||||
|
w.stopLocked()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCacheWatcherStoppedOnDestroy(t *testing.T) {
|
||||||
|
backingStorage := &dummyStorage{}
|
||||||
|
cacher, _, err := newTestCacher(backingStorage)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Couldn't create cacher: %v", err)
|
||||||
|
}
|
||||||
|
defer cacher.Stop()
|
||||||
|
|
||||||
|
// Wait until cacher is initialized.
|
||||||
|
if err := cacher.ready.wait(); err != nil {
|
||||||
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
|
|
||||||
|
w, err := cacher.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create watch: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
watchClosed := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer close(watchClosed)
|
||||||
|
for event := range w.ResultChan() {
|
||||||
|
switch event.Type {
|
||||||
|
case watch.Added, watch.Modified, watch.Deleted:
|
||||||
|
// ok
|
||||||
|
default:
|
||||||
|
t.Errorf("unexpected event %#v", event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
cacher.Stop()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-watchClosed:
|
||||||
|
case <-time.After(wait.ForeverTestTimeout):
|
||||||
|
t.Errorf("timed out waiting for watch to close")
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTimeBucketWatchersBasic(t *testing.T) {
|
||||||
|
filter := func(_ string, _ labels.Set, _ fields.Set) bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
forget := func(bool) {}
|
||||||
|
|
||||||
|
newWatcher := func(deadline time.Time) *cacheWatcher {
|
||||||
|
return newCacheWatcher(0, filter, forget, testVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, "")
|
||||||
|
}
|
||||||
|
|
||||||
|
clock := testingclock.NewFakeClock(time.Now())
|
||||||
|
watchers := newTimeBucketWatchers(clock, defaultBookmarkFrequency)
|
||||||
|
now := clock.Now()
|
||||||
|
watchers.addWatcher(newWatcher(now.Add(10 * time.Second)))
|
||||||
|
watchers.addWatcher(newWatcher(now.Add(20 * time.Second)))
|
||||||
|
watchers.addWatcher(newWatcher(now.Add(20 * time.Second)))
|
||||||
|
|
||||||
|
if len(watchers.watchersBuckets) != 2 {
|
||||||
|
t.Errorf("unexpected bucket size: %#v", watchers.watchersBuckets)
|
||||||
|
}
|
||||||
|
watchers0 := watchers.popExpiredWatchers()
|
||||||
|
if len(watchers0) != 0 {
|
||||||
|
t.Errorf("unexpected bucket size: %#v", watchers0)
|
||||||
|
}
|
||||||
|
|
||||||
|
clock.Step(10 * time.Second)
|
||||||
|
watchers1 := watchers.popExpiredWatchers()
|
||||||
|
if len(watchers1) != 1 || len(watchers1[0]) != 1 {
|
||||||
|
t.Errorf("unexpected bucket size: %v", watchers1)
|
||||||
|
}
|
||||||
|
watchers1 = watchers.popExpiredWatchers()
|
||||||
|
if len(watchers1) != 0 {
|
||||||
|
t.Errorf("unexpected bucket size: %#v", watchers1)
|
||||||
|
}
|
||||||
|
|
||||||
|
clock.Step(12 * time.Second)
|
||||||
|
watchers2 := watchers.popExpiredWatchers()
|
||||||
|
if len(watchers2) != 1 || len(watchers2[0]) != 2 {
|
||||||
|
t.Errorf("unexpected bucket size: %#v", watchers2)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeWatchCacheEvent(rv uint64) *watchCacheEvent {
|
||||||
|
return &watchCacheEvent{
|
||||||
|
Type: watch.Added,
|
||||||
|
Object: &v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: fmt.Sprintf("pod-%d", rv),
|
||||||
|
ResourceVersion: fmt.Sprintf("%d", rv),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
ResourceVersion: rv,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestCacheWatcherDraining verifies the cacheWatcher.process goroutine is properly cleaned up when draining was requested
|
||||||
|
func TestCacheWatcherDraining(t *testing.T) {
|
||||||
|
var lock sync.RWMutex
|
||||||
|
var w *cacheWatcher
|
||||||
|
count := 0
|
||||||
|
filter := func(string, labels.Set, fields.Set) bool { return true }
|
||||||
|
forget := func(drainWatcher bool) {
|
||||||
|
lock.Lock()
|
||||||
|
defer lock.Unlock()
|
||||||
|
count++
|
||||||
|
w.setDrainInputBufferLocked(drainWatcher)
|
||||||
|
w.stopLocked()
|
||||||
|
}
|
||||||
|
initEvents := []*watchCacheEvent{
|
||||||
|
makeWatchCacheEvent(5),
|
||||||
|
makeWatchCacheEvent(6),
|
||||||
|
}
|
||||||
|
w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "")
|
||||||
|
go w.processInterval(context.Background(), intervalFromEvents(initEvents), 1)
|
||||||
|
if !w.add(makeWatchCacheEvent(7), time.NewTimer(1*time.Second)) {
|
||||||
|
t.Fatal("failed adding an even to the watcher")
|
||||||
|
}
|
||||||
|
forget(true) // drain the watcher
|
||||||
|
|
||||||
|
eventCount := 0
|
||||||
|
for range w.ResultChan() {
|
||||||
|
eventCount++
|
||||||
|
}
|
||||||
|
if eventCount != 3 {
|
||||||
|
t.Errorf("Unexpected number of objects received: %d, expected: 3", eventCount)
|
||||||
|
}
|
||||||
|
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
|
||||||
|
lock.RLock()
|
||||||
|
defer lock.RUnlock()
|
||||||
|
return count == 2, nil
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("expected forget() to be called twice, because processInterval should call Stop(): %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestCacheWatcherDrainingRequestedButNotDrained verifies the cacheWatcher.process goroutine is properly cleaned up when draining was requested
|
||||||
|
// but the client never actually get any data
|
||||||
|
func TestCacheWatcherDrainingRequestedButNotDrained(t *testing.T) {
|
||||||
|
var lock sync.RWMutex
|
||||||
|
var w *cacheWatcher
|
||||||
|
count := 0
|
||||||
|
filter := func(string, labels.Set, fields.Set) bool { return true }
|
||||||
|
forget := func(drainWatcher bool) {
|
||||||
|
lock.Lock()
|
||||||
|
defer lock.Unlock()
|
||||||
|
count++
|
||||||
|
w.setDrainInputBufferLocked(drainWatcher)
|
||||||
|
w.stopLocked()
|
||||||
|
}
|
||||||
|
initEvents := []*watchCacheEvent{
|
||||||
|
makeWatchCacheEvent(5),
|
||||||
|
makeWatchCacheEvent(6),
|
||||||
|
}
|
||||||
|
w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "")
|
||||||
|
go w.processInterval(context.Background(), intervalFromEvents(initEvents), 1)
|
||||||
|
if !w.add(makeWatchCacheEvent(7), time.NewTimer(1*time.Second)) {
|
||||||
|
t.Fatal("failed adding an even to the watcher")
|
||||||
|
}
|
||||||
|
forget(true) // drain the watcher
|
||||||
|
w.Stop() // client disconnected, timeout expired or ctx was actually closed
|
||||||
|
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
|
||||||
|
lock.RLock()
|
||||||
|
defer lock.RUnlock()
|
||||||
|
return count == 3, nil
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("expected forget() to be called three times, because processInterval should call Stop(): %v", err)
|
||||||
|
}
|
||||||
|
}
|
@ -34,7 +34,6 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
"k8s.io/apiserver/pkg/audit"
|
"k8s.io/apiserver/pkg/audit"
|
||||||
@ -42,9 +41,9 @@ import (
|
|||||||
"k8s.io/apiserver/pkg/storage"
|
"k8s.io/apiserver/pkg/storage"
|
||||||
"k8s.io/apiserver/pkg/storage/cacher/metrics"
|
"k8s.io/apiserver/pkg/storage/cacher/metrics"
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
|
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
"k8s.io/component-base/tracing"
|
"k8s.io/component-base/tracing"
|
||||||
|
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
"k8s.io/utils/clock"
|
"k8s.io/utils/clock"
|
||||||
)
|
)
|
||||||
@ -1198,340 +1197,3 @@ func (c *errWatcher) ResultChan() <-chan watch.Event {
|
|||||||
func (c *errWatcher) Stop() {
|
func (c *errWatcher) Stop() {
|
||||||
// no-op
|
// no-op
|
||||||
}
|
}
|
||||||
|
|
||||||
// cacheWatcher implements watch.Interface
|
|
||||||
// this is not thread-safe
|
|
||||||
type cacheWatcher struct {
|
|
||||||
input chan *watchCacheEvent
|
|
||||||
result chan watch.Event
|
|
||||||
done chan struct{}
|
|
||||||
filter filterWithAttrsFunc
|
|
||||||
stopped bool
|
|
||||||
forget func(bool)
|
|
||||||
versioner storage.Versioner
|
|
||||||
// The watcher will be closed by server after the deadline,
|
|
||||||
// save it here to send bookmark events before that.
|
|
||||||
deadline time.Time
|
|
||||||
allowWatchBookmarks bool
|
|
||||||
groupResource schema.GroupResource
|
|
||||||
|
|
||||||
// human readable identifier that helps assigning cacheWatcher
|
|
||||||
// instance with request
|
|
||||||
identifier string
|
|
||||||
|
|
||||||
// drainInputBuffer indicates whether we should delay closing this watcher
|
|
||||||
// and send all event in the input buffer.
|
|
||||||
drainInputBuffer bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func newCacheWatcher(
|
|
||||||
chanSize int,
|
|
||||||
filter filterWithAttrsFunc,
|
|
||||||
forget func(bool),
|
|
||||||
versioner storage.Versioner,
|
|
||||||
deadline time.Time,
|
|
||||||
allowWatchBookmarks bool,
|
|
||||||
groupResource schema.GroupResource,
|
|
||||||
identifier string,
|
|
||||||
) *cacheWatcher {
|
|
||||||
return &cacheWatcher{
|
|
||||||
input: make(chan *watchCacheEvent, chanSize),
|
|
||||||
result: make(chan watch.Event, chanSize),
|
|
||||||
done: make(chan struct{}),
|
|
||||||
filter: filter,
|
|
||||||
stopped: false,
|
|
||||||
forget: forget,
|
|
||||||
versioner: versioner,
|
|
||||||
deadline: deadline,
|
|
||||||
allowWatchBookmarks: allowWatchBookmarks,
|
|
||||||
groupResource: groupResource,
|
|
||||||
identifier: identifier,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Implements watch.Interface.
|
|
||||||
func (c *cacheWatcher) ResultChan() <-chan watch.Event {
|
|
||||||
return c.result
|
|
||||||
}
|
|
||||||
|
|
||||||
// Implements watch.Interface.
|
|
||||||
func (c *cacheWatcher) Stop() {
|
|
||||||
c.forget(false)
|
|
||||||
}
|
|
||||||
|
|
||||||
// we rely on the fact that stopLocked is actually protected by Cacher.Lock()
|
|
||||||
func (c *cacheWatcher) stopLocked() {
|
|
||||||
if !c.stopped {
|
|
||||||
c.stopped = true
|
|
||||||
// stop without draining the input channel was requested.
|
|
||||||
if !c.drainInputBuffer {
|
|
||||||
close(c.done)
|
|
||||||
}
|
|
||||||
close(c.input)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Even if the watcher was already stopped, if it previously was
|
|
||||||
// using draining mode and it's not using it now we need to
|
|
||||||
// close the done channel now. Otherwise we could leak the
|
|
||||||
// processing goroutine if it will be trying to put more objects
|
|
||||||
// into result channel, the channel will be full and there will
|
|
||||||
// already be noone on the processing the events on the receiving end.
|
|
||||||
if !c.drainInputBuffer && !c.isDoneChannelClosedLocked() {
|
|
||||||
close(c.done)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool {
|
|
||||||
select {
|
|
||||||
case c.input <- event:
|
|
||||||
return true
|
|
||||||
default:
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Nil timer means that add will not block (if it can't send event immediately, it will break the watcher)
|
|
||||||
func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool {
|
|
||||||
// Try to send the event immediately, without blocking.
|
|
||||||
if c.nonblockingAdd(event) {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
closeFunc := func() {
|
|
||||||
// This means that we couldn't send event to that watcher.
|
|
||||||
// Since we don't want to block on it infinitely,
|
|
||||||
// we simply terminate it.
|
|
||||||
klog.V(1).Infof("Forcing %v watcher close due to unresponsiveness: %v. len(c.input) = %v, len(c.result) = %v", c.groupResource.String(), c.identifier, len(c.input), len(c.result))
|
|
||||||
metrics.TerminatedWatchersCounter.WithLabelValues(c.groupResource.String()).Inc()
|
|
||||||
c.forget(false)
|
|
||||||
}
|
|
||||||
|
|
||||||
if timer == nil {
|
|
||||||
closeFunc()
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// OK, block sending, but only until timer fires.
|
|
||||||
select {
|
|
||||||
case c.input <- event:
|
|
||||||
return true
|
|
||||||
case <-timer.C:
|
|
||||||
closeFunc()
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *cacheWatcher) nextBookmarkTime(now time.Time, bookmarkFrequency time.Duration) (time.Time, bool) {
|
|
||||||
// We try to send bookmarks:
|
|
||||||
//
|
|
||||||
// (a) right before the watcher timeout - for now we simply set it 2s before
|
|
||||||
// the deadline
|
|
||||||
//
|
|
||||||
// (b) roughly every minute
|
|
||||||
//
|
|
||||||
// (b) gives us periodicity if the watch breaks due to unexpected
|
|
||||||
// conditions, (a) ensures that on timeout the watcher is as close to
|
|
||||||
// now as possible - this covers 99% of cases.
|
|
||||||
|
|
||||||
heartbeatTime := now.Add(bookmarkFrequency)
|
|
||||||
if c.deadline.IsZero() {
|
|
||||||
// Timeout is set by our client libraries (e.g. reflector) as well as defaulted by
|
|
||||||
// apiserver if properly configured. So this shoudln't happen in practice.
|
|
||||||
return heartbeatTime, true
|
|
||||||
}
|
|
||||||
if pretimeoutTime := c.deadline.Add(-2 * time.Second); pretimeoutTime.Before(heartbeatTime) {
|
|
||||||
heartbeatTime = pretimeoutTime
|
|
||||||
}
|
|
||||||
|
|
||||||
if heartbeatTime.Before(now) {
|
|
||||||
return time.Time{}, false
|
|
||||||
}
|
|
||||||
return heartbeatTime, true
|
|
||||||
}
|
|
||||||
|
|
||||||
// setDrainInputBufferLocked if set to true indicates that we should delay closing this watcher
|
|
||||||
// until we send all events residing in the input buffer.
|
|
||||||
func (c *cacheWatcher) setDrainInputBufferLocked(drain bool) {
|
|
||||||
c.drainInputBuffer = drain
|
|
||||||
}
|
|
||||||
|
|
||||||
// isDoneChannelClosed checks if c.done channel is closed
|
|
||||||
func (c *cacheWatcher) isDoneChannelClosedLocked() bool {
|
|
||||||
select {
|
|
||||||
case <-c.done:
|
|
||||||
return true
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func getMutableObject(object runtime.Object) runtime.Object {
|
|
||||||
if _, ok := object.(*cachingObject); ok {
|
|
||||||
// It is safe to return without deep-copy, because the underlying
|
|
||||||
// object will lazily perform deep-copy on the first try to change
|
|
||||||
// any of its fields.
|
|
||||||
return object
|
|
||||||
}
|
|
||||||
return object.DeepCopyObject()
|
|
||||||
}
|
|
||||||
|
|
||||||
func updateResourceVersion(object runtime.Object, versioner storage.Versioner, resourceVersion uint64) {
|
|
||||||
if err := versioner.UpdateObject(object, resourceVersion); err != nil {
|
|
||||||
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", resourceVersion, object, err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event {
|
|
||||||
if event.Type == watch.Bookmark {
|
|
||||||
return &watch.Event{Type: watch.Bookmark, Object: event.Object.DeepCopyObject()}
|
|
||||||
}
|
|
||||||
|
|
||||||
curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields)
|
|
||||||
oldObjPasses := false
|
|
||||||
if event.PrevObject != nil {
|
|
||||||
oldObjPasses = c.filter(event.Key, event.PrevObjLabels, event.PrevObjFields)
|
|
||||||
}
|
|
||||||
if !curObjPasses && !oldObjPasses {
|
|
||||||
// Watcher is not interested in that object.
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
switch {
|
|
||||||
case curObjPasses && !oldObjPasses:
|
|
||||||
return &watch.Event{Type: watch.Added, Object: getMutableObject(event.Object)}
|
|
||||||
case curObjPasses && oldObjPasses:
|
|
||||||
return &watch.Event{Type: watch.Modified, Object: getMutableObject(event.Object)}
|
|
||||||
case !curObjPasses && oldObjPasses:
|
|
||||||
// return a delete event with the previous object content, but with the event's resource version
|
|
||||||
oldObj := getMutableObject(event.PrevObject)
|
|
||||||
// We know that if oldObj is cachingObject (which can only be set via
|
|
||||||
// setCachingObjects), its resourceVersion is already set correctly and
|
|
||||||
// we don't need to update it. However, since cachingObject efficiently
|
|
||||||
// handles noop updates, we avoid this microoptimization here.
|
|
||||||
updateResourceVersion(oldObj, c.versioner, event.ResourceVersion)
|
|
||||||
return &watch.Event{Type: watch.Deleted, Object: oldObj}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// NOTE: sendWatchCacheEvent is assumed to not modify <event> !!!
|
|
||||||
func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
|
|
||||||
watchEvent := c.convertToWatchEvent(event)
|
|
||||||
if watchEvent == nil {
|
|
||||||
// Watcher is not interested in that object.
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// We need to ensure that if we put event X to the c.result, all
|
|
||||||
// previous events were already put into it before, no matter whether
|
|
||||||
// c.done is close or not.
|
|
||||||
// Thus we cannot simply select from c.done and c.result and this
|
|
||||||
// would give us non-determinism.
|
|
||||||
// At the same time, we don't want to block infinitely on putting
|
|
||||||
// to c.result, when c.done is already closed.
|
|
||||||
//
|
|
||||||
// This ensures that with c.done already close, we at most once go
|
|
||||||
// into the next select after this. With that, no matter which
|
|
||||||
// statement we choose there, we will deliver only consecutive
|
|
||||||
// events.
|
|
||||||
select {
|
|
||||||
case <-c.done:
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case c.result <- *watchEvent:
|
|
||||||
case <-c.done:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watchCacheInterval, resourceVersion uint64) {
|
|
||||||
defer utilruntime.HandleCrash()
|
|
||||||
defer close(c.result)
|
|
||||||
defer c.Stop()
|
|
||||||
|
|
||||||
// Check how long we are processing initEvents.
|
|
||||||
// As long as these are not processed, we are not processing
|
|
||||||
// any incoming events, so if it takes long, we may actually
|
|
||||||
// block all watchers for some time.
|
|
||||||
// TODO: From the logs it seems that there happens processing
|
|
||||||
// times even up to 1s which is very long. However, this doesn't
|
|
||||||
// depend that much on the number of initEvents. E.g. from the
|
|
||||||
// 2000-node Kubemark run we have logs like this, e.g.:
|
|
||||||
// ... processing 13862 initEvents took 66.808689ms
|
|
||||||
// ... processing 14040 initEvents took 993.532539ms
|
|
||||||
// We should understand what is blocking us in those cases (e.g.
|
|
||||||
// is it lack of CPU, network, or sth else) and potentially
|
|
||||||
// consider increase size of result buffer in those cases.
|
|
||||||
const initProcessThreshold = 500 * time.Millisecond
|
|
||||||
startTime := time.Now()
|
|
||||||
|
|
||||||
initEventCount := 0
|
|
||||||
for {
|
|
||||||
event, err := cacheInterval.Next()
|
|
||||||
if err != nil {
|
|
||||||
// An error indicates that the cache interval
|
|
||||||
// has been invalidated and can no longer serve
|
|
||||||
// events.
|
|
||||||
//
|
|
||||||
// Initially we considered sending an "out-of-history"
|
|
||||||
// Error event in this case, but because historically
|
|
||||||
// such events weren't sent out of the watchCache, we
|
|
||||||
// decided not to. This is still ok, because on watch
|
|
||||||
// closure, the watcher will try to re-instantiate the
|
|
||||||
// watch and then will get an explicit "out-of-history"
|
|
||||||
// window. There is potential for optimization, but for
|
|
||||||
// now, in order to be on the safe side and not break
|
|
||||||
// custom clients, the cost of it is something that we
|
|
||||||
// are fully accepting.
|
|
||||||
klog.Warningf("couldn't retrieve watch event to serve: %#v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if event == nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
c.sendWatchCacheEvent(event)
|
|
||||||
// With some events already sent, update resourceVersion so that
|
|
||||||
// events that were buffered and not yet processed won't be delivered
|
|
||||||
// to this watcher second time causing going back in time.
|
|
||||||
resourceVersion = event.ResourceVersion
|
|
||||||
initEventCount++
|
|
||||||
}
|
|
||||||
|
|
||||||
if initEventCount > 0 {
|
|
||||||
metrics.InitCounter.WithLabelValues(c.groupResource.String()).Add(float64(initEventCount))
|
|
||||||
}
|
|
||||||
processingTime := time.Since(startTime)
|
|
||||||
if processingTime > initProcessThreshold {
|
|
||||||
klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", initEventCount, c.groupResource, c.identifier, processingTime)
|
|
||||||
}
|
|
||||||
|
|
||||||
c.process(ctx, resourceVersion)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *cacheWatcher) process(ctx context.Context, resourceVersion uint64) {
|
|
||||||
// At this point we already start processing incoming watch events.
|
|
||||||
// However, the init event can still be processed because their serialization
|
|
||||||
// and sending to the client happens asynchrnously.
|
|
||||||
// TODO: As describe in the KEP, we would like to estimate that by delaying
|
|
||||||
// the initialization signal proportionally to the number of events to
|
|
||||||
// process, but we're leaving this to the tuning phase.
|
|
||||||
utilflowcontrol.WatchInitialized(ctx)
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case event, ok := <-c.input:
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// only send events newer than resourceVersion
|
|
||||||
if event.ResourceVersion > resourceVersion {
|
|
||||||
c.sendWatchCacheEvent(event)
|
|
||||||
}
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -27,7 +27,6 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
v1 "k8s.io/api/core/v1"
|
|
||||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||||
"k8s.io/apimachinery/pkg/api/meta"
|
"k8s.io/apimachinery/pkg/api/meta"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
@ -36,7 +35,6 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||||
"k8s.io/apimachinery/pkg/util/diff"
|
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
@ -45,174 +43,8 @@ import (
|
|||||||
"k8s.io/apiserver/pkg/storage"
|
"k8s.io/apiserver/pkg/storage"
|
||||||
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
|
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
|
||||||
"k8s.io/utils/clock"
|
"k8s.io/utils/clock"
|
||||||
testingclock "k8s.io/utils/clock/testing"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// verifies the cacheWatcher.process goroutine is properly cleaned up even if
|
|
||||||
// the writes to cacheWatcher.result channel is blocked.
|
|
||||||
func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
|
|
||||||
var lock sync.RWMutex
|
|
||||||
var w *cacheWatcher
|
|
||||||
count := 0
|
|
||||||
filter := func(string, labels.Set, fields.Set) bool { return true }
|
|
||||||
forget := func(drainWatcher bool) {
|
|
||||||
lock.Lock()
|
|
||||||
defer lock.Unlock()
|
|
||||||
count++
|
|
||||||
// forget() has to stop the watcher, as only stopping the watcher
|
|
||||||
// triggers stopping the process() goroutine which we are in the
|
|
||||||
// end waiting for in this test.
|
|
||||||
w.setDrainInputBufferLocked(drainWatcher)
|
|
||||||
w.stopLocked()
|
|
||||||
}
|
|
||||||
initEvents := []*watchCacheEvent{
|
|
||||||
{Object: &v1.Pod{}},
|
|
||||||
{Object: &v1.Pod{}},
|
|
||||||
}
|
|
||||||
// set the size of the buffer of w.result to 0, so that the writes to
|
|
||||||
// w.result is blocked.
|
|
||||||
w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "")
|
|
||||||
go w.processInterval(context.Background(), intervalFromEvents(initEvents), 0)
|
|
||||||
w.Stop()
|
|
||||||
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
|
|
||||||
lock.RLock()
|
|
||||||
defer lock.RUnlock()
|
|
||||||
return count == 2, nil
|
|
||||||
}); err != nil {
|
|
||||||
t.Fatalf("expected forget() to be called twice, because sendWatchCacheEvent should not be blocked by the result channel: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestCacheWatcherHandlesFiltering(t *testing.T) {
|
|
||||||
filter := func(_ string, _ labels.Set, field fields.Set) bool {
|
|
||||||
return field["spec.nodeName"] == "host"
|
|
||||||
}
|
|
||||||
forget := func(bool) {}
|
|
||||||
|
|
||||||
testCases := []struct {
|
|
||||||
events []*watchCacheEvent
|
|
||||||
expected []watch.Event
|
|
||||||
}{
|
|
||||||
// properly handle starting with the filter, then being deleted, then re-added
|
|
||||||
{
|
|
||||||
events: []*watchCacheEvent{
|
|
||||||
{
|
|
||||||
Type: watch.Added,
|
|
||||||
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
|
|
||||||
ObjFields: fields.Set{"spec.nodeName": "host"},
|
|
||||||
ResourceVersion: 1,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Type: watch.Modified,
|
|
||||||
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
|
|
||||||
PrevObjFields: fields.Set{"spec.nodeName": "host"},
|
|
||||||
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
|
|
||||||
ObjFields: fields.Set{"spec.nodeName": ""},
|
|
||||||
ResourceVersion: 2,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Type: watch.Modified,
|
|
||||||
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
|
|
||||||
PrevObjFields: fields.Set{"spec.nodeName": ""},
|
|
||||||
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}},
|
|
||||||
ObjFields: fields.Set{"spec.nodeName": "host"},
|
|
||||||
ResourceVersion: 3,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
expected: []watch.Event{
|
|
||||||
{Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}},
|
|
||||||
{Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}},
|
|
||||||
{Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
// properly handle ignoring changes prior to the filter, then getting added, then deleted
|
|
||||||
{
|
|
||||||
events: []*watchCacheEvent{
|
|
||||||
{
|
|
||||||
Type: watch.Added,
|
|
||||||
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
|
|
||||||
ObjFields: fields.Set{"spec.nodeName": ""},
|
|
||||||
ResourceVersion: 1,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Type: watch.Modified,
|
|
||||||
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
|
|
||||||
PrevObjFields: fields.Set{"spec.nodeName": ""},
|
|
||||||
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
|
|
||||||
ObjFields: fields.Set{"spec.nodeName": ""},
|
|
||||||
ResourceVersion: 2,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Type: watch.Modified,
|
|
||||||
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
|
|
||||||
PrevObjFields: fields.Set{"spec.nodeName": ""},
|
|
||||||
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}},
|
|
||||||
ObjFields: fields.Set{"spec.nodeName": "host"},
|
|
||||||
ResourceVersion: 3,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Type: watch.Modified,
|
|
||||||
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}},
|
|
||||||
PrevObjFields: fields.Set{"spec.nodeName": "host"},
|
|
||||||
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}},
|
|
||||||
ObjFields: fields.Set{"spec.nodeName": "host"},
|
|
||||||
ResourceVersion: 4,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Type: watch.Modified,
|
|
||||||
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}},
|
|
||||||
PrevObjFields: fields.Set{"spec.nodeName": "host"},
|
|
||||||
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}},
|
|
||||||
ObjFields: fields.Set{"spec.nodeName": ""},
|
|
||||||
ResourceVersion: 5,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Type: watch.Modified,
|
|
||||||
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}},
|
|
||||||
PrevObjFields: fields.Set{"spec.nodeName": ""},
|
|
||||||
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "6"}},
|
|
||||||
ObjFields: fields.Set{"spec.nodeName": ""},
|
|
||||||
ResourceVersion: 6,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
expected: []watch.Event{
|
|
||||||
{Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}},
|
|
||||||
{Type: watch.Modified, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}},
|
|
||||||
{Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}}},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
TestCase:
|
|
||||||
for i, testCase := range testCases {
|
|
||||||
// set the size of the buffer of w.result to 0, so that the writes to
|
|
||||||
// w.result is blocked.
|
|
||||||
for j := range testCase.events {
|
|
||||||
testCase.events[j].ResourceVersion = uint64(j) + 1
|
|
||||||
}
|
|
||||||
|
|
||||||
w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "")
|
|
||||||
go w.processInterval(context.Background(), intervalFromEvents(testCase.events), 0)
|
|
||||||
|
|
||||||
ch := w.ResultChan()
|
|
||||||
for j, event := range testCase.expected {
|
|
||||||
e := <-ch
|
|
||||||
if !reflect.DeepEqual(event, e) {
|
|
||||||
t.Errorf("%d: unexpected event %d: %s", i, j, diff.ObjectReflectDiff(event, e))
|
|
||||||
break TestCase
|
|
||||||
}
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case obj, ok := <-ch:
|
|
||||||
t.Errorf("%d: unexpected excess event: %#v %t", i, obj, ok)
|
|
||||||
break TestCase
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
w.setDrainInputBufferLocked(false)
|
|
||||||
w.stopLocked()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type testVersioner struct{}
|
type testVersioner struct{}
|
||||||
|
|
||||||
func (testVersioner) UpdateObject(obj runtime.Object, resourceVersion uint64) error {
|
func (testVersioner) UpdateObject(obj runtime.Object, resourceVersion uint64) error {
|
||||||
@ -567,89 +399,6 @@ func TestWatcherNotGoingBackInTime(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) {
|
|
||||||
var w *cacheWatcher
|
|
||||||
done := make(chan struct{})
|
|
||||||
filter := func(string, labels.Set, fields.Set) bool { return true }
|
|
||||||
forget := func(drainWatcher bool) {
|
|
||||||
w.setDrainInputBufferLocked(drainWatcher)
|
|
||||||
w.stopLocked()
|
|
||||||
done <- struct{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
maxRetriesToProduceTheRaceCondition := 1000
|
|
||||||
// Simulating the timer is fired and stopped concurrently by set time
|
|
||||||
// timeout to zero and run the Stop goroutine concurrently.
|
|
||||||
// May sure that the watch will not be blocked on Stop.
|
|
||||||
for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ {
|
|
||||||
w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "")
|
|
||||||
go w.Stop()
|
|
||||||
select {
|
|
||||||
case <-done:
|
|
||||||
case <-time.After(time.Second):
|
|
||||||
t.Fatal("stop is blocked when the timer is fired concurrently")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
deadline := time.Now().Add(time.Hour)
|
|
||||||
// After that, verifies the cacheWatcher.process goroutine works correctly.
|
|
||||||
for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ {
|
|
||||||
w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false, schema.GroupResource{Resource: "pods"}, "")
|
|
||||||
w.input <- &watchCacheEvent{Object: &v1.Pod{}, ResourceVersion: uint64(i + 1)}
|
|
||||||
ctx, cancel := context.WithDeadline(context.Background(), deadline)
|
|
||||||
defer cancel()
|
|
||||||
go w.processInterval(ctx, intervalFromEvents(nil), 0)
|
|
||||||
select {
|
|
||||||
case <-w.ResultChan():
|
|
||||||
case <-time.After(time.Second):
|
|
||||||
t.Fatal("expected received a event on ResultChan")
|
|
||||||
}
|
|
||||||
w.setDrainInputBufferLocked(false)
|
|
||||||
w.stopLocked()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestCacheWatcherStoppedOnDestroy(t *testing.T) {
|
|
||||||
backingStorage := &dummyStorage{}
|
|
||||||
cacher, _, err := newTestCacher(backingStorage)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Couldn't create cacher: %v", err)
|
|
||||||
}
|
|
||||||
defer cacher.Stop()
|
|
||||||
|
|
||||||
// Wait until cacher is initialized.
|
|
||||||
if err := cacher.ready.wait(); err != nil {
|
|
||||||
t.Fatalf("unexpected error waiting for the cache to be ready")
|
|
||||||
}
|
|
||||||
|
|
||||||
w, err := cacher.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to create watch: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
watchClosed := make(chan struct{})
|
|
||||||
go func() {
|
|
||||||
defer close(watchClosed)
|
|
||||||
for event := range w.ResultChan() {
|
|
||||||
switch event.Type {
|
|
||||||
case watch.Added, watch.Modified, watch.Deleted:
|
|
||||||
// ok
|
|
||||||
default:
|
|
||||||
t.Errorf("unexpected event %#v", event)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
cacher.Stop()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-watchClosed:
|
|
||||||
case <-time.After(wait.ForeverTestTimeout):
|
|
||||||
t.Errorf("timed out waiting for watch to close")
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestCacheDontAcceptRequestsStopped(t *testing.T) {
|
func TestCacheDontAcceptRequestsStopped(t *testing.T) {
|
||||||
backingStorage := &dummyStorage{}
|
backingStorage := &dummyStorage{}
|
||||||
cacher, _, err := newTestCacher(backingStorage)
|
cacher, _, err := newTestCacher(backingStorage)
|
||||||
@ -712,48 +461,6 @@ func TestCacheDontAcceptRequestsStopped(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTimeBucketWatchersBasic(t *testing.T) {
|
|
||||||
filter := func(_ string, _ labels.Set, _ fields.Set) bool {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
forget := func(bool) {}
|
|
||||||
|
|
||||||
newWatcher := func(deadline time.Time) *cacheWatcher {
|
|
||||||
return newCacheWatcher(0, filter, forget, testVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, "")
|
|
||||||
}
|
|
||||||
|
|
||||||
clock := testingclock.NewFakeClock(time.Now())
|
|
||||||
watchers := newTimeBucketWatchers(clock, defaultBookmarkFrequency)
|
|
||||||
now := clock.Now()
|
|
||||||
watchers.addWatcher(newWatcher(now.Add(10 * time.Second)))
|
|
||||||
watchers.addWatcher(newWatcher(now.Add(20 * time.Second)))
|
|
||||||
watchers.addWatcher(newWatcher(now.Add(20 * time.Second)))
|
|
||||||
|
|
||||||
if len(watchers.watchersBuckets) != 2 {
|
|
||||||
t.Errorf("unexpected bucket size: %#v", watchers.watchersBuckets)
|
|
||||||
}
|
|
||||||
watchers0 := watchers.popExpiredWatchers()
|
|
||||||
if len(watchers0) != 0 {
|
|
||||||
t.Errorf("unexpected bucket size: %#v", watchers0)
|
|
||||||
}
|
|
||||||
|
|
||||||
clock.Step(10 * time.Second)
|
|
||||||
watchers1 := watchers.popExpiredWatchers()
|
|
||||||
if len(watchers1) != 1 || len(watchers1[0]) != 1 {
|
|
||||||
t.Errorf("unexpected bucket size: %v", watchers1)
|
|
||||||
}
|
|
||||||
watchers1 = watchers.popExpiredWatchers()
|
|
||||||
if len(watchers1) != 0 {
|
|
||||||
t.Errorf("unexpected bucket size: %#v", watchers1)
|
|
||||||
}
|
|
||||||
|
|
||||||
clock.Step(12 * time.Second)
|
|
||||||
watchers2 := watchers.popExpiredWatchers()
|
|
||||||
if len(watchers2) != 1 || len(watchers2[0]) != 2 {
|
|
||||||
t.Errorf("unexpected bucket size: %#v", watchers2)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestCacherNoLeakWithMultipleWatchers(t *testing.T) {
|
func TestCacherNoLeakWithMultipleWatchers(t *testing.T) {
|
||||||
backingStorage := &dummyStorage{}
|
backingStorage := &dummyStorage{}
|
||||||
cacher, _, err := newTestCacher(backingStorage)
|
cacher, _, err := newTestCacher(backingStorage)
|
||||||
@ -1630,90 +1337,3 @@ func TestCacheIntervalInvalidationStopsWatch(t *testing.T) {
|
|||||||
t.Errorf("unexpected number of events received, expected: %d, got: %d", bufferSize+1, received)
|
t.Errorf("unexpected number of events received, expected: %d, got: %d", bufferSize+1, received)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeWatchCacheEvent(rv uint64) *watchCacheEvent {
|
|
||||||
return &watchCacheEvent{
|
|
||||||
Type: watch.Added,
|
|
||||||
Object: &v1.Pod{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Name: fmt.Sprintf("pod-%d", rv),
|
|
||||||
ResourceVersion: fmt.Sprintf("%d", rv),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
ResourceVersion: rv,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestCacheWatcherDraining verifies the cacheWatcher.process goroutine is properly cleaned up when draining was requested
|
|
||||||
func TestCacheWatcherDraining(t *testing.T) {
|
|
||||||
var lock sync.RWMutex
|
|
||||||
var w *cacheWatcher
|
|
||||||
count := 0
|
|
||||||
filter := func(string, labels.Set, fields.Set) bool { return true }
|
|
||||||
forget := func(drainWatcher bool) {
|
|
||||||
lock.Lock()
|
|
||||||
defer lock.Unlock()
|
|
||||||
count++
|
|
||||||
w.setDrainInputBufferLocked(drainWatcher)
|
|
||||||
w.stopLocked()
|
|
||||||
}
|
|
||||||
initEvents := []*watchCacheEvent{
|
|
||||||
makeWatchCacheEvent(5),
|
|
||||||
makeWatchCacheEvent(6),
|
|
||||||
}
|
|
||||||
w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "")
|
|
||||||
go w.processInterval(context.Background(), intervalFromEvents(initEvents), 1)
|
|
||||||
if !w.add(makeWatchCacheEvent(7), time.NewTimer(1*time.Second)) {
|
|
||||||
t.Fatal("failed adding an even to the watcher")
|
|
||||||
}
|
|
||||||
forget(true) // drain the watcher
|
|
||||||
|
|
||||||
eventCount := 0
|
|
||||||
for range w.ResultChan() {
|
|
||||||
eventCount++
|
|
||||||
}
|
|
||||||
if eventCount != 3 {
|
|
||||||
t.Errorf("Unexpected number of objects received: %d, expected: 3", eventCount)
|
|
||||||
}
|
|
||||||
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
|
|
||||||
lock.RLock()
|
|
||||||
defer lock.RUnlock()
|
|
||||||
return count == 2, nil
|
|
||||||
}); err != nil {
|
|
||||||
t.Fatalf("expected forget() to be called twice, because processInterval should call Stop(): %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestCacheWatcherDrainingRequestedButNotDrained verifies the cacheWatcher.process goroutine is properly cleaned up when draining was requested
|
|
||||||
// but the client never actually get any data
|
|
||||||
func TestCacheWatcherDrainingRequestedButNotDrained(t *testing.T) {
|
|
||||||
var lock sync.RWMutex
|
|
||||||
var w *cacheWatcher
|
|
||||||
count := 0
|
|
||||||
filter := func(string, labels.Set, fields.Set) bool { return true }
|
|
||||||
forget := func(drainWatcher bool) {
|
|
||||||
lock.Lock()
|
|
||||||
defer lock.Unlock()
|
|
||||||
count++
|
|
||||||
w.setDrainInputBufferLocked(drainWatcher)
|
|
||||||
w.stopLocked()
|
|
||||||
}
|
|
||||||
initEvents := []*watchCacheEvent{
|
|
||||||
makeWatchCacheEvent(5),
|
|
||||||
makeWatchCacheEvent(6),
|
|
||||||
}
|
|
||||||
w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "")
|
|
||||||
go w.processInterval(context.Background(), intervalFromEvents(initEvents), 1)
|
|
||||||
if !w.add(makeWatchCacheEvent(7), time.NewTimer(1*time.Second)) {
|
|
||||||
t.Fatal("failed adding an even to the watcher")
|
|
||||||
}
|
|
||||||
forget(true) // drain the watcher
|
|
||||||
w.Stop() // client disconnected, timeout expired or ctx was actually closed
|
|
||||||
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
|
|
||||||
lock.RLock()
|
|
||||||
defer lock.RUnlock()
|
|
||||||
return count == 3, nil
|
|
||||||
}); err != nil {
|
|
||||||
t.Fatalf("expected forget() to be called three times, because processInterval should call Stop(): %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
Loading…
Reference in New Issue
Block a user