From 880f922bb673e4e9010cd848fe76e9c5b0d2bd1f Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Tue, 7 Apr 2015 16:00:37 -0700 Subject: [PATCH] Add easy setup for simple controller Also add tests; coverage up to 86.7% --- pkg/client/cache/store.go | 4 + pkg/controller/framework/controller.go | 123 ++++++++ pkg/controller/framework/controller_test.go | 276 +++++++++++++++++- .../framework/fake_controller_source.go | 37 ++- 4 files changed, 433 insertions(+), 7 deletions(-) diff --git a/pkg/client/cache/store.go b/pkg/client/cache/store.go index f42d6e178e1..4fe08ace32e 100644 --- a/pkg/client/cache/store.go +++ b/pkg/client/cache/store.go @@ -59,6 +59,10 @@ func (k KeyError) Error() string { return fmt.Sprintf("couldn't create key for object %+v: %v", k.Obj, k.Err) } +// ExplicitKey can be passed to MetaNamespaceKeyFunc if you have the key for +// the object but not the object itself. +type ExplicitKey string + // MetaNamespaceKeyFunc is a convenient default KeyFunc which knows how to make // keys for API objects which implement meta.Interface. // The key uses the format / unless is empty, then diff --git a/pkg/controller/framework/controller.go b/pkg/controller/framework/controller.go index 1868e0361e1..a55e09dd6fd 100644 --- a/pkg/controller/framework/controller.go +++ b/pkg/controller/framework/controller.go @@ -102,3 +102,126 @@ func (c *Controller) processLoop() { } } } + +// ResourceEventHandler can handle notifications for events that happen to a +// resource. The events are informational only, so you can't return an +// error. +// * OnAdd is called when an object is added. +// * OnUpdate is called when an object is modified. Note that oldObj is the +// last known state of the object-- it is possible that several changes +// were combined together, so you can't use this to see every single +// change. OnUpdate is also called when a re-list happens, and it will +// get called even if nothing changed. This is useful for periodically +// evaluating or syncing something. +// * OnDelete will get the final state of the item if it is known, otherwise +// it will get an object of type cache.DeletedFinalStateUnknown. +type ResourceEventHandler interface { + OnAdd(obj interface{}) + OnUpdate(oldObj, newObj interface{}) + OnDelete(obj interface{}) +} + +// ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or +// as few of the notification functions as you want while still implementing +// ResourceEventHandler. +type ResourceEventHandlerFuncs struct { + AddFunc func(obj interface{}) + UpdateFunc func(oldObj, newObj interface{}) + DeleteFunc func(obj interface{}) +} + +// OnAdd calls AddFunc if it's not nil. +func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) { + if r.AddFunc != nil { + r.AddFunc(obj) + } +} + +// OnUpdate calls UpdateFunc if it's not nil. +func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) { + if r.UpdateFunc != nil { + r.UpdateFunc(oldObj, newObj) + } +} + +// OnDelete calls DeleteFunc if it's not nil. +func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) { + if r.DeleteFunc != nil { + r.DeleteFunc(obj) + } +} + +// DeletionHandlingMetaNamespaceKeyFunc checks for +// cache.DeletedFinalStateUnknown objects before calling +// cache.MetaNamespaceKeyFunc. +func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error) { + if d, ok := obj.(cache.DeletedFinalStateUnknown); ok { + return d.Key, nil + } + return cache.MetaNamespaceKeyFunc(obj) +} + +// NewInformer returns a cache.Store and a controller for populating the store +// while also providing event notifications. You should only used the returned +// cache.Store for Get/List operations; Add/Modify/Deletes will cause the event +// notifications to be faulty. +// +// Parameters: +// * lw is list and watch functions for the source of the resource you want to +// be informed of. +// * objType is an object of the type that you expect to receieve. +// * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate +// calls, even if nothing changed). Otherwise, re-list will be delayed as +// long as possible (until the upstream source closes the watch or times out, +// or you stop the controller). +// * h is the object you want notifications sent to. +// +func NewInformer( + lw cache.ListerWatcher, + objType runtime.Object, + resyncPeriod time.Duration, + h ResourceEventHandler, +) (cache.Store, *Controller) { + // This will hold the client state, as we know it. + clientState := cache.NewStore(DeletionHandlingMetaNamespaceKeyFunc) + + // This will hold incoming changes. Note how we pass clientState in as a + // KeyLister, that way resync operations will result in the correct set + // of update/delete deltas. + fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, clientState) + + cfg := &Config{ + Queue: fifo, + ListerWatcher: lw, + ObjectType: objType, + FullResyncPeriod: resyncPeriod, + RetryOnError: false, + + Process: func(obj interface{}) error { + // from oldest to newest + for _, d := range obj.(cache.Deltas) { + switch d.Type { + case cache.Sync, cache.Added, cache.Updated: + if old, exists, err := clientState.Get(d.Object); err == nil && exists { + if err := clientState.Update(d.Object); err != nil { + return err + } + h.OnUpdate(old, d.Object) + } else { + if err := clientState.Add(d.Object); err != nil { + return err + } + h.OnAdd(d.Object) + } + case cache.Deleted: + if err := clientState.Delete(d.Object); err != nil { + return err + } + h.OnDelete(d.Object) + } + } + return nil + }, + } + return clientState, New(cfg) +} diff --git a/pkg/controller/framework/controller_test.go b/pkg/controller/framework/controller_test.go index 1ec94ea71e5..bbcf8719956 100644 --- a/pkg/controller/framework/controller_test.go +++ b/pkg/controller/framework/controller_test.go @@ -18,15 +18,18 @@ package framework_test import ( "fmt" + "math/rand" "sync" + "testing" "time" - // "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" "github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + + "github.com/google/gofuzz" ) func Example() { @@ -34,7 +37,7 @@ func Example() { source := framework.NewFakeControllerSource() // This will hold the downstream state, as we know it. - downstream := cache.NewStore(cache.MetaNamespaceKeyFunc) + downstream := cache.NewStore(framework.DeletionHandlingMetaNamespaceKeyFunc) // This will hold incoming changes. Note how we pass downstream in as a // KeyLister, that way resync operations will result in the correct set @@ -113,3 +116,272 @@ func Example() { // b-controller // c-framework } + +func ExampleInformer() { + // source simulates an apiserver object endpoint. + source := framework.NewFakeControllerSource() + + // Let's do threadsafe output to get predictable test results. + outputSetLock := sync.Mutex{} + outputSet := util.StringSet{} + + // Make a controller that immediately deletes anything added to it, and + // logs anything deleted. + _, controller := framework.NewInformer( + source, + &api.Pod{}, + time.Millisecond*100, + framework.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + source.Delete(obj.(runtime.Object)) + }, + DeleteFunc: func(obj interface{}) { + key, err := framework.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + key = "oops something went wrong with the key" + } + + // Record some output when items are deleted. + outputSetLock.Lock() + defer outputSetLock.Unlock() + outputSet.Insert(key) + }, + }, + ) + + // Run the controller and run it until we close stop. + stop := make(chan struct{}) + controller.Run(stop) + + // Let's add a few objects to the source. + for _, name := range []string{"a-hello", "b-controller", "c-framework"} { + // Note that these pods are not valid-- the fake source doesn't + // call validation or perform any other checking. + source.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: name}}) + } + + // Let's wait for the controller to process the things we just added. + time.Sleep(500 * time.Millisecond) + close(stop) + + outputSetLock.Lock() + for _, key := range outputSet.List() { + fmt.Println(key) + } + // Output: + // a-hello + // b-controller + // c-framework +} + +func TestHammerController(t *testing.T) { + // This test executes a bunch of requests through the fake source and + // controller framework to make sure there's no locking/threading + // errors. If an error happens, it should hang forever or trigger the + // race detector. + + // source simulates an apiserver object endpoint. + source := framework.NewFakeControllerSource() + + // Let's do threadsafe output to get predictable test results. + outputSetLock := sync.Mutex{} + // map of key to operations done on the key + outputSet := map[string][]string{} + + recordFunc := func(eventType string, obj interface{}) { + key, err := framework.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + t.Errorf("something wrong with key: %v", err) + key = "oops something went wrong with the key" + } + + // Record some output when items are deleted. + outputSetLock.Lock() + defer outputSetLock.Unlock() + outputSet[key] = append(outputSet[key], eventType) + } + + // Make a controller which just logs all the changes it gets. + _, controller := framework.NewInformer( + source, + &api.Pod{}, + time.Millisecond*100, + framework.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { recordFunc("add", obj) }, + UpdateFunc: func(oldObj, newObj interface{}) { recordFunc("update", newObj) }, + DeleteFunc: func(obj interface{}) { recordFunc("delete", obj) }, + }, + ) + + // Run the controller and run it until we close stop. + stop := make(chan struct{}) + go controller.Run(stop) + + wg := sync.WaitGroup{} + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + // Let's add a few objects to the source. + currentNames := util.StringSet{} + rs := rand.NewSource(rand.Int63()) + f := fuzz.New().NilChance(.5).NumElements(0, 2).RandSource(rs) + r := rand.New(rs) // Mustn't use r and f concurrently! + for i := 0; i < 750; i++ { + var name string + var isNew bool + if currentNames.Len() == 0 || r.Intn(3) == 1 { + f.Fuzz(&name) + isNew = true + } else { + l := currentNames.List() + name = l[r.Intn(len(l))] + } + + pod := &api.Pod{} + f.Fuzz(pod) + pod.ObjectMeta.Name = name + pod.ObjectMeta.Namespace = "default" + // Add, update, or delete randomly. + // Note that these pods are not valid-- the fake source doesn't + // call validation or perform any other checking. + if isNew { + currentNames.Insert(name) + source.Add(pod) + continue + } + switch r.Intn(2) { + case 0: + currentNames.Insert(name) + source.Modify(pod) + case 1: + currentNames.Delete(name) + source.Delete(pod) + } + } + }() + } + wg.Wait() + + // Let's wait for the controller to finish processing the things we just added. + time.Sleep(100 * time.Millisecond) + close(stop) + + outputSetLock.Lock() + t.Logf("got: %#v", outputSet) +} + +func TestUpdate(t *testing.T) { + // This test is going to exercise the various paths that result in a + // call to update. + + // source simulates an apiserver object endpoint. + source := framework.NewFakeControllerSource() + + const ( + FROM = "from" + ADD_MISSED = "missed the add event" + TO = "to" + ) + + // These are the transitions we expect to see; because this is + // asynchronous, there are a lot of valid possibilities. + type pair struct{ from, to string } + allowedTransitions := map[pair]bool{ + pair{FROM, TO}: true, + pair{FROM, ADD_MISSED}: true, + pair{ADD_MISSED, TO}: true, + + // Because a resync can happen when we've already observed one + // of the above but before the item is deleted. + pair{TO, TO}: true, + // Because a resync could happen before we observe an update. + pair{FROM, FROM}: true, + } + + var testDoneWG sync.WaitGroup + + // Make a controller that immediately deletes anything added to it, and + // logs anything deleted. + _, controller := framework.NewInformer( + source, + &api.Pod{}, + time.Millisecond*1, + framework.ResourceEventHandlerFuncs{ + UpdateFunc: func(oldObj, newObj interface{}) { + o, n := oldObj.(*api.Pod), newObj.(*api.Pod) + from, to := o.Labels["check"], n.Labels["check"] + if !allowedTransitions[pair{from, to}] { + t.Errorf("observed transition %q -> %q for %v", from, to, n.Name) + } + source.Delete(n) + }, + DeleteFunc: func(obj interface{}) { + testDoneWG.Done() + }, + }, + ) + + // Run the controller and run it until we close stop. + stop := make(chan struct{}) + go controller.Run(stop) + + pod := func(name, check string) *api.Pod { + return &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: name, + Labels: map[string]string{"check": check}, + }, + } + } + + var wg sync.WaitGroup + tests := []func(string){ + func(name string) { + defer wg.Done() + testDoneWG.Add(1) + name = "a-" + name + source.Add(pod(name, FROM)) + source.Modify(pod(name, TO)) + }, + func(name string) { + defer wg.Done() + testDoneWG.Add(1) + name = "b-" + name + source.Add(pod(name, FROM)) + source.ModifyDropWatch(pod(name, TO)) + }, + func(name string) { + defer wg.Done() + testDoneWG.Add(1) + name = "c-" + name + source.AddDropWatch(pod(name, FROM)) + source.Modify(pod(name, ADD_MISSED)) + source.Modify(pod(name, TO)) + }, + func(name string) { + defer wg.Done() + testDoneWG.Add(1) + name = "d-" + name + source.Add(pod(name, FROM)) + }, + } + + // run every test a few times, in parallel + fuzzer := fuzz.New() + for i := 0; i < 20; i++ { + for _, f := range tests { + wg.Add(1) + var name string + for len(name) < 10 { + fuzzer.Fuzz(&name) + } + go f(name) + } + } + wg.Wait() + + // Let's wait for the controller to process the things we just added. + testDoneWG.Wait() + close(stop) +} diff --git a/pkg/controller/framework/fake_controller_source.go b/pkg/controller/framework/fake_controller_source.go index d7b863ad394..3bba08aae28 100644 --- a/pkg/controller/framework/fake_controller_source.go +++ b/pkg/controller/framework/fake_controller_source.go @@ -18,6 +18,7 @@ package framework import ( "errors" + "math/rand" "strconv" "sync" @@ -51,26 +52,49 @@ type nnu struct { // Add adds an object to the set and sends an add event to watchers. // obj's ResourceVersion is set. func (f *FakeControllerSource) Add(obj runtime.Object) { - f.change(watch.Event{watch.Added, obj}) + f.Change(watch.Event{watch.Added, obj}, 1) } // Modify updates an object in the set and sends a modified event to watchers. // obj's ResourceVersion is set. func (f *FakeControllerSource) Modify(obj runtime.Object) { - f.change(watch.Event{watch.Modified, obj}) + f.Change(watch.Event{watch.Modified, obj}, 1) } // Delete deletes an object from the set and sends a delete event to watchers. // obj's ResourceVersion is set. func (f *FakeControllerSource) Delete(lastValue runtime.Object) { - f.change(watch.Event{watch.Deleted, lastValue}) + f.Change(watch.Event{watch.Deleted, lastValue}, 1) +} + +// AddDropWatch adds an object to the set but forgets to send an add event to +// watchers. +// obj's ResourceVersion is set. +func (f *FakeControllerSource) AddDropWatch(obj runtime.Object) { + f.Change(watch.Event{watch.Added, obj}, 0) +} + +// ModifyDropWatch updates an object in the set but forgets to send a modify +// event to watchers. +// obj's ResourceVersion is set. +func (f *FakeControllerSource) ModifyDropWatch(obj runtime.Object) { + f.Change(watch.Event{watch.Modified, obj}, 0) +} + +// DeleteDropWatch deletes an object from the set but forgets to send a delete +// event to watchers. +// obj's ResourceVersion is set. +func (f *FakeControllerSource) DeleteDropWatch(lastValue runtime.Object) { + f.Change(watch.Event{watch.Deleted, lastValue}, 0) } func (f *FakeControllerSource) key(meta *api.ObjectMeta) nnu { return nnu{meta.Namespace, meta.Name, meta.UID} } -func (f *FakeControllerSource) change(e watch.Event) { +// Change records the given event (setting the object's resource version) and +// sends a watch event with the specified probability. +func (f *FakeControllerSource) Change(e watch.Event, watchProbability float64) { f.lock.Lock() defer f.lock.Unlock() @@ -89,7 +113,10 @@ func (f *FakeControllerSource) change(e watch.Event) { case watch.Deleted: delete(f.items, key) } - f.broadcaster.Action(e.Type, e.Object) + + if rand.Float64() < watchProbability { + f.broadcaster.Action(e.Type, e.Object) + } } // List returns a list object, with its resource version set.