diff --git a/pkg/client/cache/store.go b/pkg/client/cache/store.go index 4fe08ace32e..b6031d43ca3 100644 --- a/pkg/client/cache/store.go +++ b/pkg/client/cache/store.go @@ -68,6 +68,9 @@ type ExplicitKey string // The key uses the format / unless is empty, then // it's just . func MetaNamespaceKeyFunc(obj interface{}) (string, error) { + if key, ok := obj.(ExplicitKey); ok { + return string(key), nil + } meta, err := meta.Accessor(obj) if err != nil { return "", fmt.Errorf("object has no meta: %v", err) diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 319f74ffb6e..4c8dda283ee 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -26,6 +26,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" @@ -51,7 +52,11 @@ type ConfigFactory struct { // a means to list all services ServiceLister *cache.StoreToServiceLister - modeler scheduler.SystemModeler + // Close this to stop all reflectors + StopEverything chan struct{} + + scheduledPodPopulator *framework.Controller + modeler scheduler.SystemModeler } // Initializes the factory. @@ -59,13 +64,40 @@ func NewConfigFactory(client *client.Client) *ConfigFactory { c := &ConfigFactory{ Client: client, PodQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc), - ScheduledPodLister: &cache.StoreToPodLister{cache.NewStore(cache.MetaNamespaceKeyFunc)}, + ScheduledPodLister: &cache.StoreToPodLister{}, NodeLister: &cache.StoreToNodeLister{cache.NewStore(cache.MetaNamespaceKeyFunc)}, ServiceLister: &cache.StoreToServiceLister{cache.NewStore(cache.MetaNamespaceKeyFunc)}, + StopEverything: make(chan struct{}), } modeler := scheduler.NewSimpleModeler(&cache.StoreToPodLister{c.PodQueue}, c.ScheduledPodLister) c.modeler = modeler c.PodLister = modeler.PodLister() + + // On add/delete to the scheduled pods, remove from the assumed pods. + // We construct this here instead of in CreateFromKeys because + // ScheduledPodLister is something we provide to plug in functions that + // they may need to call. + c.ScheduledPodLister.Store, c.scheduledPodPopulator = framework.NewInformer( + c.createAssignedPodLW(), + &api.Pod{}, + 0, + framework.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + if pod, ok := obj.(*api.Pod); ok { + c.modeler.ForgetPod(pod) + } + }, + DeleteFunc: func(obj interface{}) { + switch t := obj.(type) { + case *api.Pod: + c.modeler.ForgetPod(t) + case cache.DeletedFinalStateUnknown: + c.modeler.ForgetPodByKey(t.Key) + } + }, + }, + ) + return c } @@ -109,21 +141,6 @@ func (f *ConfigFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler return f.CreateFromKeys(predicateKeys, priorityKeys) } -// ReflectorDeletionHook passes all operations through to Store, but calls -// OnDelete in a goroutine if there is a deletion. -type ReflectorDeletionHook struct { - cache.Store - OnDelete func(obj interface{}) -} - -func (r ReflectorDeletionHook) Delete(obj interface{}) error { - go func() { - defer util.HandleCrash() - r.OnDelete(obj) - }() - return r.Store.Delete(obj) -} - // Creates a scheduler from a set of registered fit predicate keys and priority keys. func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSet) (*scheduler.Config, error) { glog.V(2).Infof("creating scheduler with fit predicates '%v' and priority functions '%v", predicateKeys, priorityKeys) @@ -144,39 +161,25 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe } // Watch and queue pods that need scheduling. - cache.NewReflector(f.createUnassignedPodLW(), &api.Pod{}, f.PodQueue, 0).Run() + cache.NewReflector(f.createUnassignedPodLW(), &api.Pod{}, f.PodQueue, 0).RunUntil(f.StopEverything) - // Pass through all events to the scheduled pod store, but on a deletion, - // also remove from the assumed pods. - assumedPodDeleter := ReflectorDeletionHook{ - Store: f.ScheduledPodLister.Store, - OnDelete: func(obj interface{}) { - if pod, ok := obj.(*api.Pod); ok { - f.modeler.LockedAction(func() { - f.modeler.ForgetPod(pod) - }) - } - }, - } - - // Watch and cache all running pods. Scheduler needs to find all pods - // so it knows where it's safe to place a pod. Cache this locally. - cache.NewReflector(f.createAssignedPodLW(), &api.Pod{}, assumedPodDeleter, 0).Run() + // Begin populating scheduled pods. + f.scheduledPodPopulator.Run(f.StopEverything) // Watch minions. // Minions may be listed frequently, so provide a local up-to-date cache. if false { // Disable this code until minions support watches. Note when this code is enabled, // we need to make sure minion ListWatcher has proper FieldSelector. - cache.NewReflector(f.createMinionLW(), &api.Node{}, f.NodeLister.Store, 0).Run() + cache.NewReflector(f.createMinionLW(), &api.Node{}, f.NodeLister.Store, 10*time.Second).RunUntil(f.StopEverything) } else { - cache.NewPoller(f.pollMinions, 10*time.Second, f.NodeLister.Store).Run() + cache.NewPoller(f.pollMinions, 10*time.Second, f.NodeLister.Store).RunUntil(f.StopEverything) } // Watch and cache all service objects. Scheduler needs to find all pods // created by the same service, so that it can spread them correctly. // Cache this locally. - cache.NewReflector(f.createServiceLW(), &api.Service{}, f.ServiceLister.Store, 0).Run() + cache.NewReflector(f.createServiceLW(), &api.Service{}, f.ServiceLister.Store, 0).RunUntil(f.StopEverything) r := rand.New(rand.NewSource(time.Now().UnixNano())) @@ -200,7 +203,8 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe glog.V(2).Infof("About to try and schedule pod %v", pod.Name) return pod }, - Error: f.makeDefaultErrorFunc(&podBackoff, f.PodQueue), + Error: f.makeDefaultErrorFunc(&podBackoff, f.PodQueue), + StopEverything: f.StopEverything, }, nil } diff --git a/plugin/pkg/scheduler/modeler.go b/plugin/pkg/scheduler/modeler.go index 0e7839b579c..a80187477ea 100644 --- a/plugin/pkg/scheduler/modeler.go +++ b/plugin/pkg/scheduler/modeler.go @@ -57,8 +57,9 @@ func (a *actionLocker) LockedAction(do func()) { // FakeModeler implements the SystemModeler interface. type FakeModeler struct { - AssumePodFunc func(pod *api.Pod) - ForgetPodFunc func(pod *api.Pod) + AssumePodFunc func(pod *api.Pod) + ForgetPodFunc func(pod *api.Pod) + ForgetPodByKeyFunc func(key string) actionLocker } @@ -76,6 +77,13 @@ func (f *FakeModeler) ForgetPod(pod *api.Pod) { } } +// ForgetPodByKey calls the function variable if it is not nil. +func (f *FakeModeler) ForgetPodByKey(key string) { + if f.ForgetPodFunc != nil { + f.ForgetPodByKeyFunc(key) + } +} + // SimpleModeler implements the SystemModeler interface with a timed pod cache. type SimpleModeler struct { queuedPods ExtendedPodLister @@ -110,6 +118,10 @@ func (s *SimpleModeler) ForgetPod(pod *api.Pod) { s.assumedPods.Delete(pod) } +func (s *SimpleModeler) ForgetPodByKey(key string) { + s.assumedPods.Delete(cache.ExplicitKey(key)) +} + // Extract names for readable logging. func podNames(pods []api.Pod) []string { out := make([]string, len(pods)) diff --git a/plugin/pkg/scheduler/scheduler.go b/plugin/pkg/scheduler/scheduler.go index bbee8012500..1ff417b97c5 100644 --- a/plugin/pkg/scheduler/scheduler.go +++ b/plugin/pkg/scheduler/scheduler.go @@ -51,6 +51,7 @@ type SystemModeler interface { // show the absence of the given pod if the pod is in the scheduled // pods list!) ForgetPod(pod *api.Pod) + ForgetPodByKey(key string) // For serializing calls to Assume/ForgetPod: imagine you want to add // a pod iff a bind succeeds, but also remove a pod if it is deleted. @@ -85,6 +86,9 @@ type Config struct { // Recorder is the EventRecorder to use Recorder record.EventRecorder + + // Close this to shut down the scheduler. + StopEverything chan struct{} } // New returns a new scheduler. @@ -98,7 +102,7 @@ func New(c *Config) *Scheduler { // Run begins watching and scheduling. It starts a goroutine and returns immediately. func (s *Scheduler) Run() { - go util.Forever(s.scheduleOne, 0) + go util.Until(s.scheduleOne, 0, s.config.StopEverything) } func (s *Scheduler) scheduleOne() {