Use RaceFreeFakeWatcher in ObjectTracker

The FakeWatcher allows sends on the result channel after it's closed,
for example calling Stop() then Add() will panic. RaceFreeFakeWatcher
checks whether the watcher is stopped before attempting to send. It also
panics instead of blocking when the result channel is full.

Kubernetes-commit: b84ad8828b6ffe0dd289f69e395968eabb9fbeaa
This commit is contained in:
Grant Rodgers 2018-03-14 11:38:19 -07:00 committed by Kubernetes Publisher
parent d74b6802c2
commit 949db79a1d
2 changed files with 41 additions and 16 deletions

View File

@ -29,11 +29,6 @@ import (
restclient "k8s.io/client-go/rest"
)
// FakeWatchBufferSize is the max num of watch event can be buffered in the
// watch channel. Note that when watch event overflows or exceed this buffer
// size, manipulations via fake client may be blocked.
const FakeWatchBufferSize = 128
// ObjectTracker keeps track of objects. It is intended to be used to
// fake calls to a server by returning objects based on their kind,
// namespace and name.
@ -142,12 +137,11 @@ type tracker struct {
lock sync.RWMutex
objects map[schema.GroupVersionResource][]runtime.Object
// The value type of watchers is a map of which the key is either a namespace or
// all/non namespace aka "" and its value is list of fake watchers. Each of
// fake watcher holds a buffered channel of size "FakeWatchBufferSize" which
// is default to 128. Manipulations on resources will broadcast the notification
// events into the watchers' channel and note that too many unhandled event may
// potentially block the tracker.
watchers map[schema.GroupVersionResource]map[string][]*watch.FakeWatcher
// all/non namespace aka "" and its value is list of fake watchers.
// Manipulations on resources will broadcast the notification events into the
// watchers' channel. Note that too many unhandled events (currently 100,
// see apimachinery/pkg/watch.DefaultChanSize) will cause a panic.
watchers map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher
}
var _ ObjectTracker = &tracker{}
@ -159,7 +153,7 @@ func NewObjectTracker(scheme ObjectScheme, decoder runtime.Decoder) ObjectTracke
scheme: scheme,
decoder: decoder,
objects: make(map[schema.GroupVersionResource][]runtime.Object),
watchers: make(map[schema.GroupVersionResource]map[string][]*watch.FakeWatcher),
watchers: make(map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher),
}
}
@ -206,10 +200,10 @@ func (t *tracker) Watch(gvr schema.GroupVersionResource, ns string) (watch.Inter
t.lock.Lock()
defer t.lock.Unlock()
fakewatcher := watch.NewFakeWithChanSize(FakeWatchBufferSize, true)
fakewatcher := watch.NewRaceFreeFake()
if _, exists := t.watchers[gvr]; !exists {
t.watchers[gvr] = make(map[string][]*watch.FakeWatcher)
t.watchers[gvr] = make(map[string][]*watch.RaceFreeFakeWatcher)
}
t.watchers[gvr][ns] = append(t.watchers[gvr][ns], fakewatcher)
return fakewatcher, nil
@ -293,8 +287,8 @@ func (t *tracker) Update(gvr schema.GroupVersionResource, obj runtime.Object, ns
return t.add(gvr, obj, ns, true)
}
func (t *tracker) getWatches(gvr schema.GroupVersionResource, ns string) []*watch.FakeWatcher {
watches := []*watch.FakeWatcher{}
func (t *tracker) getWatches(gvr schema.GroupVersionResource, ns string) []*watch.RaceFreeFakeWatcher {
watches := []*watch.RaceFreeFakeWatcher{}
if t.watchers[gvr] != nil {
if w := t.watchers[gvr][ns]; w != nil {
watches = append(watches, w...)

View File

@ -190,3 +190,34 @@ func TestWatchCallMultipleInvocation(t *testing.T) {
}
wg.Wait()
}
func TestWatchAddAfterStop(t *testing.T) {
testResource := schema.GroupVersionResource{Group: "", Version: "test_version", Resource: "test_kind"}
testObj := getArbitraryResource(testResource, "test_name", "test_namespace")
accessor, err := meta.Accessor(testObj)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
ns := accessor.GetNamespace()
scheme := runtime.NewScheme()
codecs := serializer.NewCodecFactory(scheme)
o := NewObjectTracker(scheme, codecs.UniversalDecoder())
watch, err := o.Watch(testResource, ns)
if err != nil {
t.Errorf("watch creation failed: %v", err)
}
// When the watch is stopped it should ignore later events without panicking.
defer func() {
if r := recover(); r != nil {
t.Errorf("Watch panicked when it should have ignored create after stop: %v", r)
}
}()
watch.Stop()
err = o.Create(testResource, testObj, ns)
if err != nil {
t.Errorf("test resource creation failed: %v", err)
}
}