From 065a8fa4546e6a229f8965a083dfed512468706c Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Tue, 7 Apr 2015 13:45:51 -0700 Subject: [PATCH 1/5] add informational output to test --- cmd/integration/integration.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index fdff8ee62f0..ecfa1ad1a84 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -928,6 +928,7 @@ func runSchedulerNoPhantomPodsTest(client *client.Client) { } // Delete a pod to free up room. + glog.Infof("Deleting pod %v", bar.Name) err = client.Pods(api.NamespaceDefault).Delete(bar.Name) if err != nil { glog.Fatalf("FAILED: couldn't delete pod %q: %v", bar.Name, err) From 880f922bb673e4e9010cd848fe76e9c5b0d2bd1f Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Tue, 7 Apr 2015 16:00:37 -0700 Subject: [PATCH 2/5] Add easy setup for simple controller Also add tests; coverage up to 86.7% --- pkg/client/cache/store.go | 4 + pkg/controller/framework/controller.go | 123 ++++++++ pkg/controller/framework/controller_test.go | 276 +++++++++++++++++- .../framework/fake_controller_source.go | 37 ++- 4 files changed, 433 insertions(+), 7 deletions(-) diff --git a/pkg/client/cache/store.go b/pkg/client/cache/store.go index f42d6e178e1..4fe08ace32e 100644 --- a/pkg/client/cache/store.go +++ b/pkg/client/cache/store.go @@ -59,6 +59,10 @@ func (k KeyError) Error() string { return fmt.Sprintf("couldn't create key for object %+v: %v", k.Obj, k.Err) } +// ExplicitKey can be passed to MetaNamespaceKeyFunc if you have the key for +// the object but not the object itself. +type ExplicitKey string + // MetaNamespaceKeyFunc is a convenient default KeyFunc which knows how to make // keys for API objects which implement meta.Interface. // The key uses the format / unless is empty, then diff --git a/pkg/controller/framework/controller.go b/pkg/controller/framework/controller.go index 1868e0361e1..a55e09dd6fd 100644 --- a/pkg/controller/framework/controller.go +++ b/pkg/controller/framework/controller.go @@ -102,3 +102,126 @@ func (c *Controller) processLoop() { } } } + +// ResourceEventHandler can handle notifications for events that happen to a +// resource. The events are informational only, so you can't return an +// error. +// * OnAdd is called when an object is added. +// * OnUpdate is called when an object is modified. Note that oldObj is the +// last known state of the object-- it is possible that several changes +// were combined together, so you can't use this to see every single +// change. OnUpdate is also called when a re-list happens, and it will +// get called even if nothing changed. This is useful for periodically +// evaluating or syncing something. +// * OnDelete will get the final state of the item if it is known, otherwise +// it will get an object of type cache.DeletedFinalStateUnknown. +type ResourceEventHandler interface { + OnAdd(obj interface{}) + OnUpdate(oldObj, newObj interface{}) + OnDelete(obj interface{}) +} + +// ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or +// as few of the notification functions as you want while still implementing +// ResourceEventHandler. +type ResourceEventHandlerFuncs struct { + AddFunc func(obj interface{}) + UpdateFunc func(oldObj, newObj interface{}) + DeleteFunc func(obj interface{}) +} + +// OnAdd calls AddFunc if it's not nil. +func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) { + if r.AddFunc != nil { + r.AddFunc(obj) + } +} + +// OnUpdate calls UpdateFunc if it's not nil. +func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) { + if r.UpdateFunc != nil { + r.UpdateFunc(oldObj, newObj) + } +} + +// OnDelete calls DeleteFunc if it's not nil. +func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) { + if r.DeleteFunc != nil { + r.DeleteFunc(obj) + } +} + +// DeletionHandlingMetaNamespaceKeyFunc checks for +// cache.DeletedFinalStateUnknown objects before calling +// cache.MetaNamespaceKeyFunc. +func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error) { + if d, ok := obj.(cache.DeletedFinalStateUnknown); ok { + return d.Key, nil + } + return cache.MetaNamespaceKeyFunc(obj) +} + +// NewInformer returns a cache.Store and a controller for populating the store +// while also providing event notifications. You should only used the returned +// cache.Store for Get/List operations; Add/Modify/Deletes will cause the event +// notifications to be faulty. +// +// Parameters: +// * lw is list and watch functions for the source of the resource you want to +// be informed of. +// * objType is an object of the type that you expect to receieve. +// * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate +// calls, even if nothing changed). Otherwise, re-list will be delayed as +// long as possible (until the upstream source closes the watch or times out, +// or you stop the controller). +// * h is the object you want notifications sent to. +// +func NewInformer( + lw cache.ListerWatcher, + objType runtime.Object, + resyncPeriod time.Duration, + h ResourceEventHandler, +) (cache.Store, *Controller) { + // This will hold the client state, as we know it. + clientState := cache.NewStore(DeletionHandlingMetaNamespaceKeyFunc) + + // This will hold incoming changes. Note how we pass clientState in as a + // KeyLister, that way resync operations will result in the correct set + // of update/delete deltas. + fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, clientState) + + cfg := &Config{ + Queue: fifo, + ListerWatcher: lw, + ObjectType: objType, + FullResyncPeriod: resyncPeriod, + RetryOnError: false, + + Process: func(obj interface{}) error { + // from oldest to newest + for _, d := range obj.(cache.Deltas) { + switch d.Type { + case cache.Sync, cache.Added, cache.Updated: + if old, exists, err := clientState.Get(d.Object); err == nil && exists { + if err := clientState.Update(d.Object); err != nil { + return err + } + h.OnUpdate(old, d.Object) + } else { + if err := clientState.Add(d.Object); err != nil { + return err + } + h.OnAdd(d.Object) + } + case cache.Deleted: + if err := clientState.Delete(d.Object); err != nil { + return err + } + h.OnDelete(d.Object) + } + } + return nil + }, + } + return clientState, New(cfg) +} diff --git a/pkg/controller/framework/controller_test.go b/pkg/controller/framework/controller_test.go index 1ec94ea71e5..bbcf8719956 100644 --- a/pkg/controller/framework/controller_test.go +++ b/pkg/controller/framework/controller_test.go @@ -18,15 +18,18 @@ package framework_test import ( "fmt" + "math/rand" "sync" + "testing" "time" - // "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" "github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + + "github.com/google/gofuzz" ) func Example() { @@ -34,7 +37,7 @@ func Example() { source := framework.NewFakeControllerSource() // This will hold the downstream state, as we know it. - downstream := cache.NewStore(cache.MetaNamespaceKeyFunc) + downstream := cache.NewStore(framework.DeletionHandlingMetaNamespaceKeyFunc) // This will hold incoming changes. Note how we pass downstream in as a // KeyLister, that way resync operations will result in the correct set @@ -113,3 +116,272 @@ func Example() { // b-controller // c-framework } + +func ExampleInformer() { + // source simulates an apiserver object endpoint. + source := framework.NewFakeControllerSource() + + // Let's do threadsafe output to get predictable test results. + outputSetLock := sync.Mutex{} + outputSet := util.StringSet{} + + // Make a controller that immediately deletes anything added to it, and + // logs anything deleted. + _, controller := framework.NewInformer( + source, + &api.Pod{}, + time.Millisecond*100, + framework.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + source.Delete(obj.(runtime.Object)) + }, + DeleteFunc: func(obj interface{}) { + key, err := framework.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + key = "oops something went wrong with the key" + } + + // Record some output when items are deleted. + outputSetLock.Lock() + defer outputSetLock.Unlock() + outputSet.Insert(key) + }, + }, + ) + + // Run the controller and run it until we close stop. + stop := make(chan struct{}) + controller.Run(stop) + + // Let's add a few objects to the source. + for _, name := range []string{"a-hello", "b-controller", "c-framework"} { + // Note that these pods are not valid-- the fake source doesn't + // call validation or perform any other checking. + source.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: name}}) + } + + // Let's wait for the controller to process the things we just added. + time.Sleep(500 * time.Millisecond) + close(stop) + + outputSetLock.Lock() + for _, key := range outputSet.List() { + fmt.Println(key) + } + // Output: + // a-hello + // b-controller + // c-framework +} + +func TestHammerController(t *testing.T) { + // This test executes a bunch of requests through the fake source and + // controller framework to make sure there's no locking/threading + // errors. If an error happens, it should hang forever or trigger the + // race detector. + + // source simulates an apiserver object endpoint. + source := framework.NewFakeControllerSource() + + // Let's do threadsafe output to get predictable test results. + outputSetLock := sync.Mutex{} + // map of key to operations done on the key + outputSet := map[string][]string{} + + recordFunc := func(eventType string, obj interface{}) { + key, err := framework.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + t.Errorf("something wrong with key: %v", err) + key = "oops something went wrong with the key" + } + + // Record some output when items are deleted. + outputSetLock.Lock() + defer outputSetLock.Unlock() + outputSet[key] = append(outputSet[key], eventType) + } + + // Make a controller which just logs all the changes it gets. + _, controller := framework.NewInformer( + source, + &api.Pod{}, + time.Millisecond*100, + framework.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { recordFunc("add", obj) }, + UpdateFunc: func(oldObj, newObj interface{}) { recordFunc("update", newObj) }, + DeleteFunc: func(obj interface{}) { recordFunc("delete", obj) }, + }, + ) + + // Run the controller and run it until we close stop. + stop := make(chan struct{}) + go controller.Run(stop) + + wg := sync.WaitGroup{} + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + // Let's add a few objects to the source. + currentNames := util.StringSet{} + rs := rand.NewSource(rand.Int63()) + f := fuzz.New().NilChance(.5).NumElements(0, 2).RandSource(rs) + r := rand.New(rs) // Mustn't use r and f concurrently! + for i := 0; i < 750; i++ { + var name string + var isNew bool + if currentNames.Len() == 0 || r.Intn(3) == 1 { + f.Fuzz(&name) + isNew = true + } else { + l := currentNames.List() + name = l[r.Intn(len(l))] + } + + pod := &api.Pod{} + f.Fuzz(pod) + pod.ObjectMeta.Name = name + pod.ObjectMeta.Namespace = "default" + // Add, update, or delete randomly. + // Note that these pods are not valid-- the fake source doesn't + // call validation or perform any other checking. + if isNew { + currentNames.Insert(name) + source.Add(pod) + continue + } + switch r.Intn(2) { + case 0: + currentNames.Insert(name) + source.Modify(pod) + case 1: + currentNames.Delete(name) + source.Delete(pod) + } + } + }() + } + wg.Wait() + + // Let's wait for the controller to finish processing the things we just added. + time.Sleep(100 * time.Millisecond) + close(stop) + + outputSetLock.Lock() + t.Logf("got: %#v", outputSet) +} + +func TestUpdate(t *testing.T) { + // This test is going to exercise the various paths that result in a + // call to update. + + // source simulates an apiserver object endpoint. + source := framework.NewFakeControllerSource() + + const ( + FROM = "from" + ADD_MISSED = "missed the add event" + TO = "to" + ) + + // These are the transitions we expect to see; because this is + // asynchronous, there are a lot of valid possibilities. + type pair struct{ from, to string } + allowedTransitions := map[pair]bool{ + pair{FROM, TO}: true, + pair{FROM, ADD_MISSED}: true, + pair{ADD_MISSED, TO}: true, + + // Because a resync can happen when we've already observed one + // of the above but before the item is deleted. + pair{TO, TO}: true, + // Because a resync could happen before we observe an update. + pair{FROM, FROM}: true, + } + + var testDoneWG sync.WaitGroup + + // Make a controller that immediately deletes anything added to it, and + // logs anything deleted. + _, controller := framework.NewInformer( + source, + &api.Pod{}, + time.Millisecond*1, + framework.ResourceEventHandlerFuncs{ + UpdateFunc: func(oldObj, newObj interface{}) { + o, n := oldObj.(*api.Pod), newObj.(*api.Pod) + from, to := o.Labels["check"], n.Labels["check"] + if !allowedTransitions[pair{from, to}] { + t.Errorf("observed transition %q -> %q for %v", from, to, n.Name) + } + source.Delete(n) + }, + DeleteFunc: func(obj interface{}) { + testDoneWG.Done() + }, + }, + ) + + // Run the controller and run it until we close stop. + stop := make(chan struct{}) + go controller.Run(stop) + + pod := func(name, check string) *api.Pod { + return &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: name, + Labels: map[string]string{"check": check}, + }, + } + } + + var wg sync.WaitGroup + tests := []func(string){ + func(name string) { + defer wg.Done() + testDoneWG.Add(1) + name = "a-" + name + source.Add(pod(name, FROM)) + source.Modify(pod(name, TO)) + }, + func(name string) { + defer wg.Done() + testDoneWG.Add(1) + name = "b-" + name + source.Add(pod(name, FROM)) + source.ModifyDropWatch(pod(name, TO)) + }, + func(name string) { + defer wg.Done() + testDoneWG.Add(1) + name = "c-" + name + source.AddDropWatch(pod(name, FROM)) + source.Modify(pod(name, ADD_MISSED)) + source.Modify(pod(name, TO)) + }, + func(name string) { + defer wg.Done() + testDoneWG.Add(1) + name = "d-" + name + source.Add(pod(name, FROM)) + }, + } + + // run every test a few times, in parallel + fuzzer := fuzz.New() + for i := 0; i < 20; i++ { + for _, f := range tests { + wg.Add(1) + var name string + for len(name) < 10 { + fuzzer.Fuzz(&name) + } + go f(name) + } + } + wg.Wait() + + // Let's wait for the controller to process the things we just added. + testDoneWG.Wait() + close(stop) +} diff --git a/pkg/controller/framework/fake_controller_source.go b/pkg/controller/framework/fake_controller_source.go index d7b863ad394..3bba08aae28 100644 --- a/pkg/controller/framework/fake_controller_source.go +++ b/pkg/controller/framework/fake_controller_source.go @@ -18,6 +18,7 @@ package framework import ( "errors" + "math/rand" "strconv" "sync" @@ -51,26 +52,49 @@ type nnu struct { // Add adds an object to the set and sends an add event to watchers. // obj's ResourceVersion is set. func (f *FakeControllerSource) Add(obj runtime.Object) { - f.change(watch.Event{watch.Added, obj}) + f.Change(watch.Event{watch.Added, obj}, 1) } // Modify updates an object in the set and sends a modified event to watchers. // obj's ResourceVersion is set. func (f *FakeControllerSource) Modify(obj runtime.Object) { - f.change(watch.Event{watch.Modified, obj}) + f.Change(watch.Event{watch.Modified, obj}, 1) } // Delete deletes an object from the set and sends a delete event to watchers. // obj's ResourceVersion is set. func (f *FakeControllerSource) Delete(lastValue runtime.Object) { - f.change(watch.Event{watch.Deleted, lastValue}) + f.Change(watch.Event{watch.Deleted, lastValue}, 1) +} + +// AddDropWatch adds an object to the set but forgets to send an add event to +// watchers. +// obj's ResourceVersion is set. +func (f *FakeControllerSource) AddDropWatch(obj runtime.Object) { + f.Change(watch.Event{watch.Added, obj}, 0) +} + +// ModifyDropWatch updates an object in the set but forgets to send a modify +// event to watchers. +// obj's ResourceVersion is set. +func (f *FakeControllerSource) ModifyDropWatch(obj runtime.Object) { + f.Change(watch.Event{watch.Modified, obj}, 0) +} + +// DeleteDropWatch deletes an object from the set but forgets to send a delete +// event to watchers. +// obj's ResourceVersion is set. +func (f *FakeControllerSource) DeleteDropWatch(lastValue runtime.Object) { + f.Change(watch.Event{watch.Deleted, lastValue}, 0) } func (f *FakeControllerSource) key(meta *api.ObjectMeta) nnu { return nnu{meta.Namespace, meta.Name, meta.UID} } -func (f *FakeControllerSource) change(e watch.Event) { +// Change records the given event (setting the object's resource version) and +// sends a watch event with the specified probability. +func (f *FakeControllerSource) Change(e watch.Event, watchProbability float64) { f.lock.Lock() defer f.lock.Unlock() @@ -89,7 +113,10 @@ func (f *FakeControllerSource) change(e watch.Event) { case watch.Deleted: delete(f.items, key) } - f.broadcaster.Action(e.Type, e.Object) + + if rand.Float64() < watchProbability { + f.broadcaster.Action(e.Type, e.Object) + } } // List returns a list object, with its resource version set. From 5f7715f0e9f9d5daaa6074220dbdcc7a4e0ce7c7 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Tue, 7 Apr 2015 16:44:08 -0700 Subject: [PATCH 3/5] Make scheduler not miss deletion events even in the case of a resync. --- pkg/client/cache/store.go | 3 + plugin/pkg/scheduler/factory/factory.go | 80 +++++++++++++------------ plugin/pkg/scheduler/modeler.go | 16 ++++- plugin/pkg/scheduler/scheduler.go | 6 +- 4 files changed, 64 insertions(+), 41 deletions(-) diff --git a/pkg/client/cache/store.go b/pkg/client/cache/store.go index 4fe08ace32e..b6031d43ca3 100644 --- a/pkg/client/cache/store.go +++ b/pkg/client/cache/store.go @@ -68,6 +68,9 @@ type ExplicitKey string // The key uses the format / unless is empty, then // it's just . func MetaNamespaceKeyFunc(obj interface{}) (string, error) { + if key, ok := obj.(ExplicitKey); ok { + return string(key), nil + } meta, err := meta.Accessor(obj) if err != nil { return "", fmt.Errorf("object has no meta: %v", err) diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 319f74ffb6e..4c8dda283ee 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -26,6 +26,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" @@ -51,7 +52,11 @@ type ConfigFactory struct { // a means to list all services ServiceLister *cache.StoreToServiceLister - modeler scheduler.SystemModeler + // Close this to stop all reflectors + StopEverything chan struct{} + + scheduledPodPopulator *framework.Controller + modeler scheduler.SystemModeler } // Initializes the factory. @@ -59,13 +64,40 @@ func NewConfigFactory(client *client.Client) *ConfigFactory { c := &ConfigFactory{ Client: client, PodQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc), - ScheduledPodLister: &cache.StoreToPodLister{cache.NewStore(cache.MetaNamespaceKeyFunc)}, + ScheduledPodLister: &cache.StoreToPodLister{}, NodeLister: &cache.StoreToNodeLister{cache.NewStore(cache.MetaNamespaceKeyFunc)}, ServiceLister: &cache.StoreToServiceLister{cache.NewStore(cache.MetaNamespaceKeyFunc)}, + StopEverything: make(chan struct{}), } modeler := scheduler.NewSimpleModeler(&cache.StoreToPodLister{c.PodQueue}, c.ScheduledPodLister) c.modeler = modeler c.PodLister = modeler.PodLister() + + // On add/delete to the scheduled pods, remove from the assumed pods. + // We construct this here instead of in CreateFromKeys because + // ScheduledPodLister is something we provide to plug in functions that + // they may need to call. + c.ScheduledPodLister.Store, c.scheduledPodPopulator = framework.NewInformer( + c.createAssignedPodLW(), + &api.Pod{}, + 0, + framework.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + if pod, ok := obj.(*api.Pod); ok { + c.modeler.ForgetPod(pod) + } + }, + DeleteFunc: func(obj interface{}) { + switch t := obj.(type) { + case *api.Pod: + c.modeler.ForgetPod(t) + case cache.DeletedFinalStateUnknown: + c.modeler.ForgetPodByKey(t.Key) + } + }, + }, + ) + return c } @@ -109,21 +141,6 @@ func (f *ConfigFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler return f.CreateFromKeys(predicateKeys, priorityKeys) } -// ReflectorDeletionHook passes all operations through to Store, but calls -// OnDelete in a goroutine if there is a deletion. -type ReflectorDeletionHook struct { - cache.Store - OnDelete func(obj interface{}) -} - -func (r ReflectorDeletionHook) Delete(obj interface{}) error { - go func() { - defer util.HandleCrash() - r.OnDelete(obj) - }() - return r.Store.Delete(obj) -} - // Creates a scheduler from a set of registered fit predicate keys and priority keys. func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSet) (*scheduler.Config, error) { glog.V(2).Infof("creating scheduler with fit predicates '%v' and priority functions '%v", predicateKeys, priorityKeys) @@ -144,39 +161,25 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe } // Watch and queue pods that need scheduling. - cache.NewReflector(f.createUnassignedPodLW(), &api.Pod{}, f.PodQueue, 0).Run() + cache.NewReflector(f.createUnassignedPodLW(), &api.Pod{}, f.PodQueue, 0).RunUntil(f.StopEverything) - // Pass through all events to the scheduled pod store, but on a deletion, - // also remove from the assumed pods. - assumedPodDeleter := ReflectorDeletionHook{ - Store: f.ScheduledPodLister.Store, - OnDelete: func(obj interface{}) { - if pod, ok := obj.(*api.Pod); ok { - f.modeler.LockedAction(func() { - f.modeler.ForgetPod(pod) - }) - } - }, - } - - // Watch and cache all running pods. Scheduler needs to find all pods - // so it knows where it's safe to place a pod. Cache this locally. - cache.NewReflector(f.createAssignedPodLW(), &api.Pod{}, assumedPodDeleter, 0).Run() + // Begin populating scheduled pods. + f.scheduledPodPopulator.Run(f.StopEverything) // Watch minions. // Minions may be listed frequently, so provide a local up-to-date cache. if false { // Disable this code until minions support watches. Note when this code is enabled, // we need to make sure minion ListWatcher has proper FieldSelector. - cache.NewReflector(f.createMinionLW(), &api.Node{}, f.NodeLister.Store, 0).Run() + cache.NewReflector(f.createMinionLW(), &api.Node{}, f.NodeLister.Store, 10*time.Second).RunUntil(f.StopEverything) } else { - cache.NewPoller(f.pollMinions, 10*time.Second, f.NodeLister.Store).Run() + cache.NewPoller(f.pollMinions, 10*time.Second, f.NodeLister.Store).RunUntil(f.StopEverything) } // Watch and cache all service objects. Scheduler needs to find all pods // created by the same service, so that it can spread them correctly. // Cache this locally. - cache.NewReflector(f.createServiceLW(), &api.Service{}, f.ServiceLister.Store, 0).Run() + cache.NewReflector(f.createServiceLW(), &api.Service{}, f.ServiceLister.Store, 0).RunUntil(f.StopEverything) r := rand.New(rand.NewSource(time.Now().UnixNano())) @@ -200,7 +203,8 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe glog.V(2).Infof("About to try and schedule pod %v", pod.Name) return pod }, - Error: f.makeDefaultErrorFunc(&podBackoff, f.PodQueue), + Error: f.makeDefaultErrorFunc(&podBackoff, f.PodQueue), + StopEverything: f.StopEverything, }, nil } diff --git a/plugin/pkg/scheduler/modeler.go b/plugin/pkg/scheduler/modeler.go index 0e7839b579c..a80187477ea 100644 --- a/plugin/pkg/scheduler/modeler.go +++ b/plugin/pkg/scheduler/modeler.go @@ -57,8 +57,9 @@ func (a *actionLocker) LockedAction(do func()) { // FakeModeler implements the SystemModeler interface. type FakeModeler struct { - AssumePodFunc func(pod *api.Pod) - ForgetPodFunc func(pod *api.Pod) + AssumePodFunc func(pod *api.Pod) + ForgetPodFunc func(pod *api.Pod) + ForgetPodByKeyFunc func(key string) actionLocker } @@ -76,6 +77,13 @@ func (f *FakeModeler) ForgetPod(pod *api.Pod) { } } +// ForgetPodByKey calls the function variable if it is not nil. +func (f *FakeModeler) ForgetPodByKey(key string) { + if f.ForgetPodFunc != nil { + f.ForgetPodByKeyFunc(key) + } +} + // SimpleModeler implements the SystemModeler interface with a timed pod cache. type SimpleModeler struct { queuedPods ExtendedPodLister @@ -110,6 +118,10 @@ func (s *SimpleModeler) ForgetPod(pod *api.Pod) { s.assumedPods.Delete(pod) } +func (s *SimpleModeler) ForgetPodByKey(key string) { + s.assumedPods.Delete(cache.ExplicitKey(key)) +} + // Extract names for readable logging. func podNames(pods []api.Pod) []string { out := make([]string, len(pods)) diff --git a/plugin/pkg/scheduler/scheduler.go b/plugin/pkg/scheduler/scheduler.go index bbee8012500..1ff417b97c5 100644 --- a/plugin/pkg/scheduler/scheduler.go +++ b/plugin/pkg/scheduler/scheduler.go @@ -51,6 +51,7 @@ type SystemModeler interface { // show the absence of the given pod if the pod is in the scheduled // pods list!) ForgetPod(pod *api.Pod) + ForgetPodByKey(key string) // For serializing calls to Assume/ForgetPod: imagine you want to add // a pod iff a bind succeeds, but also remove a pod if it is deleted. @@ -85,6 +86,9 @@ type Config struct { // Recorder is the EventRecorder to use Recorder record.EventRecorder + + // Close this to shut down the scheduler. + StopEverything chan struct{} } // New returns a new scheduler. @@ -98,7 +102,7 @@ func New(c *Config) *Scheduler { // Run begins watching and scheduling. It starts a goroutine and returns immediately. func (s *Scheduler) Run() { - go util.Forever(s.scheduleOne, 0) + go util.Until(s.scheduleOne, 0, s.config.StopEverything) } func (s *Scheduler) scheduleOne() { From 6835318d1e98aeb3c2f27b6f0f1e30a964996836 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Tue, 7 Apr 2015 17:40:30 -0700 Subject: [PATCH 4/5] switch to require 'go controller.Run()' --- pkg/controller/framework/controller.go | 5 +++-- pkg/controller/framework/controller_test.go | 4 ++-- plugin/pkg/scheduler/factory/factory.go | 2 +- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/controller/framework/controller.go b/pkg/controller/framework/controller.go index a55e09dd6fd..b732d92b7a7 100644 --- a/pkg/controller/framework/controller.go +++ b/pkg/controller/framework/controller.go @@ -74,8 +74,9 @@ func New(c *Config) *Controller { // Run begins processing items, and will continue until a value is sent down stopCh. // It's an error to call Run more than once. -// Run does not block. +// Run blocks; call via go. func (c *Controller) Run(stopCh <-chan struct{}) { + defer util.HandleCrash() cache.NewReflector( c.config.ListerWatcher, c.config.ObjectType, @@ -83,7 +84,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) { c.config.FullResyncPeriod, ).RunUntil(stopCh) - go util.Until(c.processLoop, time.Second, stopCh) + util.Until(c.processLoop, time.Second, stopCh) } // processLoop drains the work queue. diff --git a/pkg/controller/framework/controller_test.go b/pkg/controller/framework/controller_test.go index bbcf8719956..db9e1c747d7 100644 --- a/pkg/controller/framework/controller_test.go +++ b/pkg/controller/framework/controller_test.go @@ -94,7 +94,7 @@ func Example() { // Create the controller and run it until we close stop. stop := make(chan struct{}) - framework.New(cfg).Run(stop) + go framework.New(cfg).Run(stop) // Let's add a few objects to the source. for _, name := range []string{"a-hello", "b-controller", "c-framework"} { @@ -151,7 +151,7 @@ func ExampleInformer() { // Run the controller and run it until we close stop. stop := make(chan struct{}) - controller.Run(stop) + go controller.Run(stop) // Let's add a few objects to the source. for _, name := range []string{"a-hello", "b-controller", "c-framework"} { diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 4c8dda283ee..d0084d13576 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -164,7 +164,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe cache.NewReflector(f.createUnassignedPodLW(), &api.Pod{}, f.PodQueue, 0).RunUntil(f.StopEverything) // Begin populating scheduled pods. - f.scheduledPodPopulator.Run(f.StopEverything) + go f.scheduledPodPopulator.Run(f.StopEverything) // Watch minions. // Minions may be listed frequently, so provide a local up-to-date cache. From 395d69641ebcbd0fb466629d6aa461a45e57d854 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Fri, 10 Apr 2015 13:11:26 -0700 Subject: [PATCH 5/5] fix race --- pkg/controller/framework/controller_test.go | 35 ++++++++----------- .../framework/fake_controller_source.go | 27 ++++++++++++-- pkg/conversion/deep_copy.go | 23 +++++++----- 3 files changed, 52 insertions(+), 33 deletions(-) diff --git a/pkg/controller/framework/controller_test.go b/pkg/controller/framework/controller_test.go index db9e1c747d7..924a8c8289f 100644 --- a/pkg/controller/framework/controller_test.go +++ b/pkg/controller/framework/controller_test.go @@ -218,8 +218,9 @@ func TestHammerController(t *testing.T) { go controller.Run(stop) wg := sync.WaitGroup{} - for i := 0; i < 10; i++ { - wg.Add(1) + const threads = 3 + wg.Add(threads) + for i := 0; i < threads; i++ { go func() { defer wg.Done() // Let's add a few objects to the source. @@ -227,7 +228,7 @@ func TestHammerController(t *testing.T) { rs := rand.NewSource(rand.Int63()) f := fuzz.New().NilChance(.5).NumElements(0, 2).RandSource(rs) r := rand.New(rs) // Mustn't use r and f concurrently! - for i := 0; i < 750; i++ { + for i := 0; i < 100; i++ { var name string var isNew bool if currentNames.Len() == 0 || r.Intn(3) == 1 { @@ -335,48 +336,40 @@ func TestUpdate(t *testing.T) { } } - var wg sync.WaitGroup tests := []func(string){ func(name string) { - defer wg.Done() - testDoneWG.Add(1) name = "a-" + name source.Add(pod(name, FROM)) source.Modify(pod(name, TO)) }, func(name string) { - defer wg.Done() - testDoneWG.Add(1) name = "b-" + name source.Add(pod(name, FROM)) source.ModifyDropWatch(pod(name, TO)) }, func(name string) { - defer wg.Done() - testDoneWG.Add(1) name = "c-" + name source.AddDropWatch(pod(name, FROM)) source.Modify(pod(name, ADD_MISSED)) source.Modify(pod(name, TO)) }, func(name string) { - defer wg.Done() - testDoneWG.Add(1) name = "d-" + name source.Add(pod(name, FROM)) }, } // run every test a few times, in parallel - fuzzer := fuzz.New() - for i := 0; i < 20; i++ { - for _, f := range tests { - wg.Add(1) - var name string - for len(name) < 10 { - fuzzer.Fuzz(&name) - } - go f(name) + const threads = 3 + var wg sync.WaitGroup + wg.Add(threads * len(tests)) + testDoneWG.Add(threads * len(tests)) + for i := 0; i < threads; i++ { + for j, f := range tests { + go func(name string, f func(string)) { + defer wg.Done() + f(name) + }(fmt.Sprintf("%v-%v", i, j), f) } } wg.Wait() diff --git a/pkg/controller/framework/fake_controller_source.go b/pkg/controller/framework/fake_controller_source.go index 3bba08aae28..68d797ee641 100644 --- a/pkg/controller/framework/fake_controller_source.go +++ b/pkg/controller/framework/fake_controller_source.go @@ -23,6 +23,7 @@ import ( "sync" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/conversion" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" @@ -125,8 +126,15 @@ func (f *FakeControllerSource) List() (runtime.Object, error) { defer f.lock.RUnlock() list := make([]runtime.Object, 0, len(f.items)) for _, obj := range f.items { - // TODO: should copy obj first - list = append(list, obj) + // Must make a copy to allow clients to modify the object. + // Otherwise, if they make a change and write it back, they + // will inadvertantly change the our canonical copy (in + // addition to racing with other clients). + objCopy, err := conversion.DeepCopy(obj) + if err != nil { + return nil, err + } + list = append(list, objCopy.(runtime.Object)) } listObj := &api.List{} if err := runtime.SetList(listObj, list); err != nil { @@ -151,7 +159,20 @@ func (f *FakeControllerSource) Watch(resourceVersion string) (watch.Interface, e return nil, err } if rc < len(f.changes) { - return f.broadcaster.WatchWithPrefix(f.changes[rc:]), nil + changes := []watch.Event{} + for _, c := range f.changes[rc:] { + // Must make a copy to allow clients to modify the + // object. Otherwise, if they make a change and write + // it back, they will inadvertantly change the our + // canonical copy (in addition to racing with other + // clients). + objCopy, err := conversion.DeepCopy(c.Object) + if err != nil { + return nil, err + } + changes = append(changes, watch.Event{c.Type, objCopy.(runtime.Object)}) + } + return f.broadcaster.WatchWithPrefix(changes), nil } else if rc > len(f.changes) { return nil, errors.New("resource version in the future not supported by this fake") } diff --git a/pkg/conversion/deep_copy.go b/pkg/conversion/deep_copy.go index ee4be1f6436..ba73f068f69 100644 --- a/pkg/conversion/deep_copy.go +++ b/pkg/conversion/deep_copy.go @@ -17,22 +17,27 @@ limitations under the License. package conversion import ( + "bytes" + "encoding/gob" "reflect" ) -var deepCopier = NewConverter() - // DeepCopy makes a deep copy of source. Won't work for any private fields! // For nil slices, will return 0-length slices. These are equivilent in // basically every way except for the way that reflect.DeepEqual checks. func DeepCopy(source interface{}) (interface{}, error) { - src := reflect.ValueOf(source) - v := reflect.New(src.Type()).Elem() - s := &scope{ - converter: deepCopier, - } - if err := deepCopier.convert(src, v, s); err != nil { + v := reflect.New(reflect.TypeOf(source)) + + buff := &bytes.Buffer{} + enc := gob.NewEncoder(buff) + dec := gob.NewDecoder(buff) + err := enc.Encode(source) + if err != nil { return nil, err } - return v.Interface(), nil + err = dec.Decode(v.Interface()) + if err != nil { + return nil, err + } + return v.Elem().Interface(), nil }