diff --git a/plugin/pkg/scheduler/algorithm/priorities/priorities.go b/plugin/pkg/scheduler/algorithm/priorities/priorities.go index b6fb21cb2e1..9f94bcb0d43 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/priorities.go +++ b/plugin/pkg/scheduler/algorithm/priorities/priorities.go @@ -44,19 +44,12 @@ func calculateScore(requested int64, capacity int64, node string) int { // Calculate the resource occupancy on a node. 'node' has information about the resources on the node. // 'pods' is a list of pods currently scheduled on the node. -func calculateResourceOccupancy(pod *api.Pod, node api.Node, pods []*api.Pod) schedulerapi.HostPriority { - totalMilliCPU := int64(0) - totalMemory := int64(0) +func calculateResourceOccupancy(pod *api.Pod, node api.Node, nodeInfo *schedulercache.NodeInfo) schedulerapi.HostPriority { + totalMilliCPU := nodeInfo.NonZeroRequest().MilliCPU + totalMemory := nodeInfo.NonZeroRequest().Memory capacityMilliCPU := node.Status.Allocatable.Cpu().MilliValue() capacityMemory := node.Status.Allocatable.Memory().Value() - for _, existingPod := range pods { - for _, container := range existingPod.Spec.Containers { - cpu, memory := priorityutil.GetNonzeroRequests(&container.Resources.Requests) - totalMilliCPU += cpu - totalMemory += memory - } - } // Add the resources requested by the current pod being scheduled. // This also helps differentiate between differently sized, but empty, nodes. for _, container := range pod.Spec.Containers { @@ -93,7 +86,7 @@ func LeastRequestedPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulerca list := schedulerapi.HostPriorityList{} for _, node := range nodes.Items { - list = append(list, calculateResourceOccupancy(pod, node, nodeNameToInfo[node.Name].Pods())) + list = append(list, calculateResourceOccupancy(pod, node, nodeNameToInfo[node.Name])) } return list, nil } @@ -227,22 +220,15 @@ func BalancedResourceAllocation(pod *api.Pod, nodeNameToInfo map[string]*schedul list := schedulerapi.HostPriorityList{} for _, node := range nodes.Items { - list = append(list, calculateBalancedResourceAllocation(pod, node, nodeNameToInfo[node.Name].Pods())) + list = append(list, calculateBalancedResourceAllocation(pod, node, nodeNameToInfo[node.Name])) } return list, nil } -func calculateBalancedResourceAllocation(pod *api.Pod, node api.Node, pods []*api.Pod) schedulerapi.HostPriority { - totalMilliCPU := int64(0) - totalMemory := int64(0) +func calculateBalancedResourceAllocation(pod *api.Pod, node api.Node, nodeInfo *schedulercache.NodeInfo) schedulerapi.HostPriority { + totalMilliCPU := nodeInfo.NonZeroRequest().MilliCPU + totalMemory := nodeInfo.NonZeroRequest().Memory score := int(0) - for _, existingPod := range pods { - for _, container := range existingPod.Spec.Containers { - cpu, memory := priorityutil.GetNonzeroRequests(&container.Resources.Requests) - totalMilliCPU += cpu - totalMemory += memory - } - } // Add the resources requested by the current pod being scheduled. // This also helps differentiate between differently sized, but empty, nodes. for _, container := range pod.Spec.Containers { diff --git a/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go b/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go index 4ea1122e62a..b3d8c320521 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go @@ -140,7 +140,6 @@ func TestZeroRequest(t *testing.T) { list, err := scheduler.PrioritizeNodes( test.pod, nodeNameToInfo, - algorithm.FakePodLister(test.pods), // This should match the configuration in defaultPriorities() in // plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go if you want // to test what's actually in production. diff --git a/plugin/pkg/scheduler/extender_test.go b/plugin/pkg/scheduler/extender_test.go index ad869c9c473..2c04e6d8f0f 100644 --- a/plugin/pkg/scheduler/extender_test.go +++ b/plugin/pkg/scheduler/extender_test.go @@ -25,6 +25,7 @@ import ( "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" + schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing" ) type fitPredicate func(pod *api.Pod, node *api.Node) (bool, error) @@ -285,7 +286,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { for ii := range test.extenders { extenders = append(extenders, &test.extenders[ii]) } - scheduler := NewGenericScheduler(test.predicates, test.prioritizers, extenders, algorithm.FakePodLister(test.pods), random) + scheduler := NewGenericScheduler(schedulertesting.PodsToCache(test.pods), test.predicates, test.prioritizers, extenders, random) machine, err := scheduler.Schedule(test.pod, algorithm.FakeNodeLister(makeNodeList(test.nodes))) if test.expectsErr { if err == nil { diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 4f785fd7853..6a291836bc1 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -42,6 +42,7 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/apis/extensions" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) const ( @@ -74,7 +75,7 @@ type ConfigFactory struct { StopEverything chan struct{} scheduledPodPopulator *framework.Controller - modeler scheduler.SystemModeler + schedulerCache schedulercache.Cache // SchedulerName of a scheduler is used to select which pods will be // processed by this scheduler, based on pods's annotation key: @@ -84,6 +85,9 @@ type ConfigFactory struct { // Initializes the factory. func NewConfigFactory(client *client.Client, schedulerName string) *ConfigFactory { + stopEverything := make(chan struct{}) + schedulerCache := schedulercache.New(30*time.Second, stopEverything) + c := &ConfigFactory{ Client: client, PodQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc), @@ -95,12 +99,12 @@ func NewConfigFactory(client *client.Client, schedulerName string) *ConfigFactor ServiceLister: &cache.StoreToServiceLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, ControllerLister: &cache.StoreToReplicationControllerLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, ReplicaSetLister: &cache.StoreToReplicaSetLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, - StopEverything: make(chan struct{}), + schedulerCache: schedulerCache, + StopEverything: stopEverything, SchedulerName: schedulerName, } - modeler := scheduler.NewSimpleModeler(&cache.StoreToPodLister{Store: c.PodQueue}, c.ScheduledPodLister) - c.modeler = modeler - c.PodLister = modeler.PodLister() + + c.PodLister = schedulerCache // On add/delete to the scheduled pods, remove from the assumed pods. // We construct this here instead of in CreateFromKeys because @@ -112,21 +116,49 @@ func NewConfigFactory(client *client.Client, schedulerName string) *ConfigFactor 0, framework.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - if pod, ok := obj.(*api.Pod); ok { - c.modeler.LockedAction(func() { - c.modeler.ForgetPod(pod) - }) + pod, ok := obj.(*api.Pod) + if !ok { + glog.Errorf("cannot convert to *api.Pod") + return + } + if err := schedulerCache.AddPod(pod); err != nil { + glog.Errorf("scheduler cache AddPod failed: %v", err) + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + oldPod, ok := oldObj.(*api.Pod) + if !ok { + glog.Errorf("cannot convert to *api.Pod") + return + } + newPod, ok := newObj.(*api.Pod) + if !ok { + glog.Errorf("cannot convert to *api.Pod") + return + } + if err := schedulerCache.UpdatePod(oldPod, newPod); err != nil { + glog.Errorf("scheduler cache UpdatePod failed: %v", err) } }, DeleteFunc: func(obj interface{}) { - c.modeler.LockedAction(func() { - switch t := obj.(type) { - case *api.Pod: - c.modeler.ForgetPod(t) - case cache.DeletedFinalStateUnknown: - c.modeler.ForgetPodByKey(t.Key) + var pod *api.Pod + switch t := obj.(type) { + case *api.Pod: + pod = t + case cache.DeletedFinalStateUnknown: + var ok bool + pod, ok = t.Obj.(*api.Pod) + if !ok { + glog.Errorf("cannot convert to *api.Pod") + return } - }) + default: + glog.Errorf("cannot convert to *api.Pod") + return + } + if err := schedulerCache.RemovePod(pod); err != nil { + glog.Errorf("scheduler cache RemovePod failed: %v", err) + } }, }, ) @@ -241,7 +273,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, r := rand.New(rand.NewSource(time.Now().UnixNano())) - algo := scheduler.NewGenericScheduler(predicateFuncs, priorityConfigs, extenders, f.PodLister, r) + algo := scheduler.NewGenericScheduler(f.schedulerCache, predicateFuncs, priorityConfigs, extenders, r) podBackoff := podBackoff{ perPodBackoff: map[types.NamespacedName]*backoffEntry{}, @@ -252,7 +284,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, } return &scheduler.Config{ - Modeler: f.modeler, + SchedulerCache: f.schedulerCache, // The scheduler only needs to consider schedulable nodes. NodeLister: f.NodeLister.NodeCondition(getNodeConditionPredicate()), Algorithm: algo, diff --git a/plugin/pkg/scheduler/generic_scheduler.go b/plugin/pkg/scheduler/generic_scheduler.go index 3ff0bcb195e..bda82048435 100644 --- a/plugin/pkg/scheduler/generic_scheduler.go +++ b/plugin/pkg/scheduler/generic_scheduler.go @@ -26,7 +26,6 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" @@ -56,6 +55,7 @@ func (f *FitError) Error() string { } type genericScheduler struct { + cache schedulercache.Cache predicates map[string]algorithm.FitPredicate prioritizers []algorithm.PriorityConfig extenders []algorithm.SchedulerExtender @@ -77,13 +77,12 @@ func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeListe return "", ErrNoNodesAvailable } - // TODO: we should compute this once and dynamically update it using Watch, not constantly re-compute. - // But at least we're now only doing it in one place - pods, err := g.pods.List(labels.Everything()) + // Used for all fit and priority funcs. + nodeNameToInfo, err := g.cache.GetNodeNameToInfoMap() if err != nil { return "", err } - nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(pods) + filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, nodeNameToInfo, g.predicates, nodes, g.extenders) if err != nil { return "", err @@ -96,7 +95,7 @@ func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeListe } } - priorityList, err := PrioritizeNodes(pod, nodeNameToInfo, g.pods, g.prioritizers, algorithm.FakeNodeLister(filteredNodes), g.extenders) + priorityList, err := PrioritizeNodes(pod, nodeNameToInfo, g.prioritizers, algorithm.FakeNodeLister(filteredNodes), g.extenders) if err != nil { return "", err } @@ -185,7 +184,6 @@ func findNodesThatFit(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.No func PrioritizeNodes( pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, - podLister algorithm.PodLister, priorityConfigs []algorithm.PriorityConfig, nodeLister algorithm.NodeLister, extenders []algorithm.SchedulerExtender, @@ -289,12 +287,12 @@ func EqualPriority(_ *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInf return result, nil } -func NewGenericScheduler(predicates map[string]algorithm.FitPredicate, prioritizers []algorithm.PriorityConfig, extenders []algorithm.SchedulerExtender, pods algorithm.PodLister, random *rand.Rand) algorithm.ScheduleAlgorithm { +func NewGenericScheduler(cache schedulercache.Cache, predicates map[string]algorithm.FitPredicate, prioritizers []algorithm.PriorityConfig, extenders []algorithm.SchedulerExtender, random *rand.Rand) algorithm.ScheduleAlgorithm { return &genericScheduler{ + cache: cache, predicates: predicates, prioritizers: prioritizers, extenders: extenders, - pods: pods, random: random, } } diff --git a/plugin/pkg/scheduler/generic_scheduler_test.go b/plugin/pkg/scheduler/generic_scheduler_test.go index de63abf8647..222f3fac1b5 100644 --- a/plugin/pkg/scheduler/generic_scheduler_test.go +++ b/plugin/pkg/scheduler/generic_scheduler_test.go @@ -28,6 +28,7 @@ import ( "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" + schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing" ) func falsePredicate(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { @@ -256,7 +257,7 @@ func TestGenericScheduler(t *testing.T) { for _, test := range tests { random := rand.New(rand.NewSource(0)) - scheduler := NewGenericScheduler(test.predicates, test.prioritizers, []algorithm.SchedulerExtender{}, algorithm.FakePodLister(test.pods), random) + scheduler := NewGenericScheduler(schedulertesting.PodsToCache(test.pods), test.predicates, test.prioritizers, []algorithm.SchedulerExtender{}, random) machine, err := scheduler.Schedule(test.pod, algorithm.FakeNodeLister(makeNodeList(test.nodes))) if test.expectsErr { if err == nil { diff --git a/plugin/pkg/scheduler/modeler.go b/plugin/pkg/scheduler/modeler.go deleted file mode 100644 index 3d91396c610..00000000000 --- a/plugin/pkg/scheduler/modeler.go +++ /dev/null @@ -1,197 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package scheduler - -import ( - "fmt" - "strings" - "sync" - "time" - - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/client/cache" - "k8s.io/kubernetes/pkg/labels" - "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" - - "github.com/golang/glog" -) - -var ( - _ = SystemModeler(&FakeModeler{}) - _ = SystemModeler(&SimpleModeler{}) -) - -// ExtendedPodLister: SimpleModeler needs to be able to check for a pod's -// existence in addition to listing the pods. -type ExtendedPodLister interface { - algorithm.PodLister - Exists(pod *api.Pod) (bool, error) -} - -// actionLocker implements lockedAction (so the fake and SimpleModeler can both -// use it) -type actionLocker struct { - sync.Mutex -} - -// LockedAction serializes calls of whatever is passed as 'do'. -func (a *actionLocker) LockedAction(do func()) { - a.Lock() - defer a.Unlock() - do() -} - -// FakeModeler implements the SystemModeler interface. -type FakeModeler struct { - AssumePodFunc func(pod *api.Pod) - ForgetPodFunc func(pod *api.Pod) - ForgetPodByKeyFunc func(key string) - actionLocker -} - -// AssumePod calls the function variable if it is not nil. -func (f *FakeModeler) AssumePod(pod *api.Pod) { - if f.AssumePodFunc != nil { - f.AssumePodFunc(pod) - } -} - -// ForgetPod calls the function variable if it is not nil. -func (f *FakeModeler) ForgetPod(pod *api.Pod) { - if f.ForgetPodFunc != nil { - f.ForgetPodFunc(pod) - } -} - -// ForgetPodByKey calls the function variable if it is not nil. -func (f *FakeModeler) ForgetPodByKey(key string) { - if f.ForgetPodFunc != nil { - f.ForgetPodByKeyFunc(key) - } -} - -// SimpleModeler implements the SystemModeler interface with a timed pod cache. -type SimpleModeler struct { - queuedPods ExtendedPodLister - scheduledPods ExtendedPodLister - - // assumedPods holds the pods that we think we've scheduled, but that - // haven't yet shown up in the scheduledPods variable. - // TODO: periodically clear this. - assumedPods *cache.StoreToPodLister - - actionLocker -} - -// NewSimpleModeler returns a new SimpleModeler. -// queuedPods: a PodLister that will return pods that have not been scheduled yet. -// scheduledPods: a PodLister that will return pods that we know for sure have been scheduled. -func NewSimpleModeler(queuedPods, scheduledPods ExtendedPodLister) *SimpleModeler { - return &SimpleModeler{ - queuedPods: queuedPods, - scheduledPods: scheduledPods, - assumedPods: &cache.StoreToPodLister{ - Store: cache.NewTTLStore(cache.MetaNamespaceKeyFunc, 30*time.Second), - }, - } -} - -func (s *SimpleModeler) AssumePod(pod *api.Pod) { - s.assumedPods.Add(pod) -} - -func (s *SimpleModeler) ForgetPod(pod *api.Pod) { - s.assumedPods.Delete(pod) -} - -func (s *SimpleModeler) ForgetPodByKey(key string) { - s.assumedPods.Delete(cache.ExplicitKey(key)) -} - -// Extract names for readable logging. -func podNames(pods []*api.Pod) []string { - out := make([]string, len(pods)) - for i := range pods { - out[i] = fmt.Sprintf("'%v/%v (%v)'", pods[i].Namespace, pods[i].Name, pods[i].UID) - } - return out -} - -func (s *SimpleModeler) listPods(selector labels.Selector) (pods []*api.Pod, err error) { - assumed, err := s.assumedPods.List(selector) - if err != nil { - return nil, err - } - // Since the assumed list will be short, just check every one. - // Goal here is to stop making assumptions about a pod once it shows - // up in one of these other lists. - for _, pod := range assumed { - qExist, err := s.queuedPods.Exists(pod) - if err != nil { - return nil, err - } - if qExist { - s.assumedPods.Store.Delete(pod) - continue - } - sExist, err := s.scheduledPods.Exists(pod) - if err != nil { - return nil, err - } - if sExist { - s.assumedPods.Store.Delete(pod) - continue - } - } - - scheduled, err := s.scheduledPods.List(selector) - if err != nil { - return nil, err - } - // Listing purges the ttl cache and re-gets, in case we deleted any entries. - assumed, err = s.assumedPods.List(selector) - if err != nil { - return nil, err - } - if len(assumed) == 0 { - return scheduled, nil - } - glog.V(2).Infof( - "listing pods: [%v] assumed to exist in addition to %v known pods.", - strings.Join(podNames(assumed), ","), - len(scheduled), - ) - return append(scheduled, assumed...), nil -} - -// PodLister returns a PodLister that will list pods that we think we have scheduled in -// addition to pods that we know have been scheduled. -func (s *SimpleModeler) PodLister() algorithm.PodLister { - return simpleModelerPods{s} -} - -// simpleModelerPods is an adaptor so that SimpleModeler can be a PodLister. -type simpleModelerPods struct { - simpleModeler *SimpleModeler -} - -// List returns pods known and assumed to exist. -func (s simpleModelerPods) List(selector labels.Selector) (pods []*api.Pod, err error) { - s.simpleModeler.LockedAction( - func() { pods, err = s.simpleModeler.listPods(selector) }) - return -} diff --git a/plugin/pkg/scheduler/modeler_test.go b/plugin/pkg/scheduler/modeler_test.go deleted file mode 100644 index ab9ff937681..00000000000 --- a/plugin/pkg/scheduler/modeler_test.go +++ /dev/null @@ -1,111 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package scheduler - -import ( - "testing" - - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/client/cache" - "k8s.io/kubernetes/pkg/labels" -) - -type nn struct { - namespace, name string -} - -type names []nn - -func (ids names) list() []*api.Pod { - out := make([]*api.Pod, 0, len(ids)) - for _, id := range ids { - out = append(out, &api.Pod{ - ObjectMeta: api.ObjectMeta{ - Namespace: id.namespace, - Name: id.name, - }, - }) - } - return out -} - -func (ids names) has(pod *api.Pod) bool { - for _, id := range ids { - if pod.Namespace == id.namespace && pod.Name == id.name { - return true - } - } - return false -} - -func TestModeler(t *testing.T) { - table := []struct { - queuedPods []*api.Pod - scheduledPods []*api.Pod - assumedPods []*api.Pod - expectPods names - }{ - { - queuedPods: names{}.list(), - scheduledPods: names{{"default", "foo"}, {"custom", "foo"}}.list(), - assumedPods: names{{"default", "foo"}}.list(), - expectPods: names{{"default", "foo"}, {"custom", "foo"}}, - }, { - queuedPods: names{}.list(), - scheduledPods: names{{"default", "foo"}}.list(), - assumedPods: names{{"default", "foo"}, {"custom", "foo"}}.list(), - expectPods: names{{"default", "foo"}, {"custom", "foo"}}, - }, { - queuedPods: names{{"custom", "foo"}}.list(), - scheduledPods: names{{"default", "foo"}}.list(), - assumedPods: names{{"default", "foo"}, {"custom", "foo"}}.list(), - expectPods: names{{"default", "foo"}}, - }, - } - - for _, item := range table { - q := &cache.StoreToPodLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)} - for _, pod := range item.queuedPods { - q.Store.Add(pod) - } - s := &cache.StoreToPodLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)} - for _, pod := range item.scheduledPods { - s.Store.Add(pod) - } - m := NewSimpleModeler(q, s) - for _, pod := range item.assumedPods { - m.AssumePod(pod) - } - - list, err := m.PodLister().List(labels.Everything()) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - - found := 0 - for _, pod := range list { - if item.expectPods.has(pod) { - found++ - } else { - t.Errorf("found unexpected pod %#v", pod) - } - } - if e, a := item.expectPods, found; len(e) != a { - t.Errorf("Expected pods:\n%+v\nFound pods:\n%s\n", podNames(e.list()), podNames(list)) - } - } -} diff --git a/plugin/pkg/scheduler/scheduler.go b/plugin/pkg/scheduler/scheduler.go index 124d8240b5e..b28833817a7 100644 --- a/plugin/pkg/scheduler/scheduler.go +++ b/plugin/pkg/scheduler/scheduler.go @@ -27,6 +27,7 @@ import ( "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/metrics" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" "github.com/golang/glog" ) @@ -36,32 +37,6 @@ type Binder interface { Bind(binding *api.Binding) error } -// SystemModeler can help scheduler produce a model of the system that -// anticipates reality. For example, if scheduler has pods A and B both -// using hostPort 80, when it binds A to machine M it should not bind B -// to machine M in the time when it hasn't observed the binding of A -// take effect yet. -// -// Since the model is only an optimization, it's expected to handle -// any errors itself without sending them back to the scheduler. -type SystemModeler interface { - // AssumePod assumes that the given pod exists in the system. - // The assumtion should last until the system confirms the - // assumtion or disconfirms it. - AssumePod(pod *api.Pod) - // ForgetPod removes a pod assumtion. (It won't make the model - // show the absence of the given pod if the pod is in the scheduled - // pods list!) - ForgetPod(pod *api.Pod) - ForgetPodByKey(key string) - - // For serializing calls to Assume/ForgetPod: imagine you want to add - // a pod if and only if a bind succeeds, but also remove a pod if it is deleted. - // TODO: if SystemModeler begins modeling things other than pods, this - // should probably be parameterized or specialized for pods. - LockedAction(f func()) -} - // Scheduler watches for new unscheduled pods. It attempts to find // nodes that they fit on and writes bindings back to the api server. type Scheduler struct { @@ -69,12 +44,12 @@ type Scheduler struct { } type Config struct { - // It is expected that changes made via modeler will be observed + // It is expected that changes made via SchedulerCache will be observed // by NodeLister and Algorithm. - Modeler SystemModeler - NodeLister algorithm.NodeLister - Algorithm algorithm.ScheduleAlgorithm - Binder Binder + SchedulerCache schedulercache.Cache + NodeLister algorithm.NodeLister + Algorithm algorithm.ScheduleAlgorithm + Binder Binder // NextPod should be a function that blocks until the next pod // is available. We don't use a channel for this, because scheduling @@ -129,24 +104,25 @@ func (s *Scheduler) scheduleOne() { }, } - // We want to add the pod to the model if and only if the bind succeeds, - // but we don't want to race with any deletions, which happen asynchronously. - s.config.Modeler.LockedAction(func() { + bindAction := func() bool { bindingStart := time.Now() err := s.config.Binder.Bind(b) if err != nil { glog.V(1).Infof("Failed to bind pod: %+v", err) s.config.Recorder.Eventf(pod, api.EventTypeNormal, "FailedScheduling", "Binding rejected: %v", err) s.config.Error(pod, err) - return + return false } metrics.BindingLatency.Observe(metrics.SinceInMicroseconds(bindingStart)) s.config.Recorder.Eventf(pod, api.EventTypeNormal, "Scheduled", "Successfully assigned %v to %v", pod.Name, dest) - // tell the model to assume that this binding took effect. - assumed := *pod - assumed.Spec.NodeName = dest - s.config.Modeler.AssumePod(&assumed) - }) + return true + } + + assumed := *pod + assumed.Spec.NodeName = dest + // We want to assume the pod if and only if the bind succeeds, + // but we don't want to race with any deletions, which happen asynchronously. + s.config.SchedulerCache.AssumePodIfBindSucceed(&assumed, bindAction) metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start)) } diff --git a/plugin/pkg/scheduler/scheduler_test.go b/plugin/pkg/scheduler/scheduler_test.go index 3bd5ff41493..967eb87bfe0 100644 --- a/plugin/pkg/scheduler/scheduler_test.go +++ b/plugin/pkg/scheduler/scheduler_test.go @@ -29,9 +29,12 @@ import ( "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/record" + "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" + schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing" ) type fakeBinder struct { @@ -111,8 +114,8 @@ func TestScheduler(t *testing.T) { var gotAssumedPod *api.Pod var gotBinding *api.Binding c := &Config{ - Modeler: &FakeModeler{ - AssumePodFunc: func(pod *api.Pod) { + SchedulerCache: &schedulertesting.FakeCache{ + AssumeFunc: func(pod *api.Pod) { gotAssumedPod = pod }, }, @@ -189,42 +192,30 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) { wg.Wait() }() - // Setup modeler so we control the contents of all 3 stores: assumed, - // scheduled and queued + // Setup stores to test pod's workflow: + // - queuedPodStore: pods queued before processing + // - scheduledPodStore: pods that has a scheduling decision scheduledPodStore := cache.NewStore(cache.MetaNamespaceKeyFunc) - scheduledPodLister := &cache.StoreToPodLister{Store: scheduledPodStore} - queuedPodStore := cache.NewFIFO(cache.MetaNamespaceKeyFunc) - queuedPodLister := &cache.StoreToPodLister{Store: queuedPodStore} - - modeler := NewSimpleModeler(queuedPodLister, scheduledPodLister) - - // Create a fake clock used to timestamp entries and calculate ttl. Nothing - // will expire till we flip to something older than the ttl, at which point - // all entries inserted with fakeTime will expire. - ttl := 30 * time.Second - fakeTime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) - fakeClock := util.NewFakeClock(fakeTime) - ttlPolicy := &cache.TTLPolicy{Ttl: ttl, Clock: fakeClock} - assumedPodsStore := cache.NewFakeExpirationStore( - cache.MetaNamespaceKeyFunc, nil, ttlPolicy, fakeClock) - modeler.assumedPods = &cache.StoreToPodLister{Store: assumedPodsStore} // Port is the easiest way to cause a fit predicate failure podPort := 8080 firstPod := podWithPort("foo", "", podPort) + stop := make(chan struct{}) + defer close(stop) + cache := schedulercache.New(1*time.Second, stop) // Create the scheduler config algo := NewGenericScheduler( + cache, map[string]algorithm.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts}, []algorithm.PriorityConfig{}, []algorithm.SchedulerExtender{}, - modeler.PodLister(), rand.New(rand.NewSource(time.Now().UnixNano()))) var gotBinding *api.Binding c := &Config{ - Modeler: modeler, + SchedulerCache: cache, NodeLister: algorithm.FakeNodeLister( api.NodeList{Items: []api.Node{{ObjectMeta: api.ObjectMeta{Name: "machine1"}}}}, ), @@ -271,10 +262,6 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) { if exists { t.Errorf("Did not expect a queued pod, found %+v", pod) } - pod, exists, _ = assumedPodsStore.GetByKey("foo") - if !exists { - t.Errorf("Assumed pod store should contain stale pod") - } expectBind := &api.Binding{ ObjectMeta: api.ObjectMeta{Name: "foo"}, @@ -288,10 +275,6 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) { events.Stop() scheduledPodStore.Delete(pod) - _, exists, _ = assumedPodsStore.Get(pod) - if !exists { - t.Errorf("Expected pod %#v in assumed pod store", pod) - } secondPod := podWithPort("bar", "", podPort) queuedPodStore.Add(secondPod) @@ -299,10 +282,26 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) { // scheduledPodStore: [] // assumedPods: [foo:8080] + var waitUntilExpired sync.WaitGroup + waitUntilExpired.Add(1) + // waiting for the assumed pod to expire + go func() { + for { + pods, err := cache.List(labels.Everything()) + if err != nil { + t.Fatalf("cache.List failed: %v", err) + } + if len(pods) == 0 { + waitUntilExpired.Done() + return + } + time.Sleep(1 * time.Second) + } + }() + waitUntilExpired.Wait() + // Second scheduling pass will fail to schedule if the store hasn't expired // the deleted pod. This would normally happen with a timeout. - //expirationPolicy.NeverExpire = util.NewStringSet() - fakeClock.Step(ttl + 1) called = make(chan struct{}) events = eventBroadcaster.StartEventWatcher(func(e *api.Event) { diff --git a/plugin/pkg/scheduler/schedulercache/cache.go b/plugin/pkg/scheduler/schedulercache/cache.go index 11f75988479..704fe42c95a 100644 --- a/plugin/pkg/scheduler/schedulercache/cache.go +++ b/plugin/pkg/scheduler/schedulercache/cache.go @@ -59,7 +59,7 @@ type schedulerCache struct { type podState struct { pod *api.Pod // Used by assumedPod to determinate expiration. - deadline time.Time + deadline *time.Time } func newSchedulerCache(ttl, period time.Duration, stop chan struct{}) *schedulerCache { @@ -120,9 +120,10 @@ func (cache *schedulerCache) assumePodIfBindSucceed(pod *api.Pod, bind func() bo } cache.addPod(pod) + dl := now.Add(cache.ttl) ps := &podState{ pod: pod, - deadline: now.Add(cache.ttl), + deadline: &dl, } cache.podStates[key] = ps cache.assumedPods[key] = true @@ -142,9 +143,14 @@ func (cache *schedulerCache) AddPod(pod *api.Pod) error { switch { case ok && cache.assumedPods[key]: delete(cache.assumedPods, key) + cache.podStates[key].deadline = nil case !ok: // Pod was expired. We should add it back. cache.addPod(pod) + ps := &podState{ + pod: pod, + } + cache.podStates[key] = ps default: return fmt.Errorf("pod was already in added state. Pod key: %v", key) } @@ -246,7 +252,7 @@ func (cache *schedulerCache) cleanupAssumedPods(now time.Time) { if !ok { panic("Key found in assumed set but not in podStates. Potentially a logical error.") } - if now.After(ps.deadline) { + if now.After(*ps.deadline) { if err := cache.expirePod(key, ps); err != nil { glog.Errorf(" expirePod failed for %s: %v", key, err) } diff --git a/plugin/pkg/scheduler/schedulercache/cache_test.go b/plugin/pkg/scheduler/schedulercache/cache_test.go index 094f5352440..e5ceeb155b2 100644 --- a/plugin/pkg/scheduler/schedulercache/cache_test.go +++ b/plugin/pkg/scheduler/schedulercache/cache_test.go @@ -274,7 +274,71 @@ func TestUpdatePod(t *testing.T) { podsToUpdate []*api.Pod wNodeInfo []*NodeInfo - }{{ // Pod is assumed and added. Then it would be updated twice. + }{{ // add a pod and then update it twice + podsToAdd: []*api.Pod{testPods[0]}, + podsToUpdate: []*api.Pod{testPods[0], testPods[1], testPods[0]}, + wNodeInfo: []*NodeInfo{{ + requestedResource: &Resource{ + MilliCPU: 200, + Memory: 1024, + }, + nonzeroRequest: &Resource{ + MilliCPU: 200, + Memory: 1024, + }, + pods: []*api.Pod{testPods[1]}, + }, { + requestedResource: &Resource{ + MilliCPU: 100, + Memory: 500, + }, + nonzeroRequest: &Resource{ + MilliCPU: 100, + Memory: 500, + }, + pods: []*api.Pod{testPods[0]}, + }}, + }} + + for _, tt := range tests { + cache := newSchedulerCache(ttl, time.Second, nil) + for _, podToAdd := range tt.podsToAdd { + if err := cache.AddPod(podToAdd); err != nil { + t.Fatalf("AddPod failed: %v", err) + } + } + + for i := range tt.podsToUpdate { + if i == 0 { + continue + } + if err := cache.UpdatePod(tt.podsToUpdate[i-1], tt.podsToUpdate[i]); err != nil { + t.Fatalf("UpdatePod failed: %v", err) + } + // check after expiration. confirmed pods shouldn't be expired. + n := cache.nodes[nodeName] + if !reflect.DeepEqual(n, tt.wNodeInfo[i-1]) { + t.Errorf("#%d: node info get=%s, want=%s", i-1, n, tt.wNodeInfo) + } + } + } +} + +// TestExpireAddUpdatePod test the sequence that a pod is expired, added, then updated +func TestExpireAddUpdatePod(t *testing.T) { + nodeName := "node" + ttl := 10 * time.Second + testPods := []*api.Pod{ + makeBasePod(nodeName, "test", "100m", "500", []api.ContainerPort{{HostPort: 80}}), + makeBasePod(nodeName, "test", "200m", "1Ki", []api.ContainerPort{{HostPort: 8080}}), + } + tests := []struct { + podsToAssume []*api.Pod + podsToAdd []*api.Pod + podsToUpdate []*api.Pod + + wNodeInfo []*NodeInfo + }{{ // Pod is assumed, expired, and added. Then it would be updated twice. podsToAssume: []*api.Pod{testPods[0]}, podsToAdd: []*api.Pod{testPods[0]}, podsToUpdate: []*api.Pod{testPods[0], testPods[1], testPods[0]}, @@ -309,6 +373,8 @@ func TestUpdatePod(t *testing.T) { t.Fatalf("assumePod failed: %v", err) } } + cache.cleanupAssumedPods(now.Add(2 * ttl)) + for _, podToAdd := range tt.podsToAdd { if err := cache.AddPod(podToAdd); err != nil { t.Fatalf("AddPod failed: %v", err) @@ -356,9 +422,6 @@ func TestRemovePod(t *testing.T) { for i, tt := range tests { cache := newSchedulerCache(time.Second, time.Second, nil) - if err := cache.AssumePodIfBindSucceed(tt.pod, alwaysTrue); err != nil { - t.Fatalf("assumePod failed: %v", err) - } if err := cache.AddPod(tt.pod); err != nil { t.Fatalf("AddPod failed: %v", err) } @@ -449,12 +512,7 @@ func setupCacheOf1kNodes30kPods(b *testing.B) Cache { objName := fmt.Sprintf("%s-pod-%d", nodeName, j) pod := makeBasePod(nodeName, objName, "0", "0", nil) - err := cache.AssumePodIfBindSucceed(pod, alwaysTrue) - if err != nil { - b.Fatalf("AssumePodIfBindSucceed failed: %v", err) - } - err = cache.AddPod(pod) - if err != nil { + if err := cache.AddPod(pod); err != nil { b.Fatalf("AddPod failed: %v", err) } } diff --git a/plugin/pkg/scheduler/schedulercache/interface.go b/plugin/pkg/scheduler/schedulercache/interface.go index c7194f09dc2..8de0228854c 100644 --- a/plugin/pkg/scheduler/schedulercache/interface.go +++ b/plugin/pkg/scheduler/schedulercache/interface.go @@ -23,32 +23,35 @@ import ( // Cache collects pods' information and provides node-level aggregated information. // It's intended for generic scheduler to do efficient lookup. -// Cache's operations are pod centric. It incrementally updates itself based on pod events. +// Cache's operations are pod centric. It does incremental updates based on pod events. // Pod events are sent via network. We don't have guaranteed delivery of all events: // We use Reflector to list and watch from remote. // Reflector might be slow and do a relist, which would lead to missing events. // // State Machine of a pod's events in scheduler's cache: // -// +-------+ -// | | -// | | Update -// Assume Add + | -// Initial +--------> Assumed +------------+---> Added <--+ -// + | + -// | | | -// | Add | | Remove -// | | | -// | + | -// +-------------> Expired +----> Deleted +// +// +-------------------------------------------+ +----+ +// | Add | | | +// | | | | Update +// + Assume Add v v | +//Initial +--------> Assumed +------------+---> Added <--+ +// + | + +// | | | +// | Add | | Remove +// | | | +// | + | +// +-------------> Expired +----> Deleted // Expire // +// // Note that an assumed pod can expire, because if we haven't received Add event notifying us // for a while, there might be some problems and we shouldn't keep the pod in cache anymore. // // Note that "Initial", "Expired", and "Deleted" pods do not actually exist in cache. // Based on existing use cases, we are making the following assumptions: // - No pod would be assumed twice +// - A pod could be added without going through scheduler. In this case, we will see Add but not Assume event. // - If a pod wasn't added, it wouldn't be removed or updated. // - Both "Expired" and "Deleted" are valid end states. In case of some problems, e.g. network issue, // a pod might have changed its state (e.g. added and deleted) without delivering notification to the cache. diff --git a/plugin/pkg/scheduler/testing/fake_cache.go b/plugin/pkg/scheduler/testing/fake_cache.go new file mode 100644 index 00000000000..fe48442c3e0 --- /dev/null +++ b/plugin/pkg/scheduler/testing/fake_cache.go @@ -0,0 +1,48 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package schedulercache + +import ( + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" +) + +// FakeCache is used for testing +type FakeCache struct { + AssumeFunc func(*api.Pod) +} + +func (f *FakeCache) AssumePodIfBindSucceed(pod *api.Pod, bind func() bool) error { + if !bind() { + return nil + } + f.AssumeFunc(pod) + return nil +} + +func (f *FakeCache) AddPod(pod *api.Pod) error { return nil } + +func (f *FakeCache) UpdatePod(oldPod, newPod *api.Pod) error { return nil } + +func (f *FakeCache) RemovePod(pod *api.Pod) error { return nil } + +func (f *FakeCache) GetNodeNameToInfoMap() (map[string]*schedulercache.NodeInfo, error) { + return nil, nil +} + +func (f *FakeCache) List(s labels.Selector) ([]*api.Pod, error) { return nil, nil } diff --git a/plugin/pkg/scheduler/testing/pods_to_cache.go b/plugin/pkg/scheduler/testing/pods_to_cache.go new file mode 100644 index 00000000000..602fe96d039 --- /dev/null +++ b/plugin/pkg/scheduler/testing/pods_to_cache.go @@ -0,0 +1,52 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package schedulercache + +import ( + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" +) + +// PodsToCache is used for testing +type PodsToCache []*api.Pod + +func (p PodsToCache) AssumePodIfBindSucceed(pod *api.Pod, bind func() bool) error { + if !bind() { + return nil + } + return nil +} + +func (p PodsToCache) AddPod(pod *api.Pod) error { return nil } + +func (p PodsToCache) UpdatePod(oldPod, newPod *api.Pod) error { return nil } + +func (p PodsToCache) RemovePod(pod *api.Pod) error { return nil } + +func (p PodsToCache) GetNodeNameToInfoMap() (map[string]*schedulercache.NodeInfo, error) { + return schedulercache.CreateNodeNameToInfoMap(p), nil +} + +func (p PodsToCache) List(s labels.Selector) (selected []*api.Pod, err error) { + for _, pod := range p { + if s.Matches(labels.Set(pod.Labels)) { + selected = append(selected, pod) + } + } + return selected, nil +}