From eb2f5153b378dd235d9f91233098fc543562fa63 Mon Sep 17 00:00:00 2001 From: Marek Grabowski Date: Wed, 2 Mar 2016 15:38:56 +0100 Subject: [PATCH] Revert "Scheduler: replace system modeler with scheduler cache and do O(1) lookup for resource req" --- .../algorithm/priorities/priorities.go | 30 ++- .../algorithm/priorities/priorities_test.go | 1 + plugin/pkg/scheduler/extender_test.go | 3 +- plugin/pkg/scheduler/factory/factory.go | 68 ++---- plugin/pkg/scheduler/generic_scheduler.go | 18 +- .../pkg/scheduler/generic_scheduler_test.go | 3 +- plugin/pkg/scheduler/modeler.go | 197 ++++++++++++++++++ plugin/pkg/scheduler/modeler_test.go | 111 ++++++++++ plugin/pkg/scheduler/scheduler.go | 56 +++-- plugin/pkg/scheduler/scheduler_test.go | 65 +++--- plugin/pkg/scheduler/testing/fake_cache.go | 48 ----- plugin/pkg/scheduler/testing/pods_to_cache.go | 52 ----- 12 files changed, 433 insertions(+), 219 deletions(-) create mode 100644 plugin/pkg/scheduler/modeler.go create mode 100644 plugin/pkg/scheduler/modeler_test.go delete mode 100644 plugin/pkg/scheduler/testing/fake_cache.go delete mode 100644 plugin/pkg/scheduler/testing/pods_to_cache.go diff --git a/plugin/pkg/scheduler/algorithm/priorities/priorities.go b/plugin/pkg/scheduler/algorithm/priorities/priorities.go index 9f94bcb0d43..b6fb21cb2e1 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/priorities.go +++ b/plugin/pkg/scheduler/algorithm/priorities/priorities.go @@ -44,12 +44,19 @@ func calculateScore(requested int64, capacity int64, node string) int { // Calculate the resource occupancy on a node. 'node' has information about the resources on the node. // 'pods' is a list of pods currently scheduled on the node. -func calculateResourceOccupancy(pod *api.Pod, node api.Node, nodeInfo *schedulercache.NodeInfo) schedulerapi.HostPriority { - totalMilliCPU := nodeInfo.NonZeroRequest().MilliCPU - totalMemory := nodeInfo.NonZeroRequest().Memory +func calculateResourceOccupancy(pod *api.Pod, node api.Node, pods []*api.Pod) schedulerapi.HostPriority { + totalMilliCPU := int64(0) + totalMemory := int64(0) capacityMilliCPU := node.Status.Allocatable.Cpu().MilliValue() capacityMemory := node.Status.Allocatable.Memory().Value() + for _, existingPod := range pods { + for _, container := range existingPod.Spec.Containers { + cpu, memory := priorityutil.GetNonzeroRequests(&container.Resources.Requests) + totalMilliCPU += cpu + totalMemory += memory + } + } // Add the resources requested by the current pod being scheduled. // This also helps differentiate between differently sized, but empty, nodes. for _, container := range pod.Spec.Containers { @@ -86,7 +93,7 @@ func LeastRequestedPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulerca list := schedulerapi.HostPriorityList{} for _, node := range nodes.Items { - list = append(list, calculateResourceOccupancy(pod, node, nodeNameToInfo[node.Name])) + list = append(list, calculateResourceOccupancy(pod, node, nodeNameToInfo[node.Name].Pods())) } return list, nil } @@ -220,15 +227,22 @@ func BalancedResourceAllocation(pod *api.Pod, nodeNameToInfo map[string]*schedul list := schedulerapi.HostPriorityList{} for _, node := range nodes.Items { - list = append(list, calculateBalancedResourceAllocation(pod, node, nodeNameToInfo[node.Name])) + list = append(list, calculateBalancedResourceAllocation(pod, node, nodeNameToInfo[node.Name].Pods())) } return list, nil } -func calculateBalancedResourceAllocation(pod *api.Pod, node api.Node, nodeInfo *schedulercache.NodeInfo) schedulerapi.HostPriority { - totalMilliCPU := nodeInfo.NonZeroRequest().MilliCPU - totalMemory := nodeInfo.NonZeroRequest().Memory +func calculateBalancedResourceAllocation(pod *api.Pod, node api.Node, pods []*api.Pod) schedulerapi.HostPriority { + totalMilliCPU := int64(0) + totalMemory := int64(0) score := int(0) + for _, existingPod := range pods { + for _, container := range existingPod.Spec.Containers { + cpu, memory := priorityutil.GetNonzeroRequests(&container.Resources.Requests) + totalMilliCPU += cpu + totalMemory += memory + } + } // Add the resources requested by the current pod being scheduled. // This also helps differentiate between differently sized, but empty, nodes. for _, container := range pod.Spec.Containers { diff --git a/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go b/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go index b3d8c320521..4ea1122e62a 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go @@ -140,6 +140,7 @@ func TestZeroRequest(t *testing.T) { list, err := scheduler.PrioritizeNodes( test.pod, nodeNameToInfo, + algorithm.FakePodLister(test.pods), // This should match the configuration in defaultPriorities() in // plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go if you want // to test what's actually in production. diff --git a/plugin/pkg/scheduler/extender_test.go b/plugin/pkg/scheduler/extender_test.go index 2c04e6d8f0f..ad869c9c473 100644 --- a/plugin/pkg/scheduler/extender_test.go +++ b/plugin/pkg/scheduler/extender_test.go @@ -25,7 +25,6 @@ import ( "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" - schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing" ) type fitPredicate func(pod *api.Pod, node *api.Node) (bool, error) @@ -286,7 +285,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { for ii := range test.extenders { extenders = append(extenders, &test.extenders[ii]) } - scheduler := NewGenericScheduler(schedulertesting.PodsToCache(test.pods), test.predicates, test.prioritizers, extenders, random) + scheduler := NewGenericScheduler(test.predicates, test.prioritizers, extenders, algorithm.FakePodLister(test.pods), random) machine, err := scheduler.Schedule(test.pod, algorithm.FakeNodeLister(makeNodeList(test.nodes))) if test.expectsErr { if err == nil { diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index e03981dd4c4..3c5c7a7e701 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -42,7 +42,6 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/apis/extensions" - "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) const ( @@ -75,7 +74,7 @@ type ConfigFactory struct { StopEverything chan struct{} scheduledPodPopulator *framework.Controller - schedulerCache schedulercache.Cache + modeler scheduler.SystemModeler // SchedulerName of a scheduler is used to select which pods will be // processed by this scheduler, based on pods's annotation key: @@ -85,9 +84,6 @@ type ConfigFactory struct { // Initializes the factory. func NewConfigFactory(client *client.Client, schedulerName string) *ConfigFactory { - stopEverything := make(chan struct{}) - schedulerCache := schedulercache.New(30*time.Second, stopEverything) - c := &ConfigFactory{ Client: client, PodQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc), @@ -99,12 +95,12 @@ func NewConfigFactory(client *client.Client, schedulerName string) *ConfigFactor ServiceLister: &cache.StoreToServiceLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, ControllerLister: &cache.StoreToReplicationControllerLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, ReplicaSetLister: &cache.StoreToReplicaSetLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, - schedulerCache: schedulerCache, - StopEverything: stopEverything, + StopEverything: make(chan struct{}), SchedulerName: schedulerName, } - - c.PodLister = schedulerCache + modeler := scheduler.NewSimpleModeler(&cache.StoreToPodLister{Store: c.PodQueue}, c.ScheduledPodLister) + c.modeler = modeler + c.PodLister = modeler.PodLister() // On add/delete to the scheduled pods, remove from the assumed pods. // We construct this here instead of in CreateFromKeys because @@ -116,49 +112,21 @@ func NewConfigFactory(client *client.Client, schedulerName string) *ConfigFactor 0, framework.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - pod, ok := obj.(*api.Pod) - if !ok { - glog.Errorf("cannot convert to *api.Pod") - return - } - if err := schedulerCache.AddPod(pod); err != nil { - glog.Errorf("scheduler cache AddPod failed: %v", err) - } - }, - UpdateFunc: func(oldObj, newObj interface{}) { - oldPod, ok := oldObj.(*api.Pod) - if !ok { - glog.Errorf("cannot convert to *api.Pod") - return - } - newPod, ok := newObj.(*api.Pod) - if !ok { - glog.Errorf("cannot convert to *api.Pod") - return - } - if err := schedulerCache.UpdatePod(oldPod, newPod); err != nil { - glog.Errorf("scheduler cache UpdatePod failed: %v", err) + if pod, ok := obj.(*api.Pod); ok { + c.modeler.LockedAction(func() { + c.modeler.ForgetPod(pod) + }) } }, DeleteFunc: func(obj interface{}) { - var pod *api.Pod - switch t := obj.(type) { - case *api.Pod: - pod = t - case cache.DeletedFinalStateUnknown: - var ok bool - pod, ok = t.Obj.(*api.Pod) - if !ok { - glog.Errorf("cannot convert to *api.Pod") - return + c.modeler.LockedAction(func() { + switch t := obj.(type) { + case *api.Pod: + c.modeler.ForgetPod(t) + case cache.DeletedFinalStateUnknown: + c.modeler.ForgetPodByKey(t.Key) } - default: - glog.Errorf("cannot convert to *api.Pod") - return - } - if err := schedulerCache.RemovePod(pod); err != nil { - glog.Errorf("scheduler cache RemovePod failed: %v", err) - } + }) }, }, ) @@ -273,7 +241,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, r := rand.New(rand.NewSource(time.Now().UnixNano())) - algo := scheduler.NewGenericScheduler(f.schedulerCache, predicateFuncs, priorityConfigs, extenders, r) + algo := scheduler.NewGenericScheduler(predicateFuncs, priorityConfigs, extenders, f.PodLister, r) podBackoff := podBackoff{ perPodBackoff: map[types.NamespacedName]*backoffEntry{}, @@ -284,7 +252,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, } return &scheduler.Config{ - SchedulerCache: f.schedulerCache, + Modeler: f.modeler, // The scheduler only needs to consider schedulable nodes. NodeLister: f.NodeLister.NodeCondition(getNodeConditionPredicate()), Algorithm: algo, diff --git a/plugin/pkg/scheduler/generic_scheduler.go b/plugin/pkg/scheduler/generic_scheduler.go index b895d25ca6e..e1c8a7df918 100644 --- a/plugin/pkg/scheduler/generic_scheduler.go +++ b/plugin/pkg/scheduler/generic_scheduler.go @@ -25,6 +25,7 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" @@ -54,10 +55,10 @@ func (f *FitError) Error() string { } type genericScheduler struct { - cache schedulercache.Cache predicates map[string]algorithm.FitPredicate prioritizers []algorithm.PriorityConfig extenders []algorithm.SchedulerExtender + pods algorithm.PodLister random *rand.Rand randomLock sync.Mutex } @@ -74,12 +75,13 @@ func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeListe return "", ErrNoNodesAvailable } - // Used for all fit and priority funcs. - nodeNameToInfo, err := g.cache.GetNodeNameToInfoMap() + // TODO: we should compute this once and dynamically update it using Watch, not constantly re-compute. + // But at least we're now only doing it in one place + pods, err := g.pods.List(labels.Everything()) if err != nil { return "", err } - + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(pods) filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, nodeNameToInfo, g.predicates, nodes, g.extenders) if err != nil { return "", err @@ -92,7 +94,7 @@ func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeListe } } - priorityList, err := PrioritizeNodes(pod, nodeNameToInfo, g.prioritizers, algorithm.FakeNodeLister(filteredNodes), g.extenders) + priorityList, err := PrioritizeNodes(pod, nodeNameToInfo, g.pods, g.prioritizers, algorithm.FakeNodeLister(filteredNodes), g.extenders) if err != nil { return "", err } @@ -186,7 +188,7 @@ func findNodesThatFit(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.No // Each priority function can also have its own weight // The node scores returned by the priority function are multiplied by the weights to get weighted scores // All scores are finally combined (added) to get the total weighted scores of all nodes -func PrioritizeNodes(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, priorityConfigs []algorithm.PriorityConfig, nodeLister algorithm.NodeLister, extenders []algorithm.SchedulerExtender) (schedulerapi.HostPriorityList, error) { +func PrioritizeNodes(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, podLister algorithm.PodLister, priorityConfigs []algorithm.PriorityConfig, nodeLister algorithm.NodeLister, extenders []algorithm.SchedulerExtender) (schedulerapi.HostPriorityList, error) { result := schedulerapi.HostPriorityList{} // If no priority configs are provided, then the EqualPriority function is applied @@ -286,12 +288,12 @@ func EqualPriority(_ *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInf return result, nil } -func NewGenericScheduler(cache schedulercache.Cache, predicates map[string]algorithm.FitPredicate, prioritizers []algorithm.PriorityConfig, extenders []algorithm.SchedulerExtender, random *rand.Rand) algorithm.ScheduleAlgorithm { +func NewGenericScheduler(predicates map[string]algorithm.FitPredicate, prioritizers []algorithm.PriorityConfig, extenders []algorithm.SchedulerExtender, pods algorithm.PodLister, random *rand.Rand) algorithm.ScheduleAlgorithm { return &genericScheduler{ - cache: cache, predicates: predicates, prioritizers: prioritizers, extenders: extenders, + pods: pods, random: random, } } diff --git a/plugin/pkg/scheduler/generic_scheduler_test.go b/plugin/pkg/scheduler/generic_scheduler_test.go index 222f3fac1b5..de63abf8647 100644 --- a/plugin/pkg/scheduler/generic_scheduler_test.go +++ b/plugin/pkg/scheduler/generic_scheduler_test.go @@ -28,7 +28,6 @@ import ( "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" - schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing" ) func falsePredicate(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { @@ -257,7 +256,7 @@ func TestGenericScheduler(t *testing.T) { for _, test := range tests { random := rand.New(rand.NewSource(0)) - scheduler := NewGenericScheduler(schedulertesting.PodsToCache(test.pods), test.predicates, test.prioritizers, []algorithm.SchedulerExtender{}, random) + scheduler := NewGenericScheduler(test.predicates, test.prioritizers, []algorithm.SchedulerExtender{}, algorithm.FakePodLister(test.pods), random) machine, err := scheduler.Schedule(test.pod, algorithm.FakeNodeLister(makeNodeList(test.nodes))) if test.expectsErr { if err == nil { diff --git a/plugin/pkg/scheduler/modeler.go b/plugin/pkg/scheduler/modeler.go new file mode 100644 index 00000000000..3d91396c610 --- /dev/null +++ b/plugin/pkg/scheduler/modeler.go @@ -0,0 +1,197 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "fmt" + "strings" + "sync" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" + + "github.com/golang/glog" +) + +var ( + _ = SystemModeler(&FakeModeler{}) + _ = SystemModeler(&SimpleModeler{}) +) + +// ExtendedPodLister: SimpleModeler needs to be able to check for a pod's +// existence in addition to listing the pods. +type ExtendedPodLister interface { + algorithm.PodLister + Exists(pod *api.Pod) (bool, error) +} + +// actionLocker implements lockedAction (so the fake and SimpleModeler can both +// use it) +type actionLocker struct { + sync.Mutex +} + +// LockedAction serializes calls of whatever is passed as 'do'. +func (a *actionLocker) LockedAction(do func()) { + a.Lock() + defer a.Unlock() + do() +} + +// FakeModeler implements the SystemModeler interface. +type FakeModeler struct { + AssumePodFunc func(pod *api.Pod) + ForgetPodFunc func(pod *api.Pod) + ForgetPodByKeyFunc func(key string) + actionLocker +} + +// AssumePod calls the function variable if it is not nil. +func (f *FakeModeler) AssumePod(pod *api.Pod) { + if f.AssumePodFunc != nil { + f.AssumePodFunc(pod) + } +} + +// ForgetPod calls the function variable if it is not nil. +func (f *FakeModeler) ForgetPod(pod *api.Pod) { + if f.ForgetPodFunc != nil { + f.ForgetPodFunc(pod) + } +} + +// ForgetPodByKey calls the function variable if it is not nil. +func (f *FakeModeler) ForgetPodByKey(key string) { + if f.ForgetPodFunc != nil { + f.ForgetPodByKeyFunc(key) + } +} + +// SimpleModeler implements the SystemModeler interface with a timed pod cache. +type SimpleModeler struct { + queuedPods ExtendedPodLister + scheduledPods ExtendedPodLister + + // assumedPods holds the pods that we think we've scheduled, but that + // haven't yet shown up in the scheduledPods variable. + // TODO: periodically clear this. + assumedPods *cache.StoreToPodLister + + actionLocker +} + +// NewSimpleModeler returns a new SimpleModeler. +// queuedPods: a PodLister that will return pods that have not been scheduled yet. +// scheduledPods: a PodLister that will return pods that we know for sure have been scheduled. +func NewSimpleModeler(queuedPods, scheduledPods ExtendedPodLister) *SimpleModeler { + return &SimpleModeler{ + queuedPods: queuedPods, + scheduledPods: scheduledPods, + assumedPods: &cache.StoreToPodLister{ + Store: cache.NewTTLStore(cache.MetaNamespaceKeyFunc, 30*time.Second), + }, + } +} + +func (s *SimpleModeler) AssumePod(pod *api.Pod) { + s.assumedPods.Add(pod) +} + +func (s *SimpleModeler) ForgetPod(pod *api.Pod) { + s.assumedPods.Delete(pod) +} + +func (s *SimpleModeler) ForgetPodByKey(key string) { + s.assumedPods.Delete(cache.ExplicitKey(key)) +} + +// Extract names for readable logging. +func podNames(pods []*api.Pod) []string { + out := make([]string, len(pods)) + for i := range pods { + out[i] = fmt.Sprintf("'%v/%v (%v)'", pods[i].Namespace, pods[i].Name, pods[i].UID) + } + return out +} + +func (s *SimpleModeler) listPods(selector labels.Selector) (pods []*api.Pod, err error) { + assumed, err := s.assumedPods.List(selector) + if err != nil { + return nil, err + } + // Since the assumed list will be short, just check every one. + // Goal here is to stop making assumptions about a pod once it shows + // up in one of these other lists. + for _, pod := range assumed { + qExist, err := s.queuedPods.Exists(pod) + if err != nil { + return nil, err + } + if qExist { + s.assumedPods.Store.Delete(pod) + continue + } + sExist, err := s.scheduledPods.Exists(pod) + if err != nil { + return nil, err + } + if sExist { + s.assumedPods.Store.Delete(pod) + continue + } + } + + scheduled, err := s.scheduledPods.List(selector) + if err != nil { + return nil, err + } + // Listing purges the ttl cache and re-gets, in case we deleted any entries. + assumed, err = s.assumedPods.List(selector) + if err != nil { + return nil, err + } + if len(assumed) == 0 { + return scheduled, nil + } + glog.V(2).Infof( + "listing pods: [%v] assumed to exist in addition to %v known pods.", + strings.Join(podNames(assumed), ","), + len(scheduled), + ) + return append(scheduled, assumed...), nil +} + +// PodLister returns a PodLister that will list pods that we think we have scheduled in +// addition to pods that we know have been scheduled. +func (s *SimpleModeler) PodLister() algorithm.PodLister { + return simpleModelerPods{s} +} + +// simpleModelerPods is an adaptor so that SimpleModeler can be a PodLister. +type simpleModelerPods struct { + simpleModeler *SimpleModeler +} + +// List returns pods known and assumed to exist. +func (s simpleModelerPods) List(selector labels.Selector) (pods []*api.Pod, err error) { + s.simpleModeler.LockedAction( + func() { pods, err = s.simpleModeler.listPods(selector) }) + return +} diff --git a/plugin/pkg/scheduler/modeler_test.go b/plugin/pkg/scheduler/modeler_test.go new file mode 100644 index 00000000000..ab9ff937681 --- /dev/null +++ b/plugin/pkg/scheduler/modeler_test.go @@ -0,0 +1,111 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "testing" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/labels" +) + +type nn struct { + namespace, name string +} + +type names []nn + +func (ids names) list() []*api.Pod { + out := make([]*api.Pod, 0, len(ids)) + for _, id := range ids { + out = append(out, &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Namespace: id.namespace, + Name: id.name, + }, + }) + } + return out +} + +func (ids names) has(pod *api.Pod) bool { + for _, id := range ids { + if pod.Namespace == id.namespace && pod.Name == id.name { + return true + } + } + return false +} + +func TestModeler(t *testing.T) { + table := []struct { + queuedPods []*api.Pod + scheduledPods []*api.Pod + assumedPods []*api.Pod + expectPods names + }{ + { + queuedPods: names{}.list(), + scheduledPods: names{{"default", "foo"}, {"custom", "foo"}}.list(), + assumedPods: names{{"default", "foo"}}.list(), + expectPods: names{{"default", "foo"}, {"custom", "foo"}}, + }, { + queuedPods: names{}.list(), + scheduledPods: names{{"default", "foo"}}.list(), + assumedPods: names{{"default", "foo"}, {"custom", "foo"}}.list(), + expectPods: names{{"default", "foo"}, {"custom", "foo"}}, + }, { + queuedPods: names{{"custom", "foo"}}.list(), + scheduledPods: names{{"default", "foo"}}.list(), + assumedPods: names{{"default", "foo"}, {"custom", "foo"}}.list(), + expectPods: names{{"default", "foo"}}, + }, + } + + for _, item := range table { + q := &cache.StoreToPodLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)} + for _, pod := range item.queuedPods { + q.Store.Add(pod) + } + s := &cache.StoreToPodLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)} + for _, pod := range item.scheduledPods { + s.Store.Add(pod) + } + m := NewSimpleModeler(q, s) + for _, pod := range item.assumedPods { + m.AssumePod(pod) + } + + list, err := m.PodLister().List(labels.Everything()) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + found := 0 + for _, pod := range list { + if item.expectPods.has(pod) { + found++ + } else { + t.Errorf("found unexpected pod %#v", pod) + } + } + if e, a := item.expectPods, found; len(e) != a { + t.Errorf("Expected pods:\n%+v\nFound pods:\n%s\n", podNames(e.list()), podNames(list)) + } + } +} diff --git a/plugin/pkg/scheduler/scheduler.go b/plugin/pkg/scheduler/scheduler.go index b28833817a7..124d8240b5e 100644 --- a/plugin/pkg/scheduler/scheduler.go +++ b/plugin/pkg/scheduler/scheduler.go @@ -27,7 +27,6 @@ import ( "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/metrics" - "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" "github.com/golang/glog" ) @@ -37,6 +36,32 @@ type Binder interface { Bind(binding *api.Binding) error } +// SystemModeler can help scheduler produce a model of the system that +// anticipates reality. For example, if scheduler has pods A and B both +// using hostPort 80, when it binds A to machine M it should not bind B +// to machine M in the time when it hasn't observed the binding of A +// take effect yet. +// +// Since the model is only an optimization, it's expected to handle +// any errors itself without sending them back to the scheduler. +type SystemModeler interface { + // AssumePod assumes that the given pod exists in the system. + // The assumtion should last until the system confirms the + // assumtion or disconfirms it. + AssumePod(pod *api.Pod) + // ForgetPod removes a pod assumtion. (It won't make the model + // show the absence of the given pod if the pod is in the scheduled + // pods list!) + ForgetPod(pod *api.Pod) + ForgetPodByKey(key string) + + // For serializing calls to Assume/ForgetPod: imagine you want to add + // a pod if and only if a bind succeeds, but also remove a pod if it is deleted. + // TODO: if SystemModeler begins modeling things other than pods, this + // should probably be parameterized or specialized for pods. + LockedAction(f func()) +} + // Scheduler watches for new unscheduled pods. It attempts to find // nodes that they fit on and writes bindings back to the api server. type Scheduler struct { @@ -44,12 +69,12 @@ type Scheduler struct { } type Config struct { - // It is expected that changes made via SchedulerCache will be observed + // It is expected that changes made via modeler will be observed // by NodeLister and Algorithm. - SchedulerCache schedulercache.Cache - NodeLister algorithm.NodeLister - Algorithm algorithm.ScheduleAlgorithm - Binder Binder + Modeler SystemModeler + NodeLister algorithm.NodeLister + Algorithm algorithm.ScheduleAlgorithm + Binder Binder // NextPod should be a function that blocks until the next pod // is available. We don't use a channel for this, because scheduling @@ -104,25 +129,24 @@ func (s *Scheduler) scheduleOne() { }, } - bindAction := func() bool { + // We want to add the pod to the model if and only if the bind succeeds, + // but we don't want to race with any deletions, which happen asynchronously. + s.config.Modeler.LockedAction(func() { bindingStart := time.Now() err := s.config.Binder.Bind(b) if err != nil { glog.V(1).Infof("Failed to bind pod: %+v", err) s.config.Recorder.Eventf(pod, api.EventTypeNormal, "FailedScheduling", "Binding rejected: %v", err) s.config.Error(pod, err) - return false + return } metrics.BindingLatency.Observe(metrics.SinceInMicroseconds(bindingStart)) s.config.Recorder.Eventf(pod, api.EventTypeNormal, "Scheduled", "Successfully assigned %v to %v", pod.Name, dest) - return true - } - - assumed := *pod - assumed.Spec.NodeName = dest - // We want to assume the pod if and only if the bind succeeds, - // but we don't want to race with any deletions, which happen asynchronously. - s.config.SchedulerCache.AssumePodIfBindSucceed(&assumed, bindAction) + // tell the model to assume that this binding took effect. + assumed := *pod + assumed.Spec.NodeName = dest + s.config.Modeler.AssumePod(&assumed) + }) metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start)) } diff --git a/plugin/pkg/scheduler/scheduler_test.go b/plugin/pkg/scheduler/scheduler_test.go index ee66a6c73f0..9ce9a6ab52d 100644 --- a/plugin/pkg/scheduler/scheduler_test.go +++ b/plugin/pkg/scheduler/scheduler_test.go @@ -23,18 +23,13 @@ import ( "testing" "time" - "sync" - "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/record" - "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" - "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" - schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing" ) type fakeBinder struct { @@ -114,8 +109,8 @@ func TestScheduler(t *testing.T) { var gotAssumedPod *api.Pod var gotBinding *api.Binding c := &Config{ - SchedulerCache: &schedulertesting.FakeCache{ - AssumeFunc: func(pod *api.Pod) { + Modeler: &FakeModeler{ + AssumePodFunc: func(pod *api.Pod) { gotAssumedPod = pod }, }, @@ -166,30 +161,42 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) { eventBroadcaster := record.NewBroadcaster() defer eventBroadcaster.StartLogging(t.Logf).Stop() - // Setup stores to test pod's workflow: - // - queuedPodStore: pods queued before processing - // - scheduledPodStore: pods that has a scheduling decision + // Setup modeler so we control the contents of all 3 stores: assumed, + // scheduled and queued scheduledPodStore := cache.NewStore(cache.MetaNamespaceKeyFunc) + scheduledPodLister := &cache.StoreToPodLister{Store: scheduledPodStore} + queuedPodStore := cache.NewFIFO(cache.MetaNamespaceKeyFunc) + queuedPodLister := &cache.StoreToPodLister{Store: queuedPodStore} + + modeler := NewSimpleModeler(queuedPodLister, scheduledPodLister) + + // Create a fake clock used to timestamp entries and calculate ttl. Nothing + // will expire till we flip to something older than the ttl, at which point + // all entries inserted with fakeTime will expire. + ttl := 30 * time.Second + fakeTime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) + fakeClock := util.NewFakeClock(fakeTime) + ttlPolicy := &cache.TTLPolicy{Ttl: ttl, Clock: fakeClock} + assumedPodsStore := cache.NewFakeExpirationStore( + cache.MetaNamespaceKeyFunc, nil, ttlPolicy, fakeClock) + modeler.assumedPods = &cache.StoreToPodLister{Store: assumedPodsStore} // Port is the easiest way to cause a fit predicate failure podPort := 8080 firstPod := podWithPort("foo", "", podPort) - stop := make(chan struct{}) - defer close(stop) - cache := schedulercache.New(1*time.Second, stop) // Create the scheduler config algo := NewGenericScheduler( - cache, map[string]algorithm.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts}, []algorithm.PriorityConfig{}, []algorithm.SchedulerExtender{}, + modeler.PodLister(), rand.New(rand.NewSource(time.Now().UnixNano()))) var gotBinding *api.Binding c := &Config{ - SchedulerCache: cache, + Modeler: modeler, NodeLister: algorithm.FakeNodeLister( api.NodeList{Items: []api.Node{{ObjectMeta: api.ObjectMeta{Name: "machine1"}}}}, ), @@ -236,6 +243,10 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) { if exists { t.Errorf("Did not expect a queued pod, found %+v", pod) } + pod, exists, _ = assumedPodsStore.GetByKey("foo") + if !exists { + t.Errorf("Assumed pod store should contain stale pod") + } expectBind := &api.Binding{ ObjectMeta: api.ObjectMeta{Name: "foo"}, @@ -249,6 +260,10 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) { events.Stop() scheduledPodStore.Delete(pod) + _, exists, _ = assumedPodsStore.Get(pod) + if !exists { + t.Errorf("Expected pod %#v in assumed pod store", pod) + } secondPod := podWithPort("bar", "", podPort) queuedPodStore.Add(secondPod) @@ -256,26 +271,10 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) { // scheduledPodStore: [] // assumedPods: [foo:8080] - var wg sync.WaitGroup - wg.Add(1) - // waiting for the assumed pod to expire - go func() { - for { - pods, err := cache.List(labels.Everything()) - if err != nil { - t.Fatalf("cache.List failed: %v", err) - } - if len(pods) == 0 { - wg.Done() - return - } - time.Sleep(1 * time.Second) - } - }() - wg.Wait() - // Second scheduling pass will fail to schedule if the store hasn't expired // the deleted pod. This would normally happen with a timeout. + //expirationPolicy.NeverExpire = util.NewStringSet() + fakeClock.Step(ttl + 1) called = make(chan struct{}) events = eventBroadcaster.StartEventWatcher(func(e *api.Event) { diff --git a/plugin/pkg/scheduler/testing/fake_cache.go b/plugin/pkg/scheduler/testing/fake_cache.go deleted file mode 100644 index fe48442c3e0..00000000000 --- a/plugin/pkg/scheduler/testing/fake_cache.go +++ /dev/null @@ -1,48 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package schedulercache - -import ( - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/labels" - "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" -) - -// FakeCache is used for testing -type FakeCache struct { - AssumeFunc func(*api.Pod) -} - -func (f *FakeCache) AssumePodIfBindSucceed(pod *api.Pod, bind func() bool) error { - if !bind() { - return nil - } - f.AssumeFunc(pod) - return nil -} - -func (f *FakeCache) AddPod(pod *api.Pod) error { return nil } - -func (f *FakeCache) UpdatePod(oldPod, newPod *api.Pod) error { return nil } - -func (f *FakeCache) RemovePod(pod *api.Pod) error { return nil } - -func (f *FakeCache) GetNodeNameToInfoMap() (map[string]*schedulercache.NodeInfo, error) { - return nil, nil -} - -func (f *FakeCache) List(s labels.Selector) ([]*api.Pod, error) { return nil, nil } diff --git a/plugin/pkg/scheduler/testing/pods_to_cache.go b/plugin/pkg/scheduler/testing/pods_to_cache.go deleted file mode 100644 index 602fe96d039..00000000000 --- a/plugin/pkg/scheduler/testing/pods_to_cache.go +++ /dev/null @@ -1,52 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package schedulercache - -import ( - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/labels" - "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" -) - -// PodsToCache is used for testing -type PodsToCache []*api.Pod - -func (p PodsToCache) AssumePodIfBindSucceed(pod *api.Pod, bind func() bool) error { - if !bind() { - return nil - } - return nil -} - -func (p PodsToCache) AddPod(pod *api.Pod) error { return nil } - -func (p PodsToCache) UpdatePod(oldPod, newPod *api.Pod) error { return nil } - -func (p PodsToCache) RemovePod(pod *api.Pod) error { return nil } - -func (p PodsToCache) GetNodeNameToInfoMap() (map[string]*schedulercache.NodeInfo, error) { - return schedulercache.CreateNodeNameToInfoMap(p), nil -} - -func (p PodsToCache) List(s labels.Selector) (selected []*api.Pod, err error) { - for _, pod := range p { - if s.Matches(labels.Set(pod.Labels)) { - selected = append(selected, pod) - } - } - return selected, nil -}