fix(fakeclient): write event to watch channel on add/update/delete

fix races with watch call

add test for non-namespace resource watch

add matching for all-namespace-watch

fix delete namespace watch & restrict test

fix multiple invocation on same resource & namespace

add descriptive doc for tracker.watchers

Kubernetes-commit: f57cc0b22d282bc8fe68faf91529e7175bc3918a
This commit is contained in:
yue9944882
2017-12-21 16:50:16 +08:00
committed by Kubernetes Publisher
parent f031b2af14
commit af8ed43b01
3 changed files with 268 additions and 3 deletions

View File

@@ -29,6 +29,11 @@ import (
restclient "k8s.io/client-go/rest"
)
// FakeWatchBufferSize is the max num of watch event can be buffered in the
// watch channel. Note that when watch event overflows or exceed this buffer
// size, manipulations via fake client may be blocked.
const FakeWatchBufferSize = 128
// ObjectTracker keeps track of objects. It is intended to be used to
// fake calls to a server by returning objects based on their kind,
// namespace and name.
@@ -54,6 +59,10 @@ type ObjectTracker interface {
// didn't exist in the tracker prior to deletion, Delete returns
// no error.
Delete(gvr schema.GroupVersionResource, ns, name string) error
// Watch watches objects from the tracker. Watch returns a channel
// which will push added / modified / deleted object.
Watch(gvr schema.GroupVersionResource, ns string) (watch.Interface, error)
}
// ObjectScheme abstracts the implementation of common operations on objects.
@@ -132,6 +141,13 @@ type tracker struct {
decoder runtime.Decoder
lock sync.RWMutex
objects map[schema.GroupVersionResource][]runtime.Object
// The value type of watchers is a map of which the key is either a namespace or
// all/non namespace aka "" and its value is list of fake watchers. Each of
// fake watcher holds a buffered channel of size "FakeWatchBufferSize" which
// is default to 128. Manipulations on resources will broadcast the notification
// events into the watchers' channel and note that too many unhandled event may
// potentially block the tracker.
watchers map[schema.GroupVersionResource]map[string][]*watch.FakeWatcher
}
var _ ObjectTracker = &tracker{}
@@ -140,9 +156,10 @@ var _ ObjectTracker = &tracker{}
// of objects for the fake clientset. Mostly useful for unit tests.
func NewObjectTracker(scheme ObjectScheme, decoder runtime.Decoder) ObjectTracker {
return &tracker{
scheme: scheme,
decoder: decoder,
objects: make(map[schema.GroupVersionResource][]runtime.Object),
scheme: scheme,
decoder: decoder,
objects: make(map[schema.GroupVersionResource][]runtime.Object),
watchers: make(map[schema.GroupVersionResource]map[string][]*watch.FakeWatcher),
}
}
@@ -185,6 +202,19 @@ func (t *tracker) List(gvr schema.GroupVersionResource, gvk schema.GroupVersionK
return list.DeepCopyObject(), nil
}
func (t *tracker) Watch(gvr schema.GroupVersionResource, ns string) (watch.Interface, error) {
t.lock.Lock()
defer t.lock.Unlock()
fakewatcher := watch.NewFakeWithChanSize(FakeWatchBufferSize, true)
if _, exists := t.watchers[gvr]; !exists {
t.watchers[gvr] = make(map[string][]*watch.FakeWatcher)
}
t.watchers[gvr][ns] = append(t.watchers[gvr][ns], fakewatcher)
return fakewatcher, nil
}
func (t *tracker) Get(gvr schema.GroupVersionResource, ns, name string) (runtime.Object, error) {
errNotFound := errors.NewNotFound(gvr.GroupResource(), name)
@@ -263,6 +293,19 @@ func (t *tracker) Update(gvr schema.GroupVersionResource, obj runtime.Object, ns
return t.add(gvr, obj, ns, true)
}
func (t *tracker) getWatches(gvr schema.GroupVersionResource, ns string) []*watch.FakeWatcher {
watches := []*watch.FakeWatcher{}
if t.watchers[gvr] != nil {
if w := t.watchers[gvr][ns]; w != nil {
watches = append(watches, w...)
}
if w := t.watchers[gvr][""]; w != nil {
watches = append(watches, w...)
}
}
return watches
}
func (t *tracker) add(gvr schema.GroupVersionResource, obj runtime.Object, ns string, replaceExisting bool) error {
t.lock.Lock()
defer t.lock.Unlock()
@@ -296,6 +339,9 @@ func (t *tracker) add(gvr schema.GroupVersionResource, obj runtime.Object, ns st
}
if oldMeta.GetNamespace() == newMeta.GetNamespace() && oldMeta.GetName() == newMeta.GetName() {
if replaceExisting {
for _, w := range t.getWatches(gvr, ns) {
w.Modify(obj)
}
t.objects[gvr][i] = obj
return nil
}
@@ -310,6 +356,10 @@ func (t *tracker) add(gvr schema.GroupVersionResource, obj runtime.Object, ns st
t.objects[gvr] = append(t.objects[gvr], obj)
for _, w := range t.getWatches(gvr, ns) {
w.Add(obj)
}
return nil
}
@@ -342,7 +392,11 @@ func (t *tracker) Delete(gvr schema.GroupVersionResource, ns, name string) error
return err
}
if objMeta.GetNamespace() == ns && objMeta.GetName() == name {
obj := t.objects[gvr][i]
t.objects[gvr] = append(t.objects[gvr][:i], t.objects[gvr][i+1:]...)
for _, w := range t.getWatches(gvr, ns) {
w.Delete(obj)
}
found = true
break
}