diff --git a/staging/src/k8s.io/client-go/testing/fixture.go b/staging/src/k8s.io/client-go/testing/fixture.go index ba8ee508f46..13192f92d16 100644 --- a/staging/src/k8s.io/client-go/testing/fixture.go +++ b/staging/src/k8s.io/client-go/testing/fixture.go @@ -29,11 +29,6 @@ import ( restclient "k8s.io/client-go/rest" ) -// FakeWatchBufferSize is the max num of watch event can be buffered in the -// watch channel. Note that when watch event overflows or exceed this buffer -// size, manipulations via fake client may be blocked. -const FakeWatchBufferSize = 128 - // ObjectTracker keeps track of objects. It is intended to be used to // fake calls to a server by returning objects based on their kind, // namespace and name. @@ -142,12 +137,11 @@ type tracker struct { lock sync.RWMutex objects map[schema.GroupVersionResource][]runtime.Object // The value type of watchers is a map of which the key is either a namespace or - // all/non namespace aka "" and its value is list of fake watchers. Each of - // fake watcher holds a buffered channel of size "FakeWatchBufferSize" which - // is default to 128. Manipulations on resources will broadcast the notification - // events into the watchers' channel and note that too many unhandled event may - // potentially block the tracker. - watchers map[schema.GroupVersionResource]map[string][]*watch.FakeWatcher + // all/non namespace aka "" and its value is list of fake watchers. + // Manipulations on resources will broadcast the notification events into the + // watchers' channel. Note that too many unhandled events (currently 100, + // see apimachinery/pkg/watch.DefaultChanSize) will cause a panic. + watchers map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher } var _ ObjectTracker = &tracker{} @@ -159,7 +153,7 @@ func NewObjectTracker(scheme ObjectScheme, decoder runtime.Decoder) ObjectTracke scheme: scheme, decoder: decoder, objects: make(map[schema.GroupVersionResource][]runtime.Object), - watchers: make(map[schema.GroupVersionResource]map[string][]*watch.FakeWatcher), + watchers: make(map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher), } } @@ -206,10 +200,10 @@ func (t *tracker) Watch(gvr schema.GroupVersionResource, ns string) (watch.Inter t.lock.Lock() defer t.lock.Unlock() - fakewatcher := watch.NewFakeWithChanSize(FakeWatchBufferSize, true) + fakewatcher := watch.NewRaceFreeFake() if _, exists := t.watchers[gvr]; !exists { - t.watchers[gvr] = make(map[string][]*watch.FakeWatcher) + t.watchers[gvr] = make(map[string][]*watch.RaceFreeFakeWatcher) } t.watchers[gvr][ns] = append(t.watchers[gvr][ns], fakewatcher) return fakewatcher, nil @@ -293,8 +287,8 @@ func (t *tracker) Update(gvr schema.GroupVersionResource, obj runtime.Object, ns return t.add(gvr, obj, ns, true) } -func (t *tracker) getWatches(gvr schema.GroupVersionResource, ns string) []*watch.FakeWatcher { - watches := []*watch.FakeWatcher{} +func (t *tracker) getWatches(gvr schema.GroupVersionResource, ns string) []*watch.RaceFreeFakeWatcher { + watches := []*watch.RaceFreeFakeWatcher{} if t.watchers[gvr] != nil { if w := t.watchers[gvr][ns]; w != nil { watches = append(watches, w...) diff --git a/staging/src/k8s.io/client-go/testing/fixture_test.go b/staging/src/k8s.io/client-go/testing/fixture_test.go index 967e0aefa93..5c912c2b291 100644 --- a/staging/src/k8s.io/client-go/testing/fixture_test.go +++ b/staging/src/k8s.io/client-go/testing/fixture_test.go @@ -190,3 +190,34 @@ func TestWatchCallMultipleInvocation(t *testing.T) { } wg.Wait() } + +func TestWatchAddAfterStop(t *testing.T) { + testResource := schema.GroupVersionResource{Group: "", Version: "test_version", Resource: "test_kind"} + testObj := getArbitraryResource(testResource, "test_name", "test_namespace") + accessor, err := meta.Accessor(testObj) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + ns := accessor.GetNamespace() + scheme := runtime.NewScheme() + codecs := serializer.NewCodecFactory(scheme) + o := NewObjectTracker(scheme, codecs.UniversalDecoder()) + watch, err := o.Watch(testResource, ns) + if err != nil { + t.Errorf("watch creation failed: %v", err) + } + + // When the watch is stopped it should ignore later events without panicking. + defer func() { + if r := recover(); r != nil { + t.Errorf("Watch panicked when it should have ignored create after stop: %v", r) + } + }() + + watch.Stop() + err = o.Create(testResource, testObj, ns) + if err != nil { + t.Errorf("test resource creation failed: %v", err) + } +}