Revert "Scheduler: replace system modeler with scheduler cache and do O(1) lookup for resource req"

This commit is contained in:
Marek Grabowski 2016-03-02 15:38:56 +01:00
parent 512cc08929
commit eb2f5153b3
12 changed files with 433 additions and 219 deletions

View File

@ -44,12 +44,19 @@ func calculateScore(requested int64, capacity int64, node string) int {
// Calculate the resource occupancy on a node. 'node' has information about the resources on the node. // Calculate the resource occupancy on a node. 'node' has information about the resources on the node.
// 'pods' is a list of pods currently scheduled on the node. // 'pods' is a list of pods currently scheduled on the node.
func calculateResourceOccupancy(pod *api.Pod, node api.Node, nodeInfo *schedulercache.NodeInfo) schedulerapi.HostPriority { func calculateResourceOccupancy(pod *api.Pod, node api.Node, pods []*api.Pod) schedulerapi.HostPriority {
totalMilliCPU := nodeInfo.NonZeroRequest().MilliCPU totalMilliCPU := int64(0)
totalMemory := nodeInfo.NonZeroRequest().Memory totalMemory := int64(0)
capacityMilliCPU := node.Status.Allocatable.Cpu().MilliValue() capacityMilliCPU := node.Status.Allocatable.Cpu().MilliValue()
capacityMemory := node.Status.Allocatable.Memory().Value() capacityMemory := node.Status.Allocatable.Memory().Value()
for _, existingPod := range pods {
for _, container := range existingPod.Spec.Containers {
cpu, memory := priorityutil.GetNonzeroRequests(&container.Resources.Requests)
totalMilliCPU += cpu
totalMemory += memory
}
}
// Add the resources requested by the current pod being scheduled. // Add the resources requested by the current pod being scheduled.
// This also helps differentiate between differently sized, but empty, nodes. // This also helps differentiate between differently sized, but empty, nodes.
for _, container := range pod.Spec.Containers { for _, container := range pod.Spec.Containers {
@ -86,7 +93,7 @@ func LeastRequestedPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulerca
list := schedulerapi.HostPriorityList{} list := schedulerapi.HostPriorityList{}
for _, node := range nodes.Items { for _, node := range nodes.Items {
list = append(list, calculateResourceOccupancy(pod, node, nodeNameToInfo[node.Name])) list = append(list, calculateResourceOccupancy(pod, node, nodeNameToInfo[node.Name].Pods()))
} }
return list, nil return list, nil
} }
@ -220,15 +227,22 @@ func BalancedResourceAllocation(pod *api.Pod, nodeNameToInfo map[string]*schedul
list := schedulerapi.HostPriorityList{} list := schedulerapi.HostPriorityList{}
for _, node := range nodes.Items { for _, node := range nodes.Items {
list = append(list, calculateBalancedResourceAllocation(pod, node, nodeNameToInfo[node.Name])) list = append(list, calculateBalancedResourceAllocation(pod, node, nodeNameToInfo[node.Name].Pods()))
} }
return list, nil return list, nil
} }
func calculateBalancedResourceAllocation(pod *api.Pod, node api.Node, nodeInfo *schedulercache.NodeInfo) schedulerapi.HostPriority { func calculateBalancedResourceAllocation(pod *api.Pod, node api.Node, pods []*api.Pod) schedulerapi.HostPriority {
totalMilliCPU := nodeInfo.NonZeroRequest().MilliCPU totalMilliCPU := int64(0)
totalMemory := nodeInfo.NonZeroRequest().Memory totalMemory := int64(0)
score := int(0) score := int(0)
for _, existingPod := range pods {
for _, container := range existingPod.Spec.Containers {
cpu, memory := priorityutil.GetNonzeroRequests(&container.Resources.Requests)
totalMilliCPU += cpu
totalMemory += memory
}
}
// Add the resources requested by the current pod being scheduled. // Add the resources requested by the current pod being scheduled.
// This also helps differentiate between differently sized, but empty, nodes. // This also helps differentiate between differently sized, but empty, nodes.
for _, container := range pod.Spec.Containers { for _, container := range pod.Spec.Containers {

View File

@ -140,6 +140,7 @@ func TestZeroRequest(t *testing.T) {
list, err := scheduler.PrioritizeNodes( list, err := scheduler.PrioritizeNodes(
test.pod, test.pod,
nodeNameToInfo, nodeNameToInfo,
algorithm.FakePodLister(test.pods),
// This should match the configuration in defaultPriorities() in // This should match the configuration in defaultPriorities() in
// plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go if you want // plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go if you want
// to test what's actually in production. // to test what's actually in production.

View File

@ -25,7 +25,6 @@ import (
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing"
) )
type fitPredicate func(pod *api.Pod, node *api.Node) (bool, error) type fitPredicate func(pod *api.Pod, node *api.Node) (bool, error)
@ -286,7 +285,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
for ii := range test.extenders { for ii := range test.extenders {
extenders = append(extenders, &test.extenders[ii]) extenders = append(extenders, &test.extenders[ii])
} }
scheduler := NewGenericScheduler(schedulertesting.PodsToCache(test.pods), test.predicates, test.prioritizers, extenders, random) scheduler := NewGenericScheduler(test.predicates, test.prioritizers, extenders, algorithm.FakePodLister(test.pods), random)
machine, err := scheduler.Schedule(test.pod, algorithm.FakeNodeLister(makeNodeList(test.nodes))) machine, err := scheduler.Schedule(test.pod, algorithm.FakeNodeLister(makeNodeList(test.nodes)))
if test.expectsErr { if test.expectsErr {
if err == nil { if err == nil {

View File

@ -42,7 +42,6 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
) )
const ( const (
@ -75,7 +74,7 @@ type ConfigFactory struct {
StopEverything chan struct{} StopEverything chan struct{}
scheduledPodPopulator *framework.Controller scheduledPodPopulator *framework.Controller
schedulerCache schedulercache.Cache modeler scheduler.SystemModeler
// SchedulerName of a scheduler is used to select which pods will be // SchedulerName of a scheduler is used to select which pods will be
// processed by this scheduler, based on pods's annotation key: // processed by this scheduler, based on pods's annotation key:
@ -85,9 +84,6 @@ type ConfigFactory struct {
// Initializes the factory. // Initializes the factory.
func NewConfigFactory(client *client.Client, schedulerName string) *ConfigFactory { func NewConfigFactory(client *client.Client, schedulerName string) *ConfigFactory {
stopEverything := make(chan struct{})
schedulerCache := schedulercache.New(30*time.Second, stopEverything)
c := &ConfigFactory{ c := &ConfigFactory{
Client: client, Client: client,
PodQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc), PodQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc),
@ -99,12 +95,12 @@ func NewConfigFactory(client *client.Client, schedulerName string) *ConfigFactor
ServiceLister: &cache.StoreToServiceLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, ServiceLister: &cache.StoreToServiceLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
ControllerLister: &cache.StoreToReplicationControllerLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, ControllerLister: &cache.StoreToReplicationControllerLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
ReplicaSetLister: &cache.StoreToReplicaSetLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, ReplicaSetLister: &cache.StoreToReplicaSetLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
schedulerCache: schedulerCache, StopEverything: make(chan struct{}),
StopEverything: stopEverything,
SchedulerName: schedulerName, SchedulerName: schedulerName,
} }
modeler := scheduler.NewSimpleModeler(&cache.StoreToPodLister{Store: c.PodQueue}, c.ScheduledPodLister)
c.PodLister = schedulerCache c.modeler = modeler
c.PodLister = modeler.PodLister()
// On add/delete to the scheduled pods, remove from the assumed pods. // On add/delete to the scheduled pods, remove from the assumed pods.
// We construct this here instead of in CreateFromKeys because // We construct this here instead of in CreateFromKeys because
@ -116,49 +112,21 @@ func NewConfigFactory(client *client.Client, schedulerName string) *ConfigFactor
0, 0,
framework.ResourceEventHandlerFuncs{ framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { AddFunc: func(obj interface{}) {
pod, ok := obj.(*api.Pod) if pod, ok := obj.(*api.Pod); ok {
if !ok { c.modeler.LockedAction(func() {
glog.Errorf("cannot convert to *api.Pod") c.modeler.ForgetPod(pod)
return })
}
if err := schedulerCache.AddPod(pod); err != nil {
glog.Errorf("scheduler cache AddPod failed: %v", err)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldPod, ok := oldObj.(*api.Pod)
if !ok {
glog.Errorf("cannot convert to *api.Pod")
return
}
newPod, ok := newObj.(*api.Pod)
if !ok {
glog.Errorf("cannot convert to *api.Pod")
return
}
if err := schedulerCache.UpdatePod(oldPod, newPod); err != nil {
glog.Errorf("scheduler cache UpdatePod failed: %v", err)
} }
}, },
DeleteFunc: func(obj interface{}) { DeleteFunc: func(obj interface{}) {
var pod *api.Pod c.modeler.LockedAction(func() {
switch t := obj.(type) { switch t := obj.(type) {
case *api.Pod: case *api.Pod:
pod = t c.modeler.ForgetPod(t)
case cache.DeletedFinalStateUnknown: case cache.DeletedFinalStateUnknown:
var ok bool c.modeler.ForgetPodByKey(t.Key)
pod, ok = t.Obj.(*api.Pod)
if !ok {
glog.Errorf("cannot convert to *api.Pod")
return
} }
default: })
glog.Errorf("cannot convert to *api.Pod")
return
}
if err := schedulerCache.RemovePod(pod); err != nil {
glog.Errorf("scheduler cache RemovePod failed: %v", err)
}
}, },
}, },
) )
@ -273,7 +241,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
r := rand.New(rand.NewSource(time.Now().UnixNano())) r := rand.New(rand.NewSource(time.Now().UnixNano()))
algo := scheduler.NewGenericScheduler(f.schedulerCache, predicateFuncs, priorityConfigs, extenders, r) algo := scheduler.NewGenericScheduler(predicateFuncs, priorityConfigs, extenders, f.PodLister, r)
podBackoff := podBackoff{ podBackoff := podBackoff{
perPodBackoff: map[types.NamespacedName]*backoffEntry{}, perPodBackoff: map[types.NamespacedName]*backoffEntry{},
@ -284,7 +252,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
} }
return &scheduler.Config{ return &scheduler.Config{
SchedulerCache: f.schedulerCache, Modeler: f.modeler,
// The scheduler only needs to consider schedulable nodes. // The scheduler only needs to consider schedulable nodes.
NodeLister: f.NodeLister.NodeCondition(getNodeConditionPredicate()), NodeLister: f.NodeLister.NodeCondition(getNodeConditionPredicate()),
Algorithm: algo, Algorithm: algo,

View File

@ -25,6 +25,7 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
@ -54,10 +55,10 @@ func (f *FitError) Error() string {
} }
type genericScheduler struct { type genericScheduler struct {
cache schedulercache.Cache
predicates map[string]algorithm.FitPredicate predicates map[string]algorithm.FitPredicate
prioritizers []algorithm.PriorityConfig prioritizers []algorithm.PriorityConfig
extenders []algorithm.SchedulerExtender extenders []algorithm.SchedulerExtender
pods algorithm.PodLister
random *rand.Rand random *rand.Rand
randomLock sync.Mutex randomLock sync.Mutex
} }
@ -74,12 +75,13 @@ func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeListe
return "", ErrNoNodesAvailable return "", ErrNoNodesAvailable
} }
// Used for all fit and priority funcs. // TODO: we should compute this once and dynamically update it using Watch, not constantly re-compute.
nodeNameToInfo, err := g.cache.GetNodeNameToInfoMap() // But at least we're now only doing it in one place
pods, err := g.pods.List(labels.Everything())
if err != nil { if err != nil {
return "", err return "", err
} }
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(pods)
filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, nodeNameToInfo, g.predicates, nodes, g.extenders) filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, nodeNameToInfo, g.predicates, nodes, g.extenders)
if err != nil { if err != nil {
return "", err return "", err
@ -92,7 +94,7 @@ func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeListe
} }
} }
priorityList, err := PrioritizeNodes(pod, nodeNameToInfo, g.prioritizers, algorithm.FakeNodeLister(filteredNodes), g.extenders) priorityList, err := PrioritizeNodes(pod, nodeNameToInfo, g.pods, g.prioritizers, algorithm.FakeNodeLister(filteredNodes), g.extenders)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -186,7 +188,7 @@ func findNodesThatFit(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.No
// Each priority function can also have its own weight // Each priority function can also have its own weight
// The node scores returned by the priority function are multiplied by the weights to get weighted scores // The node scores returned by the priority function are multiplied by the weights to get weighted scores
// All scores are finally combined (added) to get the total weighted scores of all nodes // All scores are finally combined (added) to get the total weighted scores of all nodes
func PrioritizeNodes(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, priorityConfigs []algorithm.PriorityConfig, nodeLister algorithm.NodeLister, extenders []algorithm.SchedulerExtender) (schedulerapi.HostPriorityList, error) { func PrioritizeNodes(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, podLister algorithm.PodLister, priorityConfigs []algorithm.PriorityConfig, nodeLister algorithm.NodeLister, extenders []algorithm.SchedulerExtender) (schedulerapi.HostPriorityList, error) {
result := schedulerapi.HostPriorityList{} result := schedulerapi.HostPriorityList{}
// If no priority configs are provided, then the EqualPriority function is applied // If no priority configs are provided, then the EqualPriority function is applied
@ -286,12 +288,12 @@ func EqualPriority(_ *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInf
return result, nil return result, nil
} }
func NewGenericScheduler(cache schedulercache.Cache, predicates map[string]algorithm.FitPredicate, prioritizers []algorithm.PriorityConfig, extenders []algorithm.SchedulerExtender, random *rand.Rand) algorithm.ScheduleAlgorithm { func NewGenericScheduler(predicates map[string]algorithm.FitPredicate, prioritizers []algorithm.PriorityConfig, extenders []algorithm.SchedulerExtender, pods algorithm.PodLister, random *rand.Rand) algorithm.ScheduleAlgorithm {
return &genericScheduler{ return &genericScheduler{
cache: cache,
predicates: predicates, predicates: predicates,
prioritizers: prioritizers, prioritizers: prioritizers,
extenders: extenders, extenders: extenders,
pods: pods,
random: random, random: random,
} }
} }

View File

@ -28,7 +28,6 @@ import (
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing"
) )
func falsePredicate(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { func falsePredicate(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
@ -257,7 +256,7 @@ func TestGenericScheduler(t *testing.T) {
for _, test := range tests { for _, test := range tests {
random := rand.New(rand.NewSource(0)) random := rand.New(rand.NewSource(0))
scheduler := NewGenericScheduler(schedulertesting.PodsToCache(test.pods), test.predicates, test.prioritizers, []algorithm.SchedulerExtender{}, random) scheduler := NewGenericScheduler(test.predicates, test.prioritizers, []algorithm.SchedulerExtender{}, algorithm.FakePodLister(test.pods), random)
machine, err := scheduler.Schedule(test.pod, algorithm.FakeNodeLister(makeNodeList(test.nodes))) machine, err := scheduler.Schedule(test.pod, algorithm.FakeNodeLister(makeNodeList(test.nodes)))
if test.expectsErr { if test.expectsErr {
if err == nil { if err == nil {

View File

@ -0,0 +1,197 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"fmt"
"strings"
"sync"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"github.com/golang/glog"
)
var (
_ = SystemModeler(&FakeModeler{})
_ = SystemModeler(&SimpleModeler{})
)
// ExtendedPodLister: SimpleModeler needs to be able to check for a pod's
// existence in addition to listing the pods.
type ExtendedPodLister interface {
algorithm.PodLister
Exists(pod *api.Pod) (bool, error)
}
// actionLocker implements lockedAction (so the fake and SimpleModeler can both
// use it)
type actionLocker struct {
sync.Mutex
}
// LockedAction serializes calls of whatever is passed as 'do'.
func (a *actionLocker) LockedAction(do func()) {
a.Lock()
defer a.Unlock()
do()
}
// FakeModeler implements the SystemModeler interface.
type FakeModeler struct {
AssumePodFunc func(pod *api.Pod)
ForgetPodFunc func(pod *api.Pod)
ForgetPodByKeyFunc func(key string)
actionLocker
}
// AssumePod calls the function variable if it is not nil.
func (f *FakeModeler) AssumePod(pod *api.Pod) {
if f.AssumePodFunc != nil {
f.AssumePodFunc(pod)
}
}
// ForgetPod calls the function variable if it is not nil.
func (f *FakeModeler) ForgetPod(pod *api.Pod) {
if f.ForgetPodFunc != nil {
f.ForgetPodFunc(pod)
}
}
// ForgetPodByKey calls the function variable if it is not nil.
func (f *FakeModeler) ForgetPodByKey(key string) {
if f.ForgetPodFunc != nil {
f.ForgetPodByKeyFunc(key)
}
}
// SimpleModeler implements the SystemModeler interface with a timed pod cache.
type SimpleModeler struct {
queuedPods ExtendedPodLister
scheduledPods ExtendedPodLister
// assumedPods holds the pods that we think we've scheduled, but that
// haven't yet shown up in the scheduledPods variable.
// TODO: periodically clear this.
assumedPods *cache.StoreToPodLister
actionLocker
}
// NewSimpleModeler returns a new SimpleModeler.
// queuedPods: a PodLister that will return pods that have not been scheduled yet.
// scheduledPods: a PodLister that will return pods that we know for sure have been scheduled.
func NewSimpleModeler(queuedPods, scheduledPods ExtendedPodLister) *SimpleModeler {
return &SimpleModeler{
queuedPods: queuedPods,
scheduledPods: scheduledPods,
assumedPods: &cache.StoreToPodLister{
Store: cache.NewTTLStore(cache.MetaNamespaceKeyFunc, 30*time.Second),
},
}
}
func (s *SimpleModeler) AssumePod(pod *api.Pod) {
s.assumedPods.Add(pod)
}
func (s *SimpleModeler) ForgetPod(pod *api.Pod) {
s.assumedPods.Delete(pod)
}
func (s *SimpleModeler) ForgetPodByKey(key string) {
s.assumedPods.Delete(cache.ExplicitKey(key))
}
// Extract names for readable logging.
func podNames(pods []*api.Pod) []string {
out := make([]string, len(pods))
for i := range pods {
out[i] = fmt.Sprintf("'%v/%v (%v)'", pods[i].Namespace, pods[i].Name, pods[i].UID)
}
return out
}
func (s *SimpleModeler) listPods(selector labels.Selector) (pods []*api.Pod, err error) {
assumed, err := s.assumedPods.List(selector)
if err != nil {
return nil, err
}
// Since the assumed list will be short, just check every one.
// Goal here is to stop making assumptions about a pod once it shows
// up in one of these other lists.
for _, pod := range assumed {
qExist, err := s.queuedPods.Exists(pod)
if err != nil {
return nil, err
}
if qExist {
s.assumedPods.Store.Delete(pod)
continue
}
sExist, err := s.scheduledPods.Exists(pod)
if err != nil {
return nil, err
}
if sExist {
s.assumedPods.Store.Delete(pod)
continue
}
}
scheduled, err := s.scheduledPods.List(selector)
if err != nil {
return nil, err
}
// Listing purges the ttl cache and re-gets, in case we deleted any entries.
assumed, err = s.assumedPods.List(selector)
if err != nil {
return nil, err
}
if len(assumed) == 0 {
return scheduled, nil
}
glog.V(2).Infof(
"listing pods: [%v] assumed to exist in addition to %v known pods.",
strings.Join(podNames(assumed), ","),
len(scheduled),
)
return append(scheduled, assumed...), nil
}
// PodLister returns a PodLister that will list pods that we think we have scheduled in
// addition to pods that we know have been scheduled.
func (s *SimpleModeler) PodLister() algorithm.PodLister {
return simpleModelerPods{s}
}
// simpleModelerPods is an adaptor so that SimpleModeler can be a PodLister.
type simpleModelerPods struct {
simpleModeler *SimpleModeler
}
// List returns pods known and assumed to exist.
func (s simpleModelerPods) List(selector labels.Selector) (pods []*api.Pod, err error) {
s.simpleModeler.LockedAction(
func() { pods, err = s.simpleModeler.listPods(selector) })
return
}

View File

@ -0,0 +1,111 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"testing"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/labels"
)
type nn struct {
namespace, name string
}
type names []nn
func (ids names) list() []*api.Pod {
out := make([]*api.Pod, 0, len(ids))
for _, id := range ids {
out = append(out, &api.Pod{
ObjectMeta: api.ObjectMeta{
Namespace: id.namespace,
Name: id.name,
},
})
}
return out
}
func (ids names) has(pod *api.Pod) bool {
for _, id := range ids {
if pod.Namespace == id.namespace && pod.Name == id.name {
return true
}
}
return false
}
func TestModeler(t *testing.T) {
table := []struct {
queuedPods []*api.Pod
scheduledPods []*api.Pod
assumedPods []*api.Pod
expectPods names
}{
{
queuedPods: names{}.list(),
scheduledPods: names{{"default", "foo"}, {"custom", "foo"}}.list(),
assumedPods: names{{"default", "foo"}}.list(),
expectPods: names{{"default", "foo"}, {"custom", "foo"}},
}, {
queuedPods: names{}.list(),
scheduledPods: names{{"default", "foo"}}.list(),
assumedPods: names{{"default", "foo"}, {"custom", "foo"}}.list(),
expectPods: names{{"default", "foo"}, {"custom", "foo"}},
}, {
queuedPods: names{{"custom", "foo"}}.list(),
scheduledPods: names{{"default", "foo"}}.list(),
assumedPods: names{{"default", "foo"}, {"custom", "foo"}}.list(),
expectPods: names{{"default", "foo"}},
},
}
for _, item := range table {
q := &cache.StoreToPodLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}
for _, pod := range item.queuedPods {
q.Store.Add(pod)
}
s := &cache.StoreToPodLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}
for _, pod := range item.scheduledPods {
s.Store.Add(pod)
}
m := NewSimpleModeler(q, s)
for _, pod := range item.assumedPods {
m.AssumePod(pod)
}
list, err := m.PodLister().List(labels.Everything())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
found := 0
for _, pod := range list {
if item.expectPods.has(pod) {
found++
} else {
t.Errorf("found unexpected pod %#v", pod)
}
}
if e, a := item.expectPods, found; len(e) != a {
t.Errorf("Expected pods:\n%+v\nFound pods:\n%s\n", podNames(e.list()), podNames(list))
}
}
}

View File

@ -27,7 +27,6 @@ import (
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/metrics" "k8s.io/kubernetes/plugin/pkg/scheduler/metrics"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
"github.com/golang/glog" "github.com/golang/glog"
) )
@ -37,6 +36,32 @@ type Binder interface {
Bind(binding *api.Binding) error Bind(binding *api.Binding) error
} }
// SystemModeler can help scheduler produce a model of the system that
// anticipates reality. For example, if scheduler has pods A and B both
// using hostPort 80, when it binds A to machine M it should not bind B
// to machine M in the time when it hasn't observed the binding of A
// take effect yet.
//
// Since the model is only an optimization, it's expected to handle
// any errors itself without sending them back to the scheduler.
type SystemModeler interface {
// AssumePod assumes that the given pod exists in the system.
// The assumtion should last until the system confirms the
// assumtion or disconfirms it.
AssumePod(pod *api.Pod)
// ForgetPod removes a pod assumtion. (It won't make the model
// show the absence of the given pod if the pod is in the scheduled
// pods list!)
ForgetPod(pod *api.Pod)
ForgetPodByKey(key string)
// For serializing calls to Assume/ForgetPod: imagine you want to add
// a pod if and only if a bind succeeds, but also remove a pod if it is deleted.
// TODO: if SystemModeler begins modeling things other than pods, this
// should probably be parameterized or specialized for pods.
LockedAction(f func())
}
// Scheduler watches for new unscheduled pods. It attempts to find // Scheduler watches for new unscheduled pods. It attempts to find
// nodes that they fit on and writes bindings back to the api server. // nodes that they fit on and writes bindings back to the api server.
type Scheduler struct { type Scheduler struct {
@ -44,12 +69,12 @@ type Scheduler struct {
} }
type Config struct { type Config struct {
// It is expected that changes made via SchedulerCache will be observed // It is expected that changes made via modeler will be observed
// by NodeLister and Algorithm. // by NodeLister and Algorithm.
SchedulerCache schedulercache.Cache Modeler SystemModeler
NodeLister algorithm.NodeLister NodeLister algorithm.NodeLister
Algorithm algorithm.ScheduleAlgorithm Algorithm algorithm.ScheduleAlgorithm
Binder Binder Binder Binder
// NextPod should be a function that blocks until the next pod // NextPod should be a function that blocks until the next pod
// is available. We don't use a channel for this, because scheduling // is available. We don't use a channel for this, because scheduling
@ -104,25 +129,24 @@ func (s *Scheduler) scheduleOne() {
}, },
} }
bindAction := func() bool { // We want to add the pod to the model if and only if the bind succeeds,
// but we don't want to race with any deletions, which happen asynchronously.
s.config.Modeler.LockedAction(func() {
bindingStart := time.Now() bindingStart := time.Now()
err := s.config.Binder.Bind(b) err := s.config.Binder.Bind(b)
if err != nil { if err != nil {
glog.V(1).Infof("Failed to bind pod: %+v", err) glog.V(1).Infof("Failed to bind pod: %+v", err)
s.config.Recorder.Eventf(pod, api.EventTypeNormal, "FailedScheduling", "Binding rejected: %v", err) s.config.Recorder.Eventf(pod, api.EventTypeNormal, "FailedScheduling", "Binding rejected: %v", err)
s.config.Error(pod, err) s.config.Error(pod, err)
return false return
} }
metrics.BindingLatency.Observe(metrics.SinceInMicroseconds(bindingStart)) metrics.BindingLatency.Observe(metrics.SinceInMicroseconds(bindingStart))
s.config.Recorder.Eventf(pod, api.EventTypeNormal, "Scheduled", "Successfully assigned %v to %v", pod.Name, dest) s.config.Recorder.Eventf(pod, api.EventTypeNormal, "Scheduled", "Successfully assigned %v to %v", pod.Name, dest)
return true // tell the model to assume that this binding took effect.
} assumed := *pod
assumed.Spec.NodeName = dest
assumed := *pod s.config.Modeler.AssumePod(&assumed)
assumed.Spec.NodeName = dest })
// We want to assume the pod if and only if the bind succeeds,
// but we don't want to race with any deletions, which happen asynchronously.
s.config.SchedulerCache.AssumePodIfBindSucceed(&assumed, bindAction)
metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start)) metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))
} }

View File

@ -23,18 +23,13 @@ import (
"testing" "testing"
"time" "time"
"sync"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing"
) )
type fakeBinder struct { type fakeBinder struct {
@ -114,8 +109,8 @@ func TestScheduler(t *testing.T) {
var gotAssumedPod *api.Pod var gotAssumedPod *api.Pod
var gotBinding *api.Binding var gotBinding *api.Binding
c := &Config{ c := &Config{
SchedulerCache: &schedulertesting.FakeCache{ Modeler: &FakeModeler{
AssumeFunc: func(pod *api.Pod) { AssumePodFunc: func(pod *api.Pod) {
gotAssumedPod = pod gotAssumedPod = pod
}, },
}, },
@ -166,30 +161,42 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) {
eventBroadcaster := record.NewBroadcaster() eventBroadcaster := record.NewBroadcaster()
defer eventBroadcaster.StartLogging(t.Logf).Stop() defer eventBroadcaster.StartLogging(t.Logf).Stop()
// Setup stores to test pod's workflow: // Setup modeler so we control the contents of all 3 stores: assumed,
// - queuedPodStore: pods queued before processing // scheduled and queued
// - scheduledPodStore: pods that has a scheduling decision
scheduledPodStore := cache.NewStore(cache.MetaNamespaceKeyFunc) scheduledPodStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
scheduledPodLister := &cache.StoreToPodLister{Store: scheduledPodStore}
queuedPodStore := cache.NewFIFO(cache.MetaNamespaceKeyFunc) queuedPodStore := cache.NewFIFO(cache.MetaNamespaceKeyFunc)
queuedPodLister := &cache.StoreToPodLister{Store: queuedPodStore}
modeler := NewSimpleModeler(queuedPodLister, scheduledPodLister)
// Create a fake clock used to timestamp entries and calculate ttl. Nothing
// will expire till we flip to something older than the ttl, at which point
// all entries inserted with fakeTime will expire.
ttl := 30 * time.Second
fakeTime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
fakeClock := util.NewFakeClock(fakeTime)
ttlPolicy := &cache.TTLPolicy{Ttl: ttl, Clock: fakeClock}
assumedPodsStore := cache.NewFakeExpirationStore(
cache.MetaNamespaceKeyFunc, nil, ttlPolicy, fakeClock)
modeler.assumedPods = &cache.StoreToPodLister{Store: assumedPodsStore}
// Port is the easiest way to cause a fit predicate failure // Port is the easiest way to cause a fit predicate failure
podPort := 8080 podPort := 8080
firstPod := podWithPort("foo", "", podPort) firstPod := podWithPort("foo", "", podPort)
stop := make(chan struct{})
defer close(stop)
cache := schedulercache.New(1*time.Second, stop)
// Create the scheduler config // Create the scheduler config
algo := NewGenericScheduler( algo := NewGenericScheduler(
cache,
map[string]algorithm.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts}, map[string]algorithm.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts},
[]algorithm.PriorityConfig{}, []algorithm.PriorityConfig{},
[]algorithm.SchedulerExtender{}, []algorithm.SchedulerExtender{},
modeler.PodLister(),
rand.New(rand.NewSource(time.Now().UnixNano()))) rand.New(rand.NewSource(time.Now().UnixNano())))
var gotBinding *api.Binding var gotBinding *api.Binding
c := &Config{ c := &Config{
SchedulerCache: cache, Modeler: modeler,
NodeLister: algorithm.FakeNodeLister( NodeLister: algorithm.FakeNodeLister(
api.NodeList{Items: []api.Node{{ObjectMeta: api.ObjectMeta{Name: "machine1"}}}}, api.NodeList{Items: []api.Node{{ObjectMeta: api.ObjectMeta{Name: "machine1"}}}},
), ),
@ -236,6 +243,10 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) {
if exists { if exists {
t.Errorf("Did not expect a queued pod, found %+v", pod) t.Errorf("Did not expect a queued pod, found %+v", pod)
} }
pod, exists, _ = assumedPodsStore.GetByKey("foo")
if !exists {
t.Errorf("Assumed pod store should contain stale pod")
}
expectBind := &api.Binding{ expectBind := &api.Binding{
ObjectMeta: api.ObjectMeta{Name: "foo"}, ObjectMeta: api.ObjectMeta{Name: "foo"},
@ -249,6 +260,10 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) {
events.Stop() events.Stop()
scheduledPodStore.Delete(pod) scheduledPodStore.Delete(pod)
_, exists, _ = assumedPodsStore.Get(pod)
if !exists {
t.Errorf("Expected pod %#v in assumed pod store", pod)
}
secondPod := podWithPort("bar", "", podPort) secondPod := podWithPort("bar", "", podPort)
queuedPodStore.Add(secondPod) queuedPodStore.Add(secondPod)
@ -256,26 +271,10 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) {
// scheduledPodStore: [] // scheduledPodStore: []
// assumedPods: [foo:8080] // assumedPods: [foo:8080]
var wg sync.WaitGroup
wg.Add(1)
// waiting for the assumed pod to expire
go func() {
for {
pods, err := cache.List(labels.Everything())
if err != nil {
t.Fatalf("cache.List failed: %v", err)
}
if len(pods) == 0 {
wg.Done()
return
}
time.Sleep(1 * time.Second)
}
}()
wg.Wait()
// Second scheduling pass will fail to schedule if the store hasn't expired // Second scheduling pass will fail to schedule if the store hasn't expired
// the deleted pod. This would normally happen with a timeout. // the deleted pod. This would normally happen with a timeout.
//expirationPolicy.NeverExpire = util.NewStringSet()
fakeClock.Step(ttl + 1)
called = make(chan struct{}) called = make(chan struct{})
events = eventBroadcaster.StartEventWatcher(func(e *api.Event) { events = eventBroadcaster.StartEventWatcher(func(e *api.Event) {

View File

@ -1,48 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package schedulercache
import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)
// FakeCache is used for testing
type FakeCache struct {
AssumeFunc func(*api.Pod)
}
func (f *FakeCache) AssumePodIfBindSucceed(pod *api.Pod, bind func() bool) error {
if !bind() {
return nil
}
f.AssumeFunc(pod)
return nil
}
func (f *FakeCache) AddPod(pod *api.Pod) error { return nil }
func (f *FakeCache) UpdatePod(oldPod, newPod *api.Pod) error { return nil }
func (f *FakeCache) RemovePod(pod *api.Pod) error { return nil }
func (f *FakeCache) GetNodeNameToInfoMap() (map[string]*schedulercache.NodeInfo, error) {
return nil, nil
}
func (f *FakeCache) List(s labels.Selector) ([]*api.Pod, error) { return nil, nil }

View File

@ -1,52 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package schedulercache
import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)
// PodsToCache is used for testing
type PodsToCache []*api.Pod
func (p PodsToCache) AssumePodIfBindSucceed(pod *api.Pod, bind func() bool) error {
if !bind() {
return nil
}
return nil
}
func (p PodsToCache) AddPod(pod *api.Pod) error { return nil }
func (p PodsToCache) UpdatePod(oldPod, newPod *api.Pod) error { return nil }
func (p PodsToCache) RemovePod(pod *api.Pod) error { return nil }
func (p PodsToCache) GetNodeNameToInfoMap() (map[string]*schedulercache.NodeInfo, error) {
return schedulercache.CreateNodeNameToInfoMap(p), nil
}
func (p PodsToCache) List(s labels.Selector) (selected []*api.Pod, err error) {
for _, pod := range p {
if s.Matches(labels.Set(pod.Labels)) {
selected = append(selected, pod)
}
}
return selected, nil
}